Решение на Concurrent Tasks от Никола Юруков

Обратно към всички решения

Към профила на Никола Юруков

Резултати

  • 12 точки от тестове
  • 0 бонус точки
  • 12 точки общо
  • 12 успешни тест(а)
  • 1 неуспешни тест(а)

Код

package main
import (
"fmt"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type res struct {
r int
err error
}
type responseTask struct {
exec func(int) (int, error)
}
func (rt responseTask) Execute(arg int) (int, error) {
return rt.exec(arg)
}
func runner(task Task, arg int, c chan<- res) {
r, e := task.Execute(arg)
ret := new(res)
ret.r = r
ret.err = e
c <- *ret
return
}
func runWrapper(task Task, arg int, c chan res, selfDestruct <-chan struct{}) {
go runner(task, arg, c)
select {
case result := <-c:
c <- result
return
case <-selfDestruct:
return
}
}
func Pipeline(tasks ...Task) Task {
response := new(responseTask)
response.exec = func(arg int) (int, error) {
if len(tasks) == 0 {
return 0, fmt.Errorf("Zero tasks passed to Pipeline.")
}
execResult, err := tasks[0].Execute(arg)
if err != nil {
return 0, fmt.Errorf("There was an error %v in the first task.", err)
}
for i, t := range tasks {
if i == 0 {
continue
}
execResult, err = t.Execute(execResult)
if err != nil {
return 0, err
}
}
return execResult, nil
}
return response
}
func Fastest(tasks ...Task) Task {
response := new(responseTask)
response.exec = func(arg int) (int, error) {
if len(tasks) == 0 {
return 0, fmt.Errorf("No tasks!")
}
agg := make(chan res, 1) // "Who is first" issue
destroy := make(chan struct{}, len(tasks))
for _, t := range tasks {
go runWrapper(t, arg, agg, destroy)
}
result := <-agg
for i := 0; i < len(tasks); i++ {
destroy <- struct{}{}
}
return result.r, result.err
}
return response
}
func Timed(task Task, timeout time.Duration) Task {
response := new(responseTask)
response.exec = func(arg int) (int, error) {
if task == nil {
return 0, fmt.Errorf("Nil task given!")
}
c := make(chan res, 1)
go runner(task, arg, c)
select {
case r := <-c:
return r.r, r.err
case <-time.After(timeout):
return 0, fmt.Errorf("Task timed out!")
}
}
return response
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
response := new(responseTask)
response.exec = func(arg int) (int, error) {
if len(tasks) == 0 {
return 0, fmt.Errorf("No tasks!")
}
agg := make(chan res, len(tasks))
destroy := make(chan struct{}, len(tasks))
results := make([]int, 0, len(tasks))
for _, t := range tasks {
go runWrapper(t, arg, agg, destroy)
}
for i := 0; i < len(tasks); i++ {
msg := <-agg
if msg.err != nil {
for j := 0; j < len(tasks); j++ {
destroy <- struct{}{}
}
return 0, msg.err
}
results = append(results, msg.r)
}
return reduce(results), nil
}
return response
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
response := new(responseTask)
response.exec = func(arg int) (int, error) {
agg := make(chan res)
destroy := make(chan struct{})
count := 0
for t := range tasks {
go runWrapper(t, arg, agg, destroy)
count++
}
results := make([]int, 0, 20)
for count > 0 {
r := <-agg
count--
if r.err != nil {
errorLimit--
if errorLimit == -1 {
for count > 0 {
destroy <- struct{}{}
count--
}
return 0, fmt.Errorf("Error limit exceeded!")
}
}
results = append(results, r.r)
}
// Get max of results
max := results[0]
for i := 0; i < len(results); i++ {
if results[i] > max {
max = results[i]
}
}
return max, nil
}
return response
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.003s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.003s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.003s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.103s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.203s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.134s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.203s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.003s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.003s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.003s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.048s
panic: runtime error: index out of range [recovered]
	panic: runtime error: index out of range

goroutine 28 [running]:
panic(0x4fb8a0, 0xc4200100f0)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc42009a0c0)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4fb8a0, 0xc4200100f0)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161129-30451-ncjsa3.GreatestSearcher.func1(0xa, 0x20, 0x504ac0, 0x1)
	/tmp/d20161129-30451-ncjsa3/solution.go:189 +0x3f9
_/tmp/d20161129-30451-ncjsa3.responseTask.Execute(0xc4200980e0, 0xa, 0xc4200980e0, 0xc4200aa000, 0xc4200980e0)
	/tmp/d20161129-30451-ncjsa3/solution.go:22 +0x30
