Анатоли обнови решението на 29.11.2016 12:58 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "time"
+ "sync"
+)
+
+// Task ...
+type Task interface {
+ Execute(int) (int, error)
+}
+
+// Pipeline
+
+type tolibPipeline struct {
+ tasks []Task
+}
+
+func (t *tolibPipeline) Execute(i int) (int, error) {
+
+ if len(t.tasks) == 0 {
+ return i, fmt.Errorf("No valid tasks provided")
+ }
+
+ var err error
+ for _, task := range t.tasks {
+ if i, err = task.Execute(i); err != nil {
+ return i, err
+ }
+ }
+
+ return i, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ return &tolibPipeline{tasks}
+}
+
+// Fastest
+
+type tolibFastest struct {
+ tasks []Task
+}
+
+func (t *tolibFastest) Execute(i int) (int, error) {
+
+ if len(t.tasks) == 0 {
+ return i, fmt.Errorf("No valid tasks provided")
+ }
+
+ type myerr struct {
+ r int
+ e error
+ }
+
+ ch := make(chan myerr, len(t.tasks))
+ var wg sync.WaitGroup
+
+ for _, task := range t.tasks {
+ wg.Add(1)
+ go func() {
+ r, e := task.Execute(i)
+ ch <- myerr{r, e}
+ wg.Done()
+ }()
+ }
+
+ res := <-ch
+ wg.Wait()
Предполагам разбираш че идеята на Fastest е че искаме резултата от най-бързата задача възможно най бързо а за другите искаме просто да не хабят goroutine-и но не и да ги чакаме за да си получим резултата
Условието казва: трябва да се изпълнят конкурентно и да се върне резултата (или грешката) на тази задача, която завърши първа
. Това, според мен, не изисква да се върне веднага резултата на най-бързата, само на тази която е завършила първа.
+ close(ch)
+ return res.r, res.e
+}
+
+func Fastest(tasks ...Task) Task {
+ return &tolibFastest{tasks}
+}
+
+// Timed
+
+type tolibTimed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t *tolibTimed) Execute(i int) (int, error) {
+
+ type myerr struct {
+ r int
+ e error
+ }
+
+ ch := make(chan myerr, 1)
+ res := myerr{i, fmt.Errorf("Timeout")}
+
+ go func() {
+ r, e := t.task.Execute(i)
+ ch <- myerr{r, e}
+ close(ch)
+ }()
+
+ select {
+ case res = <-ch:
+ case <-time.After(t.timeout):
+ }
+
+ return res.r, res.e
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return &tolibTimed{task, timeout}
+}
+
+// MapReduce
+
+type tolibMapReduce struct {
+ tasks []Task
+ f func([]int) int
+}
+
+func (t *tolibMapReduce) Execute(i int) (int, error) {
+
+ if len(t.tasks) == 0 {
+ return i, fmt.Errorf("No valid tasks provided")
+ }
+
+ type myerr struct {
+ r int
+ e error
+ }
+
+ chdone := make(chan myerr, len(t.tasks))
+ cherrs := make(chan myerr, len(t.tasks))
+ res := myerr{i, nil}
+
+ go func() {
+ var wg sync.WaitGroup
+ for _, task := range t.tasks {
+ wg.Add(1)
+ go func() {
+ r, e := task.Execute(i)
+
+ if e != nil {
+ cherrs <- myerr{r, e}
+ } else {
+ chdone <- myerr{r, e}
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ close(chdone)
+ close(cherrs)
+ }()
+
+ toreduce := make([]int, len(t.tasks))
+
+ for i := 0; i < len(t.tasks); i++ {
+ select {
+ case res = <-chdone:
+ toreduce[i] = res.r
+ case res = <-cherrs:
+ return res.r, res.e
+ }
+ }
+
+ return t.f(toreduce), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return &tolibMapReduce{tasks, reduce}
+}
+
+// GSearch
+
+type tolibGSearch struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (t *tolibGSearch) Execute(i int) (int, error) {
+
+ chres := make(chan int)
+ cherrs := make(chan struct{})
+
+ go func() {
+ var wg sync.WaitGroup
+
+ for {
+ task, ok := <-t.tasks
+
for task := range t.tasks {
+ if !ok {
+ break
+ }
+
+ wg.Add(1)
+ go func() {
+ res, err := task.Execute(i)
+ if err != nil {
if res, err := task.Execute(i); err != nil {
+ fmt.Println("Error", err)
+ cherrs <- struct{}{}
+ } else {
+ chres <- res
+ }
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+ close(cherrs)
+ close(chres)
+ }()
+
+ errors, tasks, max := 0, 0, 0
+ for {
+ select {
+ case res, ok := <-chres:
+ if !ok {
+ if tasks == 0 {
+ return i, fmt.Errorf("No tasks provided")
+ }
+
+ return max, nil
+ }
+
+ tasks++
+ if max <= res {
+ max = res
+ }
+ case _, ok := <-cherrs:
+
+ if ok {
+ errors++
+ }
+
+ if errors > t.errorLimit {
+ return i, fmt.Errorf("Error limit exceeded")
+ }
+ }
+ }
+
+ return 0, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return &tolibGSearch{tasks, errorLimit}
+}