Ралица обнови решението на 28.11.2016 18:50 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "fmt"
+)
+
+type taskReturnData struct {
+ result int
+ err error
+}
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+/*______________________________Pipeline______________________________*/
+
+type serialTasks struct {
+ tasks []Task
+}
+
+func (s serialTasks) Execute(initialValue int) (result int, err error) {
+ for i, task := range s.tasks {
+ if i == 0 {
+ result, err = task.Execute(initialValue)
+ } else {
+ result, err = task.Execute(result)
+ }
+ if err != nil {
+ return
+ }
+ }
+
+ return
+}
+
+func Pipeline(tasks ...Task) Task {
+ return serialTasks{tasks}
+}
+
+/*______________________________Fastest______________________________*/
+
+type fastTasks struct {
+ tasks []Task
+
+ fastest chan taskReturnData
+ isDone bool
+ doneMutex sync.RWMutex
+}
+
+func (f fastTasks) Execute(initialValue int) (result int, err error) {
+ if len(f.tasks) == 0 {
+ return result, errors.New("No fast tasks to execute")
+ }
+
+ for _, task := range f.tasks {
+ go func() {
+ currentResult, currentError := task.Execute(initialValue)
+
+ f.doneMutex.RLock()
+ currentIsDone := f.isDone
+ f.doneMutex.RUnlock()
+
+ if !currentIsDone {
+ f.doneMutex.Lock()
+ f.isDone = true
+ f.doneMutex.Unlock()
+
+ f.fastest <- taskReturnData{currentResult, currentError}
+ }
+ }()
+ }
+
+ fastestTaskResult := <-f.fastest
+ return fastestTaskResult.result, fastestTaskResult.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return fastTasks{tasks, make(chan taskReturnData), false, sync.RWMutex{}}
+}
+
+/*_______________________________Timed______________________________*/
+
+type timedTask struct {
+ task Task
+ timeLimit time.Duration
+ resultChannel chan taskReturnData
+}
+
+func (t timedTask) Execute(initialValue int) (int, error) {
+ go func() {
+ result, err := t.task.Execute(initialValue)
+ t.resultChannel <- taskReturnData{result, err}
+ }()
+
+ select {
+ case timedTaskResult := <-t.resultChannel:
+ return timedTaskResult.result, timedTaskResult.err
+ case <-time.After(t.timeLimit):
+ return 0, errors.New("Time limit exceeded")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timedTask{task, timeout, make(chan taskReturnData)}
+}
+
+/*________________________ConcurrentMapReduce______________________*/
+
+type reducableTasks struct {
+ tasks []Task
+
+ results []int
+ resultsMutex sync.Mutex
+
+ ready sync.WaitGroup
+
+ reduce func(results []int) int
+}
+
+func (r reducableTasks) Execute(initialValue int) (int, error) {
+ if len(r.tasks) == 0 {
+ return 0, errors.New("No tasks to execute")
+ }
+
+ for i, t := range r.tasks {
+ r.ready.Add(1)
+
+ go func(index int, task Task) {
+ currentResult, currentError := task.Execute(initialValue)
+ if currentError != nil {
+ fmt.Println("Noooo") /// todo - return error when single task fails
+ }
+
+ r.resultsMutex.Lock()
+ r.results = append(r.results, currentResult)
+ r.resultsMutex.Unlock()
+
+ r.ready.Done()
+ }(i, t)
+ }
+
+ r.ready.Wait()
+
+ return r.reduce(r.results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return reducableTasks{tasks, make([]int, 0, 2), sync.Mutex{}, sync.WaitGroup{}, reduce}
+}
+
+/*__________________________GreatestSearcher________________________*/
+
+type dummy struct{}
+
+func (d dummy) Execute(n int) (int, error) {
+ return 0, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return dummy{}
+}
Todo: GreatestSearcher