Решение на Concurrent Tasks от Георги Серев

Обратно към всички решения

Към профила на Георги Серев

Резултати

  • 11 точки от тестове
  • 0 бонус точки
  • 11 точки общо
  • 11 успешни тест(а)
  • 2 неуспешни тест(а)

Код

package main
import (
"errors"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type TaskOutput struct {
result int
err error
}
// SequentialExecutor
type SequentialExecutor struct {
tasks []Task
}
func NewSequentialExecutor() *SequentialExecutor {
return &SequentialExecutor{}
}
func (s *SequentialExecutor) AppendTask(task Task) {
s.tasks = append(s.tasks, task)
}
func (s *SequentialExecutor) Execute(arg int) (int, error) {
if len(s.tasks) == 0 {
return -1, errors.New("There aren't any tasks to be executed.")
}
execArg := arg
var err error
for _, task := range s.tasks {
execArg, err = task.Execute(execArg)
if err != nil {
return -1, err
}
}
return execArg, err
}
// ConcurrentExecutor
type ConcurrentExecutor struct {
tasks []Task
}
func NewConcurrentExecutor() *ConcurrentExecutor {
return &ConcurrentExecutor{}
}
func (c *ConcurrentExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
resultChan := make(chan TaskOutput, 1)
for _, task := range c.tasks {
// Run new goroutines only if we still don't have a completed task
select {
case res := <-resultChan:
return res.result, res.err
default:
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(task, arg)
}
}
// In case the last task is the fastest one
result := <-resultChan
return result.result, result.err
}
// TimedExecutor
type TimedExecutor struct {
task Task
timeout time.Duration
}
func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
return &TimedExecutor{task, timeout}
}
func (t *TimedExecutor) Execute(arg int) (int, error) {
timer := time.Now()
resultChan := make(chan TaskOutput, 1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(t.task, arg)
final := <-resultChan
if t.timeout > time.Since(timer) {
return final.result, final.err
}
return -1, errors.New("The task execution time exceeded the provided timeout")
}
// ConcurrentMapExecutor
type ConcurrentMapExecutor struct {
reduce func(result []int) int
tasks []Task
}
func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
obj := &ConcurrentMapExecutor{}
obj.reduce = reduce
return obj
}
func (c *ConcurrentMapExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
var wg sync.WaitGroup
resultChan := make(chan TaskOutput, len(c.tasks))
errorChan := make(chan error, 1)
for _, task := range c.tasks {
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
// Not a proper way to do it
if err != nil && len(errorChan) == 0 {
errorChan <- err
}
resultChan <- TaskOutput{res, err}
wg.Done()
}(task, arg)
}
wg.Wait()
close(resultChan)
if len(errorChan) > 0 {
return -1, <-errorChan
}
results := []int{}
for output := range resultChan {
results = append(results, output.result)
}
return c.reduce(results), nil
}
// GreedilyTaskExecutor
type GreedilyTaskExecutor struct {
errorLimit int
tasks <-chan Task
}
func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
return &GreedilyTaskExecutor{errorLimit, tasks}
}
func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
var wg sync.WaitGroup
var mtx sync.Mutex
results := []TaskOutput{}
for {
task, ok := <-g.tasks
if ok {
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
mtx.Lock()
results = append(results, TaskOutput{res, err})
mtx.Unlock()
wg.Done()
}(task, arg)
} else {
break
}
}
wg.Wait()
if len(results) == 0 {
return -1, errors.New("No tasks were executed.")
}
max := math.MinInt32
errCount := 0
for _, output := range results {
if output.result > max {
max = output.result
}
if output.err != nil {
if errCount++; errCount > g.errorLimit {
return -1, errors.New("Exceeded error limit.")
}
}
}
return max, nil
}
// Functions
func Pipeline(tasks ...Task) Task {
se := NewSequentialExecutor()
for _, task := range tasks {
se.AppendTask(task)
}
return se
}
func Fastest(tasks ...Task) Task {
ce := NewConcurrentExecutor()
for _, task := range tasks {
ce.AppendTask(task)
}
return ce
}
func Timed(task Task, timeout time.Duration) Task {
return NewTimedExecutor(task, timeout)
}
func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
cmr := NewConcurrentMapExecutor(reduce)
for _, task := range tasks {
cmr.AppendTask(task)
}
return cmr
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return NewGreedilyTaskExecutor(errorLimit, tasks)
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.103s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.203s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.183s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4ef4e0, 0xc4200c2010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200720c0, 0x51e907, 0x24, 0x528438, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200720c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200720c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x528478, 0x5a7980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-1vtqx54/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-1vtqx54.(*TimedExecutor).Execute(0xc42000e700, 0x1, 0xc420010558, 0xc420054360, 0xc420072180)
	/tmp/d20161129-30451-1vtqx54/solution.go:112 +0xf7
