Константин обнови решението на 27.11.2016 22:54 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type TaskResult struct {
+ result int
+ err error
+}
+
+type asyncTask struct {
+ tasks []Task
+ concurrent bool
+ timeout time.Duration
+ reduceFunc func(results []int) int
+
+ errorLimit int
+ taskChannel <-chan Task
+}
+
+func CreateAsyncTask(tasks []Task, concurrent bool, timeout time.Duration, reduceFunc func(results []int) int, errorLimit int, taskChannel <-chan Task) Task {
+ var newTask = new(asyncTask)
+ newTask.tasks = tasks
+ newTask.concurrent = concurrent
+ newTask.timeout = timeout
+ newTask.reduceFunc = reduceFunc
+ newTask.errorLimit = errorLimit
+ newTask.taskChannel = taskChannel
+ return newTask
+}
Това че всичко е един тип не ми харесва - хаби място когато половината полета не се ползват и прави долния метод ненужно сложен като между временно не печелиш нищо освен че ще трябва да се грижиш долния метод да не стане грешен всеки път когато добавяш нов "тип" задача
+
+func (a asyncTask) Execute(number int) (int, error) {
+ if (a.tasks == nil || len(a.tasks) == 0) && a.taskChannel == nil {
+ return 0, errors.New("No tasks given")
+ }
+
+ var taskResult TaskResult
+
+ if a.taskChannel != nil {
+ taskResult = a.StartTaskExecutorChannel(number)
+ } else if a.concurrent {
+ taskResult = a.ExecuteTasksAsync(number)
+ } else {
+ taskResult = a.ExecuteTasks(number)
+ }
+
+ return taskResult.result, taskResult.err
+}
+
+func (a asyncTask) ExecuteTasks(number int) TaskResult {
+ var currentResult = number
+ var err error
+ var nextResult = 0
+
+ for i := 0; i < len(a.tasks); i++ {
+ // If there is set timeout, then we must use it
+ // Otherwise, wait forever...
+ if a.timeout != 0 {
+ select {
+ case taskResult := <-fetchResult(a.tasks[i], currentResult):
+ nextResult = taskResult.result
+ err = taskResult.err
+ case <-time.After(a.timeout):
+ return TaskResult{0, errors.New("Timed out")}
+ }
+ } else {
+ nextResult, err = a.tasks[i].Execute(currentResult)
+ }
+
+ currentResult = nextResult
+
+ // If error occured, we leave without running the next tasks
+ if err != nil {
+ return TaskResult{currentResult, err}
+ }
+ }
+
+ return TaskResult{currentResult, nil}
+}
+
+func (a asyncTask) ExecuteTasksAsync(number int) TaskResult {
+ var wg sync.WaitGroup
+ var syncMtx sync.Mutex
+ var errorMtx sync.Mutex
+
+ var result = 0
+ var err error
+
+ var results = []int{}
+
+ var finished = false
+
+ for i := 0; i < len(a.tasks); i++ {
+ // Add every task to the wait group, so we can finish only when there are no running tasks
+ wg.Add(1)
+ go func(index int) {
+ currentResult, currentErr := a.tasks[index].Execute(number)
+ if a.reduceFunc != nil {
+ if currentErr != nil {
+ errorMtx.Lock()
+ err = currentErr
+ errorMtx.Unlock()
+ wg.Done()
+ return
+ }
+
+ // Add the result to the current list of results of all tasks
+ syncMtx.Lock()
+ results = append(results, currentResult)
+ if len(results) == len(a.tasks) { // Then it's the last task
+ result = a.reduceFunc(results)
+ err = nil
+ }
+ syncMtx.Unlock()
+
+ } else {
+ syncMtx.Lock()
+ // We only want the result from the fastest task
+ if !finished {
+ result = currentResult
+ err = currentErr
+ finished = true
+ }
+ syncMtx.Unlock()
+
+ }
+
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+ return TaskResult{result, err}
+}
+
+func fetchResult(task Task, parameter int) <-chan TaskResult {
+ ch := make(chan TaskResult)
+
+ go func() {
+ if response, err := task.Execute(parameter); err == nil {
+ ch <- TaskResult{response, err}
+ }
+ }()
+
+ return ch
+}
+
+func (a asyncTask) StartTaskExecutorChannel(number int) TaskResult {
+ var maxTaskResult = 0
+ var errorsOccured = 0
+
+ var wg sync.WaitGroup
+
+ var maxResultMtx sync.Mutex
+ var errorMtx sync.Mutex
+
+ wg.Add(1)
+ go func() {
+ for taskForExecuting := range a.taskChannel {
+ // Add every task to the wait group, so we can finish only when there are no running tasks
+ wg.Add(1)
+ go func(task Task) {
+ currentResult, currentErr := task.Execute(number)
+
+ if currentErr != nil {
+ errorMtx.Lock()
+ errorsOccured++
+ errorMtx.Unlock()
+ }
+
+ maxResultMtx.Lock()
+ if currentErr == nil && currentResult > maxTaskResult {
+ maxTaskResult = currentResult
+ }
+ maxResultMtx.Unlock()
+ wg.Done()
+
+ }(taskForExecuting)
+ }
+ wg.Done()
+ }()
+
+ wg.Wait()
+ if errorsOccured > a.errorLimit {
+ return TaskResult{0, errors.New("Max number of errors occured!")}
+ }
+
+ return TaskResult{maxTaskResult, nil}
+}
+
+func Pipeline(tasks ...Task) Task {
+ newTask := CreateAsyncTask(tasks, false, 0, nil, 0, nil)
+ return newTask
Тук и на всички по долу може директно да правиш return
+}
+
+func Fastest(tasks ...Task) Task {
+ newTask := CreateAsyncTask(tasks, true, 0, nil, 0, nil)
+ return newTask
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ newTask := CreateAsyncTask([]Task{task}, false, timeout, nil, 0, nil)
+ return newTask
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ newTask := CreateAsyncTask(tasks, true, 0, reduce, 0, nil)
+ return newTask
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ newTask := CreateAsyncTask(nil, false, 0, nil, errorLimit, tasks)
+ return newTask
+}
- Fastest имплементацията ти не връща веднага
- Timed leak-ва goroutine-а когато timeout-не
Това че всичко е един тип не ми харесва - хаби място когато половината полета не се ползват и прави долния метод ненужно сложен като между временно не печелиш нищо освен че ще трябва да се грижиш долния метод да не стане грешен всеки път когато добавяш нов "тип" задача