Добромир обнови решението на 28.11.2016 00:02 (преди над 1 година)
+package main
+
+import "sync"
+import "time"
+import "fmt"
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelinedTasks struct {
+ tasks []Task
+}
+
+type fastestTask struct {
+ once sync.Once
+ start sync.WaitGroup
+ tasks []Task
+}
+
+type timedTask struct {
+ task Task
+ duration time.Duration
+}
+
+type mapReduce struct {
+ err bool
+ tasks []Task
+ reduce func([]int) int
+ lock sync.Mutex
+}
+
+type faultTollerantTaskExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+ running sync.WaitGroup
+}
+
+func Pipeline(tasks ...Task) Task {
+ return pipelinedTasks{tasks}
+}
+
+func (pipeline pipelinedTasks) Execute(firstArg int) (int, error) {
+ result := firstArg
+ var err error
+ for i := 0; i < len(pipeline.tasks); i++ {
+ result, err = pipeline.tasks[i].Execute(result)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return result, nil
+}
+
+func Fastest(tasks ...Task) Task {
+ runner := &fastestTask{}
+ runner.tasks = tasks
+ return runner
+}
+
+func (fastest *fastestTask) Execute(arg int) (int, error) {
+ if len(fastest.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given to Fastest")
+ }
+
+ resChan := make(chan struct {
+ res int
+ err error
+ })
+ fastest.start.Add(len(fastest.tasks))
+
+ for _, task := range fastest.tasks {
+ go func(t Task, arg int, resChan chan struct {
+ res int
+ err error
+ }) {
+ fastest.start.Wait()
+
+ result, err := t.Execute(arg)
+ fastest.once.Do(func() {
+ resChan <- struct {
+ res int
+ err error
+ }{result, err}
+ })
+ }(task, arg, resChan)
+ fastest.start.Done()
+ }
+
+ res := <-resChan
+ return res.res, res.err
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timedTask{task, timeout}
+}
+
+func (timeLimited timedTask) Execute(arg int) (int, error) {
+ start := time.Now()
+ res, err := timeLimited.task.Execute(arg)
+ duration := time.Since(start)
+
+ if timeLimited.duration >= duration {
+ return res, err
+ }
+
+ return 0, fmt.Errorf("")
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ mr := &mapReduce{}
+ mr.err = false
+ mr.reduce = reduce
+ mr.tasks = tasks
+
+ return mr
+}
+
+func (mr *mapReduce) Execute(arg int) (int, error) {
+ if len(mr.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given!")
+ }
+
+ results := make([]int, 0)
+ resultChan := make(chan int)
+ errorChan := make(chan error)
+
+ for _, task := range mr.tasks {
+ go func(task Task, resultChan chan int, errorChan chan error, mr *mapReduce) {
+ res, err := task.Execute(arg)
+
+ mr.lock.Lock()
+ defer func() { mr.lock.Unlock() }()
+
+ if mr.err {
+ return
+ }
+
+ if err != nil {
+ errorChan <- err
+ mr.err = true
+ } else {
+ resultChan <- res
+ }
+ }(task, resultChan, errorChan, mr)
+ }
+
+ done := false
+ for !done {
+ select {
+ case result, _ := <-resultChan:
+ results = append(results, result)
+ if len(results) == len(mr.tasks) {
+ done = true
+ break
+ }
+ case err := <-errorChan:
+ return 0, err
+ }
+ }
+
+ return mr.reduce(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ executor := &faultTollerantTaskExecutor{}
+
+ executor.errorLimit = errorLimit
+ executor.tasks = tasks
+
+ return executor
+}
+
+func (executor *faultTollerantTaskExecutor) Execute(arg int) (int, error) {
+ resultChan := make(chan int)
+ errorChan := make(chan error)
+ tasksRun := 0
+
+ for task := range(executor.tasks) {
+ tasksRun++
+ executor.running.Add(1)
+ go func(task Task, resultChan chan int, errorChan chan error) {
+ defer func() { executor.running.Done() }()
+ res, err := task.Execute(arg)
+ if err != nil {
+ errorChan <- err
+ } else {
+ resultChan <- res
+ }
+ }(task, resultChan, errorChan)
+ }
+
+ if tasksRun == 0 {
+ return 0, fmt.Errorf("No tasks given!")
+ }
+
+ result, errors := 0, 0
+ go func() {
+ for tasksRun > 0 {
+ select {
+ case res, _ := <-resultChan:
+ if res > result {
+ result = res
+ }
+ tasksRun--
+ case <-errorChan:
+ tasksRun--
+ errors++
+ }
+ }
+ }()
+
+ executor.running.Wait()
+ if errors >= executor.errorLimit {
+ return 0, fmt.Errorf("More tasks errored out than allowed limit!")
+ }
+
+ return result, nil
+}
Идеята на Timed е че не искаме да чакаме 2 часа за задача която ни трябва резултата в следващите 10 секунди. Така че трябва след като изтече timeout-а Timed да върне че не е стигнало времето. В допълнение е хубаво ако не оставя goroutine които няма да завършат преди края на програмата.