Николай обнови решението на 26.11.2016 16:38 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+ "unsafe"
+)
+
+//
+// Help functions and types
+//
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Result struct {
+ res int
+ err error
+}
+
+type SyncInt struct {
+ sync.RWMutex
+ num int
+}
+
+func CreateSyncInt(start int) SyncInt {
+ return SyncInt{num: start}
+}
+
+func (ai *SyncInt) Increment() {
+ ai.Lock()
+ ai.num++
+ ai.Unlock()
+}
+
+func (ai *SyncInt) SetMax(val int) {
+ if val > ai.Get() {
+ ai.Lock()
+ ai.num = val
+ ai.Unlock()
+ }
+}
+
+func (ai *SyncInt) Get() (ret int) {
+ ai.RLock()
+ ret = ai.num
+ ai.RUnlock()
+ return
+}
+
+//----------------------------------------------------------------------------
+
+func Pipeline(tasks ...Task) Task {
+ return Pipeliner{tasks}
+}
+
+type Pipeliner struct {
+ tasks []Task
+}
+
+func (p Pipeliner) Execute(arg int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks given!")
+ }
+
+ finChan := make(chan Result)
+ prevChan, nextChan := finChan, finChan
+
+ for i := len(p.tasks) - 1; i >= 0; i-- {
+ nextChan = make(chan Result)
+ go chainTask(prevChan, nextChan, p.tasks[i])
+ prevChan = nextChan
+ }
+
+ go func(feedChan chan Result) {
+ feedChan <- Result{arg, nil}
+ }(nextChan)
+
+ result := <-finChan
+ return result.res, result.err
+}
+
+func chainTask(prevChan, nextChan chan Result, task Task) {
+ result := <-nextChan
+
+ if result.err != nil {
+ prevChan <- result
+ } else {
+ res, err := task.Execute(result.res)
+ prevChan <- Result{res, err}
+ }
+}
+
+//----------------------------------------------------------------------------
+
+func Fastest(tasks ...Task) Task {
+ return Timer{tasks}
+}
+
+type Timer struct {
+ tasks []Task
+}
+
+func (t Timer) Execute(arg int) (int, error) {
+ if len(t.tasks) == 0 {
+ return 0, errors.New("No tasks given!")
+ }
+
+ var once sync.Once
+ fastResChan := make(chan Result)
+
+ for _, task := range t.tasks {
+ go func(task Task) {
+ res, err := task.Execute(arg)
+ once.Do(func() {
+ fastResChan <- Result{res, err}
+ })
+ }(task)
+ }
+
+ result := <-fastResChan
+ return result.res, result.err
+}
+
+//----------------------------------------------------------------------------
+
+func Timed(task Task, timeout time.Duration) Task {
+ return Limiter{task, timeout}
+}
+
+type Limiter struct {
+ task Task
+ limit time.Duration
+}
+
+func (l Limiter) Execute(arg int) (int, error) {
+ resultChan := make(chan Result)
+
+ go func() {
+ res, err := l.task.Execute(arg)
+ resultChan <- Result{res, err}
+ }()
+
+ select {
+ case result := <-resultChan:
+ return result.res, result.err
+ case <-time.After(l.limit):
+ return 0, fmt.Errorf("Task didn't finish for %s!", l.limit)
+ }
+}
+
+//----------------------------------------------------------------------------
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return MapReducer{reduce, tasks}
+}
+
+type MapReducer struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (mr MapReducer) Execute(arg int) (int, error) {
+ if len(mr.tasks) == 0 {
+ return 0, errors.New("No tasks given!")
+ }
+
+ results := make([]int, len(mr.tasks))
+ resultChan := make(chan Result)
+ errCnt := CreateSyncInt(0)
+
+ var taskWg sync.WaitGroup
+ for ind, task := range mr.tasks {
+ taskWg.Add(1)
+ go func(ind int, task Task) {
+ res, err := task.Execute(arg)
+ if err == nil {
+ results[ind] = res
+ } else {
+ resultChan <- Result{0, fmt.Errorf("Error in task %d", ind)}
+ if errCnt.Get() == 0 {
+ errCnt.Increment()
+ }
+ }
+ taskWg.Done()
+ }(ind, task)
+ }
+
+ go func() {
+ taskWg.Wait()
+ if errCnt.Get() == 0 {
+ resultChan <- Result{mr.reduce(results), nil}
+ }
+ }()
+
+ result := <-resultChan
+ return result.res, result.err
+}
+
+//----------------------------------------------------------------------------
+
+func MinInt() int {
+ // 'i' is used to take the size of an int for the current build
+ var i int
+ return -(1 << ((uint(unsafe.Sizeof(i)) * 8) - 1))
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return AsyncExecuter{errorLimit, tasks}
+}
+
+type AsyncExecuter struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (ae AsyncExecuter) Execute(arg int) (int, error) {
+ var taskWg sync.WaitGroup
+ errorCountSync, maxSync := CreateSyncInt(0), CreateSyncInt(MinInt())
+
+ taskCnt := 0
+ for task := range ae.tasks {
+ taskWg.Add(1)
+ taskCnt++
+ go func(task Task) {
+ res, err := task.Execute(arg)
+ if err == nil {
+ maxSync.SetMax(res)
+ } else {
+ errorCountSync.Increment()
+ }
+ taskWg.Done()
+ }(task)
+ }
+
+ taskWg.Wait()
+ max, errorCnt := maxSync.Get(), errorCountSync.Get()
+ var err error = nil
+ if taskCnt == 0 {
+ err = fmt.Errorf("No tasks given!")
+ } else if taskCnt <= errorCnt {
+ err = fmt.Errorf("All tasks failed!")
+ } else if ae.errorLimit < errorCnt {
+ err = fmt.Errorf("The error limit of %d was exceeded!", ae.errorLimit)
+ }
+ return max, err
+}
- Виж пак условието за
Pipeline()
, реално всичките тези горутини и канали не са ли лек overkill там? - Какво ще стане с горутината, която пускаш в
Execute()
наTimed()
, ако все пак се стигне до timeout? Или в тези, които пускаш вConcurrentMapReduce()
, след като си прочел един резултат отresultChan
и си го върнал? - Вместо
MinInt
(иunsafe
!), помисли за по-добър начин да укажеш "нищо" или "няма стойност". Или разгледай math пакета от стандартната библиотека и ползвай MinInt нещата от там :)