Александър обнови решението на 27.11.2016 15:53 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+// Pipeline stuff
+type pipelineTask struct {
+ tasks []Task
+}
+
+func (p pipelineTask) Execute(input int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute in Pipeline Task")
+ }
+
+ var (
+ output int
+ err error
+ )
+ for _, t := range p.tasks {
+ output, err = t.Execute(input)
+ if err != nil {
+ return 0, err
+ }
+ input = output
+ }
+
+ return output, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ var t pipelineTask
+ t.tasks = tasks
+ return t
+}
+
+// Fastest stuff
+type fastestTask struct {
+ tasks []Task
+}
+
+func (f fastestTask) Execute(in int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("No input tasks")
+ }
+
+ doneSync := make(chan struct{}, 1)
+ setOutputSync := make(chan struct{}, 1)
+ var (
+ output int
+ outputError error
+ )
+
+ for _, t := range f.tasks {
+ go func(task Task) {
+ out, err := task.Execute(in)
+ if _, ok := <-setOutputSync; ok {
+ output = out
+ outputError = err
+ doneSync <- struct{}{}
+ }
+ }(t)
+ }
+ setOutputSync <- struct{}{}
+ <-doneSync
+ close(setOutputSync)
+ close(doneSync)
+ return output, outputError
+}
+
+func Fastest(tasks ...Task) Task {
+ var f fastestTask
+ f.tasks = tasks
+ return f
+}
+
+// Timed stuff
+type timedTask struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timedTask) Execute(in int) (int, error) {
+ doneSync := make(chan struct{}, 1)
+ var (
+ output int
+ outputError error
+ )
+ go func() {
+ out, err := t.task.Execute(in)
+ output = out
+ outputError = err
+ doneSync <- struct{}{}
+ }()
+
+ select {
+ case <-doneSync:
+ return output, outputError
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("Timed out waiting to task")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ var t timedTask
+ t.task = task
+ t.timeout = timeout
+ return t
+}
+
+// ConcurrentMapReduce
+type concurrentMapReduceTask struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (c concurrentMapReduceTask) Execute(in int) (int, error) {
+ numTasks := len(c.tasks)
+ if numTasks == 0 {
+ return 0, fmt.Errorf("No input tasks")
+ }
+
+ results := make([]int, 0, numTasks)
+ errorChannel := make(chan error)
+ resultChannel := make(chan int)
+ syncChannel := make(chan struct{})
+ defer close(syncChannel)
+ defer close(errorChannel)
+ defer close(resultChannel)
+
+ for _, t := range c.tasks {
+ go func(task Task) {
+ output, outputError := task.Execute(in)
+ if _, ok := <-syncChannel; ok {
+ if outputError == nil {
+ resultChannel <- output
+ } else {
+ errorChannel <- outputError
+ }
+ }
+ }(t)
+ }
+
+ for i := 0; i < numTasks; i++ {
+ syncChannel <- struct{}{}
+ select {
+ case res := <-resultChannel:
+ results = append(results, res)
+ case err := <-errorChannel:
+ return 0, err
+ }
+ }
+
+ return c.reduce(results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ var c concurrentMapReduceTask
+ c.reduce = reduce
+ c.tasks = tasks
+ return c
+}
+
+// GreatestSearcher
+type greatestSearcherTask struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (g greatestSearcherTask) Execute(in int) (int, error) {
+ errorChannel := make(chan struct{})
+ resultChannel := make(chan int)
+ currentErrors := 0
+ maxNumber := 0
+ var (
+ counter int64
+ shouldQuit bool
+ hadAnyTasks bool
+ )
+
+ for !shouldQuit || counter != 0 {
+ select {
+ case <-errorChannel:
+ currentErrors++
+ counter--
+ case res := <-resultChannel:
+ if res > maxNumber {
+ maxNumber = res
+ }
+ counter--
+ default:
+ if t, ok := <-g.tasks; ok {
+ hadAnyTasks = true
+ counter++
+ go func(task Task) {
+ out, err := task.Execute(in)
+ if err != nil {
+ errorChannel <- struct{}{}
+ } else {
+ resultChannel <- out
+ }
+ }(t)
+ } else {
+ shouldQuit = true
+ }
+ }
+ }
+
+ if !hadAnyTasks {
+ return 0, fmt.Errorf("No input tasks")
+ }
+
+ if currentErrors > g.errorLimit {
+ return 0, fmt.Errorf("Too much errors generated from tasks")
+ }
+
+ return maxNumber, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ var g greatestSearcherTask
+ g.errorLimit = errorLimit
+ g.tasks = tasks
+ return g
+}