Решение на Concurrent Tasks от Емил Дудев

Обратно към всички решения

Към профила на Емил Дудев

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"errors"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type PipelineTask struct {
tasks []Task
}
func (p *PipelineTask) Execute(i int) (r int, err error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks given")
}
for _, task := range p.tasks {
r, err = task.Execute(i)
if err != nil {
return
}
i = r
}
return
}
func Pipeline(tasks ...Task) Task {
return &PipelineTask{
tasks: tasks,
}
}
type FastestTask struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
return &FastestTask{
tasks: tasks,
}
}
func (p *FastestTask) Execute(i int) (r int, err error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks given")
}
rc := make(chan int)
ec := make(chan error)
for _, task := range p.tasks {
go func(t Task) {
rf, errf := t.Execute(i)
if errf != nil {
ec <- errf
} else {
rc <- rf
}
}(task)
}
select {
case r = <-rc:
case err = <-ec:
}
return
}
type TimedTask struct {
timeout time.Duration
}
func (t *TimedTask) Execute(i int) (r int, err error) {
time.Sleep(t.timeout)
err = errors.New("Timeout")
return
}
func Timed(task Task, timeout time.Duration) Task {
tasks := []Task{task, &TimedTask{timeout}}
return Fastest(tasks...)
}
type ConcurrentMapReduceTask struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &ConcurrentMapReduceTask{
tasks: tasks,
reduce: reduce,
}
}
func (t *ConcurrentMapReduceTask) Execute(i int) (r int, err error) {
if len(t.tasks) == 0 {
return 0, errors.New("No tasks given")
}
rc := make(chan int)
ec := make(chan error)
results := make([]int, 0)
for _, task := range t.tasks {
go func(t Task) {
rf, errf := t.Execute(i)
if errf != nil {
ec <- errf
} else {
rc <- rf
}
}(task)
}
for _, _ = range t.tasks {
select {
case rt := <-rc:
results = append(results, rt)
case err = <-ec:
return
}
}
r = t.reduce(results)
return
}
type GreatestSearcherTask struct {
tasks <-chan Task
errorLimit int
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &GreatestSearcherTask{
tasks: tasks,
errorLimit: errorLimit,
}
}
func (t *GreatestSearcherTask) Execute(i int) (r int, err error) {
rc := make(chan int)
ec := make(chan error)
first := true
errorCount := t.errorLimit
taskCount := 0
for task := range t.tasks {
taskCount++
go func(t Task) {
rf, errf := t.Execute(i)
if errf != nil {
ec <- errf
} else {
rc <- rf
}
}(task)
}
for taskCount != 0 {
select {
case rt := <-rc:
if first || rt > r {
r = rt
first = false
}
case _ = <-ec:
errorCount--
if errorCount < 0 {
err = errors.New("Error limit exceeded")
return
}
}
taskCount--
}
return
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.003s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.006s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.003s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.103s
--- FAIL: TestFastestWaitsForGoroutines (0.20s)
	solution_test.go:146: Expected that there will be as many goroutines as at the start(3) after all tasks in Fastest have finishes but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x46c3d6 0x46f44d 0x46c071 0x46d0e5 0x46c705 0x401276 0x42a414 0x459ed1
		#	0x46c3d5	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f44c	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c070	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d0e4	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c704	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-c4g23i/_test/_testmain.go:78
		#	0x42a413	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c1f3f 0x4c1d40 0x4be9f1 0x472ad1 0x46c071 0x459ed1
		#	0x4c1f3e	runtime/pprof.writeRuntimeProfile+0x9e					/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c1d3f	runtime/pprof.writeGoroutine+0x9f					/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4be9f0	runtime/pprof.(*Profile).WriteTo+0x340					/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x472ad0	_/tmp/d20161129-30451-c4g23i.TestFastestWaitsForGoroutines+0x1a0	/tmp/d20161129-30451-c4g23i/solution_test.go:114
		#	0x46c070	testing.tRunner+0x80							/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a87a 0x42a96e 0x4039a8 0x40376d 0x475374 0x459ed1
		#	0x475373	_/tmp/d20161129-30451-c4g23i.(*FastestTask).Execute.func1+0xb3	/tmp/d20161129-30451-c4g23i/solution.go:62
		
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x46c3d6 0x46f44d 0x46c071 0x46d0e5 0x46c705 0x401276 0x42a414 0x459ed1
		#	0x46c3d5	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f44c	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c070	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d0e4	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c704	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-c4g23i/_test/_testmain.go:78
		#	0x42a413	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x472d00 0x46c071 0x459ed1
		#	0x472cff	_/tmp/d20161129-30451-c4g23i.TestFastestWaitsForGoroutines+0x3cf	/tmp/d20161129-30451-c4g23i/solution_test.go:157
		#	0x46c070	testing.tRunner+0x80							/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c1f3f 0x4c1d40 0x4be9f1 0x47587b 0x459ed1
		#	0x4c1f3e	runtime/pprof.writeRuntimeProfile+0x9e					/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c1d3f	runtime/pprof.writeGoroutine+0x9f					/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4be9f0	runtime/pprof.(*Profile).WriteTo+0x340					/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x47587a	_/tmp/d20161129-30451-c4g23i.TestFastestWaitsForGoroutines.func3+0x17a	/tmp/d20161129-30451-c4g23i/solution_test.go:142
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-c4g23i	0.204s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.134s
--- FAIL: TestTimedDoesntLeaveGoroutineHanging (0.20s)
	solution_test.go:204: Expected that there will be 1 more goroutines than at the start(3) after Timed one has started got 5
	solution_test.go:216: Expected that there will be as many goroutines as at the start(3) after Timed task has finished after it has timeouted but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x46c3d6 0x46f44d 0x46c071 0x46d0e5 0x46c705 0x401276 0x42a414 0x459ed1
		#	0x46c3d5	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f44c	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c070	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d0e4	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c704	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-c4g23i/_test/_testmain.go:78
		#	0x42a413	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c1f3f 0x4c1d40 0x4be9f1 0x473566 0x46c071 0x459ed1
		#	0x4c1f3e	runtime/pprof.writeRuntimeProfile+0x9e					/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c1d3f	runtime/pprof.writeGoroutine+0x9f					/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4be9f0	runtime/pprof.(*Profile).WriteTo+0x340					/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x473565	_/tmp/d20161129-30451-c4g23i.TestTimedDoesntLeaveGoroutineHanging+0x175	/tmp/d20161129-30451-c4g23i/solution_test.go:191
		#	0x46c070	testing.tRunner+0x80							/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a87a 0x42a96e 0x4039a8 0x40376d 0x475374 0x459ed1
		#	0x475373	_/tmp/d20161129-30451-c4g23i.(*FastestTask).Execute.func1+0xb3	/tmp/d20161129-30451-c4g23i/solution.go:62
		
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x46c3d6 0x46f44d 0x46c071 0x46d0e5 0x46c705 0x401276 0x42a414 0x459ed1
		#	0x46c3d5	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f44c	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c070	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d0e4	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c704	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-c4g23i/_test/_testmain.go:78
		#	0x42a413	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a87a 0x42a96e 0x40469f 0x404365 0x4736bc 0x46c071 0x459ed1
		#	0x4736bb	_/tmp/d20161129-30451-c4g23i.TestTimedDoesntLeaveGoroutineHanging+0x2cb	/tmp/d20161129-30451-c4g23i/solution_test.go:225
		#	0x46c070	testing.tRunner+0x80							/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c1f3f 0x4c1d40 0x4be9f1 0x475e31 0x459ed1
		#	0x4c1f3e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c1d3f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4be9f0	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x475e30	_/tmp/d20161129-30451-c4g23i.TestTimedDoesntLeaveGoroutineHanging.func2+0x1a0	/tmp/d20161129-30451-c4g23i/solution_test.go:212
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-c4g23i	0.204s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.003s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.003s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.003s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/close_immediately (0.00s)
    	solution_test.go:322: Expected error did not occur instead got 0
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-c4g23i	0.048s
PASS
ok  	_/tmp/d20161129-30451-c4g23i	0.124s

История (1 версия и 1 коментар)

Емил обнови решението на 29.11.2016 13:19 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type PipelineTask struct {
+ tasks []Task
+}
+
+func (p *PipelineTask) Execute(i int) (r int, err error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+
+ for _, task := range p.tasks {
+ r, err = task.Execute(i)
+ if err != nil {
+ return
+ }
+
+ i = r
+ }
+
+ return
+}
+
+func Pipeline(tasks ...Task) Task {
+ return &PipelineTask{
+ tasks: tasks,
+ }
+}
+
+type FastestTask struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+ return &FastestTask{
+ tasks: tasks,
+ }
+}
+
+func (p *FastestTask) Execute(i int) (r int, err error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+
+ rc := make(chan int)
+ ec := make(chan error)
+ for _, task := range p.tasks {
+ go func(t Task) {
+ rf, errf := t.Execute(i)
+ if errf != nil {
+ ec <- errf
+ } else {
+ rc <- rf
+ }
+ }(task)
+ }
+
+ select {
+ case r = <-rc:
+ case err = <-ec:
+ }
+
+ return
+}
+
+type TimedTask struct {
+ timeout time.Duration
+}
+
+func (t *TimedTask) Execute(i int) (r int, err error) {
+ time.Sleep(t.timeout)
+ err = errors.New("Timeout")
+ return
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ tasks := []Task{task, &TimedTask{timeout}}
+ return Fastest(tasks...)
+}
+
+type ConcurrentMapReduceTask struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return &ConcurrentMapReduceTask{
+ tasks: tasks,
+ reduce: reduce,
+ }
+}
+
+func (t *ConcurrentMapReduceTask) Execute(i int) (r int, err error) {
+ if len(t.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+
+ rc := make(chan int)
+ ec := make(chan error)
+ results := make([]int, 0)
+
+ for _, task := range t.tasks {
+ go func(t Task) {
+ rf, errf := t.Execute(i)
+ if errf != nil {
+ ec <- errf
+ } else {
+ rc <- rf
+ }
+ }(task)
+ }
+
+ for _, _ = range t.tasks {
+ select {
+ case rt := <-rc:
+ results = append(results, rt)
+ case err = <-ec:
+ return
+ }
+ }
+
+ r = t.reduce(results)
+ return
+}
+
+type GreatestSearcherTask struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return &GreatestSearcherTask{
+ tasks: tasks,
+ errorLimit: errorLimit,
+ }
+}
+
+func (t *GreatestSearcherTask) Execute(i int) (r int, err error) {
+ rc := make(chan int)
+ ec := make(chan error)
+ first := true
+ errorCount := t.errorLimit
+ taskCount := 0
+
+ for task := range t.tasks {
+ taskCount++
+ go func(t Task) {
+ rf, errf := t.Execute(i)
+ if errf != nil {
+ ec <- errf
+ } else {
+ rc <- rf
+ }
+ }(task)
+ }
+
+ for taskCount != 0 {
+ select {
+ case rt := <-rc:
+ if first || rt > r {
+ r = rt
+ first = false
+ }
+ case _ = <-ec:
+ errorCount--
+ if errorCount < 0 {
+ err = errors.New("Error limit exceeded")
+ return
+ }
+ }
+ taskCount--
+ }
+
+ return
+}