Решение на Concurrent Tasks от Живко Йотов

Обратно към всички решения

Към профила на Живко Йотов

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"fmt"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type Task1 struct {
result int
err error
ch1, ch2 chan struct{}
}
func (task1 *Task1) Execute(arg int) (int, error) {
task1.result = arg
task1.ch1 <- struct{}{}
<-task1.ch2
return task1.result, task1.err
}
func Pipeline(tasks ...Task) Task {
myTask := Task1{result: 0, err: nil}
myTask.ch1 = make(chan struct{})
myTask.ch2 = make(chan struct{})
go func() {
<-myTask.ch1
for _, task := range tasks {
if myTask.err == nil {
myTask.result, myTask.err = task.Execute(myTask.result)
} else {
break
}
}
if len(tasks) == 0 {
myTask.err = fmt.Errorf("no tasks")
}
myTask.ch2 <- struct{}{}
}()
return &myTask
}
func Fastest(tasks ...Task) Task {
myTask := Task1{err: nil}
myTask.ch1 = make(chan struct{})
myTask.ch2 = make(chan struct{})
var countMtx sync.Mutex
go func() {
<-myTask.ch1
ch3 := make(chan int, len(tasks)+1)
ch4 := make(chan error, len(tasks)+1)
for _, task := range tasks {
go func(currentTask Task) {
curruntResult, currentError := currentTask.Execute(myTask.result)
countMtx.Lock()
ch3 <- curruntResult
ch4 <- currentError
countMtx.Unlock()
}(task)
}
if len(tasks) == 0 {
myTask.err = fmt.Errorf("no tasks")
myTask.ch2 <- struct{}{}
return
}
myTask.result = <-ch3
myTask.err = <-ch4
myTask.ch2 <- struct{}{}
}()
return &myTask
}
func Timed(task Task, timeout time.Duration) Task {
myTask := Task1{err: nil}
myTask.ch1 = make(chan struct{})
myTask.ch2 = make(chan struct{})
go func() {
<-myTask.ch1
ch3 := make(chan struct{})
go func() {
myTask.result, myTask.err = task.Execute(myTask.result)
ch3 <- struct{}{}
}()
select {
case <-ch3:
myTask.ch2 <- struct{}{}
case <-time.After(timeout):
myTask.err = fmt.Errorf("time is over")
myTask.ch2 <- struct{}{}
}
}()
return &myTask
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
myTask := Task1{err: nil}
myTask.ch1 = make(chan struct{})
myTask.ch2 = make(chan struct{})
var countMtx sync.Mutex
go func() {
<-myTask.ch1
ch3 := make(chan int, len(tasks)+1)
ch4 := make(chan error, len(tasks)+1)
ch5 := make(chan struct{})
for _, task := range tasks {
go func(currentTask Task) {
curruntResult, currentError := currentTask.Execute(myTask.result)
if currentError != nil {
myTask.err = currentError
myTask.ch2 <- struct{}{}
}
countMtx.Lock()
ch3 <- curruntResult
ch4 <- currentError
countMtx.Unlock()
ch5 <- struct{}{}
}(task)
}
for i := 0; i < len(tasks); i++ {
<-ch5
}
if len(tasks) == 0 {
myTask.err = fmt.Errorf("no tasks")
myTask.ch2 <- struct{}{}
return
}
for currentError := range ch4 {
if currentError != nil {
return
}
}
results := make([]int, 0, len(tasks))
for curruntResult := range ch3 {
results = append(results, curruntResult)
}
myTask.result = reduce(results)
myTask.ch2 <- struct{}{}
}()
return &myTask
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
myTask := Task1{err: nil}
myTask.ch1 = make(chan struct{})
myTask.ch2 = make(chan struct{})
var countMtx sync.Mutex
ch3 := make(chan int)
ch4 := make(chan error)
count := 0
go func() {
<-myTask.ch1
ch5 := make(chan struct{})
for {
task, ok := <-tasks
if ok == false {
break
}
count = count + 1
go func(currentTask Task) {
curruntResult, currentError := currentTask.Execute(myTask.result)
countMtx.Lock()
ch3 <- curruntResult
ch4 <- currentError
countMtx.Unlock()
ch5 <- struct{}{}
}(task)
}
for i := 0; i < count; i++ {
<-ch5
}
close(ch3)
close(ch4)
}()
go func() {
max := 0
firstResult := true
for {
curruntResult, ok1 := <-ch3
currentError, ok2 := <-ch4
if ok1 == false || ok2 == false {
break
}
if currentError != nil {
errorLimit = errorLimit - 1
} else if firstResult {
max = curruntResult
firstResult = false
} else if max < curruntResult {
max = curruntResult
}
}
if errorLimit < 0 {
myTask.err = fmt.Errorf("too many errors")
myTask.ch2 <- struct{}{}
return
}
if count == 0 {
myTask.err = fmt.Errorf("no tasks")
myTask.ch2 <- struct{}{}
return
}
if firstResult {
myTask.err = fmt.Errorf("only errors")
myTask.ch2 <- struct{}{}
return
}
myTask.result = max
myTask.ch2 <- struct{}{}
}()
return &myTask
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.103s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.203s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.134s
--- FAIL: TestTimedDoesntLeaveGoroutineHanging (0.20s)
	solution_test.go:204: Expected that there will be 1 more goroutines than at the start(3) after Timed one has started got 5
	solution_test.go:216: Expected that there will be as many goroutines as at the start(3) after Timed task has finished after it has timeouted but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c456 0x46f4cd 0x46c0f1 0x46d165 0x46c785 0x401276 0x42a494 0x459f51
		#	0x46c455	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4cc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0f0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d164	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c784	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-1r0olci/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c2e3f 0x4c2c40 0x4bf8f1 0x472f36 0x46c0f1 0x459f51
		#	0x4c2e3e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2c3f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bf8f0	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x472f35	_/tmp/d20161129-30451-1r0olci.TestTimedDoesntLeaveGoroutineHanging+0x175	/tmp/d20161129-30451-1r0olci/solution_test.go:191
		#	0x46c0f0	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a8fa 0x42a9ee 0x4039a8 0x40376d 0x47520f 0x459f51
		#	0x47520e	_/tmp/d20161129-30451-1r0olci.Timed.func1.1+0x8e	/tmp/d20161129-30451-1r0olci/solution.go:86
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c456 0x46f4cd 0x46c0f1 0x46d165 0x46c785 0x401276 0x42a494 0x459f51
		#	0x46c455	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4cc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0f0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d164	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c784	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-1r0olci/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x47308c 0x46c0f1 0x459f51
		#	0x47308b	_/tmp/d20161129-30451-1r0olci.TestTimedDoesntLeaveGoroutineHanging+0x2cb	/tmp/d20161129-30451-1r0olci/solution_test.go:225
		#	0x46c0f0	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c2e3f 0x4c2c40 0x4bf8f1 0x476a51 0x459f51
		#	0x4c2e3e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2c3f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bf8f0	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x476a50	_/tmp/d20161129-30451-1r0olci.TestTimedDoesntLeaveGoroutineHanging.func2+0x1a0	/tmp/d20161129-30451-1r0olci/solution_test.go:212
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1r0olci	0.204s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.003s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f2540, 0xc4200a8010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x52045a, 0x1d, 0x52b560, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52b648, 0x5ab880, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-1r0olci/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-1r0olci.(*Task1).Execute(0xc42000a4b0, 0x2c, 0x3, 0x3, 0x5982c0)
	/tmp/d20161129-30451-1r0olci/solution.go:22 +0x78
_/tmp/d20161129-30451-1r0olci.TestConcurrentMapReduceSimple(0xc420070180)
	/tmp/d20161129-30451-1r0olci/solution_test.go:250 +0x18f
testing.tRunner(0xc420070180, 0x52b560)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan receive]:
_/tmp/d20161129-30451-1r0olci.ConcurrentMapReduce.func1(0xc42000a4b0, 0xc42000a480, 0x3, 0x3, 0xc420010650, 0x52b620)
	/tmp/d20161129-30451-1r0olci/solution.go:131 +0x217
