Стоян обнови решението на 27.11.2016 13:13 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+// type adder struct {
+// augend int
+// }
+//
+// func (a adder) Execute(addend int) (int, error) {
+// result := a.augend + addend
+// if result > 127 {
+// return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
+// }
+// return result, nil
+// }
+
+type Pipe struct {
+ tasks []Task
+}
+
+func (p Pipe) Execute(startValue int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("There are no tasks to be executed")
+ }
+ value := startValue
+ for _, task := range p.tasks {
+ if newValue, err := task.Execute(value); err != nil {
+ return 0, err
+ } else {
+ value = newValue
+ }
+ }
+ return value, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ return Pipe{tasks}
+}
+
+type FastExecutor struct {
+ tasks []Task
+}
+
+func (f FastExecutor) Execute(value int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, errors.New("There are no tasks to be executed")
+ }
+ var res int
+ var err error
+ quit := make(chan Result, 1)
+ defer close(quit)
+ for _, task := range f.tasks {
+ go func(task Task) {
+ res, err = task.Execute(value)
+ quit <- Result{res, err}
Какво ще се случи когато има две задачи и след като приключи първата и Execute-а на FastExecute върне, върне и втората ?
+ }(task)
+ }
+ r := <-quit
+ return r.result, r.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return FastExecutor{tasks}
+}
+
+// type lazyAdder struct {
+// adder
+// delay time.Duration
+// }
+//
+// func (la lazyAdder) Execute(addend int) (int, error) {
+// time.Sleep(la.delay * time.Millisecond)
+// return la.adder.Execute(addend)
+// }
+
+type TimedExecutor struct {
+ task Task
+ timeout time.Duration
+}
+
+type Result struct {
+ result int
+ err error
+}
+
+func (te TimedExecutor) Execute(value int) (int, error) {
+ ch := make(chan Result)
+ go func() {
+ res, err := te.task.Execute(value)
+ ch <- Result{res, err}
Какво ще се случи ако select-а отдолу вече е минал ?
Той нали блокира до получаване на стойност по 1 от двата канала ?
така прави и после какво се случва в сценария който съм представил с реда който коментираме ?
Паника заради писане в затворен канал или горутината зависва :(
точно тук не затваряш канала, но да :D
+ }()
+ select {
+ case s := <-ch:
+ return s.result, s.err
+ case <-time.After(te.timeout):
+ return 0, errors.New("Execution timed out.")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return TimedExecutor{task, timeout}
+}
+
+type ConcurrentMapReduceExecutor struct {
+ reduce func([]int) int
+ tasks []Task
+}
+
+func (ce ConcurrentMapReduceExecutor) Execute(value int) (int, error) {
+ if len(ce.tasks) == 0 {
+ return 0, errors.New("There are no tasks to be executed")
+ }
+ ch := make(chan Result)
+ defer close(ch)
+ for _, task := range ce.tasks {
+ go func(task Task) {
+ res, err := task.Execute(value)
+ ch <- Result{res, err}
+ }(task)
+ }
+ results := make([]int, 0)
+ for {
+ select {
+ case r := <-ch:
+ if r.err != nil {
+ return 0, r.err
+ }
+ results = append(results, r.result)
+ if len(results) == len(ce.tasks) {
+ return ce.reduce(results), nil
+ }
+ }
+ }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return ConcurrentMapReduceExecutor{reduce, tasks}
+}
+
+type GreatestSearcherExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func max(nums []int) int {
+ m := nums[0]
+ for _, num := range nums {
+ if m < num {
+ m = num
+ }
+ }
+ return m
+}
+
+func (ge GreatestSearcherExecutor) Execute(value int) (int, error) {
+ var mutex sync.Mutex
+ results := make([]int, 0)
+ errorsNum := 0
+ var wg sync.WaitGroup
+ for task := range ge.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ res, err := task.Execute(value)
+ mutex.Lock()
+ if err != nil {
+ errorsNum++
+ } else {
+ results = append(results, res)
+ }
+ mutex.Unlock()
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ if len(results) == 0 && errorsNum == 0 {
+ return 0, errors.New("No tasks were given")
+ }
+
+ if errorsNum > ge.errorLimit {
+ return 0, errors.New("Error limit is passed")
+ } else {
+ return max(results), nil
+ }
+ return max(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return GreatestSearcherExecutor{errorLimit, tasks}
+}
Какво ще се случи ако select-а отдолу вече е минал ?
Той нали блокира до получаване на стойност по 1 от двата канала ?
така прави и после какво се случва в сценария който съм представил с реда който коментираме ?
Паника заради писане в затворен канал или горутината зависва :(
точно тук не затваряш канала, но да :D