_/tmp/d20161129-30451-1vtqx54.TestTimedDoesntLeaveGoroutineHanging(0xc420072180)
	/tmp/d20161129-30451-1vtqx54/solution_test.go:221 +0x295
testing.tRunner(0xc420072180, 0x528438)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan receive]:
_/tmp/d20161129-30451-1vtqx54.TestTimedDoesntLeaveGoroutineHanging.func2(0xc420010558, 0xc420054360, 0xc420072180, 0xc420054420, 0xc4200543c0, 0xc420012310, 0xc420054480)
	/tmp/d20161129-30451-1vtqx54/solution_test.go:206 +0xc8
created by _/tmp/d20161129-30451-1vtqx54.TestTimedDoesntLeaveGoroutineHanging
	/tmp/d20161129-30451-1vtqx54/solution_test.go:220 +0x27a

goroutine 8 [chan receive]:
_/tmp/d20161129-30451-1vtqx54.TestTimedDoesntLeaveGoroutineHanging.func1(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1vtqx54/solution_test.go:196 +0x56
_/tmp/d20161129-30451-1vtqx54.fTask.Execute(0xc42000e6e0, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1vtqx54/solution_test.go:40 +0x30
_/tmp/d20161129-30451-1vtqx54.(*TimedExecutor).Execute.func1(0xc4200549c0, 0x594700, 0xc42000e6e0, 0x1)
	/tmp/d20161129-30451-1vtqx54/solution.go:108 +0x3f
created by _/tmp/d20161129-30451-1vtqx54.(*TimedExecutor).Execute
	/tmp/d20161129-30451-1vtqx54/solution.go:110 +0xb5
exit status 2
FAIL	_/tmp/d20161129-30451-1vtqx54	1.006s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1vtqx54	0.048s
PASS
ok  	_/tmp/d20161129-30451-1vtqx54	0.123s

История (6 версии и 3 коментара)

Георги обнови решението на 28.11.2016 19:31 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type TaskOutput struct {
+ result int
+ err error
+}
+
+// SequentialExecutor
+
+type SequentialExecutor struct {
+ tasks []Task
+}
+
+func NewSequentialExecutor() *SequentialExecutor {
+ return &SequentialExecutor{}
+}
+
+func (s *SequentialExecutor) AppendTask(task Task) {
+ s.tasks = append(s.tasks, task)
+}
+
+func (s *SequentialExecutor) Execute(arg int) (int, error) {
+ if len(s.tasks) == 0 {
+ return -1, errors.New("There aren't any tasks to be executed.")
+ }
+
+ execArg := arg
+ var err error
+
+ for _, task := range s.tasks {
+ execArg, err = task.Execute(execArg)
+
+ if err != nil {
+ return -1, err
+ }
+ }
+
+ return execArg, err
+}
+
+// ConcurrentExecutor
+
+type ConcurrentExecutor struct {
+ tasks []Task
+}
+
+func NewConcurrentExecutor() *ConcurrentExecutor {
+ return &ConcurrentExecutor{}
+}
+
+func (c *ConcurrentExecutor) AppendTask(task Task) {
+ c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
+ if len(c.tasks) == 0 {
+ return -1, errors.New("The aren't any tasks to be executed.")
+ }
+
+ resultChan := make(chan TaskOutput, 1)
+
+ for _, task := range c.tasks {
+ // Run new goroutines only if we still don't have a completed task
+ select {
+ case res := <-resultChan:
+ return res.result, res.err
+ default:
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ resultChan <- TaskOutput{res, err}
+ }(task, arg)
+ }
+ }
+
+ // In case the last task is the fastest one
+ result := <-resultChan
+
+ return result.result, result.err
+}
+
+// TimedExecutor
+
+type TimedExecutor struct {
+ task Task
+ timeout time.Duration
+}
+
+func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
+ return &TimedExecutor{task, timeout}
+}
+
+func (t *TimedExecutor) Execute(arg int) (int, error) {
+ timer := time.Now()
+ resultChan := make(chan TaskOutput, 1)
+
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ resultChan <- TaskOutput{res, err}
+ }(t.task, arg)
+
+ final := <-resultChan
+
+ if t.timeout > time.Since(timer) {
+ return final.result, final.err
+ }
+
+ return -1, errors.New("The task execution time exceeded the provided timeout")
+}
+
+// ConcurrentMapExecutor
+
+type ConcurrentMapExecutor struct {
+ reduce func(result []int) int
+ tasks []Task
+}
+
+func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
+ obj := &ConcurrentMapExecutor{}
+ obj.reduce = reduce
+
+ return obj
+}
+
+func (c *ConcurrentMapExecutor) AppendTask(task Task) {
+ c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
+ if len(c.tasks) == 0 {
+ return -1, errors.New("The aren't any tasks to be executed.")
+ }
+
+ var wg sync.WaitGroup
+ resultChan := make(chan TaskOutput, len(c.tasks))
+ errorChan := make(chan error, 1)
+
+ for _, task := range c.tasks {
+ select {
+ case err := <-errorChan:
+ return -1, err
+ default:
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+
+ if err != nil {
+ errorChan <- err
+ }
+
+ resultChan <- TaskOutput{res, err}
+ wg.Done()
+ }(task, arg)
+ }
+ }
+
+ wg.Wait()
+ close(resultChan)
+
+ results := []int{}
+ for output := range resultChan {
+ results = append(results, output.result)
+ }
+
+ return c.reduce(results), nil
+}
+
+// GreedilyTaskExecutor
+type GreedilyTaskExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
+ return &GreedilyTaskExecutor{errorLimit, tasks}
+}
+
+func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ results := []TaskOutput{}
+
+ for {
+ task, ok := <-g.tasks
+
+ if ok {
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ mtx.Lock()
+ results = append(results, TaskOutput{res, err})
+ mtx.Unlock()
+ wg.Done()
+ }(task, arg)
+ } else {
+ break
+ }
+ }
+
+ wg.Wait()
+
+ if len(results) == 0 {
+ return -1, errors.New("No tasks were executed.")
+ }
+
+ max := math.MinInt32
+ errCount := 0
+ for _, output := range results {
+ if output.result > max {
+ max = output.result
+ }
+ if output.err != nil {
+ errCount++
+ }
+ }
+
+ if errCount > g.errorLimit {
+ return -1, errors.New("Exceeded error limit.")
+ }
+
+ return max, nil
+}
+
+// Functions
+
+func Pipeline(tasks ...Task) Task {
+ se := NewSequentialExecutor()
+
+ for _, task := range tasks {
+ se.AppendTask(task)
+ }
+
+ return se
+}
+
+func Fastest(tasks ...Task) Task {
+ ce := NewConcurrentExecutor()
+
+ for _, task := range tasks {
+ ce.AppendTask(task)
+ }
+
+ return ce
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return NewTimedExecutor(task, timeout)
+}
+
+func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
+ cmr := NewConcurrentMapExecutor(reduce)
+
+ for _, task := range tasks {
+ cmr.AppendTask(task)
+ }
+
+ return cmr
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return NewGreedilyTaskExecutor(errorLimit, tasks)
+}
+
+// TESTING
+
+// Pipeline stuff
+type adder struct {
+ augend int
+}
+
+func (a adder) Execute(addend int) (int, error) {
+ result := a.augend + addend
+ if result > 127 {
+ return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
+ }
+ return result, nil
+}
+
+// Fastest and Timed stuff
+type lazyAdder struct {
+ adder
+ delay time.Duration
+}
+
+func (la lazyAdder) Execute(addend int) (int, error) {
+ time.Sleep(la.delay * time.Millisecond)
+ return la.adder.Execute(addend)
+}
+
+func main() {
+
+}

