Емил обнови решението на 29.11.2016 13:19 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type PipelineTask struct {
+ tasks []Task
+}
+
+func (p *PipelineTask) Execute(i int) (r int, err error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+
+ for _, task := range p.tasks {
+ r, err = task.Execute(i)
+ if err != nil {
+ return
+ }
+
+ i = r
+ }
+
+ return
+}
+
+func Pipeline(tasks ...Task) Task {
+ return &PipelineTask{
+ tasks: tasks,
+ }
+}
+
+type FastestTask struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+ return &FastestTask{
+ tasks: tasks,
+ }
+}
+
+func (p *FastestTask) Execute(i int) (r int, err error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+
+ rc := make(chan int)
+ ec := make(chan error)
+ for _, task := range p.tasks {
+ go func(t Task) {
+ rf, errf := t.Execute(i)
+ if errf != nil {
+ ec <- errf
+ } else {
+ rc <- rf
+ }
+ }(task)
+ }
+
+ select {
+ case r = <-rc:
+ case err = <-ec:
+ }
+
+ return
+}
+
+type TimedTask struct {
+ timeout time.Duration
+}
+
+func (t *TimedTask) Execute(i int) (r int, err error) {
+ time.Sleep(t.timeout)
+ err = errors.New("Timeout")
+ return
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ tasks := []Task{task, &TimedTask{timeout}}
+ return Fastest(tasks...)
+}
+
+type ConcurrentMapReduceTask struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return &ConcurrentMapReduceTask{
+ tasks: tasks,
+ reduce: reduce,
+ }
+}
+
+func (t *ConcurrentMapReduceTask) Execute(i int) (r int, err error) {
+ if len(t.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+
+ rc := make(chan int)
+ ec := make(chan error)
+ results := make([]int, 0)
+
+ for _, task := range t.tasks {
+ go func(t Task) {
+ rf, errf := t.Execute(i)
+ if errf != nil {
+ ec <- errf
+ } else {
+ rc <- rf
+ }
+ }(task)
+ }
+
+ for _, _ = range t.tasks {
+ select {
+ case rt := <-rc:
+ results = append(results, rt)
+ case err = <-ec:
+ return
+ }
+ }
+
+ r = t.reduce(results)
+ return
+}
+
+type GreatestSearcherTask struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return &GreatestSearcherTask{
+ tasks: tasks,
+ errorLimit: errorLimit,
+ }
+}
+
+func (t *GreatestSearcherTask) Execute(i int) (r int, err error) {
+ rc := make(chan int)
+ ec := make(chan error)
+ first := true
+ errorCount := t.errorLimit
+ taskCount := 0
+
+ for task := range t.tasks {
+ taskCount++
+ go func(t Task) {
+ rf, errf := t.Execute(i)
+ if errf != nil {
+ ec <- errf
+ } else {
+ rc <- rf
+ }
+ }(task)
+ }
+
+ for taskCount != 0 {
+ select {
+ case rt := <-rc:
+ if first || rt > r {
+ r = rt
+ first = false
+ }
+ case _ = <-ec:
+ errorCount--
+ if errorCount < 0 {
+ err = errors.New("Error limit exceeded")
+ return
+ }
+ }
+ taskCount--
+ }
+
+ return
+}
Погледни темата във форума и си погледни error handling-а на GreatestSearcher
и какво се случва с пуснатите от теб горутини във Fastest