Александър обнови решението на 27.11.2016 13:09 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelineTask struct {
+ tasks []Task
+}
+
+func (pt *pipelineTask) Execute(passed int) (int, error) {
+ tasks := pt.tasks
+ // check the tasks slice
+ if tasks == nil || len(tasks) == 0 {
+ return 0, errors.New("Received nil or zero arguments for pipeline.")
+ }
+ var err error = nil
+ // for each task pass the answer from the last and check for an error
+ for _, task := range tasks {
+ passed, err = task.Execute(passed)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return passed, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ var pt Task = &pipelineTask{tasks: tasks}
+ return pt
+}
+
+type fastestTask struct {
+ tasks []Task
+}
+
+// create a struct to be used in the channel of results
+type result struct {
+ answer int
+ err error
+}
+
+func (ft *fastestTask) Execute(passed int) (int, error) {
+ tasks := ft.tasks
+ // check the tasks slice
+ if tasks == nil || len(tasks) == 0 {
+ return 0, errors.New("Received nil or zero arguments for fastest task.")
+ }
+
+ // make a channel of results for the tasks
+ resultCh := make(chan result, len(tasks))
+ for _, task := range tasks {
+ // execute each task concurrently and write the result to the channel
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ res := result{answer: answer, err: err}
+ resultCh <- res
+ }(task)
+ }
+
+ // get the first finished result
+ firstRes := <-resultCh
+
+ //spawn a routine to get remaining result and close the channel
+ go func() {
+ for i := 0; i < len(tasks)-1; i++ {
+ <-resultCh
+ }
+ close(resultCh)
+ }()
+
+ return firstRes.answer, firstRes.err
+
+}
+
+func Fastest(tasks ...Task) Task {
+ var ft Task = &fastestTask{tasks: tasks}
+ return ft
+}
+
+type timedTask struct {
+ task Task
+ timeout time.Duration
+}
+
+func (tt *timedTask) Execute(passed int) (int, error) {
+ // make a channel for the task execution
+ resultCh := make(chan result, 1)
+ // execute the task concurrently and pass the result to the channel
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ resultCh <- result{answer: answer, err: err}
+ }(tt.task)
+ // chose the first result
+ select {
+ case res := <-resultCh:
+ // close the channel after use
+ close(resultCh)
+ return res.answer, res.err
+ case <-time.After(tt.timeout):
+ // spwan a routine to read from the channel and close it after that
+ go func() {
+ <-resultCh
+ close(resultCh)
+ }()
+ return 0, errors.New("Task did not finish in required timeframe.")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ var tt Task = &timedTask{task: task, timeout: timeout}
+ return tt
+}
+
+type cmrTask struct {
+ reduce func([]int) int
+ tasks []Task
+}
+
+func (t *cmrTask) Execute(passed int) (int, error) {
+ tasks := t.tasks
+ // check the tasks slice
+ if tasks == nil || len(tasks) == 0 {
+ return 0, errors.New("Received nil or zero arguments for concurrent map reduce.")
+ }
+
+ // channel for the errors
+ errCh := make(chan error, len(tasks))
+ // channel for the results
+ resultCh := make(chan int, len(tasks))
+ // channel for synchronisation
+ syncCh := make(chan struct{}, 1)
+
+ // spawn a routine that executes all tasks concurrently and waits
+ // for them to finish
+ go func(tasks []Task) {
+ var wg sync.WaitGroup
+ for _, task := range tasks {
+ wg.Add(1)
+ // execute each task concurrently
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ // if there was an error add it to the errors channel
+ if err != nil {
+ errCh <- err
+ } else {
+ resultCh <- answer
+ }
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ close(resultCh)
+ // when all tasks are finished add to the synch channel
+ syncCh <- struct{}{}
+ close(syncCh)
+ }(tasks)
+
+ // wait for all tasks to finish or there is an error
+ select {
+ case <-syncCh:
+ results := make([]int, 0)
+ for {
+ res, ok := <-resultCh
+ if !ok {
+ break
+ }
+ results = append(results, res)
+ }
+ return t.reduce(results), nil
+ case err := <-errCh:
+ return 0, err
+ }
+
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ var cmpt Task = &cmrTask{tasks: tasks, reduce: reduce}
+ return cmpt
+}
+
+type gsTask struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (t *gsTask) Execute(passed int) (int, error) {
+ results := make([]int, 0)
+ tasks := t.tasks
+ errorLimit := 0
+ // variables used for synchronisation
+ var mutext sync.Mutex
+ var wg sync.WaitGroup
+
+ // while the channel is open execute each task concurrently
+ for {
+ task, ok := <-tasks
+ if !ok {
+ break
+ }
+
+ wg.Add(1)
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ // depending on the result use the mutex to increment one of the two
+ if err != nil {
+ mutext.Lock()
+ errorLimit++
+ mutext.Unlock()
+ } else {
+ mutext.Lock()
+ results = append(results, answer)
+ mutext.Unlock()
+ }
+ wg.Done()
+ }(task)
+
+ }
+
+ // wait for all tasks to finish
+ wg.Wait()
+
+ if errorLimit > t.errorLimit {
+ return 0, errors.New("Error limit exceeded.")
+ }
+
+ if len(results) == 0 {
+ return 0, errors.New("No results were processed.")
+ }
+
+ biggest := results[0]
+ for i := 1; i < len(results); i++ {
+ if biggest < results[i] {
+ biggest = results[i]
+ }
+ }
+
+ return biggest, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ var gst Task = &gsTask{tasks: tasks, errorLimit: errorLimit}
+ return gst
+}