Николай обнови решението на 29.11.2016 02:12 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+func Pipeline(tasks ...Task) Task {
+ return &pipeline{tasks}
+}
+
+type pipeline struct {
+ tasks []Task
+}
+
+func (p pipeline) Execute(addend int) (int, error) {
+ var err error
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given")
+ }
+ val := addend
+ for _, task := range p.tasks {
+ if val, err = task.Execute(val); err != nil {
+ return 0, err
+ }
+ }
+ return val, nil
+}
+
+func Fastest(tasks ...Task) Task {
+ return &fastest{tasks}
+}
+
+type fastest struct {
+ tasks []Task
+}
+
+type taskResult struct {
+ val int
+ err error
+}
+
+func (f fastest) Execute(addend int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given")
+ }
+ c := make(chan taskResult)
+ executeTask := func(i int) {
+ val, err := f.tasks[i].Execute(addend)
+ c <- taskResult{val, err}
+ }
+ for i := range f.tasks {
+ go executeTask(i)
+ }
+ fastestTaskResult := <-c
+ return fastestTaskResult.val, fastestTaskResult.err
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return &timed{task, timeout}
+}
+
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timed) Execute(addend int) (int, error) {
+ c := make(chan taskResult)
+ go func() {
+ val, err := t.task.Execute(addend)
+ c <- taskResult{val, err}
+ }()
+ select {
+ case result := <-c:
+ close(c)
+ return result.val, result.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
+ }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return &concurrentMapReduce{tasks, reduce}
+}
+
+type concurrentMapReduce struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
+ if len(cmr.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given")
+ }
+ group := sync.WaitGroup{}
+ c := make(chan taskResult)
+ results := make([]int, 0)
+
+ executeTask := func(i int) {
+ val, err := cmr.tasks[i].Execute(addend)
+ c <- taskResult{val, err}
+ }
+ for i := range cmr.tasks {
+ group.Add(1)
+ go executeTask(i)
+ }
+ go func() {
+ group.Wait()
+ close(c)
+ }()
+ for result := range c {
+ val, err := result.val, result.err
+ if err == nil {
+ results = append(results, val)
+ } else {
+ return 0, err
+ }
+ group.Done()
+ }
+ return cmr.reduce(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return &greatestSearcher{tasks, errorLimit}
+}
+
+type greatestSearcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (g greatestSearcher) Execute(addend int) (int, error) {
+ errorCount := 0
+ max := math.MinInt64
+ hasSuccessfulTask, hasFailedTask := false, false
+ group := sync.WaitGroup{}
+
+ c := concurentHelper(g.tasks, &group, addend)
+
+ for taskResult := range c {
+ val, err := taskResult.val, taskResult.err
+ if err == nil {
+ hasSuccessfulTask = true
+ if val > max {
+ max = val
+ }
+ } else {
+ hasFailedTask = true
+ errorCount++
+ if errorCount > g.errorLimit {
+ return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
+ }
+ }
+ group.Done()
+ }
+ if !hasFailedTask && !hasSuccessfulTask {
+ return 0, fmt.Errorf("No task were given")
+ } else if !hasSuccessfulTask {
+ return 0, fmt.Errorf("No successful tasks were executed")
+ }
+ return max, nil
+}
+
+func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
+ c := make(chan taskResult)
+ go func() {
+ defer close(c)
+ defer group.Wait()
+ for task := range tasks {
+ group.Add(1)
+ go func(task Task) {
+ val, err := task.Execute(addend)
+ c <- taskResult{val, err}
+ }(task)
+ }
+ }()
+ return c
+}