Решение на Concurrent Tasks от Димо Дрънгов

Обратно към всички решения

Към профила на Димо Дрънгов

Резултати

  • 6 точки от тестове
  • 0 бонус точки
  • 6 точки общо
  • 6 успешни тест(а)
  • 7 неуспешни тест(а)

Код

package main
import "errors"
import "sync"
import "time"
import "math"
type Task interface {
Execute(int) (int, error)
}
type Task1 struct {
tasks []Task
}
func (t Task1) Execute(arg int) (int, error) {
res, err := arg, errors.New(" ")
for i := 0; i < len(t.tasks); i++ {
res, err = t.tasks[i].Execute(res)
if err != nil {
return 0, err
}
}
return res, err
}
func Pipeline(tasks ...Task) Task {
return Task1{tasks}
}
type Task2 struct {
tasks []Task
}
func (t Task2) Execute(arg int) (int, error) {
resChan := make(chan struct {
res int
err error
})
for _, task := range t.tasks {
go func() {
res1, err1 := task.Execute(arg)
resChan <- struct {
res int
err error
}{res1, err1}
}()
}
select {
case st := <-resChan:
return st.res, st.err
}
}
func Fastest(tasks ...Task) Task {
return Task2{tasks}
}
type Task3 struct {
task Task
timeout time.Duration
}
func (t Task3) Execute(arg int) (int, error) {
ch := make(chan struct {
res int
err error
})
timeo := make(chan struct{}, 1)
go func() {
res1, err1 := t.task.Execute(arg)
ch <- struct {
res int
err error
}{res1, err1}
}()
go func() {
time.Sleep(t.timeout)
timeo <- struct{}{}
}()
select {
case st := <-ch:
return st.res, st.err
case <-timeo:
//fmt.Println("Tap timeout")
return 0, errors.New("Time out.")
}
}
func Timed(task Task, timeout time.Duration) Task {
return Task3{task, timeout}
}
type Task4 struct {
reduce func([]int) int
tasks []Task
}
func (t Task4) Execute(arg int) (int, error) {
if len(t.tasks) == 0 {
return 0, errors.New("No tasks!")
}
resChan := make(chan struct {
res int
err error
})
for _, task := range t.tasks {
go func() {
res1, err1 := task.Execute(arg)
resChan <- struct {
res int
err error
}{res1, err1}
}()
}
defer close(resChan)
arr := make([]int, 0)
for i := 0; i < len(t.tasks); i++ {
select {
case st := <-resChan:
if st.err != nil {
return 0, st.err
} else {
arr = append(arr, st.res)
if len(arr) == len(t.tasks) {
return t.reduce(arr), nil
}
}
}
}
return 0, errors.New("Unexpected error")
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return Task4{reduce, tasks}
}
type Task5 struct {
tasks <-chan Task
errNum int
mutex sync.Mutex
wg sync.WaitGroup
}
func (t Task5) Execute(arg int) (int, error) {
errs := 0
max := math.MinInt32
for task := range t.tasks {
t.wg.Add(1)
go func() {
res, err := task.Execute(arg)
t.mutex.Lock()
if err != nil {
//t.mutex.Lock()
errs++
//t.mutex.Unlock()
} else {
//t.mutex.Lock()
if res > max {
max = res
}
//t.mutex.Unlock()
}
t.mutex.Unlock()
t.wg.Done()
}()
}
t.wg.Wait()
if errs > t.errNum {
return 0, errors.New("More errors!")
} else if errs > 0 && max == math.MinInt32 {
return 0, errors.New("Only errors!")
} else if errs == 0 && max == math.MinInt32 {
return 0, errors.New("No tasks!")
}
return max, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var mut sync.Mutex
var wg sync.WaitGroup
return Task5{tasks, errorLimit, mut, wg}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-bb5utl	0.004s
PASS
ok  	_/tmp/d20161129-30451-bb5utl	0.003s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f2620, 0xc42008e010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x51e29d, 0x11, 0x52bd90, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52be60, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-bb5utl/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-bb5utl.Task2.Execute(0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution.go:50 +0x14f
_/tmp/d20161129-30451-bb5utl.(*Task2).Execute(0xc42000e460, 0x5, 0x0, 0x5985c0, 0xc42000e460)
	<autogenerated>:3 +0x6e
_/tmp/d20161129-30451-bb5utl.TestFastestErrors(0xc420070180)
	/tmp/d20161129-30451-bb5utl/solution_test.go:76 +0x73
testing.tRunner(0xc420070180, 0x52bd90)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161129-30451-bb5utl	1.005s
PASS
ok  	_/tmp/d20161129-30451-bb5utl	0.103s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f2620, 0xc4200b6010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x520bed, 0x1d, 0x52bda8, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52be60, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-bb5utl/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-bb5utl.Task2.Execute(0xc42000e780, 0x2, 0x2, 0x1, 0xc42003feb0, 0xc42003fea0, 0x472dd1)
	/tmp/d20161129-30451-bb5utl/solution.go:50 +0x14f
_/tmp/d20161129-30451-bb5utl.(*Task2).Execute(0xc42000e7a0, 0x1, 0xc420010558, 0xc420054420, 0xc420070180)
	<autogenerated>:3 +0x6e
_/tmp/d20161129-30451-bb5utl.TestFastestWaitsForGoroutines(0xc420070180)
	/tmp/d20161129-30451-bb5utl/solution_test.go:151 +0x37e
testing.tRunner(0xc420070180, 0x52bda8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan receive]:
_/tmp/d20161129-30451-bb5utl.TestFastestWaitsForGoroutines.func3(0xc420010558, 0xc420054420, 0xc420070180, 0xc420054480, 0xc420012310, 0xc4200544e0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:131 +0x81
created by _/tmp/d20161129-30451-bb5utl.TestFastestWaitsForGoroutines
	/tmp/d20161129-30451-bb5utl/solution_test.go:150 +0x361

goroutine 8 [chan receive]:
_/tmp/d20161129-30451-bb5utl.TestFastestWaitsForGoroutines.func2(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:123 +0x55
_/tmp/d20161129-30451-bb5utl.fTask.Execute(0xc42000e760, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:40 +0x30
_/tmp/d20161129-30451-bb5utl.Task2.Execute.func1(0xc4200109c0, 0x1, 0xc420054a20)
	/tmp/d20161129-30451-bb5utl/solution.go:42 +0x41
created by _/tmp/d20161129-30451-bb5utl.Task2.Execute
	/tmp/d20161129-30451-bb5utl/solution.go:47 +0xca

goroutine 9 [chan receive]:
_/tmp/d20161129-30451-bb5utl.TestFastestWaitsForGoroutines.func2(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:123 +0x55
_/tmp/d20161129-30451-bb5utl.fTask.Execute(0xc42000e760, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:40 +0x30
_/tmp/d20161129-30451-bb5utl.Task2.Execute.func1(0xc4200109c0, 0x1, 0xc420054a20)
	/tmp/d20161129-30451-bb5utl/solution.go:42 +0x41
created by _/tmp/d20161129-30451-bb5utl.Task2.Execute
	/tmp/d20161129-30451-bb5utl/solution.go:47 +0xca
exit status 2
FAIL	_/tmp/d20161129-30451-bb5utl	1.007s
PASS
ok  	_/tmp/d20161129-30451-bb5utl	0.134s
--- FAIL: TestTimedDoesntLeaveGoroutineHanging (0.20s)
	solution_test.go:204: Expected that there will be 1 more goroutines than at the start(3) after Timed one has started got 5
	solution_test.go:216: Expected that there will be as many goroutines as at the start(3) after Timed task has finished after it has timeouted but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x46c446 0x46f4bd 0x46c0e1 0x46d155 0x46c775 0x401276 0x42a414 0x459f41
		#	0x46c445	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4bc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0e0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d154	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c774	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-bb5utl/_test/_testmain.go:78
		#	0x42a413	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c304f 0x4c2e50 0x4bfb01 0x473776 0x46c0e1 0x459f41
		#	0x4c304e	runtime/pprof.writeRuntimeProfile+0x9e					/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2e4f	runtime/pprof.writeGoroutine+0x9f					/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bfb00	runtime/pprof.(*Profile).WriteTo+0x340					/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x473775	_/tmp/d20161129-30451-bb5utl.TestTimedDoesntLeaveGoroutineHanging+0x175	/tmp/d20161129-30451-bb5utl/solution_test.go:191
		#	0x46c0e0	testing.tRunner+0x80							/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a87a 0x42a96e 0x4039a8 0x40376d 0x475813 0x459f41
		#	0x475812	_/tmp/d20161129-30451-bb5utl.Task3.Execute.func1+0x92	/tmp/d20161129-30451-bb5utl/solution.go:75
		
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x46c446 0x46f4bd 0x46c0e1 0x46d155 0x46c775 0x401276 0x42a414 0x459f41
		#	0x46c445	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4bc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0e0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d154	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c774	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-bb5utl/_test/_testmain.go:78
		#	0x42a413	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x47390f 0x46c0e1 0x459f41
		#	0x47390e	_/tmp/d20161129-30451-bb5utl.TestTimedDoesntLeaveGoroutineHanging+0x30e	/tmp/d20161129-30451-bb5utl/solution_test.go:225
		#	0x46c0e0	testing.tRunner+0x80							/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c304f 0x4c2e50 0x4bfb01 0x476351 0x459f41
		#	0x4c304e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2e4f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bfb00	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x476350	_/tmp/d20161129-30451-bb5utl.TestTimedDoesntLeaveGoroutineHanging.func2+0x1a0	/tmp/d20161129-30451-bb5utl/solution_test.go:212
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-bb5utl	0.204s
PASS
ok  	_/tmp/d20161129-30451-bb5utl	0.008s
--- FAIL: TestConcurrentMapReduceSimple (0.00s)
	solution_test.go:253: Expected result to be 55 but is 77
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-bb5utl	0.003s
--- FAIL: TestGreatestSearcherSimple (0.00s)
	solution_test.go:287: Received an unexpected error More errors!
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-bb5utl	0.003s
--- FAIL: TestGreatestSearcherComplex (0.05s)
	solution_test.go:302: Received result 32 when expecting 42
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-bb5utl	0.048s
PASS
ok  	_/tmp/d20161129-30451-bb5utl	0.048s
panic: test timed out after 1s

goroutine 8 [running]:
panic(0x4f2620, 0xc420010760)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200720c0, 0x51d1a8, 0xb, 0x52be10, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200720c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200720c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52be60, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-bb5utl/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-bb5utl.Task2.Execute(0xc42000a480, 0x3, 0x3, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution.go:50 +0x14f
_/tmp/d20161129-30451-bb5utl.(*Task2).Execute(0xc42000e3e0, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	<autogenerated>:3 +0x6e
_/tmp/d20161129-30451-bb5utl.Task1.Execute(0xc42001a0a0, 0x5, 0x5, 0xffffffffffffffb5, 0xc42001a0a0, 0x5, 0x5)
	/tmp/d20161129-30451-bb5utl/solution.go:19 +0x9c
_/tmp/d20161129-30451-bb5utl.(*Task1).Execute(0xc42000e460, 0xffffffffffffffb5, 0x5, 0x598580, 0xc42000e460)
	<autogenerated>:2 +0x6e
_/tmp/d20161129-30451-bb5utl.TestThemAll(0xc420072180)
	/tmp/d20161129-30451-bb5utl/solution_test.go:374 +0x73a
testing.tRunner(0xc420072180, 0x52be10)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan send]:
_/tmp/d20161129-30451-bb5utl.TestThemAll.func1(0xc420056360)
	/tmp/d20161129-30451-bb5utl/solution_test.go:343 +0x8e
created by _/tmp/d20161129-30451-bb5utl.TestThemAll
	/tmp/d20161129-30451-bb5utl/solution_test.go:349 +0x75

goroutine 17 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-bb5utl.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:33 +0x32
_/tmp/d20161129-30451-bb5utl.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:8 +0x65
_/tmp/d20161129-30451-bb5utl.Task2.Execute.func1(0xc42008e000, 0xffffffffffffffd3, 0xc42008c000)
	/tmp/d20161129-30451-bb5utl/solution.go:42 +0x41
created by _/tmp/d20161129-30451-bb5utl.Task2.Execute
	/tmp/d20161129-30451-bb5utl/solution.go:47 +0xca

goroutine 18 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-bb5utl.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:33 +0x32
_/tmp/d20161129-30451-bb5utl.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:8 +0x65
_/tmp/d20161129-30451-bb5utl.Task2.Execute.func1(0xc42008e000, 0xffffffffffffffd3, 0xc42008c000)
	/tmp/d20161129-30451-bb5utl/solution.go:42 +0x41
created by _/tmp/d20161129-30451-bb5utl.Task2.Execute
	/tmp/d20161129-30451-bb5utl/solution.go:47 +0xca

goroutine 19 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-bb5utl.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-bb5utl/solution_test.go:33 +0x32
_/tmp/d20161129-30451-bb5utl.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:8 +0x65
_/tmp/d20161129-30451-bb5utl.Task2.Execute.func1(0xc42008e000, 0xffffffffffffffd3, 0xc42008c000)
	/tmp/d20161129-30451-bb5utl/solution.go:42 +0x41
created by _/tmp/d20161129-30451-bb5utl.Task2.Execute
	/tmp/d20161129-30451-bb5utl/solution.go:47 +0xca
exit status 2
FAIL	_/tmp/d20161129-30451-bb5utl	1.009s

История (1 версия и 0 коментара)

Димо обнови решението на 29.11.2016 16:14 (преди над 1 година)

+package main
+
+import "errors"
+import "sync"
+import "time"
+import "math"
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Task1 struct {
+ tasks []Task
+}
+
+func (t Task1) Execute(arg int) (int, error) {
+ res, err := arg, errors.New(" ")
+ for i := 0; i < len(t.tasks); i++ {
+ res, err = t.tasks[i].Execute(res)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return res, err
+}
+
+func Pipeline(tasks ...Task) Task {
+ return Task1{tasks}
+}
+
+type Task2 struct {
+ tasks []Task
+}
+
+func (t Task2) Execute(arg int) (int, error) {
+ resChan := make(chan struct {
+ res int
+ err error
+ })
+ for _, task := range t.tasks {
+ go func() {
+ res1, err1 := task.Execute(arg)
+ resChan <- struct {
+ res int
+ err error
+ }{res1, err1}
+ }()
+ }
+ select {
+ case st := <-resChan:
+ return st.res, st.err
+ }
+}
+
+func Fastest(tasks ...Task) Task {
+ return Task2{tasks}
+}
+
+type Task3 struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t Task3) Execute(arg int) (int, error) {
+ ch := make(chan struct {
+ res int
+ err error
+ })
+ timeo := make(chan struct{}, 1)
+ go func() {
+ res1, err1 := t.task.Execute(arg)
+ ch <- struct {
+ res int
+ err error
+ }{res1, err1}
+ }()
+ go func() {
+ time.Sleep(t.timeout)
+ timeo <- struct{}{}
+ }()
+ select {
+ case st := <-ch:
+ return st.res, st.err
+ case <-timeo:
+ //fmt.Println("Tap timeout")
+ return 0, errors.New("Time out.")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return Task3{task, timeout}
+}
+
+type Task4 struct {
+ reduce func([]int) int
+ tasks []Task
+}
+
+func (t Task4) Execute(arg int) (int, error) {
+ if len(t.tasks) == 0 {
+ return 0, errors.New("No tasks!")
+ }
+ resChan := make(chan struct {
+ res int
+ err error
+ })
+ for _, task := range t.tasks {
+ go func() {
+ res1, err1 := task.Execute(arg)
+ resChan <- struct {
+ res int
+ err error
+ }{res1, err1}
+ }()
+ }
+ defer close(resChan)
+ arr := make([]int, 0)
+ for i := 0; i < len(t.tasks); i++ {
+ select {
+ case st := <-resChan:
+ if st.err != nil {
+ return 0, st.err
+ } else {
+ arr = append(arr, st.res)
+ if len(arr) == len(t.tasks) {
+ return t.reduce(arr), nil
+ }
+ }
+ }
+ }
+ return 0, errors.New("Unexpected error")
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return Task4{reduce, tasks}
+}
+
+type Task5 struct {
+ tasks <-chan Task
+ errNum int
+ mutex sync.Mutex
+ wg sync.WaitGroup
+}
+
+func (t Task5) Execute(arg int) (int, error) {
+ errs := 0
+ max := math.MinInt32
+ for task := range t.tasks {
+ t.wg.Add(1)
+ go func() {
+ res, err := task.Execute(arg)
+ t.mutex.Lock()
+ if err != nil {
+ //t.mutex.Lock()
+ errs++
+ //t.mutex.Unlock()
+ } else {
+ //t.mutex.Lock()
+ if res > max {
+ max = res
+ }
+ //t.mutex.Unlock()
+ }
+ t.mutex.Unlock()
+ t.wg.Done()
+ }()
+ }
+ t.wg.Wait()
+ if errs > t.errNum {
+ return 0, errors.New("More errors!")
+ } else if errs > 0 && max == math.MinInt32 {
+ return 0, errors.New("Only errors!")
+ } else if errs == 0 && max == math.MinInt32 {
+ return 0, errors.New("No tasks!")
+ }
+ return max, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ var mut sync.Mutex
+ var wg sync.WaitGroup
+ return Task5{tasks, errorLimit, mut, wg}
+}