_/tmp/d20161129-30451-ncjsa3.(*responseTask).Execute(0xc4200aa000, 0xa, 0x5972c0, 0xc4200aa000, 0x583d9194)
	<autogenerated>:1 +0x5c
_/tmp/d20161129-30451-ncjsa3.TestGreatestSearcherErrors.func2(0xc42009a0c0)
	/tmp/d20161129-30451-ncjsa3/solution_test.go:321 +0x81
testing.tRunner(0xc42009a0c0, 0x52ac78)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161129-30451-ncjsa3	0.050s
PASS
ok  	_/tmp/d20161129-30451-ncjsa3	0.123s

История (1 версия и 0 коментара)

Никола обнови решението на 29.11.2016 15:31 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type res struct {
+ r int
+ err error
+}
+
+type responseTask struct {
+ exec func(int) (int, error)
+}
+
+func (rt responseTask) Execute(arg int) (int, error) {
+ return rt.exec(arg)
+}
+
+func runner(task Task, arg int, c chan<- res) {
+ r, e := task.Execute(arg)
+ ret := new(res)
+ ret.r = r
+ ret.err = e
+ c <- *ret
+ return
+}
+
+func runWrapper(task Task, arg int, c chan res, selfDestruct <-chan struct{}) {
+ go runner(task, arg, c)
+ select {
+ case result := <-c:
+ c <- result
+ return
+ case <-selfDestruct:
+ return
+ }
+}
+
+func Pipeline(tasks ...Task) Task {
+
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if len(tasks) == 0 {
+ return 0, fmt.Errorf("Zero tasks passed to Pipeline.")
+ }
+
+ execResult, err := tasks[0].Execute(arg)
+ if err != nil {
+ return 0, fmt.Errorf("There was an error %v in the first task.", err)
+ }
+
+ for i, t := range tasks {
+ if i == 0 {
+ continue
+ }
+ execResult, err = t.Execute(execResult)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return execResult, nil
+ }
+
+ return response
+}
+
+func Fastest(tasks ...Task) Task {
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if len(tasks) == 0 {
+ return 0, fmt.Errorf("No tasks!")
+ }
+
+ agg := make(chan res, 1) // "Who is first" issue
+ destroy := make(chan struct{}, len(tasks))
+
+ for _, t := range tasks {
+ go runWrapper(t, arg, agg, destroy)
+ }
+
+ result := <-agg
+ for i := 0; i < len(tasks); i++ {
+ destroy <- struct{}{}
+ }
+ return result.r, result.err
+ }
+
+ return response
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if task == nil {
+ return 0, fmt.Errorf("Nil task given!")
+ }
+
+ c := make(chan res, 1)
+ go runner(task, arg, c)
+ select {
+ case r := <-c:
+ return r.r, r.err
+ case <-time.After(timeout):
+ return 0, fmt.Errorf("Task timed out!")
+ }
+ }
+
+ return response
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+
+ if len(tasks) == 0 {
+ return 0, fmt.Errorf("No tasks!")
+ }
+
+ agg := make(chan res, len(tasks))
+ destroy := make(chan struct{}, len(tasks))
+
+ results := make([]int, 0, len(tasks))
+
+ for _, t := range tasks {
+ go runWrapper(t, arg, agg, destroy)
+ }
+
+ for i := 0; i < len(tasks); i++ {
+ msg := <-agg
+ if msg.err != nil {
+ for j := 0; j < len(tasks); j++ {
+ destroy <- struct{}{}
+ }
+ return 0, msg.err
+ }
+ results = append(results, msg.r)
+ }
+ return reduce(results), nil
+ }
+
+ return response
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ response := new(responseTask)
+
+ response.exec = func(arg int) (int, error) {
+ agg := make(chan res)
+ destroy := make(chan struct{})
+ count := 0
+
+ for t := range tasks {
+ go runWrapper(t, arg, agg, destroy)
+ count++
+ }
+
+ results := make([]int, 0, 20)
+ for count > 0 {
+ r := <-agg
+ count--
+ if r.err != nil {
+ errorLimit--
+ if errorLimit == -1 {
+ for count > 0 {
+ destroy <- struct{}{}
+ count--
+ }
+ return 0, fmt.Errorf("Error limit exceeded!")
+ }
+ }
+ results = append(results, r.r)
+ }
+
+ // Get max of results
+ max := results[0]
+ for i := 0; i < len(results); i++ {
+ if results[i] > max {
+ max = results[i]
+ }
+ }
+ return max, nil
+ }
+
+ return response
+}