Иван обнови решението на 29.11.2016 14:53 (преди над 1 година)
+// hw3
+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type PipeType struct {
+ tasks []Task
+}
+
+func (m *PipeType) Execute(op int) (int, error) {
+
+ var nextVal int
+ var err error
+
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("no params!")
+ }
+
+ if nextVal, err = m.tasks[0].Execute(op); err != nil {
+
+ return 0, err
+
+ }
+
+ for i := 1; i < len(m.tasks); i++ {
+
+ if nextVal, err = m.tasks[i].Execute(nextVal); err != nil {
+
+ return 0, err
+
+ }
+
+ }
+
+ return nextVal, err
+
+}
+
+func Pipeline(tasks ...Task) Task {
+
+ return &PipeType{tasks}
+
+}
+
+//second part
+type FastType struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+
+ return &FastType{tasks}
+
+}
+
+type Res struct {
+ r int
+ e error
+}
+
+func makeRes(r int, e error) Res {
+ return Res{r, e}
+}
+
+func (f *FastType) Execute(op int) (int, error) {
+
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("no params!")
+ }
+
+ ch := make(chan Res, 1)
+
+ for _, t := range f.tasks {
+ go func() {
+ select {
+ case ch <- makeRes(t.Execute(op)):
+
+ default:
+ }
+ }()
+ }
+
+ var ret Res = <-ch
+
+ return ret.r, ret.e
+
+}
+
+//third part
+
+type TimeType struct {
+ ts Task
+ tm time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+
+ return &TimeType{task, timeout}
+
+}
+
+func (t TimeType) Execute(op int) (int, error) {
+
+ ch := make(chan Res, 1)
+
+ go func() {
+
+ ch <- makeRes(t.ts.Execute(op))
+
+ }()
+
+ select {
+
+ case res := <-ch:
+ return res.r, res.e
+
+ case <-time.After(t.tm):
+ return 0, fmt.Errorf("time out!")
+
+ }
+
+}
+
+//fourth part
+type MapType struct {
+ tasks []Task
+
+ f func([]int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+
+ return &MapType{tasks, reduce}
+
+}
+
+func (m *MapType) Execute(op int) (int, error) {
+
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("no params!")
+ }
+
+ var wg sync.WaitGroup
+
+ lock := make(chan struct{}, 1) //for res slice protection
+ errDetector := make(chan struct{}, 1)
+ alright := make(chan struct{}, 1)
+
+ res := make([]int, 0)
+
+ for _, i := range m.tasks {
+
+ wg.Add(1)
+ go func() {
+
+ lock <- struct{}{}
+
+ if curRes, err := i.Execute(op); err != nil {
+
+ errDetector <- struct{}{}
+
+ } else {
+
+ res = append(res, curRes)
+ <-lock
+ wg.Done()
+ }
+
+ }()
+ }
+
+ go func() {
+
+ wg.Wait()
+ alright <- struct{}{}
+ }()
+
+ select {
+
+ case <-alright:
+ //we are sure that all routines have been passed
+ return m.f(res), nil
+
+ case <-errDetector:
+ return 0, fmt.Errorf("an error has occurred")
+
+ }
+
+}
+
+//fifth part
+
+var MIN int = -100000000
+
+type GreatestType struct {
+ tasks <-chan Task
+ errLimit int
+ greatest int
+ sync.Mutex
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+
+ return &GreatestType{tasks, errorLimit, MIN, sync.Mutex{}}
+
+}
+
+func (g *GreatestType) Execute(op int) (int, error) {
+
+ var wg sync.WaitGroup
+
+ for {
+
+ curTask, ok := <-g.tasks
+
+ if !ok { //the chan is closed
+
+ wg.Wait()
+
+ if g.errLimit < 0 {
+ return 0, fmt.Errorf("error limit exceeded")
+ }
+
+ if g.greatest == MIN { //the channel has been closed without sending any tasks
+ return 0, fmt.Errorf("no tasks has been sent")
+ } else {
+ return g.greatest, nil
+ }
+ } else {
+
+ wg.Add(1)
+ go func() {
+
+ g.Lock()
+
+ defer g.Unlock()
+
+ if res, err := curTask.Execute(op); err != nil {
+
+ if g.errLimit > 0 {
+
+ g.errLimit--
+
+ }
+
+ } else {
+
+ if res > g.greatest {
+ g.greatest = res
+ }
+
+ }
+ wg.Done()
+ }()
+ }
+
+ }
+
+}