Ралица обнови решението на 28.11.2016 18:50 (преди над 1 година)
+package main
+
+import (
+        "errors"
+        "sync"
+        "time"
+
+        "fmt"
+)
+
+type taskReturnData struct {
+        result int
+        err    error
+}
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+/*______________________________Pipeline______________________________*/
+
+type serialTasks struct {
+        tasks []Task
+}
+
+func (s serialTasks) Execute(initialValue int) (result int, err error) {
+        for i, task := range s.tasks {
+                if i == 0 {
+                        result, err = task.Execute(initialValue)
+                } else {
+                        result, err = task.Execute(result)
+                }
+                if err != nil {
+                        return
+                }
+        }
+
+        return
+}
+
+func Pipeline(tasks ...Task) Task {
+        return serialTasks{tasks}
+}
+
+/*______________________________Fastest______________________________*/
+
+type fastTasks struct {
+        tasks []Task
+
+        fastest   chan taskReturnData
+        isDone    bool
+        doneMutex sync.RWMutex
+}
+
+func (f fastTasks) Execute(initialValue int) (result int, err error) {
+        if len(f.tasks) == 0 {
+                return result, errors.New("No fast tasks to execute")
+        }
+
+        for _, task := range f.tasks {
+                go func() {
+                        currentResult, currentError := task.Execute(initialValue)
+
+                        f.doneMutex.RLock()
+                        currentIsDone := f.isDone
+                        f.doneMutex.RUnlock()
+
+                        if !currentIsDone {
+                                f.doneMutex.Lock()
+                                f.isDone = true
+                                f.doneMutex.Unlock()
+
+                                f.fastest <- taskReturnData{currentResult, currentError}
+                        }
+                }()
+        }
+
+        fastestTaskResult := <-f.fastest
+        return fastestTaskResult.result, fastestTaskResult.err
+}
+
+func Fastest(tasks ...Task) Task {
+        return fastTasks{tasks, make(chan taskReturnData), false, sync.RWMutex{}}
+}
+
+/*_______________________________Timed______________________________*/
+
+type timedTask struct {
+        task          Task
+        timeLimit     time.Duration
+        resultChannel chan taskReturnData
+}
+
+func (t timedTask) Execute(initialValue int) (int, error) {
+        go func() {
+                result, err := t.task.Execute(initialValue)
+                t.resultChannel <- taskReturnData{result, err}
+        }()
+
+        select {
+        case timedTaskResult := <-t.resultChannel:
+                return timedTaskResult.result, timedTaskResult.err
+        case <-time.After(t.timeLimit):
+                return 0, errors.New("Time limit exceeded")
+        }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        return timedTask{task, timeout, make(chan taskReturnData)}
+}
+
+/*________________________ConcurrentMapReduce______________________*/
+
+type reducableTasks struct {
+        tasks []Task
+
+        results      []int
+        resultsMutex sync.Mutex
+
+        ready sync.WaitGroup
+
+        reduce func(results []int) int
+}
+
+func (r reducableTasks) Execute(initialValue int) (int, error) {
+        if len(r.tasks) == 0 {
+                return 0, errors.New("No tasks to execute")
+        }
+
+        for i, t := range r.tasks {
+                r.ready.Add(1)
+
+                go func(index int, task Task) {
+                        currentResult, currentError := task.Execute(initialValue)
+                        if currentError != nil {
+                                fmt.Println("Noooo") /// todo - return error when single task fails
+                        }
+
+                        r.resultsMutex.Lock()
+                        r.results = append(r.results, currentResult)
+                        r.resultsMutex.Unlock()
+
+                        r.ready.Done()
+                }(i, t)
+        }
+
+        r.ready.Wait()
+
+        return r.reduce(r.results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        return reducableTasks{tasks, make([]int, 0, 2), sync.Mutex{}, sync.WaitGroup{}, reduce}
+}
+
+/*__________________________GreatestSearcher________________________*/
+
+type dummy struct{}
+
+func (d dummy) Execute(n int) (int, error) {
+        return 0, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        return dummy{}
+}
Todo: GreatestSearcher