created by _/tmp/d20161129-30451-1r0olci.ConcurrentMapReduce
	/tmp/d20161129-30451-1r0olci/solution.go:142 +0x13f
exit status 2
FAIL	_/tmp/d20161129-30451-1r0olci	1.005s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.049s
PASS
ok  	_/tmp/d20161129-30451-1r0olci	0.048s
--- FAIL: TestThemAll (0.22s)
	solution_test.go:378: Received an unexpected error time is over
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1r0olci	0.223s

История (1 версия и 0 коментара)

Живко обнови решението на 29.11.2016 16:15 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Task1 struct {
+ result int
+ err error
+ ch1, ch2 chan struct{}
+}
+
+func (task1 *Task1) Execute(arg int) (int, error) {
+ task1.result = arg
+ task1.ch1 <- struct{}{}
+ <-task1.ch2
+ return task1.result, task1.err
+}
+
+func Pipeline(tasks ...Task) Task {
+ myTask := Task1{result: 0, err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ go func() {
+ <-myTask.ch1
+ for _, task := range tasks {
+ if myTask.err == nil {
+ myTask.result, myTask.err = task.Execute(myTask.result)
+ } else {
+ break
+ }
+ }
+ if len(tasks) == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ }
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}
+
+func Fastest(tasks ...Task) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ var countMtx sync.Mutex
+ go func() {
+ <-myTask.ch1
+ ch3 := make(chan int, len(tasks)+1)
+ ch4 := make(chan error, len(tasks)+1)
+ for _, task := range tasks {
+ go func(currentTask Task) {
+ curruntResult, currentError := currentTask.Execute(myTask.result)
+ countMtx.Lock()
+ ch3 <- curruntResult
+ ch4 <- currentError
+ countMtx.Unlock()
+ }(task)
+ }
+ if len(tasks) == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ myTask.result = <-ch3
+ myTask.err = <-ch4
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ go func() {
+ <-myTask.ch1
+ ch3 := make(chan struct{})
+ go func() {
+ myTask.result, myTask.err = task.Execute(myTask.result)
+ ch3 <- struct{}{}
+ }()
+ select {
+ case <-ch3:
+ myTask.ch2 <- struct{}{}
+ case <-time.After(timeout):
+ myTask.err = fmt.Errorf("time is over")
+ myTask.ch2 <- struct{}{}
+ }
+ }()
+ return &myTask
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ var countMtx sync.Mutex
+ go func() {
+ <-myTask.ch1
+ ch3 := make(chan int, len(tasks)+1)
+ ch4 := make(chan error, len(tasks)+1)
+ ch5 := make(chan struct{})
+ for _, task := range tasks {
+ go func(currentTask Task) {
+ curruntResult, currentError := currentTask.Execute(myTask.result)
+ if currentError != nil {
+ myTask.err = currentError
+ myTask.ch2 <- struct{}{}
+ }
+ countMtx.Lock()
+ ch3 <- curruntResult
+ ch4 <- currentError
+ countMtx.Unlock()
+ ch5 <- struct{}{}
+ }(task)
+ }
+ for i := 0; i < len(tasks); i++ {
+ <-ch5
+ }
+ if len(tasks) == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ for currentError := range ch4 {
+ if currentError != nil {
+ return
+ }
+ }
+ results := make([]int, 0, len(tasks))
+ for curruntResult := range ch3 {
+ results = append(results, curruntResult)
+ }
+ myTask.result = reduce(results)
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ myTask := Task1{err: nil}
+ myTask.ch1 = make(chan struct{})
+ myTask.ch2 = make(chan struct{})
+ var countMtx sync.Mutex
+ ch3 := make(chan int)
+ ch4 := make(chan error)
+ count := 0
+ go func() {
+ <-myTask.ch1
+ ch5 := make(chan struct{})
+ for {
+ task, ok := <-tasks
+ if ok == false {
+ break
+ }
+ count = count + 1
+ go func(currentTask Task) {
+ curruntResult, currentError := currentTask.Execute(myTask.result)
+ countMtx.Lock()
+ ch3 <- curruntResult
+ ch4 <- currentError
+ countMtx.Unlock()
+ ch5 <- struct{}{}
+ }(task)
+ }
+ for i := 0; i < count; i++ {
+ <-ch5
+ }
+ close(ch3)
+ close(ch4)
+ }()
+ go func() {
+ max := 0
+ firstResult := true
+ for {
+ curruntResult, ok1 := <-ch3
+ currentError, ok2 := <-ch4
+ if ok1 == false || ok2 == false {
+ break
+ }
+ if currentError != nil {
+ errorLimit = errorLimit - 1
+ } else if firstResult {
+ max = curruntResult
+ firstResult = false
+ } else if max < curruntResult {
+ max = curruntResult
+ }
+ }
+ if errorLimit < 0 {
+ myTask.err = fmt.Errorf("too many errors")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ if count == 0 {
+ myTask.err = fmt.Errorf("no tasks")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ if firstResult {
+ myTask.err = fmt.Errorf("only errors")
+ myTask.ch2 <- struct{}{}
+ return
+ }
+ myTask.result = max
+ myTask.ch2 <- struct{}{}
+ }()
+ return &myTask
+}