Георги обнови решението на 28.11.2016 19:33 (преди над 1 година)

package main
import (
"errors"
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type TaskOutput struct {
result int
err error
}
// SequentialExecutor
type SequentialExecutor struct {
tasks []Task
}
func NewSequentialExecutor() *SequentialExecutor {
return &SequentialExecutor{}
}
func (s *SequentialExecutor) AppendTask(task Task) {
s.tasks = append(s.tasks, task)
}
func (s *SequentialExecutor) Execute(arg int) (int, error) {
if len(s.tasks) == 0 {
return -1, errors.New("There aren't any tasks to be executed.")
}
execArg := arg
var err error
for _, task := range s.tasks {
execArg, err = task.Execute(execArg)
if err != nil {
return -1, err
}
}
return execArg, err
}
// ConcurrentExecutor
type ConcurrentExecutor struct {
tasks []Task
}
func NewConcurrentExecutor() *ConcurrentExecutor {
return &ConcurrentExecutor{}
}
func (c *ConcurrentExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
resultChan := make(chan TaskOutput, 1)
for _, task := range c.tasks {
// Run new goroutines only if we still don't have a completed task
select {
case res := <-resultChan:
return res.result, res.err
default:
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(task, arg)
}
}
// In case the last task is the fastest one
result := <-resultChan
return result.result, result.err
}
// TimedExecutor
type TimedExecutor struct {
task Task
timeout time.Duration
}
func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
return &TimedExecutor{task, timeout}
}
func (t *TimedExecutor) Execute(arg int) (int, error) {
timer := time.Now()
resultChan := make(chan TaskOutput, 1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(t.task, arg)
final := <-resultChan
if t.timeout > time.Since(timer) {
return final.result, final.err
}
return -1, errors.New("The task execution time exceeded the provided timeout")
}
// ConcurrentMapExecutor
type ConcurrentMapExecutor struct {
reduce func(result []int) int
tasks []Task
}
func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
obj := &ConcurrentMapExecutor{}
obj.reduce = reduce
return obj
}
func (c *ConcurrentMapExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
var wg sync.WaitGroup
resultChan := make(chan TaskOutput, len(c.tasks))
errorChan := make(chan error, 1)
for _, task := range c.tasks {
select {
case err := <-errorChan:
return -1, err
default:
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
if err != nil {
errorChan <- err
}
resultChan <- TaskOutput{res, err}
wg.Done()
}(task, arg)
}
}
wg.Wait()
close(resultChan)
results := []int{}
for output := range resultChan {
results = append(results, output.result)
}
return c.reduce(results), nil
}
// GreedilyTaskExecutor
type GreedilyTaskExecutor struct {
errorLimit int
tasks <-chan Task
}
func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
return &GreedilyTaskExecutor{errorLimit, tasks}
}
func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
var wg sync.WaitGroup
var mtx sync.Mutex
results := []TaskOutput{}
for {
task, ok := <-g.tasks
if ok {
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
mtx.Lock()
results = append(results, TaskOutput{res, err})
mtx.Unlock()
wg.Done()
}(task, arg)
} else {
break
}
}
wg.Wait()
if len(results) == 0 {
return -1, errors.New("No tasks were executed.")
}
max := math.MinInt32
errCount := 0
for _, output := range results {
if output.result > max {
max = output.result
}
if output.err != nil {
errCount++
}
}
if errCount > g.errorLimit {
return -1, errors.New("Exceeded error limit.")
}
return max, nil
}
// Functions
func Pipeline(tasks ...Task) Task {
se := NewSequentialExecutor()
for _, task := range tasks {
se.AppendTask(task)
}
return se
}
func Fastest(tasks ...Task) Task {
ce := NewConcurrentExecutor()
for _, task := range tasks {
ce.AppendTask(task)
}
return ce
}
func Timed(task Task, timeout time.Duration) Task {
return NewTimedExecutor(task, timeout)
}
func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
cmr := NewConcurrentMapExecutor(reduce)
for _, task := range tasks {
cmr.AppendTask(task)
}
return cmr
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return NewGreedilyTaskExecutor(errorLimit, tasks)
}
-// TESTING
-
// Pipeline stuff
type adder struct {
augend int
}
func (a adder) Execute(addend int) (int, error) {
result := a.augend + addend
if result > 127 {
return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
}
return result, nil
}
// Fastest and Timed stuff
type lazyAdder struct {
adder
delay time.Duration
}
func (la lazyAdder) Execute(addend int) (int, error) {
time.Sleep(la.delay * time.Millisecond)
return la.adder.Execute(addend)
}
func main() {
}

