Живко обнови решението на 29.11.2016 16:15 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Task1 struct {
+ result int
+ err error
+ ch1, ch2 chan struct{}
+}
+
+func (task1 *Task1) Execute(arg int) (int, error) {
+ task1.result = arg
+ task1.ch1 <- struct{}{}
+ <-task1.ch2
+ return task1.result, task1.err
+}
+
+func Pipeline(tasks ...Task) Task {
+ myTask := Task1{result: 0, err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ go func() {
+ <-myTask.ch1
+ for _, task := range tasks {
+ if myTask.err == nil {
+ myTask.result, myTask.err = task.Execute(myTask.result)
+ } else {
+ break
+ }
+ }
+ if len(tasks) == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ }
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}
+
+func Fastest(tasks ...Task) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ var countMtx sync.Mutex
+ go func() {
+ <-myTask.ch1
+ ch3 := make(chan int, len(tasks)+1)
+ ch4 := make(chan error, len(tasks)+1)
+ for _, task := range tasks {
+ go func(currentTask Task) {
+ curruntResult, currentError := currentTask.Execute(myTask.result)
+ countMtx.Lock()
+ ch3 <- curruntResult
+ ch4 <- currentError
+ countMtx.Unlock()
+ }(task)
+ }
+ if len(tasks) == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ myTask.result = <-ch3
+ myTask.err = <-ch4
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ go func() {
+ <-myTask.ch1
+ ch3 := make(chan struct{})
+ go func() {
+ myTask.result, myTask.err = task.Execute(myTask.result)
+ ch3 <- struct{}{}
+ }()
+ select {
+ case <-ch3:
+ myTask.ch2 <- struct{}{}
+ case <-time.After(timeout):
+ myTask.err = fmt.Errorf("time is over")
+ myTask.ch2 <- struct{}{}
+ }
+ }()
+ return &myTask
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ var countMtx sync.Mutex
+ go func() {
+ <-myTask.ch1
+ ch3 := make(chan int, len(tasks)+1)
+ ch4 := make(chan error, len(tasks)+1)
+ ch5 := make(chan struct{})
+ for _, task := range tasks {
+ go func(currentTask Task) {
+ curruntResult, currentError := currentTask.Execute(myTask.result)
+ if currentError != nil {
+ myTask.err = currentError
+ myTask.ch2 <- struct{}{}
+ }
+ countMtx.Lock()
+ ch3 <- curruntResult
+ ch4 <- currentError
+ countMtx.Unlock()
+ ch5 <- struct{}{}
+ }(task)
+ }
+ for i := 0; i < len(tasks); i++ {
+ <-ch5
+ }
+ if len(tasks) == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ for currentError := range ch4 {
+ if currentError != nil {
+ return
+ }
+ }
+ results := make([]int, 0, len(tasks))
+ for curruntResult := range ch3 {
+ results = append(results, curruntResult)
+ }
+ myTask.result = reduce(results)
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ var countMtx sync.Mutex
+ ch3 := make(chan int)
+ ch4 := make(chan error)
+ count := 0
+ go func() {
+ <-myTask.ch1
+ ch5 := make(chan struct{})
+ for {
+ task, ok := <-tasks
+ if ok == false {
+ break
+ }
+ count = count + 1
+ go func(currentTask Task) {
+ curruntResult, currentError := currentTask.Execute(myTask.result)
+ countMtx.Lock()
+ ch3 <- curruntResult
+ ch4 <- currentError
+ countMtx.Unlock()
+ ch5 <- struct{}{}
+ }(task)
+ }
+ for i := 0; i < count; i++ {
+ <-ch5
+ }
+ close(ch3)
+ close(ch4)
+ }()
+ go func() {
+ max := 0
+ firstResult := true
+ for {
+ curruntResult, ok1 := <-ch3
+ currentError, ok2 := <-ch4
+ if ok1 == false || ok2 == false {
+ break
+ }
+ if currentError != nil {
+ errorLimit = errorLimit - 1
+ } else if firstResult {
+ max = curruntResult
+ firstResult = false
+ } else if max < curruntResult {
+ max = curruntResult
+ }
+ }
+ if errorLimit < 0 {
+ myTask.err = fmt.Errorf("too many errors")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ if count == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ if firstResult {
+ myTask.err = fmt.Errorf("only errors")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ myTask.result = max
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}