Решение на Concurrent Tasks от Слави Боянов

Обратно към всички решения

Към профила на Слави Боянов

Резултати

  • 11 точки от тестове
  • 0 бонус точки
  • 11 точки общо
  • 11 успешни тест(а)
  • 2 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
//pipline
type pipeline struct {
tasks []Task
}
func (pl pipeline) Execute(input int) (int, error) {
if len(pl.tasks) == 0 {
return 0, errors.New("no tasks given!")
}
result := input
var err error
for _, task := range pl.tasks {
result, err = task.Execute(result)
if err != nil {
return 0, err
}
}
return result, nil
}
func Pipeline(tasks ...Task) Task {
return pipeline{[]Task(tasks)}
}
//fastest
type fastest struct {
tasks []Task
}
func (fs fastest) Execute(input int) (int, error) {
if len(fs.tasks) == 0 {
return 0, errors.New("No tasks given")
}
var once sync.Once
first := make(chan struct {
result int
err error
})
for _, task := range fs.tasks {
go func(task Task) {
result, err := task.Execute(input)
once.Do(func() {
first <- struct {
result int
err error
}{result, err}
})
}(task)
}
output := <-first
return output.result, output.err
}
func Fastest(tasks ...Task) Task {
return fastest{[]Task(tasks)}
}
//timed
type timed struct {
task Task
timeout time.Duration
}
func (td timed) Execute(input int) (int, error) {
output := make(chan struct {
result int
err error
})
var once sync.Once
go func() {
result, err := td.task.Execute(input)
once.Do(func() {
output <- struct {
result int
err error
}{result, err}
})
}()
select {
case result := <-output:
return result.result, result.err
case <-time.After(td.timeout):
once.Do(func() {})
return 0, errors.New("Task timed out")
}
return 0, nil
}
func Timed(task Task, timeout time.Duration) Task {
return timed{task, timeout}
}
//concurentmapreduce
type mapReducer struct {
tasks []Task
reduce func(results []int) int
}
func (mr mapReducer) Execute(input int) (int, error) {
taskNum := len(mr.tasks)
if taskNum == 0 {
return 0, errors.New("No tasks given")
}
results := make([]int, taskNum, taskNum)
errSignal := make(chan error)
doneSignal := make(chan struct{}, 1)
var (
once sync.Once
wg sync.WaitGroup
)
wg.Add(taskNum)
for index, task := range mr.tasks {
go func(index int, task Task) {
result, err := task.Execute(input)
if err == nil {
results[index] = result
} else {
once.Do(func() { errSignal <- err })
}
wg.Done()
}(index, task)
}
go func() {
wg.Wait()
doneSignal <- struct{}{}
}()
select {
case err := <-errSignal:
return 0, err
case <-doneSignal:
return mr.reduce(results), nil
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return mapReducer{[]Task(tasks), reduce}
}
//greatestSearcher
type searcher struct {
tasks <-chan Task
errorLimit int
}
func (gs searcher) Execute(input int) (int, error) {
var (
mtex sync.Mutex
wg sync.WaitGroup
)
errorNum := 0
results := make([]int, 0)
for task := range gs.tasks {
go func(task Task) {
wg.Add(1)
result, err := task.Execute(input)
mtex.Lock()
if err == nil {
results = append(results, result)
} else {
errorNum++
}
mtex.Unlock()
wg.Done()
}(task)
}
wg.Wait()
if len(results) == 0 {
return 0, errors.New("Not task finished succesfuly")
} else if errorNum > gs.errorLimit {
return 0, errors.New("Too many errors")
} else {
max := results[0]
for _, result := range results {
if max < result {
max = result
}
}
return max, nil
}
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return searcher{tasks, errorLimit}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.003s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.003s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.002s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.104s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.204s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.134s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.203s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.003s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.003s
--- FAIL: TestGreatestSearcherSimple (0.00s)
	solution_test.go:289: Received result -1 when expecting 2
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1huw5wt	0.003s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/like_the_example (0.05s)
    	solution_test.go:313: Expected error did not occur instead got 42
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1huw5wt	0.048s
PASS
ok  	_/tmp/d20161129-30451-1huw5wt	0.123s

История (2 версии и 2 коментара)

Слави обнови решението на 29.11.2016 12:04 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+//pipline
+type pipeline struct {
+ tasks []Task
+}
+
+func (pl pipeline) Execute(input int) (int, error) {
+ if len(pl.tasks) == 0 {
+ return 0, errors.New("no tasks given!")
+ }
+ result := input
+ var err error
+ for _, task := range pl.tasks {
+ result, err = task.Execute(result)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return result, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ return pipeline{[]Task(tasks)}
+}
+
+//fastest
+type fastest struct {
+ tasks []Task
+}
+
+func (fs fastest) Execute(input int) (int, error) {
+ if len(fs.tasks) == 0 {
+ return 0, errors.New("No tasks given")
+ }
+ var once sync.Once
+ first := make(chan struct {
+ result int
+ err error
+ })
+ for _, task := range fs.tasks {
+ go func() {
+ result, err := task.Execute(input)
+ once.Do(func() {
+ first <- struct {
+ result int
+ err error
+ }{result, err}
+ })
+ }()
+ }
+ output := <-first
+ return output.result, output.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return fastest{[]Task(tasks)}
+}
+
+//timed
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (td timed) Execute(input int) (int, error) {
+ output := make(chan struct {
+ result int
+ err error
+ })
+ var once sync.Once
+ go func() {
+ result, err := td.task.Execute(input)
+ once.Do(func() {
+ output <- struct {
+ result int
+ err error
+ }{result, err}
+ })
+ }()
+ select {
+ case result := <-output:
+ return result.result, result.err
+ case <-time.After(td.timeout):
+ once.Do(func() {})
+ return 0, errors.New("Task timed out")
+ }
+ return 0, nil
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timed{task, timeout}
+}
+
+//concurentmapreduce
+type mapReducer struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (mr mapReducer) Execute(input int) (int, error) {
+ taskNum := len(mr.tasks)
+ if taskNum == 0 {
+ return 0, errors.New("No tasks given")
+ }
+ results := make([]int, taskNum, taskNum)
+ errSignal := make(chan error)
+ doneSignal := make(chan struct{}, 1)
+ var (
+ once sync.Once
+ wg sync.WaitGroup
+ )
+ wg.Add(taskNum)
+ for index, task := range mr.tasks {
+ go func(index int, task Task) {
+ result, err := task.Execute(input)
+ if err == nil {
+ results[index] = result
+ } else {
+ once.Do(func() { errSignal <- err })
+ }
+ wg.Done()
+ }(index, task)
+ }
+ go func() {
+ wg.Wait()
+ doneSignal <- struct{}{}
+ }()
+ select {
+ case err := <-errSignal:
+ return 0, err
+ case <-doneSignal:
+ return mr.reduce(results), nil
+ }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return mapReducer{[]Task(tasks), reduce}
+}
+
+//greatestSearcher
+type searcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (gs searcher) Execute(input int) (int, error) {
+ var (
+ mtex sync.Mutex
+ wg sync.WaitGroup
+ )
+ errorNum := 0
+ results := make([]int, 0)
+ for task := range gs.tasks {
+ go func(task Task) {
+ wg.Add(1)
+ result, err := task.Execute(input)
+ mtex.Lock()
+ if err == nil {
+ results = append(results, result)
+ } else {
+ errorNum++
+ }
+ mtex.Unlock()
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ if len(results) == 0 {
+ return 0, errors.New("Not task finished succesfuly")
+ } else if errorNum > gs.errorLimit {
+
+ return 0, errors.New("Too many errors")
+ } else {
+ max := results[0]
+ for _, result := range results {
+ if max < result {
+ max = result
+ }
+ }
+ return max, nil
+ }
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return searcher{tasks, errorLimit}
+}

Слави обнови решението на 29.11.2016 14:01 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
//pipline
type pipeline struct {
tasks []Task
}
func (pl pipeline) Execute(input int) (int, error) {
if len(pl.tasks) == 0 {
return 0, errors.New("no tasks given!")
}
result := input
var err error
for _, task := range pl.tasks {
result, err = task.Execute(result)
if err != nil {
return 0, err
}
}
return result, nil
}
-
func Pipeline(tasks ...Task) Task {
return pipeline{[]Task(tasks)}
}
//fastest
type fastest struct {
tasks []Task
}
func (fs fastest) Execute(input int) (int, error) {
if len(fs.tasks) == 0 {
return 0, errors.New("No tasks given")
}
var once sync.Once
first := make(chan struct {
result int
err error
})
for _, task := range fs.tasks {
- go func() {
+ go func(task Task) {
result, err := task.Execute(input)
once.Do(func() {
first <- struct {
result int
err error
}{result, err}
})
- }()
+ }(task)
}
output := <-first
return output.result, output.err
}
-
func Fastest(tasks ...Task) Task {
return fastest{[]Task(tasks)}
}
//timed
type timed struct {
task Task
timeout time.Duration
}
func (td timed) Execute(input int) (int, error) {
output := make(chan struct {
result int
err error
})
var once sync.Once
go func() {
result, err := td.task.Execute(input)
once.Do(func() {
output <- struct {
result int
err error
}{result, err}
})
}()
select {
case result := <-output:
return result.result, result.err
case <-time.After(td.timeout):
once.Do(func() {})
return 0, errors.New("Task timed out")
}
return 0, nil
}
-
func Timed(task Task, timeout time.Duration) Task {
return timed{task, timeout}
}
//concurentmapreduce
type mapReducer struct {
tasks []Task
reduce func(results []int) int
}
func (mr mapReducer) Execute(input int) (int, error) {
taskNum := len(mr.tasks)
if taskNum == 0 {
return 0, errors.New("No tasks given")
}
results := make([]int, taskNum, taskNum)
errSignal := make(chan error)
doneSignal := make(chan struct{}, 1)
var (
once sync.Once
wg sync.WaitGroup
)
wg.Add(taskNum)
for index, task := range mr.tasks {
go func(index int, task Task) {
result, err := task.Execute(input)
if err == nil {
results[index] = result
} else {
once.Do(func() { errSignal <- err })
}
wg.Done()
}(index, task)
}
go func() {
wg.Wait()
doneSignal <- struct{}{}
}()
select {
case err := <-errSignal:
return 0, err
case <-doneSignal:
return mr.reduce(results), nil
}
}
-
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return mapReducer{[]Task(tasks), reduce}
}
//greatestSearcher
type searcher struct {
tasks <-chan Task
errorLimit int
}
func (gs searcher) Execute(input int) (int, error) {
var (
mtex sync.Mutex
wg sync.WaitGroup
)
errorNum := 0
results := make([]int, 0)
for task := range gs.tasks {
go func(task Task) {
wg.Add(1)
result, err := task.Execute(input)
mtex.Lock()
if err == nil {
results = append(results, result)
} else {
errorNum++
}
mtex.Unlock()
wg.Done()
}(task)
}
wg.Wait()
if len(results) == 0 {
return 0, errors.New("Not task finished succesfuly")
} else if errorNum > gs.errorLimit {
-
return 0, errors.New("Too many errors")
} else {
max := results[0]
for _, result := range results {
if max < result {
max = result
}
}
return max, nil
}
}
-
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return searcher{tasks, errorLimit}
}