Георги обнови решението на 28.11.2016 19:33 (преди над 1 година)

package main
import (
"errors"
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type TaskOutput struct {
result int
err error
}
// SequentialExecutor
type SequentialExecutor struct {
tasks []Task
}
func NewSequentialExecutor() *SequentialExecutor {
return &SequentialExecutor{}
}
func (s *SequentialExecutor) AppendTask(task Task) {
s.tasks = append(s.tasks, task)
}
func (s *SequentialExecutor) Execute(arg int) (int, error) {
if len(s.tasks) == 0 {
return -1, errors.New("There aren't any tasks to be executed.")
}
execArg := arg
var err error
for _, task := range s.tasks {
execArg, err = task.Execute(execArg)
if err != nil {
return -1, err
}
}
return execArg, err
}
// ConcurrentExecutor
type ConcurrentExecutor struct {
tasks []Task
}
func NewConcurrentExecutor() *ConcurrentExecutor {
return &ConcurrentExecutor{}
}
func (c *ConcurrentExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
resultChan := make(chan TaskOutput, 1)
for _, task := range c.tasks {
// Run new goroutines only if we still don't have a completed task
select {
case res := <-resultChan:
return res.result, res.err
default:
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(task, arg)
}
}
// In case the last task is the fastest one
result := <-resultChan
return result.result, result.err
}
// TimedExecutor
type TimedExecutor struct {
task Task
timeout time.Duration
}
func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
return &TimedExecutor{task, timeout}
}
func (t *TimedExecutor) Execute(arg int) (int, error) {
timer := time.Now()
resultChan := make(chan TaskOutput, 1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(t.task, arg)
final := <-resultChan
if t.timeout > time.Since(timer) {
return final.result, final.err
}
return -1, errors.New("The task execution time exceeded the provided timeout")
}
// ConcurrentMapExecutor
type ConcurrentMapExecutor struct {
reduce func(result []int) int
tasks []Task
}
func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
obj := &ConcurrentMapExecutor{}
obj.reduce = reduce
return obj
}
func (c *ConcurrentMapExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
var wg sync.WaitGroup
resultChan := make(chan TaskOutput, len(c.tasks))
errorChan := make(chan error, 1)
for _, task := range c.tasks {
select {
case err := <-errorChan:
return -1, err
default:
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
if err != nil {
errorChan <- err
}
resultChan <- TaskOutput{res, err}
wg.Done()
}(task, arg)
}
}
wg.Wait()
close(resultChan)
results := []int{}
for output := range resultChan {
results = append(results, output.result)
}
return c.reduce(results), nil
}
// GreedilyTaskExecutor
type GreedilyTaskExecutor struct {
errorLimit int
tasks <-chan Task
}
func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
return &GreedilyTaskExecutor{errorLimit, tasks}
}
func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
var wg sync.WaitGroup
var mtx sync.Mutex
results := []TaskOutput{}
for {
task, ok := <-g.tasks
if ok {
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
mtx.Lock()
results = append(results, TaskOutput{res, err})
mtx.Unlock()
wg.Done()
}(task, arg)
} else {
break
}
}
wg.Wait()
if len(results) == 0 {
return -1, errors.New("No tasks were executed.")
}
max := math.MinInt32
errCount := 0
for _, output := range results {
if output.result > max {
max = output.result
}
if output.err != nil {
errCount++
}
}
if errCount > g.errorLimit {
return -1, errors.New("Exceeded error limit.")
}
return max, nil
}
// Functions
func Pipeline(tasks ...Task) Task {
se := NewSequentialExecutor()
for _, task := range tasks {
se.AppendTask(task)
}
return se
}
func Fastest(tasks ...Task) Task {
ce := NewConcurrentExecutor()
for _, task := range tasks {
ce.AppendTask(task)
}
return ce
}
func Timed(task Task, timeout time.Duration) Task {
return NewTimedExecutor(task, timeout)
}
func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
cmr := NewConcurrentMapExecutor(reduce)
for _, task := range tasks {
cmr.AppendTask(task)
}
return cmr
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return NewGreedilyTaskExecutor(errorLimit, tasks)
}
-// Pipeline stuff
-type adder struct {
- augend int
-}
-func (a adder) Execute(addend int) (int, error) {
- result := a.augend + addend
- if result > 127 {
- return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
- }
- return result, nil
-}
-
-// Fastest and Timed stuff
-type lazyAdder struct {
- adder
- delay time.Duration
-}
-
-func (la lazyAdder) Execute(addend int) (int, error) {
- time.Sleep(la.delay * time.Millisecond)
- return la.adder.Execute(addend)
-}
-
func main() {
-
+fmt.Println("")
}

