Никола обнови решението на 29.11.2016 15:31 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type res struct {
+ r int
+ err error
+}
+
+type responseTask struct {
+ exec func(int) (int, error)
+}
+
+func (rt responseTask) Execute(arg int) (int, error) {
+ return rt.exec(arg)
+}
+
+func runner(task Task, arg int, c chan<- res) {
+ r, e := task.Execute(arg)
+ ret := new(res)
+ ret.r = r
+ ret.err = e
+ c <- *ret
+ return
+}
+
+func runWrapper(task Task, arg int, c chan res, selfDestruct <-chan struct{}) {
+ go runner(task, arg, c)
+ select {
+ case result := <-c:
+ c <- result
+ return
+ case <-selfDestruct:
+ return
+ }
+}
+
+func Pipeline(tasks ...Task) Task {
+
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if len(tasks) == 0 {
+ return 0, fmt.Errorf("Zero tasks passed to Pipeline.")
+ }
+
+ execResult, err := tasks[0].Execute(arg)
+ if err != nil {
+ return 0, fmt.Errorf("There was an error %v in the first task.", err)
+ }
+
+ for i, t := range tasks {
+ if i == 0 {
+ continue
+ }
+ execResult, err = t.Execute(execResult)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return execResult, nil
+ }
+
+ return response
+}
+
+func Fastest(tasks ...Task) Task {
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if len(tasks) == 0 {
+ return 0, fmt.Errorf("No tasks!")
+ }
+
+ agg := make(chan res, 1) // "Who is first" issue
+ destroy := make(chan struct{}, len(tasks))
+
+ for _, t := range tasks {
+ go runWrapper(t, arg, agg, destroy)
+ }
+
+ result := <-agg
+ for i := 0; i < len(tasks); i++ {
+ destroy <- struct{}{}
+ }
+ return result.r, result.err
+ }
+
+ return response
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if task == nil {
+ return 0, fmt.Errorf("Nil task given!")
+ }
+
+ c := make(chan res, 1)
+ go runner(task, arg, c)
+ select {
+ case r := <-c:
+ return r.r, r.err
+ case <-time.After(timeout):
+ return 0, fmt.Errorf("Task timed out!")
+ }
+ }
+
+ return response
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if len(tasks) == 0 {
+ return 0, fmt.Errorf("No tasks!")
+ }
+
+ agg := make(chan res, len(tasks))
+ destroy := make(chan struct{}, len(tasks))
+
+ results := make([]int, 0, len(tasks))
+
+ for _, t := range tasks {
+ go runWrapper(t, arg, agg, destroy)
+ }
+
+ for i := 0; i < len(tasks); i++ {
+ msg := <-agg
+ if msg.err != nil {
+ for j := 0; j < len(tasks); j++ {
+ destroy <- struct{}{}
+ }
+ return 0, msg.err
+ }
+ results = append(results, msg.r)
+ }
+ return reduce(results), nil
+ }
+
+ return response
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+ agg := make(chan res)
+ destroy := make(chan struct{})
+ count := 0
+
+ for t := range tasks {
+ go runWrapper(t, arg, agg, destroy)
+ count++
+ }
+
+ results := make([]int, 0, 20)
+ for count > 0 {
+ r := <-agg
+ count--
+ if r.err != nil {
+ errorLimit--
+ if errorLimit == -1 {
+ for count > 0 {
+ destroy <- struct{}{}
+ count--
+ }
+ return 0, fmt.Errorf("Error limit exceeded!")
+ }
+ }
+ results = append(results, r.r)
+ }
+
+ // Get max of results
+ max := results[0]
+ for i := 0; i < len(results); i++ {
+ if results[i] > max {
+ max = results[i]
+ }
+ }
+ return max, nil
+ }
+
+ return response
+}