Данислав обнови решението на 26.11.2016 10:56 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Pipeliner struct {
+ tasks []Task
+}
+
+func (p *Pipeliner) Execute(number int) (int, error) {
+ numberOfTasks := len(p.tasks)
+ if numberOfTasks == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+
+ res := number
+ err := errors.New("")
тук май може да е просто var err error
+ for _, task := range p.tasks {
+ res, err = task.Execute(res)
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return res, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ p := new(Pipeliner)
+ p.tasks = tasks[:]
+ return p
+}
+
+type Fast struct {
+ tasks []Task
+}
+
+func (f *Fast) Execute(number int) (int, error) {
+ numberOfTasks := len(f.tasks)
+ if numberOfTasks == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+
+ ch := make(chan struct {
+ res int
+ err error
+ }, numberOfTasks)
+ for _, task := range f.tasks {
+ go func(task Task) {
+ res, err := task.Execute(number)
+ ch <- struct {
+ res int
+ err error
+ }{res, err}
+ }(task)
+ }
+
+ first := <-ch
+ return first.res, first.err
+}
+
+func Fastest(tasks ...Task) Task {
+ f := new(Fast)
+ f.tasks = tasks[:]
+ return f
+}
+
+type Timer struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t *Timer) Execute(number int) (int, error) {
+ ch := make(chan struct {
+ res int
+ err error
+ }, 1)
+ go func() {
+ res, err := t.task.Execute(number)
+ ch <- struct {
+ res int
+ err error
+ }{res, err}
+ close(ch)
+ }()
+
+ select {
+ case s := <-ch:
+ return s.res, s.err
+ case <-time.After(t.timeout):
+ return 0, errors.New("Timeout")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ t := new(Timer)
+ t.task = task
+ t.timeout = timeout
+ return t
+}
+
+type Reducer struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (r *Reducer) Execute(number int) (int, error) {
+ numberOfTasks := len(r.tasks)
+ if numberOfTasks == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+
+ var results []int
+ ch := make(chan struct {
+ res int
+ err error
+ }, numberOfTasks)
+ for _, task := range r.tasks {
+ go func(task Task) {
+ res, err := task.Execute(number)
+ ch <- struct {
+ res int
+ err error
+ }{res, err}
+ }(task)
+ }
+
+ for i := 0; i < numberOfTasks; i++ {
+ s := <-ch
+ if s.err != nil {
+ return 0, s.err
+ }
+ results = append(results, s.res)
+ }
+ return r.reduce(results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ r := new(Reducer)
+ r.reduce = reduce
+ r.tasks = tasks[:]
+ return r
+}
+
+type Searcher struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (s *Searcher) Execute(number int) (int, error) {
+ var results []int
+ errs := int64(0)
+ var wg sync.WaitGroup
+ for task := range s.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ res, err := task.Execute(number)
+ if err != nil {
+ errs = atomic.AddInt64(&errs, 1)
+ } else {
+ results = append(results, res)
+ }
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ if len(results) == 0 && errs == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+ if errs > int64(s.errorLimit) {
+ return 0, errors.New("Too many errors!")
+ }
+
+ max := results[0]
+ for _, res := range results {
+ if res > max {
+ max = res
+ }
+ }
+
+ return int(max), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ s := new(Searcher)
+ s.errorLimit = errorLimit
+ s.tasks = tasks
+ return s
+}
Може да си дефинираш някакъв твой тип за тази структура с грешката и резултата, за да не се налага да му пишеш дефиницията навсякъде.
Иначе изглежда супер. Оше не сме написали всичките тестове и не мога да кажа със сигурност, на на пръв поглед не виждам някакив очевидни синхронизационни проблеми или висящи горутини.