Георги обнови решението на 28.11.2016 20:46 (преди над 1 година)

package main
-import (
- "errors"
- "fmt"
- "math"
- "sync"
- "time"
-)
+import "fmt"
-type Task interface {
- Execute(int) (int, error)
-}
-
-type TaskOutput struct {
- result int
- err error
-}
-
-// SequentialExecutor
-
-type SequentialExecutor struct {
- tasks []Task
-}
-
-func NewSequentialExecutor() *SequentialExecutor {
- return &SequentialExecutor{}
-}
-
-func (s *SequentialExecutor) AppendTask(task Task) {
- s.tasks = append(s.tasks, task)
-}
-
-func (s *SequentialExecutor) Execute(arg int) (int, error) {
- if len(s.tasks) == 0 {
- return -1, errors.New("There aren't any tasks to be executed.")
- }
-
- execArg := arg
- var err error
-
- for _, task := range s.tasks {
- execArg, err = task.Execute(execArg)
-
- if err != nil {
- return -1, err
- }
- }
-
- return execArg, err
-}
-
-// ConcurrentExecutor
-
-type ConcurrentExecutor struct {
- tasks []Task
-}
-
-func NewConcurrentExecutor() *ConcurrentExecutor {
- return &ConcurrentExecutor{}
-}
-
-func (c *ConcurrentExecutor) AppendTask(task Task) {
- c.tasks = append(c.tasks, task)
-}
-
-func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
- if len(c.tasks) == 0 {
- return -1, errors.New("The aren't any tasks to be executed.")
- }
-
- resultChan := make(chan TaskOutput, 1)
-
- for _, task := range c.tasks {
- // Run new goroutines only if we still don't have a completed task
- select {
- case res := <-resultChan:
- return res.result, res.err
- default:
- go func(t Task, arg int) {
- res, err := t.Execute(arg)
- resultChan <- TaskOutput{res, err}
- }(task, arg)
- }
- }
-
- // In case the last task is the fastest one
- result := <-resultChan
-
- return result.result, result.err
-}
-
-// TimedExecutor
-
-type TimedExecutor struct {
- task Task
- timeout time.Duration
-}
-
-func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
- return &TimedExecutor{task, timeout}
-}
-
-func (t *TimedExecutor) Execute(arg int) (int, error) {
- timer := time.Now()
- resultChan := make(chan TaskOutput, 1)
-
- go func(t Task, arg int) {
- res, err := t.Execute(arg)
- resultChan <- TaskOutput{res, err}
- }(t.task, arg)
-
- final := <-resultChan
-
- if t.timeout > time.Since(timer) {
- return final.result, final.err
- }
-
- return -1, errors.New("The task execution time exceeded the provided timeout")
-}
-
-// ConcurrentMapExecutor
-
-type ConcurrentMapExecutor struct {
- reduce func(result []int) int
- tasks []Task
-}
-
-func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
- obj := &ConcurrentMapExecutor{}
- obj.reduce = reduce
-
- return obj
-}
-
-func (c *ConcurrentMapExecutor) AppendTask(task Task) {
- c.tasks = append(c.tasks, task)
-}
-
-func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
- if len(c.tasks) == 0 {
- return -1, errors.New("The aren't any tasks to be executed.")
- }
-
- var wg sync.WaitGroup
- resultChan := make(chan TaskOutput, len(c.tasks))
- errorChan := make(chan error, 1)
-
- for _, task := range c.tasks {
- select {
- case err := <-errorChan:
- return -1, err
- default:
- wg.Add(1)
- go func(t Task, arg int) {
- res, err := t.Execute(arg)
-
- if err != nil {
- errorChan <- err
- }
-
- resultChan <- TaskOutput{res, err}
- wg.Done()
- }(task, arg)
- }
- }
-
- wg.Wait()
- close(resultChan)
-
- results := []int{}
- for output := range resultChan {
- results = append(results, output.result)
- }
-
- return c.reduce(results), nil
-}
-
-// GreedilyTaskExecutor
-type GreedilyTaskExecutor struct {
- errorLimit int
- tasks <-chan Task
-}
-
-func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
- return &GreedilyTaskExecutor{errorLimit, tasks}
-}
-
-func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
- var wg sync.WaitGroup
- var mtx sync.Mutex
- results := []TaskOutput{}
-
- for {
- task, ok := <-g.tasks
-
- if ok {
- wg.Add(1)
- go func(t Task, arg int) {
- res, err := t.Execute(arg)
- mtx.Lock()
- results = append(results, TaskOutput{res, err})
- mtx.Unlock()
- wg.Done()
- }(task, arg)
- } else {
- break
- }
- }
-
- wg.Wait()
-
- if len(results) == 0 {
- return -1, errors.New("No tasks were executed.")
- }
-
- max := math.MinInt32
- errCount := 0
- for _, output := range results {
- if output.result > max {
- max = output.result
- }
- if output.err != nil {
- errCount++
- }
- }
-
- if errCount > g.errorLimit {
- return -1, errors.New("Exceeded error limit.")
- }
-
- return max, nil
-}
-
-// Functions
-
-func Pipeline(tasks ...Task) Task {
- se := NewSequentialExecutor()
-
- for _, task := range tasks {
- se.AppendTask(task)
- }
-
- return se
-}
-
-func Fastest(tasks ...Task) Task {
- ce := NewConcurrentExecutor()
-
- for _, task := range tasks {
- ce.AppendTask(task)
- }
-
- return ce
-}
-
-func Timed(task Task, timeout time.Duration) Task {
- return NewTimedExecutor(task, timeout)
-}
-
-func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
- cmr := NewConcurrentMapExecutor(reduce)
-
- for _, task := range tasks {
- cmr.AppendTask(task)
- }
-
- return cmr
-}
-
-func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
- return NewGreedilyTaskExecutor(errorLimit, tasks)
-}
-
-
func main() {
-fmt.Println("")
+ fmt.Println("asdsad")
}

