Решение на Concurrent Tasks от Веселин Стоянов

Обратно към всички решения

Към профила на Веселин Стоянов

Резултати

  • 5 точки от тестове
  • 0 бонус точки
  • 5 точки общо
  • 5 успешни тест(а)
  • 8 неуспешни тест(а)

Код

package main
import "errors"
import "time"
// Common
type Task interface {
Execute(int) (int, error)
}
type FunctionResult struct {
res int
err error
}
// Pipeline
type PipelineResult struct {
Tasks []Task
}
func (r PipelineResult) Execute(x int) (int, error) {
if len(r.Tasks) == 0 {
return 0, errors.New("No tasks provided.")
}
var last_result int
var err error
for i, task := range r.Tasks {
if i == 0 {
last_result, err = task.Execute(x)
} else {
last_result, err = task.Execute(last_result)
}
if err != nil {
return 0, errors.New("One of the tasks' execution returned an error.")
}
}
return last_result, err
}
func Pipeline(tasks ...Task) Task {
return PipelineResult{tasks}
}
// Fastest
type FastestResult struct {
tasks []Task
}
func (fr FastestResult) fastestExecutor(x int) chan FunctionResult {
ch := make(chan FunctionResult)
go func() {
for _, task := range fr.tasks {
go func() {
result, err := task.Execute(x)
res := FunctionResult{res: result, err: err}
ch <- res
}()
}
}()
return ch
}
func (fr FastestResult) Execute(x int) (int, error) {
select {
case r := <-fr.fastestExecutor(x):
return r.res, r.err
}
}
func Fastest(tasks ...Task) Task {
return FastestResult{tasks}
}
// Timed
type TimedResult struct {
task Task
timeout time.Duration
}
func (tr TimedResult) timedExecutor(x int) chan FunctionResult {
ch := make(chan FunctionResult)
go func() {
if result, err := tr.task.Execute(x); err == nil {
res := FunctionResult{res: result, err: err}
ch <- res
}
}()
return ch
}
func (tr TimedResult) Execute(x int) (int, error) {
select {
case fr := <-tr.timedExecutor(x):
return fr.res, fr.err
case <-time.After(tr.timeout):
return 0, errors.New("The task didn't finish in the specified time.")
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimedResult{task, timeout}
}
// ConcurrentMapReduce
type ConcurrentMapReduceResult struct {
reduce func(results []int) int
tasks []Task
}
func (cmrr ConcurrentMapReduceResult) concurrentMapReduceExecutor(x int) chan FunctionResult {
ch := make(chan FunctionResult)
go func() {
for _, task := range cmrr.tasks {
go func() {
result, err := task.Execute(x)
res := FunctionResult{res: result, err: err}
ch <- res
}()
}
}()
return ch
}
func (cmrr ConcurrentMapReduceResult) Execute(x int) (int, error) {
if len(cmrr.tasks) == 0 {
return 0, errors.New("No tasks provided.")
}
results := make([]int, 0)
i := 0
ch := cmrr.concurrentMapReduceExecutor(x)
for r := range ch {
if r.err == nil {
results = append(results, r.res)
i++
if i == len(cmrr.tasks) {
break
}
} else {
return 0, r.err
}
}
var err error
return cmrr.reduce(results), err
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return ConcurrentMapReduceResult{reduce, tasks}
}
// GreatestSearcher
type GreatestSearcherResult struct {
errorLimit int
tasks_channel <-chan Task
}
func (gsr GreatestSearcherResult) Execute(x int) (int, error) {
max, errs, i := 0, 0, 0
for task := range gsr.tasks_channel {
go func() {
result, err := task.Execute(x)
if result > max {
max = result
}
if err != nil {
errs++
}
i++
}()
}
if i == 0 {
return 0, errors.New("No tasks provided.")
}
if errs > gsr.errorLimit {
return 0, errors.New("Too many errors.")
}
var err error
return max, err
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return GreatestSearcherResult{errorLimit, tasks}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-13zggrj	0.003s
PASS
ok  	_/tmp/d20161129-30451-13zggrj	0.003s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f26c0, 0xc42008e010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200720c0, 0x51e1e0, 0x11, 0x52bc90, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200720c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200720c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52bd68, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-13zggrj/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-13zggrj.FastestResult.Execute(0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution.go:74 +0x91
_/tmp/d20161129-30451-13zggrj.(*FastestResult).Execute(0xc42000e460, 0x5, 0x0, 0x5985c0, 0xc42000e460)
	<autogenerated>:4 +0x6e
_/tmp/d20161129-30451-13zggrj.TestFastestErrors(0xc420072180)
	/tmp/d20161129-30451-13zggrj/solution_test.go:76 +0x73
testing.tRunner(0xc420072180, 0x52bc90)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161129-30451-13zggrj	1.005s
PASS
ok  	_/tmp/d20161129-30451-13zggrj	0.103s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f26c0, 0xc4200a8010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x520b71, 0x1d, 0x52bca8, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52bd68, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-13zggrj/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-13zggrj.FastestResult.Execute(0xc42000e780, 0x2, 0x2, 0x1, 0xc42003feb0, 0xc42003fea0, 0x472d11)
	/tmp/d20161129-30451-13zggrj/solution.go:74 +0x91
_/tmp/d20161129-30451-13zggrj.(*FastestResult).Execute(0xc42000e7a0, 0x1, 0xc420010558, 0xc420054420, 0xc420070180)
	<autogenerated>:4 +0x6e
_/tmp/d20161129-30451-13zggrj.TestFastestWaitsForGoroutines(0xc420070180)
	/tmp/d20161129-30451-13zggrj/solution_test.go:151 +0x37e
testing.tRunner(0xc420070180, 0x52bca8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan receive]:
_/tmp/d20161129-30451-13zggrj.TestFastestWaitsForGoroutines.func3(0xc420010558, 0xc420054420, 0xc420070180, 0xc420054480, 0xc420012310, 0xc4200544e0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:131 +0x81
created by _/tmp/d20161129-30451-13zggrj.TestFastestWaitsForGoroutines
	/tmp/d20161129-30451-13zggrj/solution_test.go:150 +0x361

goroutine 9 [chan receive]:
_/tmp/d20161129-30451-13zggrj.TestFastestWaitsForGoroutines.func2(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:123 +0x55
_/tmp/d20161129-30451-13zggrj.fTask.Execute(0xc42000e760, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:40 +0x30
_/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1.1(0xc4200109c0, 0x1, 0xc420054a20)
	/tmp/d20161129-30451-13zggrj/solution.go:62 +0x3d
created by _/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1
	/tmp/d20161129-30451-13zggrj/solution.go:65 +0xa4

goroutine 10 [chan receive]:
_/tmp/d20161129-30451-13zggrj.TestFastestWaitsForGoroutines.func2(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:123 +0x55
_/tmp/d20161129-30451-13zggrj.fTask.Execute(0xc42000e760, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:40 +0x30
_/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1.1(0xc4200109c0, 0x1, 0xc420054a20)
	/tmp/d20161129-30451-13zggrj/solution.go:62 +0x3d
created by _/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1
	/tmp/d20161129-30451-13zggrj/solution.go:65 +0xa4
exit status 2
FAIL	_/tmp/d20161129-30451-13zggrj	1.006s
PASS
ok  	_/tmp/d20161129-30451-13zggrj	0.234s
--- FAIL: TestTimedDoesntLeaveGoroutineHanging (0.20s)
	solution_test.go:216: Expected that there will be as many goroutines as at the start(3) after Timed task has finished after it has timeouted but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c4c6 0x46f53d 0x46c161 0x46d1d5 0x46c7f5 0x401276 0x42a494 0x459fc1
		#	0x46c4c5	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f53c	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c160	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d1d4	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c7f4	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-13zggrj/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c2c4f 0x4c2a50 0x4bf701 0x4736b6 0x46c161 0x459fc1
		#	0x4c2c4e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2a4f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bf700	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x4736b5	_/tmp/d20161129-30451-13zggrj.TestTimedDoesntLeaveGoroutineHanging+0x175	/tmp/d20161129-30451-13zggrj/solution_test.go:191
		#	0x46c160	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a8fa 0x42a9ee 0x4039a8 0x40376d 0x4756fc 0x459fc1
		#	0x4756fb	_/tmp/d20161129-30451-13zggrj.TimedResult.timedExecutor.func1+0x8b	/tmp/d20161129-30451-13zggrj/solution.go:96
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c4c6 0x46f53d 0x46c161 0x46d1d5 0x46c7f5 0x401276 0x42a494 0x459fc1
		#	0x46c4c5	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f53c	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c160	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d1d4	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c7f4	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-13zggrj/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x47384f 0x46c161 0x459fc1
		#	0x47384e	_/tmp/d20161129-30451-13zggrj.TestTimedDoesntLeaveGoroutineHanging+0x30e	/tmp/d20161129-30451-13zggrj/solution_test.go:225
		#	0x46c160	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c2c4f 0x4c2a50 0x4bf701 0x476281 0x459fc1
		#	0x4c2c4e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2a4f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bf700	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x476280	_/tmp/d20161129-30451-13zggrj.TestTimedDoesntLeaveGoroutineHanging.func2+0x1a0	/tmp/d20161129-30451-13zggrj/solution_test.go:212
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-13zggrj	0.204s
PASS
ok  	_/tmp/d20161129-30451-13zggrj	0.004s
--- FAIL: TestConcurrentMapReduceSimple (0.00s)
	solution_test.go:253: Expected result to be 55 but is 77
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-13zggrj	0.003s
--- FAIL: TestGreatestSearcherSimple (0.00s)
	solution_test.go:287: Received an unexpected error Too many errors.
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-13zggrj	0.004s
--- FAIL: TestGreatestSearcherComplex (0.05s)
	solution_test.go:302: Received result 32 when expecting 42
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-13zggrj	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/like_the_example (0.05s)
    	solution_test.go:313: Expected error did not occur instead got 32
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-13zggrj	0.048s
panic: test timed out after 1s

goroutine 8 [running]:
panic(0x4f26c0, 0xc420010760)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200720c0, 0x51d0dd, 0xb, 0x52bd10, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200720c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200720c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52bd68, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-13zggrj/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-13zggrj.FastestResult.Execute(0xc42000a480, 0x3, 0x3, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution.go:74 +0x91
_/tmp/d20161129-30451-13zggrj.(*FastestResult).Execute(0xc42000e3e0, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	<autogenerated>:4 +0x6e
_/tmp/d20161129-30451-13zggrj.PipelineResult.Execute(0xc42001a0f0, 0x5, 0x5, 0xffffffffffffffb5, 0xc42001a0f0, 0x5, 0x5)
	/tmp/d20161129-30451-13zggrj/solution.go:35 +0x161
_/tmp/d20161129-30451-13zggrj.(*PipelineResult).Execute(0xc42000e460, 0xffffffffffffffb5, 0x5, 0x598640, 0xc42000e460)
	<autogenerated>:2 +0x6e
_/tmp/d20161129-30451-13zggrj.TestThemAll(0xc420072180)
	/tmp/d20161129-30451-13zggrj/solution_test.go:374 +0x6d3
testing.tRunner(0xc420072180, 0x52bd10)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan send]:
_/tmp/d20161129-30451-13zggrj.TestThemAll.func1(0xc420054360)
	/tmp/d20161129-30451-13zggrj/solution_test.go:343 +0x8e
created by _/tmp/d20161129-30451-13zggrj.TestThemAll
	/tmp/d20161129-30451-13zggrj/solution_test.go:349 +0x75

goroutine 18 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-13zggrj.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:33 +0x32
_/tmp/d20161129-30451-13zggrj.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:11 +0x65
_/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1.1(0xc420090000, 0xffffffffffffffd3, 0xc42008a000)
	/tmp/d20161129-30451-13zggrj/solution.go:62 +0x3d
created by _/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1
	/tmp/d20161129-30451-13zggrj/solution.go:65 +0xa4

goroutine 19 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-13zggrj.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:33 +0x32
_/tmp/d20161129-30451-13zggrj.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:11 +0x65
_/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1.1(0xc420090000, 0xffffffffffffffd3, 0xc42008a000)
	/tmp/d20161129-30451-13zggrj/solution.go:62 +0x3d
created by _/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1
	/tmp/d20161129-30451-13zggrj/solution.go:65 +0xa4

goroutine 20 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-13zggrj.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-13zggrj/solution_test.go:33 +0x32
_/tmp/d20161129-30451-13zggrj.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:11 +0x65
_/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1.1(0xc420090000, 0xffffffffffffffd3, 0xc42008a000)
	/tmp/d20161129-30451-13zggrj/solution.go:62 +0x3d
created by _/tmp/d20161129-30451-13zggrj.FastestResult.fastestExecutor.func1
	/tmp/d20161129-30451-13zggrj/solution.go:65 +0xa4
exit status 2
FAIL	_/tmp/d20161129-30451-13zggrj	1.006s

История (1 версия и 2 коментара)

Веселин обнови решението на 28.11.2016 18:27 (преди над 1 година)

+package main
+
+import "errors"
+import "time"
+
+// Common
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type FunctionResult struct {
+ res int
+ err error
+}
+
+// Pipeline
+
+type PipelineResult struct {
+ Tasks []Task
+}
+
+func (r PipelineResult) Execute(x int) (int, error) {
+ if len(r.Tasks) == 0 {
+ return 0, errors.New("No tasks provided.")
+ }
+
+ var last_result int
+ var err error
+
+ for i, task := range r.Tasks {
+ if i == 0 {
+ last_result, err = task.Execute(x)
+ } else {
+ last_result, err = task.Execute(last_result)
+ }
+
+ if err != nil {
+ return 0, errors.New("One of the tasks' execution returned an error.")
+ }
+ }
+
+ return last_result, err
+}
+
+func Pipeline(tasks ...Task) Task {
+ return PipelineResult{tasks}
+}
+
+// Fastest
+
+type FastestResult struct {
+ tasks []Task
+}
+
+func (fr FastestResult) fastestExecutor(x int) chan FunctionResult {
+ ch := make(chan FunctionResult)
+
+ go func() {
+ for _, task := range fr.tasks {
+ go func() {
+ result, err := task.Execute(x)
+ res := FunctionResult{res: result, err: err}
+ ch <- res
+ }()
+ }
+ }()
+
+ return ch
+}
+
+func (fr FastestResult) Execute(x int) (int, error) {
+ select {
+ case r := <-fr.fastestExecutor(x):
+ return r.res, r.err
+ }
+}
+
+func Fastest(tasks ...Task) Task {
+ return FastestResult{tasks}
+}
+
+// Timed
+
+type TimedResult struct {
+ task Task
+ timeout time.Duration
+}
+
+func (tr TimedResult) timedExecutor(x int) chan FunctionResult {
+ ch := make(chan FunctionResult)
+
+ go func() {
+ if result, err := tr.task.Execute(x); err == nil {
+ res := FunctionResult{res: result, err: err}
+ ch <- res
+ }
+ }()
+
+ return ch
+}
+
+func (tr TimedResult) Execute(x int) (int, error) {
+ select {
+ case fr := <-tr.timedExecutor(x):
+ return fr.res, fr.err
+ case <-time.After(tr.timeout):
+ return 0, errors.New("The task didn't finish in the specified time.")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return TimedResult{task, timeout}
+}
+
+// ConcurrentMapReduce
+
+type ConcurrentMapReduceResult struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (cmrr ConcurrentMapReduceResult) concurrentMapReduceExecutor(x int) chan FunctionResult {
+ ch := make(chan FunctionResult)
+
+ go func() {
+ for _, task := range cmrr.tasks {
+ go func() {
+ result, err := task.Execute(x)
+ res := FunctionResult{res: result, err: err}
+ ch <- res
+ }()
+ }
+ }()
+
+ return ch
+}
+
+func (cmrr ConcurrentMapReduceResult) Execute(x int) (int, error) {
+ if len(cmrr.tasks) == 0 {
+ return 0, errors.New("No tasks provided.")
+ }
+
+ results := make([]int, 0)
+ i := 0
+
+ ch := cmrr.concurrentMapReduceExecutor(x)
+
+ for r := range ch {
+ if r.err == nil {
+ results = append(results, r.res)
+ i++
+
+ if i == len(cmrr.tasks) {
+ break
+ }
+
+ } else {
+ return 0, r.err
+ }
+ }
+
+ var err error
+ return cmrr.reduce(results), err
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return ConcurrentMapReduceResult{reduce, tasks}
+}
+
+// GreatestSearcher
+
+type GreatestSearcherResult struct {
+ errorLimit int
+ tasks_channel <-chan Task
+}
+
+func (gsr GreatestSearcherResult) Execute(x int) (int, error) {
+ max, errs, i := 0, 0, 0
+
+ for task := range gsr.tasks_channel {
+ go func() {
+ result, err := task.Execute(x)
+ if result > max {
+ max = result
+ }
+
+ if err != nil {
+ errs++
+ }
+
+ i++
+ }()
+ }
+
+ if i == 0 {
+ return 0, errors.New("No tasks provided.")
+ }
+
+ if errs > gsr.errorLimit {
+ return 0, errors.New("Too many errors.")
+ }
+
+ var err error
+ return max, err
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return GreatestSearcherResult{errorLimit, tasks}
+}