Иван обнови решението на 29.11.2016 14:53 (преди над 1 година)
+// hw3
+package main
+
+import (
+        "fmt"
+        "sync"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type PipeType struct {
+        tasks []Task
+}
+
+func (m *PipeType) Execute(op int) (int, error) {
+
+        var nextVal int
+        var err error
+
+        if len(m.tasks) == 0 {
+                return 0, fmt.Errorf("no params!")
+        }
+
+        if nextVal, err = m.tasks[0].Execute(op); err != nil {
+
+                return 0, err
+
+        }
+
+        for i := 1; i < len(m.tasks); i++ {
+
+                if nextVal, err = m.tasks[i].Execute(nextVal); err != nil {
+
+                        return 0, err
+
+                }
+
+        }
+
+        return nextVal, err
+
+}
+
+func Pipeline(tasks ...Task) Task {
+
+        return &PipeType{tasks}
+
+}
+
+//second part
+type FastType struct {
+        tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+
+        return &FastType{tasks}
+
+}
+
+type Res struct {
+        r int
+        e error
+}
+
+func makeRes(r int, e error) Res {
+        return Res{r, e}
+}
+
+func (f *FastType) Execute(op int) (int, error) {
+
+        if len(f.tasks) == 0 {
+                return 0, fmt.Errorf("no params!")
+        }
+
+        ch := make(chan Res, 1)
+
+        for _, t := range f.tasks {
+                go func() {
+                        select {
+                        case ch <- makeRes(t.Execute(op)):
+
+                        default:
+                        }
+                }()
+        }
+
+        var ret Res = <-ch
+
+        return ret.r, ret.e
+
+}
+
+//third part
+
+type TimeType struct {
+        ts Task
+        tm time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+
+        return &TimeType{task, timeout}
+
+}
+
+func (t TimeType) Execute(op int) (int, error) {
+
+        ch := make(chan Res, 1)
+
+        go func() {
+
+                ch <- makeRes(t.ts.Execute(op))
+
+        }()
+
+        select {
+
+        case res := <-ch:
+                return res.r, res.e
+
+        case <-time.After(t.tm):
+                return 0, fmt.Errorf("time out!")
+
+        }
+
+}
+
+//fourth part
+type MapType struct {
+        tasks []Task
+
+        f func([]int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+
+        return &MapType{tasks, reduce}
+
+}
+
+func (m *MapType) Execute(op int) (int, error) {
+
+        if len(m.tasks) == 0 {
+                return 0, fmt.Errorf("no params!")
+        }
+
+        var wg sync.WaitGroup
+
+        lock := make(chan struct{}, 1) //for res slice protection
+        errDetector := make(chan struct{}, 1)
+        alright := make(chan struct{}, 1)
+
+        res := make([]int, 0)
+
+        for _, i := range m.tasks {
+
+                wg.Add(1)
+                go func() {
+
+                        lock <- struct{}{}
+
+                        if curRes, err := i.Execute(op); err != nil {
+
+                                errDetector <- struct{}{}
+
+                        } else {
+
+                                res = append(res, curRes)
+                                <-lock
+                                wg.Done()
+                        }
+
+                }()
+        }
+
+        go func() {
+
+                wg.Wait()
+                alright <- struct{}{}
+        }()
+
+        select {
+
+        case <-alright:
+                //we are sure that all routines have been passed
+                return m.f(res), nil
+
+        case <-errDetector:
+                return 0, fmt.Errorf("an error has occurred")
+
+        }
+
+}
+
+//fifth part
+
+var MIN int = -100000000
+
+type GreatestType struct {
+        tasks    <-chan Task
+        errLimit int
+        greatest int
+        sync.Mutex
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+
+        return &GreatestType{tasks, errorLimit, MIN, sync.Mutex{}}
+
+}
+
+func (g *GreatestType) Execute(op int) (int, error) {
+
+        var wg sync.WaitGroup
+
+        for {
+
+                curTask, ok := <-g.tasks
+
+                if !ok { //the chan is closed
+
+                        wg.Wait()
+
+                        if g.errLimit < 0 {
+                                return 0, fmt.Errorf("error limit exceeded")
+                        }
+
+                        if g.greatest == MIN { //the channel has been closed without sending any tasks
+                                return 0, fmt.Errorf("no tasks has been sent")
+                        } else {
+                                return g.greatest, nil
+                        }
+                } else {
+
+                        wg.Add(1)
+                        go func() {
+
+                                g.Lock()
+
+                                defer g.Unlock()
+
+                                if res, err := curTask.Execute(op); err != nil {
+
+                                        if g.errLimit > 0 {
+
+                                                g.errLimit--
+
+                                        }
+
+                                } else {
+
+                                        if res > g.greatest {
+                                                g.greatest = res
+                                        }
+
+                                }
+                                wg.Done()
+                        }()
+                }
+
+        }
+
+}
