Валентин обнови решението на 25.11.2016 23:34 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Result struct {
+ result int
+ err error
+}
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Chain struct {
+ tasks []Task
+}
+
+func (c Chain) Execute(arg int) (_ int, e error) {
+ if len(c.tasks) == 0 {
+ return 0, errors.New("No tasks to execute.")
+ }
+ for _, t := range c.tasks {
+ arg, e = t.Execute(arg)
+ if e != nil {
+ return 0, e
+ }
+ }
+ return arg, nil
+}
+func Pipeline(tasks ...Task) Task {
+ return Chain{tasks}
+}
+
+type Concurrent struct {
+ tasks []Task
+ fastestResult chan Result
+}
+
+func (p Concurrent) Execute(arg int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks to execute.")
+ }
+ once := sync.Once{}
+ for _, t := range p.tasks {
+ go func(t Task) {
+ r, e := t.Execute(arg)
+ once.Do(func() { p.fastestResult <- Result{r, e} })
+ }(t)
+ }
+ result := <-p.fastestResult
+ return result.result, result.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return Concurrent{
+ tasks: tasks,
+ fastestResult: make(chan Result),
+ }
+}
+
+type TimeIsMoney struct {
+ task Task
+ time time.Duration
+}
+
+func (t TimeIsMoney) Execute(arg int) (int, error) {
+ done := make(chan Result)
+ quit := make(chan struct{})
+ go func() {
+ n, e := t.task.Execute(arg)
+ select {
+ case done <- Result{n, e}:
+ case <-quit:
+ }
+ }()
+ select {
+ case <-time.After(t.time):
+ quit <- struct{}{}
+ return 0, errors.New("Task did not finish in time.")
+ case r := <-done:
+ return r.result, r.err
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return TimeIsMoney{
+ task: task,
+ time: timeout,
+ }
+}
+
+type Reducer struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (r Reducer) Execute(arg int) (int, error) {
+ if len(r.tasks) == 0 {
+ return 0, errors.New("No tasks to execute.")
+ }
+ resultStream := make(chan Result)
+ resultSlice := make([]int, 0, len(r.tasks))
+ for _, t := range r.tasks {
+ go func(t Task) {
+ n, e := t.Execute(arg)
+ defer func() {
+ recover()
+ }()
+ resultStream <- Result{n, e}
+ }(t)
+ }
+ for range r.tasks {
+ res := <-resultStream
+ if res.err != nil {
+ close(resultStream)
+ return 0, res.err
+ }
+ resultSlice = append(resultSlice, res.result)
+ }
+ return r.reduce(resultSlice), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return Reducer{
+ tasks: tasks,
+ reduce: reduce,
+ }
+}
+
+type Searcher struct {
+ tasks <-chan Task
+ errLimit int
+}
+
+func (r Searcher) Execute(arg int) (int, error) {
+ var (
+ firstTask bool = true
+ numberOfErrors int
+ maxNumber int
+ wg sync.WaitGroup
+ resultChannel chan Result = make(chan Result)
+ )
+ go func() {
+ for res := range resultChannel {
+ if res.err != nil {
+ numberOfErrors++
+ }
+ if maxNumber < res.result || firstTask {
+ maxNumber = res.result
+ firstTask = false
+ }
+ }
+ }()
+ for t := range r.tasks {
+ wg.Add(1)
+ go func(t Task) {
+ // wg.Add(1) HERE CAUSES RACE CONDITION!!
+ n, e := t.Execute(arg)
+ resultChannel <- Result{n, e}
+ wg.Done()
+ }(t)
+ }
+ wg.Wait()
+ close(resultChannel)
+ if firstTask {
+ return 0, errors.New("No tasks to execute.")
+ }
+ if numberOfErrors > r.errLimit {
+ return 0, errors.New("Maximum number of errors exceeded.")
+ }
+ return maxNumber, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return Searcher{
+ tasks: tasks,
+ errLimit: errorLimit,
+ }
+}
Имаш race condition в този Execute