Георги обнови решението на 28.11.2016 20:48 (преди над 1 година)

package main
-import "fmt"
+import (
+ "errors"
+ "math"
+ "sync"
+ "time"
+)
-func main() {
- fmt.Println("asdsad")
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type TaskOutput struct {
+ result int
+ err error
+}
+
+// SequentialExecutor
+
+type SequentialExecutor struct {
+ tasks []Task
+}
+
+func NewSequentialExecutor() *SequentialExecutor {
+ return &SequentialExecutor{}
+}
+
+func (s *SequentialExecutor) AppendTask(task Task) {
+ s.tasks = append(s.tasks, task)
+}
+
+func (s *SequentialExecutor) Execute(arg int) (int, error) {
+ if len(s.tasks) == 0 {
+ return -1, errors.New("There aren't any tasks to be executed.")
+ }
+
+ execArg := arg
+ var err error
+
+ for _, task := range s.tasks {
+ execArg, err = task.Execute(execArg)
+
+ if err != nil {
+ return -1, err
+ }
+ }
+
+ return execArg, err
+}
+
+// ConcurrentExecutor
+
+type ConcurrentExecutor struct {
+ tasks []Task
+}
+
+func NewConcurrentExecutor() *ConcurrentExecutor {
+ return &ConcurrentExecutor{}
+}
+
+func (c *ConcurrentExecutor) AppendTask(task Task) {
+ c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
+ if len(c.tasks) == 0 {
+ return -1, errors.New("The aren't any tasks to be executed.")
+ }
+
+ resultChan := make(chan TaskOutput, 1)
+
+ for _, task := range c.tasks {
+ // Run new goroutines only if we still don't have a completed task
+ select {
+ case res := <-resultChan:
+ return res.result, res.err
+ default:
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ resultChan <- TaskOutput{res, err}
+ }(task, arg)
+ }
+ }
+
+ // In case the last task is the fastest one
+ result := <-resultChan
+
+ return result.result, result.err
+}
+
+// TimedExecutor
+
+type TimedExecutor struct {
+ task Task
+ timeout time.Duration
+}
+
+func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
+ return &TimedExecutor{task, timeout}
+}
+
+func (t *TimedExecutor) Execute(arg int) (int, error) {
+ timer := time.Now()
+ resultChan := make(chan TaskOutput, 1)
+
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ resultChan <- TaskOutput{res, err}
+ }(t.task, arg)
+
+ final := <-resultChan
+
+ if t.timeout > time.Since(timer) {
+ return final.result, final.err
+ }
+
+ return -1, errors.New("The task execution time exceeded the provided timeout")
+}
+
+// ConcurrentMapExecutor
+
+type ConcurrentMapExecutor struct {
+ reduce func(result []int) int
+ tasks []Task
+}
+
+func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
+ obj := &ConcurrentMapExecutor{}
+ obj.reduce = reduce
+
+ return obj
+}
+
+func (c *ConcurrentMapExecutor) AppendTask(task Task) {
+ c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
+ if len(c.tasks) == 0 {
+ return -1, errors.New("The aren't any tasks to be executed.")
+ }
+
+ var wg sync.WaitGroup
+ resultChan := make(chan TaskOutput, len(c.tasks))
+ errorChan := make(chan error, 1)
+
+ for _, task := range c.tasks {
+ select {
+ case err := <-errorChan:
+ return -1, err
+ default:
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+
+ if err != nil {
+ errorChan <- err
+ }
+
+ resultChan <- TaskOutput{res, err}
+ wg.Done()
+ }(task, arg)
+ }
+ }
+
+ wg.Wait()
+ close(resultChan)
+
+ results := []int{}
+ for output := range resultChan {
+ results = append(results, output.result)
+ }
+
+ return c.reduce(results), nil
+}
+
+// GreedilyTaskExecutor
+type GreedilyTaskExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
+ return &GreedilyTaskExecutor{errorLimit, tasks}
+}
+
+func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ results := []TaskOutput{}
+
+ for {
+ task, ok := <-g.tasks
+
+ if ok {
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
+ mtx.Lock()
+ results = append(results, TaskOutput{res, err})
+ mtx.Unlock()
+ wg.Done()
+ }(task, arg)
+ } else {
+ break
+ }
+ }
+
+ wg.Wait()
+
+ if len(results) == 0 {
+ return -1, errors.New("No tasks were executed.")
+ }
+
+ max := math.MinInt32
+ errCount := 0
+ for _, output := range results {
+ if output.result > max {
+ max = output.result
+ }
+ if output.err != nil {
+ errCount++
+ }
+ }
+
+ if errCount > g.errorLimit {
+ return -1, errors.New("Exceeded error limit.")
+ }
+
+ return max, nil
+}
+
+// Functions
+
+func Pipeline(tasks ...Task) Task {
+ se := NewSequentialExecutor()
+
+ for _, task := range tasks {
+ se.AppendTask(task)
+ }
+
+ return se
+}
+
+func Fastest(tasks ...Task) Task {
+ ce := NewConcurrentExecutor()
+
+ for _, task := range tasks {
+ ce.AppendTask(task)
+ }
+
+ return ce
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return NewTimedExecutor(task, timeout)
+}
+
+func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
+ cmr := NewConcurrentMapExecutor(reduce)
+
+ for _, task := range tasks {
+ cmr.AppendTask(task)
+ }
+
+ return cmr
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return NewGreedilyTaskExecutor(errorLimit, tasks)
}

