Решение на Concurrent Tasks от Добромир Иванов

Обратно към всички решения

Към профила на Добромир Иванов

Резултати

  • 8 точки от тестове
  • 0 бонус точки
  • 8 точки общо
  • 8 успешни тест(а)
  • 5 неуспешни тест(а)

Код

package main
import "sync"
import "time"
import "fmt"
type Task interface {
Execute(int) (int, error)
}
type pipelinedTasks struct {
tasks []Task
}
type fastestTask struct {
once sync.Once
start sync.WaitGroup
tasks []Task
}
type timedTask struct {
task Task
duration time.Duration
done bool
}
type mapReduce struct {
err bool
tasks []Task
reduce func([]int) int
lock sync.Mutex
}
type faultTollerantTaskExecutor struct {
errorLimit int
tasks <-chan Task
running sync.WaitGroup
}
func Pipeline(tasks ...Task) Task {
return pipelinedTasks{tasks}
}
func (pipeline pipelinedTasks) Execute(firstArg int) (int, error) {
result := firstArg
var err error
for i := 0; i < len(pipeline.tasks); i++ {
result, err = pipeline.tasks[i].Execute(result)
if err != nil {
return 0, err
}
}
return result, nil
}
func Fastest(tasks ...Task) Task {
runner := &fastestTask{}
runner.tasks = tasks
return runner
}
func (fastest *fastestTask) Execute(arg int) (int, error) {
if len(fastest.tasks) == 0 {
return 0, fmt.Errorf("No tasks given to Fastest")
}
resChan := make(chan struct {
res int
err error
})
fastest.start.Add(len(fastest.tasks))
for _, task := range fastest.tasks {
go func(t Task, arg int, resChan chan struct {
res int
err error
}) {
fastest.start.Wait()
result, err := t.Execute(arg)
fastest.once.Do(func() {
resChan <- struct {
res int
err error
}{result, err}
})
}(task, arg, resChan)
fastest.start.Done()
}
res := <-resChan
return res.res, res.err
}
func Timed(task Task, timeout time.Duration) Task {
timed := &timedTask{}
timed.task = task
timed.duration = timeout
timed.done = false
return timed
}
func (timeLimited *timedTask) Execute(arg int) (int, error) {
start := time.Now()
resultChan := make(chan struct {
res int
err error
})
go func(task Task, resultChan chan struct {
res int
err error
}) {
result, err := task.Execute(arg)
if timeLimited.done == false {
timeLimited.done = true
resultChan <- struct {
res int
err error
}{result, err}
}
}(timeLimited.task, resultChan)
for {
select {
case res, _ := <-resultChan:
return res.res, res.err
default:
duration := time.Since(start)
if duration > timeLimited.duration {
if timeLimited.done {
res := <-resultChan
return res.res, res.err
}
timeLimited.done = true
return 0, fmt.Errorf("")
}
}
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
mr := &mapReduce{}
mr.err = false
mr.reduce = reduce
mr.tasks = tasks
return mr
}
func (mr *mapReduce) Execute(arg int) (int, error) {
if len(mr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given!")
}
results := make([]int, 0)
resultChan := make(chan int)
errorChan := make(chan error)
for _, task := range mr.tasks {
go func(task Task, resultChan chan int, errorChan chan error, mr *mapReduce) {
res, err := task.Execute(arg)
mr.lock.Lock()
defer func() { mr.lock.Unlock() }()
if mr.err {
return
}
if err != nil {
errorChan <- err
mr.err = true
} else {
resultChan <- res
}
}(task, resultChan, errorChan, mr)
}
done := false
for !done {
select {
case result, _ := <-resultChan:
results = append(results, result)
if len(results) == len(mr.tasks) {
done = true
break
}
case err := <-errorChan:
return 0, err
}
}
return mr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
executor := &faultTollerantTaskExecutor{}
executor.errorLimit = errorLimit
executor.tasks = tasks
return executor
}
func (executor *faultTollerantTaskExecutor) Execute(arg int) (int, error) {
resultChan := make(chan int)
errorChan := make(chan error)
tasksRun := 0
for task := range executor.tasks {
tasksRun++
executor.running.Add(1)
go func(task Task, resultChan chan int, errorChan chan error) {
defer func() { executor.running.Done() }()
res, err := task.Execute(arg)
if err != nil {
errorChan <- err
} else {
resultChan <- res
}
}(task, resultChan, errorChan)
}
if tasksRun == 0 {
return 0, fmt.Errorf("No tasks given!")
}
result, errors := 0, 0
go func() {
for tasksRun > 0 {
select {
case res, _ := <-resultChan:
if res > result {
result = res
}
tasksRun--
case <-errorChan:
tasksRun--
errors++
}
}
}()
executor.running.Wait()
if errors >= executor.errorLimit {
return 0, fmt.Errorf("More tasks errored out than allowed limit!")
}
return result, nil
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.003s
--- FAIL: TestPipelineErrors (0.00s)
	solution_test.go:63: Expected error did not occur instead got 1
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-19jxo87	0.003s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.003s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.103s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.203s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.140s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.204s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.003s
PASS
ok  	_/tmp/d20161129-30451-19jxo87	0.003s
--- FAIL: TestGreatestSearcherSimple (0.00s)
	solution_test.go:287: Received an unexpected error More tasks errored out than allowed limit!
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-19jxo87	0.003s
--- FAIL: TestGreatestSearcherComplex (0.05s)
	solution_test.go:300: Received an unexpected error More tasks errored out than allowed limit!
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-19jxo87	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-19jxo87	0.048s
--- FAIL: TestThemAll (0.12s)
	solution_test.go:378: Received an unexpected error More tasks errored out than allowed limit!
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-19jxo87	0.123s

История (3 версии и 1 коментар)

Добромир обнови решението на 28.11.2016 00:02 (преди над 1 година)

+package main
+
+import "sync"
+import "time"
+import "fmt"
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelinedTasks struct {
+ tasks []Task
+}
+
+type fastestTask struct {
+ once sync.Once
+ start sync.WaitGroup
+ tasks []Task
+}
+
+type timedTask struct {
+ task Task
+ duration time.Duration
+}
+
+type mapReduce struct {
+ err bool
+ tasks []Task
+ reduce func([]int) int
+ lock sync.Mutex
+}
+
+type faultTollerantTaskExecutor struct {
+ errorLimit int
+ tasks <-chan Task
+ running sync.WaitGroup
+}
+
+func Pipeline(tasks ...Task) Task {
+ return pipelinedTasks{tasks}
+}
+
+func (pipeline pipelinedTasks) Execute(firstArg int) (int, error) {
+ result := firstArg
+ var err error
+ for i := 0; i < len(pipeline.tasks); i++ {
+ result, err = pipeline.tasks[i].Execute(result)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return result, nil
+}
+
+func Fastest(tasks ...Task) Task {
+ runner := &fastestTask{}
+ runner.tasks = tasks
+ return runner
+}
+
+func (fastest *fastestTask) Execute(arg int) (int, error) {
+ if len(fastest.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given to Fastest")
+ }
+
+ resChan := make(chan struct {
+ res int
+ err error
+ })
+ fastest.start.Add(len(fastest.tasks))
+
+ for _, task := range fastest.tasks {
+ go func(t Task, arg int, resChan chan struct {
+ res int
+ err error
+ }) {
+ fastest.start.Wait()
+
+ result, err := t.Execute(arg)
+ fastest.once.Do(func() {
+ resChan <- struct {
+ res int
+ err error
+ }{result, err}
+ })
+ }(task, arg, resChan)
+ fastest.start.Done()
+ }
+
+ res := <-resChan
+ return res.res, res.err
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timedTask{task, timeout}
+}
+
+func (timeLimited timedTask) Execute(arg int) (int, error) {
+ start := time.Now()
+ res, err := timeLimited.task.Execute(arg)
+ duration := time.Since(start)
+
+ if timeLimited.duration >= duration {
+ return res, err
+ }
+
+ return 0, fmt.Errorf("")
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ mr := &mapReduce{}
+ mr.err = false
+ mr.reduce = reduce
+ mr.tasks = tasks
+
+ return mr
+}
+
+func (mr *mapReduce) Execute(arg int) (int, error) {
+ if len(mr.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given!")
+ }
+
+ results := make([]int, 0)
+ resultChan := make(chan int)
+ errorChan := make(chan error)
+
+ for _, task := range mr.tasks {
+ go func(task Task, resultChan chan int, errorChan chan error, mr *mapReduce) {
+ res, err := task.Execute(arg)
+
+ mr.lock.Lock()
+ defer func() { mr.lock.Unlock() }()
+
+ if mr.err {
+ return
+ }
+
+ if err != nil {
+ errorChan <- err
+ mr.err = true
+ } else {
+ resultChan <- res
+ }
+ }(task, resultChan, errorChan, mr)
+ }
+
+ done := false
+ for !done {
+ select {
+ case result, _ := <-resultChan:
+ results = append(results, result)
+ if len(results) == len(mr.tasks) {
+ done = true
+ break
+ }
+ case err := <-errorChan:
+ return 0, err
+ }
+ }
+
+ return mr.reduce(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ executor := &faultTollerantTaskExecutor{}
+
+ executor.errorLimit = errorLimit
+ executor.tasks = tasks
+
+ return executor
+}
+
+func (executor *faultTollerantTaskExecutor) Execute(arg int) (int, error) {
+ resultChan := make(chan int)
+ errorChan := make(chan error)
+ tasksRun := 0
+
+ for task := range(executor.tasks) {
+ tasksRun++
+ executor.running.Add(1)
+ go func(task Task, resultChan chan int, errorChan chan error) {
+ defer func() { executor.running.Done() }()
+ res, err := task.Execute(arg)
+ if err != nil {
+ errorChan <- err
+ } else {
+ resultChan <- res
+ }
+ }(task, resultChan, errorChan)
+ }
+
+ if tasksRun == 0 {
+ return 0, fmt.Errorf("No tasks given!")
+ }
+
+ result, errors := 0, 0
+ go func() {
+ for tasksRun > 0 {
+ select {
+ case res, _ := <-resultChan:
+ if res > result {
+ result = res
+ }
+ tasksRun--
+ case <-errorChan:
+ tasksRun--
+ errors++
+ }
+ }
+ }()
+
+ executor.running.Wait()
+ if errors >= executor.errorLimit {
+ return 0, fmt.Errorf("More tasks errored out than allowed limit!")
+ }
+
+ return result, nil
+}

Идеята на Timed е че не искаме да чакаме 2 часа за задача която ни трябва резултата в следващите 10 секунди. Така че трябва след като изтече timeout-а Timed да върне че не е стигнало времето. В допълнение е хубаво ако не оставя goroutine които няма да завършат преди края на програмата.

Добромир обнови решението на 29.11.2016 16:22 (преди над 1 година)

package main
import "sync"
import "time"
import "fmt"
type Task interface {
Execute(int) (int, error)
}
type pipelinedTasks struct {
tasks []Task
}
type fastestTask struct {
once sync.Once
start sync.WaitGroup
tasks []Task
}
type timedTask struct {
task Task
duration time.Duration
+ done bool
}
type mapReduce struct {
err bool
tasks []Task
reduce func([]int) int
lock sync.Mutex
}
type faultTollerantTaskExecutor struct {
errorLimit int
tasks <-chan Task
running sync.WaitGroup
}
func Pipeline(tasks ...Task) Task {
return pipelinedTasks{tasks}
}
func (pipeline pipelinedTasks) Execute(firstArg int) (int, error) {
result := firstArg
var err error
for i := 0; i < len(pipeline.tasks); i++ {
result, err = pipeline.tasks[i].Execute(result)
if err != nil {
return 0, err
}
}
return result, nil
}
func Fastest(tasks ...Task) Task {
runner := &fastestTask{}
runner.tasks = tasks
return runner
}
func (fastest *fastestTask) Execute(arg int) (int, error) {
if len(fastest.tasks) == 0 {
return 0, fmt.Errorf("No tasks given to Fastest")
}
resChan := make(chan struct {
res int
err error
})
fastest.start.Add(len(fastest.tasks))
for _, task := range fastest.tasks {
go func(t Task, arg int, resChan chan struct {
res int
err error
}) {
fastest.start.Wait()
result, err := t.Execute(arg)
fastest.once.Do(func() {
resChan <- struct {
res int
err error
}{result, err}
})
}(task, arg, resChan)
fastest.start.Done()
}
res := <-resChan
return res.res, res.err
}
func Timed(task Task, timeout time.Duration) Task {
- return timedTask{task, timeout}
+ timed := &timedTask{}
+
+ timed.task = task
+ timed.duration = timeout
+ timed.done = false
+
+ return timed
}
-func (timeLimited timedTask) Execute(arg int) (int, error) {
+func (timeLimited *timedTask) Execute(arg int) (int, error) {
start := time.Now()
- res, err := timeLimited.task.Execute(arg)
- duration := time.Since(start)
+ resultChan := make(chan struct{res int; err error})
- if timeLimited.duration >= duration {
- return res, err
- }
+ go func(task Task, resultChan chan struct{res int; err error}) {
+ result, err := task.Execute(arg)
- return 0, fmt.Errorf("")
+ if timeLimited.done == false {
+ timeLimited.done = true
+ resultChan <- struct{res int; err error}{result, err}
+ }
+
+ }(timeLimited.task, resultChan)
+
+ for ;; {
+ select {
+ case res, _ := <-resultChan:
+ return res.res, res.err
+ default:
+ duration := time.Since(start)
+ if duration > timeLimited.duration {
+ if timeLimited.done {
+ res := <-resultChan
+ return res.res, res.err
+ }
+
+ timeLimited.done = true
+ return 0, fmt.Errorf("")
+ }
+ }
+ }
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
mr := &mapReduce{}
mr.err = false
mr.reduce = reduce
mr.tasks = tasks
return mr
}
func (mr *mapReduce) Execute(arg int) (int, error) {
if len(mr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given!")
}
results := make([]int, 0)
resultChan := make(chan int)
errorChan := make(chan error)
for _, task := range mr.tasks {
go func(task Task, resultChan chan int, errorChan chan error, mr *mapReduce) {
res, err := task.Execute(arg)
mr.lock.Lock()
defer func() { mr.lock.Unlock() }()
if mr.err {
return
}
if err != nil {
errorChan <- err
mr.err = true
} else {
resultChan <- res
}
}(task, resultChan, errorChan, mr)
}
done := false
for !done {
select {
case result, _ := <-resultChan:
results = append(results, result)
if len(results) == len(mr.tasks) {
done = true
break
}
case err := <-errorChan:
return 0, err
}
}
return mr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
executor := &faultTollerantTaskExecutor{}
executor.errorLimit = errorLimit
executor.tasks = tasks
return executor
}
func (executor *faultTollerantTaskExecutor) Execute(arg int) (int, error) {
resultChan := make(chan int)
errorChan := make(chan error)
tasksRun := 0
for task := range(executor.tasks) {
tasksRun++
executor.running.Add(1)
go func(task Task, resultChan chan int, errorChan chan error) {
defer func() { executor.running.Done() }()
res, err := task.Execute(arg)
if err != nil {
errorChan <- err
} else {
resultChan <- res
}
}(task, resultChan, errorChan)
}
if tasksRun == 0 {
return 0, fmt.Errorf("No tasks given!")
}
result, errors := 0, 0
go func() {
for tasksRun > 0 {
select {
case res, _ := <-resultChan:
if res > result {
result = res
}
tasksRun--
case <-errorChan:
tasksRun--
errors++
}
}
}()
executor.running.Wait()
if errors >= executor.errorLimit {
return 0, fmt.Errorf("More tasks errored out than allowed limit!")
}
return result, nil
}

Добромир обнови решението на 29.11.2016 16:22 (преди над 1 година)

package main
import "sync"
import "time"
import "fmt"
type Task interface {
Execute(int) (int, error)
}
type pipelinedTasks struct {
tasks []Task
}
type fastestTask struct {
once sync.Once
start sync.WaitGroup
tasks []Task
}
type timedTask struct {
task Task
duration time.Duration
- done bool
+ done bool
}
type mapReduce struct {
err bool
tasks []Task
reduce func([]int) int
lock sync.Mutex
}
type faultTollerantTaskExecutor struct {
errorLimit int
tasks <-chan Task
- running sync.WaitGroup
+ running sync.WaitGroup
}
func Pipeline(tasks ...Task) Task {
return pipelinedTasks{tasks}
}
func (pipeline pipelinedTasks) Execute(firstArg int) (int, error) {
result := firstArg
var err error
for i := 0; i < len(pipeline.tasks); i++ {
result, err = pipeline.tasks[i].Execute(result)
if err != nil {
return 0, err
}
}
return result, nil
}
func Fastest(tasks ...Task) Task {
runner := &fastestTask{}
runner.tasks = tasks
return runner
}
func (fastest *fastestTask) Execute(arg int) (int, error) {
if len(fastest.tasks) == 0 {
return 0, fmt.Errorf("No tasks given to Fastest")
}
resChan := make(chan struct {
res int
err error
})
fastest.start.Add(len(fastest.tasks))
for _, task := range fastest.tasks {
go func(t Task, arg int, resChan chan struct {
res int
err error
}) {
fastest.start.Wait()
result, err := t.Execute(arg)
fastest.once.Do(func() {
resChan <- struct {
res int
err error
}{result, err}
})
}(task, arg, resChan)
fastest.start.Done()
}
res := <-resChan
return res.res, res.err
}
func Timed(task Task, timeout time.Duration) Task {
timed := &timedTask{}
timed.task = task
timed.duration = timeout
timed.done = false
return timed
}
func (timeLimited *timedTask) Execute(arg int) (int, error) {
start := time.Now()
- resultChan := make(chan struct{res int; err error})
+ resultChan := make(chan struct {
+ res int
+ err error
+ })
- go func(task Task, resultChan chan struct{res int; err error}) {
+ go func(task Task, resultChan chan struct {
+ res int
+ err error
+ }) {
result, err := task.Execute(arg)
if timeLimited.done == false {
timeLimited.done = true
- resultChan <- struct{res int; err error}{result, err}
+ resultChan <- struct {
+ res int
+ err error
+ }{result, err}
}
}(timeLimited.task, resultChan)
- for ;; {
+ for {
select {
- case res, _ := <-resultChan:
- return res.res, res.err
- default:
- duration := time.Since(start)
- if duration > timeLimited.duration {
- if timeLimited.done {
- res := <-resultChan
- return res.res, res.err
- }
-
- timeLimited.done = true
- return 0, fmt.Errorf("")
+ case res, _ := <-resultChan:
+ return res.res, res.err
+ default:
+ duration := time.Since(start)
+ if duration > timeLimited.duration {
+ if timeLimited.done {
+ res := <-resultChan
+ return res.res, res.err
}
+
+ timeLimited.done = true
+ return 0, fmt.Errorf("")
+ }
}
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
mr := &mapReduce{}
mr.err = false
mr.reduce = reduce
mr.tasks = tasks
return mr
}
func (mr *mapReduce) Execute(arg int) (int, error) {
if len(mr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given!")
}
results := make([]int, 0)
resultChan := make(chan int)
errorChan := make(chan error)
for _, task := range mr.tasks {
go func(task Task, resultChan chan int, errorChan chan error, mr *mapReduce) {
res, err := task.Execute(arg)
mr.lock.Lock()
defer func() { mr.lock.Unlock() }()
if mr.err {
return
}
if err != nil {
errorChan <- err
mr.err = true
} else {
resultChan <- res
}
}(task, resultChan, errorChan, mr)
}
done := false
for !done {
select {
case result, _ := <-resultChan:
results = append(results, result)
if len(results) == len(mr.tasks) {
done = true
break
}
case err := <-errorChan:
return 0, err
}
}
return mr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
executor := &faultTollerantTaskExecutor{}
executor.errorLimit = errorLimit
executor.tasks = tasks
return executor
}
func (executor *faultTollerantTaskExecutor) Execute(arg int) (int, error) {
resultChan := make(chan int)
errorChan := make(chan error)
tasksRun := 0
- for task := range(executor.tasks) {
+ for task := range executor.tasks {
tasksRun++
executor.running.Add(1)
go func(task Task, resultChan chan int, errorChan chan error) {
defer func() { executor.running.Done() }()
res, err := task.Execute(arg)
if err != nil {
errorChan <- err
} else {
resultChan <- res
}
}(task, resultChan, errorChan)
}
if tasksRun == 0 {
return 0, fmt.Errorf("No tasks given!")
}
result, errors := 0, 0
go func() {
for tasksRun > 0 {
select {
- case res, _ := <-resultChan:
- if res > result {
- result = res
- }
- tasksRun--
- case <-errorChan:
- tasksRun--
- errors++
+ case res, _ := <-resultChan:
+ if res > result {
+ result = res
+ }
+ tasksRun--
+ case <-errorChan:
+ tasksRun--
+ errors++
}
}
}()
executor.running.Wait()
if errors >= executor.errorLimit {
return 0, fmt.Errorf("More tasks errored out than allowed limit!")
}
return result, nil
}