Георги обнови решението на 28.11.2016 19:31 (преди над 1 година)
+package main
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type TaskOutput struct {
+ result int
+ err error
+}
+
+// SequentialExecutor
+
+type SequentialExecutor struct {
+ tasks []Task
+}
+
+func NewSequentialExecutor() *SequentialExecutor {
+ return &SequentialExecutor{}
+}
+
+func (s *SequentialExecutor) AppendTask(task Task) {
+ s.tasks = append(s.tasks, task)
+}
+
+func (s *SequentialExecutor) Execute(arg int) (int, error) {
+ if len(s.tasks) == 0 {
+ return -1, errors.New("There aren't any tasks to be executed.")
+ }
+
+ execArg := arg
+ var err error
+
+ for _, task := range s.tasks {
+ execArg, err = task.Execute(execArg)
+
+ if err != nil {
+ return -1, err
+ }
+ }
+
+ return execArg, err
+}
+
+// ConcurrentExecutor
+
+type ConcurrentExecutor struct {
+ tasks []Task
+}
+
+func NewConcurrentExecutor() *ConcurrentExecutor {
+ return &ConcurrentExecutor{}
+}
+
+func (c *ConcurrentExecutor) AppendTask(task Task) {
+ c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
+ if len(c.tasks) == 0 {
+ return -1, errors.New("The aren't any tasks to be executed.")
+ }
+
+ resultChan := make(chan TaskOutput, 1)
+
+ for _, task := range c.tasks {
+ // Run new goroutines only if we still don't have a completed task
+ select {
+ case res := <-resultChan:
+ return res.result, res.err
+ default:
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ resultChan <- TaskOutput{res, err}
+ }(task, arg)
+ }
+ }
+
+ // In case the last task is the fastest one
+ result := <-resultChan
+
+ return result.result, result.err
+}
+
+// TimedExecutor
+
+type TimedExecutor struct {
+ task Task
+ timeout time.Duration
+}
+
+func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
+ return &TimedExecutor{task, timeout}
+}
+
+func (t *TimedExecutor) Execute(arg int) (int, error) {
+ timer := time.Now()
+ resultChan := make(chan TaskOutput, 1)
+
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ resultChan <- TaskOutput{res, err}
+ }(t.task, arg)
+
+ final := <-resultChan
+
+ if t.timeout > time.Since(timer) {
+ return final.result, final.err
+ }
+
+ return -1, errors.New("The task execution time exceeded the provided timeout")
+}
+
+// ConcurrentMapExecutor
+
+type ConcurrentMapExecutor struct {
+ reduce func(result []int) int
+ tasks []Task
+}
+
+func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
+ obj := &ConcurrentMapExecutor{}
+ obj.reduce = reduce
+
+ return obj
+}
+
+func (c *ConcurrentMapExecutor) AppendTask(task Task) {
+ c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
+ if len(c.tasks) == 0 {
+ return -1, errors.New("The aren't any tasks to be executed.")
+ }
+
+ var wg sync.WaitGroup
+ resultChan := make(chan TaskOutput, len(c.tasks))
+ errorChan := make(chan error, 1)
+
+ for _, task := range c.tasks {
+ select {
+ case err := <-errorChan:
+ return -1, err
+ default:
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+
+ if err != nil {
+ errorChan <- err
+ }
+
+ resultChan <- TaskOutput{res, err}
+ wg.Done()
+ }(task, arg)
+ }
+ }
+
+ wg.Wait()
+ close(resultChan)
+
+ results := []int{}
+ for output := range resultChan {
+ results = append(results, output.result)
+ }
+
+ return c.reduce(results), nil
+}
+
+// GreedilyTaskExecutor
+type GreedilyTaskExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
+ return &GreedilyTaskExecutor{errorLimit, tasks}
+}
+
+func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ results := []TaskOutput{}
+
+ for {
+ task, ok := <-g.tasks
+
+ if ok {
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ mtx.Lock()
+ results = append(results, TaskOutput{res, err})
+ mtx.Unlock()
+ wg.Done()
+ }(task, arg)
+ } else {
+ break
+ }
+ }
+
+ wg.Wait()
+
+ if len(results) == 0 {
+ return -1, errors.New("No tasks were executed.")
+ }
+
+ max := math.MinInt32
+ errCount := 0
+ for _, output := range results {
+ if output.result > max {
+ max = output.result
+ }
+ if output.err != nil {
+ errCount++
+ }
+ }
+
+ if errCount > g.errorLimit {
+ return -1, errors.New("Exceeded error limit.")
+ }
+
+ return max, nil
+}
+
+// Functions
+
+func Pipeline(tasks ...Task) Task {
+ se := NewSequentialExecutor()
+
+ for _, task := range tasks {
+ se.AppendTask(task)
+ }
+
+ return se
+}
+
+func Fastest(tasks ...Task) Task {
+ ce := NewConcurrentExecutor()
+
+ for _, task := range tasks {
+ ce.AppendTask(task)
+ }
+
+ return ce
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return NewTimedExecutor(task, timeout)
+}
+
+func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
+ cmr := NewConcurrentMapExecutor(reduce)
+
+ for _, task := range tasks {
+ cmr.AppendTask(task)
+ }
+
+ return cmr
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return NewGreedilyTaskExecutor(errorLimit, tasks)
+}
+
+// TESTING
+
+// Pipeline stuff
+type adder struct {
+ augend int
+}
+
+func (a adder) Execute(addend int) (int, error) {
+ result := a.augend + addend
+ if result > 127 {
+ return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
+ }
+ return result, nil
+}
+
+// Fastest and Timed stuff
+type lazyAdder struct {
+ adder
+ delay time.Duration
+}
+
+func (la lazyAdder) Execute(addend int) (int, error) {
+ time.Sleep(la.delay * time.Millisecond)
+ return la.adder.Execute(addend)
+}
+
+func main() {
+
+}
Какво ще се случи с goroutine-ите които не са най бързи ?
Това не е
for task := range g.tasks {
защото ?Какво очакваш да е result при задача която е върнала грешка?