Решение на Concurrent Tasks от Стоян Ефтимов

Обратно към всички решения

Към профила на Стоян Ефтимов

Резултати

  • 12 точки от тестове
  • 0 бонус точки
  • 12 точки общо
  • 12 успешни тест(а)
  • 1 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
// type adder struct {
// augend int
// }
//
// func (a adder) Execute(addend int) (int, error) {
// result := a.augend + addend
// if result > 127 {
// return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
// }
// return result, nil
// }
type Pipe struct {
tasks []Task
}
func (p Pipe) Execute(startValue int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
value := startValue
for _, task := range p.tasks {
if newValue, err := task.Execute(value); err != nil {
return 0, err
} else {
value = newValue
}
}
return value, nil
}
func Pipeline(tasks ...Task) Task {
return Pipe{tasks}
}
type FastExecutor struct {
tasks []Task
}
func (f FastExecutor) Execute(value int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
var res int
var err error
ch := make(chan Result, 1)
defer close(ch)
quit := make(chan struct{}, 1)
quit <- struct{}{}
for _, task := range f.tasks {
go func(task Task) {
res, err = task.Execute(value)
select {
case <-quit:
ch <- Result{res, err}
default:
}
}(task)
}
r := <-ch
return r.result, r.err
}
func Fastest(tasks ...Task) Task {
return FastExecutor{tasks}
}
// type lazyAdder struct {
// adder
// delay time.Duration
// }
//
// func (la lazyAdder) Execute(addend int) (int, error) {
// time.Sleep(la.delay * time.Millisecond)
// return la.adder.Execute(addend)
// }
type TimedExecutor struct {
task Task
timeout time.Duration
}
type Result struct {
result int
err error
}
func goExecute(task Task, value int) chan Result {
ch := make(chan Result, 1)
go func() {
defer close(ch)
res, err := task.Execute(value)
ch <- Result{res, err}
}()
return ch
}
func (te TimedExecutor) Execute(value int) (int, error) {
ch := make(chan Result, 1)
go func() {
defer close(ch)
res, err := te.task.Execute(value)
ch <- Result{res, err}
}()
select {
case s := <-ch:
return s.result, s.err
case <-time.After(te.timeout):
return 0, errors.New("Execution timed out.")
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimedExecutor{task, timeout}
}
type ConcurrentMapReduceExecutor struct {
reduce func([]int) int
tasks []Task
}
func (ce ConcurrentMapReduceExecutor) Execute(value int) (int, error) {
if len(ce.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
ch := make(chan Result)
defer close(ch)
for _, task := range ce.tasks {
go func(task Task) {
res, err := task.Execute(value)
ch <- Result{res, err}
}(task)
}
results := make([]int, 0)
for {
select {
case r := <-ch:
if r.err != nil {
return 0, r.err
}
results = append(results, r.result)
if len(results) == len(ce.tasks) {
return ce.reduce(results), nil
}
}
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return ConcurrentMapReduceExecutor{reduce, tasks}
}
type GreatestSearcherExecutor struct {
errorLimit int
tasks <-chan Task
}
func max(nums []int) int {
m := nums[0]
for _, num := range nums {
if m < num {
m = num
}
}
return m
}
func (ge GreatestSearcherExecutor) Execute(value int) (int, error) {
var mutex sync.Mutex
results := make([]int, 0)
errorsNum := 0
var wg sync.WaitGroup
for task := range ge.tasks {
wg.Add(1)
go func(task Task) {
defer wg.Done()
res, err := task.Execute(value)
mutex.Lock()
if err != nil {
errorsNum++
} else {
results = append(results, res)
}
mutex.Unlock()
}(task)
}
wg.Wait()
if len(results) == 0 && errorsNum == 0 {
return 0, errors.New("No tasks were given")
}
if len(results) == 0 {
return 0, errors.New("No task succeeded")
}
if errorsNum > ge.errorLimit {
return 0, errors.New("Error limit is passed")
} else {
return max(results), nil
}
return max(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return GreatestSearcherExecutor{errorLimit, tasks}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.002s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.003s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.003s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.103s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.203s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.134s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.203s
panic: send on closed channel

goroutine 7 [running]:
panic(0x4fc960, 0xc42008e000)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
_/tmp/d20161129-30451-1iyl8ly.ConcurrentMapReduceExecutor.Execute.func1(0x1, 0xc420054360, 0x5996c0, 0xc420010558)
	/tmp/d20161129-30451-1iyl8ly/solution.go:143 +0x93
created by _/tmp/d20161129-30451-1iyl8ly.ConcurrentMapReduceExecutor.Execute
	/tmp/d20161129-30451-1iyl8ly/solution.go:144 +0x10d
exit status 2
FAIL	_/tmp/d20161129-30451-1iyl8ly	0.006s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.003s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.003s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.048s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.057s
PASS
ok  	_/tmp/d20161129-30451-1iyl8ly	0.123s

История (3 версии и 6 коментара)

Стоян обнови решението на 27.11.2016 13:13 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+// type adder struct {
+// augend int
+// }
+//
+// func (a adder) Execute(addend int) (int, error) {
+// result := a.augend + addend
+// if result > 127 {
+// return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
+// }
+// return result, nil
+// }
+
+type Pipe struct {
+ tasks []Task
+}
+
+func (p Pipe) Execute(startValue int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("There are no tasks to be executed")
+ }
+ value := startValue
+ for _, task := range p.tasks {
+ if newValue, err := task.Execute(value); err != nil {
+ return 0, err
+ } else {
+ value = newValue
+ }
+ }
+ return value, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ return Pipe{tasks}
+}
+
+type FastExecutor struct {
+ tasks []Task
+}
+
+func (f FastExecutor) Execute(value int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, errors.New("There are no tasks to be executed")
+ }
+ var res int
+ var err error
+ quit := make(chan Result, 1)
+ defer close(quit)
+ for _, task := range f.tasks {
+ go func(task Task) {
+ res, err = task.Execute(value)
+ quit <- Result{res, err}
+ }(task)
+ }
+ r := <-quit
+ return r.result, r.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return FastExecutor{tasks}
+}
+
+// type lazyAdder struct {
+// adder
+// delay time.Duration
+// }
+//
+// func (la lazyAdder) Execute(addend int) (int, error) {
+// time.Sleep(la.delay * time.Millisecond)
+// return la.adder.Execute(addend)
+// }
+
+type TimedExecutor struct {
+ task Task
+ timeout time.Duration
+}
+
+type Result struct {
+ result int
+ err error
+}
+
+func (te TimedExecutor) Execute(value int) (int, error) {
+ ch := make(chan Result)
+ go func() {
+ res, err := te.task.Execute(value)
+ ch <- Result{res, err}
+ }()
+ select {
+ case s := <-ch:
+ return s.result, s.err
+ case <-time.After(te.timeout):
+ return 0, errors.New("Execution timed out.")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return TimedExecutor{task, timeout}
+}
+
+type ConcurrentMapReduceExecutor struct {
+ reduce func([]int) int
+ tasks []Task
+}
+
+func (ce ConcurrentMapReduceExecutor) Execute(value int) (int, error) {
+ if len(ce.tasks) == 0 {
+ return 0, errors.New("There are no tasks to be executed")
+ }
+ ch := make(chan Result)
+ defer close(ch)
+ for _, task := range ce.tasks {
+ go func(task Task) {
+ res, err := task.Execute(value)
+ ch <- Result{res, err}
+ }(task)
+ }
+ results := make([]int, 0)
+ for {
+ select {
+ case r := <-ch:
+ if r.err != nil {
+ return 0, r.err
+ }
+ results = append(results, r.result)
+ if len(results) == len(ce.tasks) {
+ return ce.reduce(results), nil
+ }
+ }
+ }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return ConcurrentMapReduceExecutor{reduce, tasks}
+}
+
+type GreatestSearcherExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func max(nums []int) int {
+ m := nums[0]
+ for _, num := range nums {
+ if m < num {
+ m = num
+ }
+ }
+ return m
+}
+
+func (ge GreatestSearcherExecutor) Execute(value int) (int, error) {
+ var mutex sync.Mutex
+ results := make([]int, 0)
+ errorsNum := 0
+ var wg sync.WaitGroup
+ for task := range ge.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ res, err := task.Execute(value)
+ mutex.Lock()
+ if err != nil {
+ errorsNum++
+ } else {
+ results = append(results, res)
+ }
+ mutex.Unlock()
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ if len(results) == 0 && errorsNum == 0 {
+ return 0, errors.New("No tasks were given")
+ }
+
+ if errorsNum > ge.errorLimit {
+ return 0, errors.New("Error limit is passed")
+ } else {
+ return max(results), nil
+ }
+ return max(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return GreatestSearcherExecutor{errorLimit, tasks}
+}

Стоян обнови решението на 28.11.2016 19:54 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
// type adder struct {
// augend int
// }
//
// func (a adder) Execute(addend int) (int, error) {
// result := a.augend + addend
// if result > 127 {
// return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
// }
// return result, nil
// }
type Pipe struct {
tasks []Task
}
func (p Pipe) Execute(startValue int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
value := startValue
for _, task := range p.tasks {
if newValue, err := task.Execute(value); err != nil {
return 0, err
} else {
value = newValue
}
}
return value, nil
}
func Pipeline(tasks ...Task) Task {
return Pipe{tasks}
}
type FastExecutor struct {
tasks []Task
}
func (f FastExecutor) Execute(value int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
var res int
var err error
- quit := make(chan Result, 1)
- defer close(quit)
+ ch := make(chan Result, 1)
+ defer close(ch)
+ quit := make(chan struct{}, 1)
+ quit <- struct{}{}
for _, task := range f.tasks {
go func(task Task) {
res, err = task.Execute(value)
- quit <- Result{res, err}
+ select {
+ case <-quit:
+ ch <- Result{res, err}
+ default:
+ }
}(task)
}
- r := <-quit
+ r := <-ch
return r.result, r.err
}
func Fastest(tasks ...Task) Task {
return FastExecutor{tasks}
}
// type lazyAdder struct {
// adder
// delay time.Duration
// }
//
// func (la lazyAdder) Execute(addend int) (int, error) {
// time.Sleep(la.delay * time.Millisecond)
// return la.adder.Execute(addend)
// }
type TimedExecutor struct {
task Task
timeout time.Duration
}
type Result struct {
result int
err error
}
+func goExecute(task Task, value int) chan Result {
+ ch := make(chan Result, 1)
+ go func() {
+ defer close(ch)
+ res, err := task.Execute(value)
+ ch <- Result{res, err}
+ }()
+ return ch
+}
+
func (te TimedExecutor) Execute(value int) (int, error) {
- ch := make(chan Result)
+ ch := make(chan Result, 1)
go func() {
+ defer close(ch)
res, err := te.task.Execute(value)
ch <- Result{res, err}
}()
select {
case s := <-ch:
return s.result, s.err
case <-time.After(te.timeout):
return 0, errors.New("Execution timed out.")
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimedExecutor{task, timeout}
}
type ConcurrentMapReduceExecutor struct {
reduce func([]int) int
tasks []Task
}
func (ce ConcurrentMapReduceExecutor) Execute(value int) (int, error) {
if len(ce.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
ch := make(chan Result)
defer close(ch)
for _, task := range ce.tasks {
go func(task Task) {
res, err := task.Execute(value)
ch <- Result{res, err}
}(task)
}
results := make([]int, 0)
for {
select {
case r := <-ch:
if r.err != nil {
return 0, r.err
}
results = append(results, r.result)
if len(results) == len(ce.tasks) {
return ce.reduce(results), nil
}
}
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return ConcurrentMapReduceExecutor{reduce, tasks}
}
type GreatestSearcherExecutor struct {
errorLimit int
tasks <-chan Task
}
func max(nums []int) int {
m := nums[0]
for _, num := range nums {
if m < num {
m = num
}
}
return m
}
func (ge GreatestSearcherExecutor) Execute(value int) (int, error) {
var mutex sync.Mutex
results := make([]int, 0)
errorsNum := 0
var wg sync.WaitGroup
for task := range ge.tasks {
wg.Add(1)
go func(task Task) {
+ defer wg.Done()
res, err := task.Execute(value)
mutex.Lock()
if err != nil {
errorsNum++
} else {
results = append(results, res)
}
mutex.Unlock()
- wg.Done()
}(task)
}
wg.Wait()
if len(results) == 0 && errorsNum == 0 {
return 0, errors.New("No tasks were given")
}
if errorsNum > ge.errorLimit {
return 0, errors.New("Error limit is passed")
} else {
return max(results), nil
}
return max(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return GreatestSearcherExecutor{errorLimit, tasks}
}

Стоян обнови решението на 29.11.2016 16:25 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
// type adder struct {
// augend int
// }
//
// func (a adder) Execute(addend int) (int, error) {
// result := a.augend + addend
// if result > 127 {
// return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
// }
// return result, nil
// }
type Pipe struct {
tasks []Task
}
func (p Pipe) Execute(startValue int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
value := startValue
for _, task := range p.tasks {
if newValue, err := task.Execute(value); err != nil {
return 0, err
} else {
value = newValue
}
}
return value, nil
}
func Pipeline(tasks ...Task) Task {
return Pipe{tasks}
}
type FastExecutor struct {
tasks []Task
}
func (f FastExecutor) Execute(value int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
var res int
var err error
ch := make(chan Result, 1)
defer close(ch)
quit := make(chan struct{}, 1)
quit <- struct{}{}
for _, task := range f.tasks {
go func(task Task) {
res, err = task.Execute(value)
select {
case <-quit:
ch <- Result{res, err}
default:
}
}(task)
}
r := <-ch
return r.result, r.err
}
func Fastest(tasks ...Task) Task {
return FastExecutor{tasks}
}
// type lazyAdder struct {
// adder
// delay time.Duration
// }
//
// func (la lazyAdder) Execute(addend int) (int, error) {
// time.Sleep(la.delay * time.Millisecond)
// return la.adder.Execute(addend)
// }
type TimedExecutor struct {
task Task
timeout time.Duration
}
type Result struct {
result int
err error
}
func goExecute(task Task, value int) chan Result {
ch := make(chan Result, 1)
go func() {
defer close(ch)
res, err := task.Execute(value)
ch <- Result{res, err}
}()
return ch
}
func (te TimedExecutor) Execute(value int) (int, error) {
ch := make(chan Result, 1)
go func() {
defer close(ch)
res, err := te.task.Execute(value)
ch <- Result{res, err}
}()
select {
case s := <-ch:
return s.result, s.err
case <-time.After(te.timeout):
return 0, errors.New("Execution timed out.")
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimedExecutor{task, timeout}
}
type ConcurrentMapReduceExecutor struct {
reduce func([]int) int
tasks []Task
}
func (ce ConcurrentMapReduceExecutor) Execute(value int) (int, error) {
if len(ce.tasks) == 0 {
return 0, errors.New("There are no tasks to be executed")
}
ch := make(chan Result)
defer close(ch)
for _, task := range ce.tasks {
go func(task Task) {
res, err := task.Execute(value)
ch <- Result{res, err}
}(task)
}
results := make([]int, 0)
for {
select {
case r := <-ch:
if r.err != nil {
return 0, r.err
}
results = append(results, r.result)
if len(results) == len(ce.tasks) {
return ce.reduce(results), nil
}
}
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return ConcurrentMapReduceExecutor{reduce, tasks}
}
type GreatestSearcherExecutor struct {
errorLimit int
tasks <-chan Task
}
func max(nums []int) int {
m := nums[0]
for _, num := range nums {
if m < num {
m = num
}
}
return m
}
func (ge GreatestSearcherExecutor) Execute(value int) (int, error) {
var mutex sync.Mutex
results := make([]int, 0)
errorsNum := 0
var wg sync.WaitGroup
for task := range ge.tasks {
wg.Add(1)
go func(task Task) {
defer wg.Done()
res, err := task.Execute(value)
mutex.Lock()
if err != nil {
errorsNum++
} else {
results = append(results, res)
}
mutex.Unlock()
}(task)
}
wg.Wait()
if len(results) == 0 && errorsNum == 0 {
return 0, errors.New("No tasks were given")
}
+
+ if len(results) == 0 {
+ return 0, errors.New("No task succeeded")
+ }
if errorsNum > ge.errorLimit {
return 0, errors.New("Error limit is passed")
} else {
return max(results), nil
}
return max(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return GreatestSearcherExecutor{errorLimit, tasks}
}