Георги обнови решението на 28.11.2016 22:22 (преди над 1 година)

package main
import (
"errors"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type TaskOutput struct {
result int
err error
}
// SequentialExecutor
type SequentialExecutor struct {
tasks []Task
}
func NewSequentialExecutor() *SequentialExecutor {
return &SequentialExecutor{}
}
func (s *SequentialExecutor) AppendTask(task Task) {
s.tasks = append(s.tasks, task)
}
func (s *SequentialExecutor) Execute(arg int) (int, error) {
if len(s.tasks) == 0 {
return -1, errors.New("There aren't any tasks to be executed.")
}
execArg := arg
var err error
for _, task := range s.tasks {
execArg, err = task.Execute(execArg)
if err != nil {
return -1, err
}
}
return execArg, err
}
// ConcurrentExecutor
type ConcurrentExecutor struct {
tasks []Task
}
func NewConcurrentExecutor() *ConcurrentExecutor {
return &ConcurrentExecutor{}
}
func (c *ConcurrentExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
resultChan := make(chan TaskOutput, 1)
for _, task := range c.tasks {
// Run new goroutines only if we still don't have a completed task
select {
case res := <-resultChan:
return res.result, res.err
default:
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(task, arg)
}
}
// In case the last task is the fastest one
result := <-resultChan
return result.result, result.err
}
// TimedExecutor
type TimedExecutor struct {
task Task
timeout time.Duration
}
func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
return &TimedExecutor{task, timeout}
}
func (t *TimedExecutor) Execute(arg int) (int, error) {
timer := time.Now()
resultChan := make(chan TaskOutput, 1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
resultChan <- TaskOutput{res, err}
}(t.task, arg)
final := <-resultChan
if t.timeout > time.Since(timer) {
return final.result, final.err
}
return -1, errors.New("The task execution time exceeded the provided timeout")
}
// ConcurrentMapExecutor
type ConcurrentMapExecutor struct {
reduce func(result []int) int
tasks []Task
}
func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
obj := &ConcurrentMapExecutor{}
obj.reduce = reduce
return obj
}
func (c *ConcurrentMapExecutor) AppendTask(task Task) {
c.tasks = append(c.tasks, task)
}
func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
if len(c.tasks) == 0 {
return -1, errors.New("The aren't any tasks to be executed.")
}
var wg sync.WaitGroup
resultChan := make(chan TaskOutput, len(c.tasks))
errorChan := make(chan error, 1)
for _, task := range c.tasks {
- select {
- case err := <-errorChan:
- return -1, err
- default:
- wg.Add(1)
- go func(t Task, arg int) {
- res, err := t.Execute(arg)
+ wg.Add(1)
+ go func(t Task, arg int) {
+ res, err := t.Execute(arg)
- if err != nil {
- errorChan <- err
- }
+ // Not a proper way to do it
+ if err != nil && len(errorChan) == 0 {
+ errorChan <- err
+ }
- resultChan <- TaskOutput{res, err}
- wg.Done()
- }(task, arg)
- }
+ resultChan <- TaskOutput{res, err}
+ wg.Done()
+ }(task, arg)
}
wg.Wait()
close(resultChan)
+ if len(errorChan) > 0 {
+ return -1, <-errorChan
+ }
+
results := []int{}
for output := range resultChan {
results = append(results, output.result)
}
return c.reduce(results), nil
}
// GreedilyTaskExecutor
type GreedilyTaskExecutor struct {
errorLimit int
tasks <-chan Task
}
func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
return &GreedilyTaskExecutor{errorLimit, tasks}
}
func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
var wg sync.WaitGroup
var mtx sync.Mutex
results := []TaskOutput{}
for {
task, ok := <-g.tasks
if ok {
wg.Add(1)
go func(t Task, arg int) {
res, err := t.Execute(arg)
mtx.Lock()
results = append(results, TaskOutput{res, err})
mtx.Unlock()
wg.Done()
}(task, arg)
} else {
break
}
}
wg.Wait()
if len(results) == 0 {
return -1, errors.New("No tasks were executed.")
}
max := math.MinInt32
errCount := 0
for _, output := range results {
if output.result > max {
max = output.result
}
if output.err != nil {
- errCount++
+ if errCount++; errCount > g.errorLimit {
+ return -1, errors.New("Exceeded error limit.")
+ }
}
- }
-
- if errCount > g.errorLimit {
- return -1, errors.New("Exceeded error limit.")
}
return max, nil
}
// Functions
func Pipeline(tasks ...Task) Task {
se := NewSequentialExecutor()
for _, task := range tasks {
se.AppendTask(task)
}
return se
}
func Fastest(tasks ...Task) Task {
ce := NewConcurrentExecutor()
for _, task := range tasks {
ce.AppendTask(task)
}
return ce
}
func Timed(task Task, timeout time.Duration) Task {
return NewTimedExecutor(task, timeout)
}
func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
cmr := NewConcurrentMapExecutor(reduce)
for _, task := range tasks {
cmr.AppendTask(task)
}
return cmr
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return NewGreedilyTaskExecutor(errorLimit, tasks)
}