Слави обнови решението на 29.11.2016 12:04 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+//pipline
+type pipeline struct {
+ tasks []Task
+}
+
+func (pl pipeline) Execute(input int) (int, error) {
+ if len(pl.tasks) == 0 {
+ return 0, errors.New("no tasks given!")
+ }
+ result := input
+ var err error
+ for _, task := range pl.tasks {
+ result, err = task.Execute(result)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return result, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ return pipeline{[]Task(tasks)}
+}
+
+//fastest
+type fastest struct {
+ tasks []Task
+}
+
+func (fs fastest) Execute(input int) (int, error) {
+ if len(fs.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+ var once sync.Once
+ first := make(chan struct {
+ result int
+ err error
+ })
+ for _, task := range fs.tasks {
+ go func() {
+ result, err := task.Execute(input)
+ once.Do(func() {
+ first <- struct {
+ result int
+ err error
+ }{result, err}
+ })
+ }()
+ }
+ output := <-first
+ return output.result, output.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return fastest{[]Task(tasks)}
+}
+
+//timed
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (td timed) Execute(input int) (int, error) {
+ output := make(chan struct {
+ result int
+ err error
+ })
+ var once sync.Once
+ go func() {
+ result, err := td.task.Execute(input)
+ once.Do(func() {
+ output <- struct {
+ result int
+ err error
+ }{result, err}
+ })
+ }()
+ select {
+ case result := <-output:
+ return result.result, result.err
+ case <-time.After(td.timeout):
+ once.Do(func() {})
+ return 0, errors.New("Task timed out")
+ }
+ return 0, nil
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timed{task, timeout}
+}
+
+//concurentmapreduce
+type mapReducer struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (mr mapReducer) Execute(input int) (int, error) {
+ taskNum := len(mr.tasks)
+ if taskNum == 0 {
+ return 0, errors.New("No tasks given")
+ }
+ results := make([]int, taskNum, taskNum)
+ errSignal := make(chan error)
+ doneSignal := make(chan struct{}, 1)
+ var (
+ once sync.Once
+ wg sync.WaitGroup
+ )
+ wg.Add(taskNum)
+ for index, task := range mr.tasks {
+ go func(index int, task Task) {
+ result, err := task.Execute(input)
+ if err == nil {
+ results[index] = result
+ } else {
+ once.Do(func() { errSignal <- err })
+ }
+ wg.Done()
+ }(index, task)
+ }
+ go func() {
+ wg.Wait()
+ doneSignal <- struct{}{}
+ }()
+ select {
+ case err := <-errSignal:
+ return 0, err
+ case <-doneSignal:
+ return mr.reduce(results), nil
+ }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return mapReducer{[]Task(tasks), reduce}
+}
+
+//greatestSearcher
+type searcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (gs searcher) Execute(input int) (int, error) {
+ var (
+ mtex sync.Mutex
+ wg sync.WaitGroup
+ )
+ errorNum := 0
+ results := make([]int, 0)
+ for task := range gs.tasks {
+ go func(task Task) {
+ wg.Add(1)
+ result, err := task.Execute(input)
+ mtex.Lock()
+ if err == nil {
+ results = append(results, result)
+ } else {
+ errorNum++
+ }
+ mtex.Unlock()
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ if len(results) == 0 {
+ return 0, errors.New("Not task finished succesfuly")
+ } else if errorNum > gs.errorLimit {
+
+ return 0, errors.New("Too many errors")
+ } else {
+ max := results[0]
+ for _, result := range results {
+ if max < result {
+ max = result
+ }
+ }
+ return max, nil
+ }
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return searcher{tasks, errorLimit}
+}
Погледни във Fastest
какво ще бъде task
в цикъла, който въртиш (и горутините, които пускаш).