Решение на Concurrent Tasks от Данислав Киров

Обратно към всички решения

Към профила на Данислав Киров

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type Response struct {
res int
err error
}
type Pipeliner struct {
tasks []Task
}
func (p *Pipeliner) Execute(number int) (int, error) {
numberOfTasks := len(p.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
res := number
var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
return 0, err
}
}
return res, nil
}
func Pipeline(tasks ...Task) Task {
p := new(Pipeliner)
p.tasks = tasks[:]
return p
}
type Fast struct {
tasks []Task
}
func (f *Fast) Execute(number int) (int, error) {
numberOfTasks := len(f.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
ch := make(chan Response, numberOfTasks)
for _, task := range f.tasks {
go func(task Task) {
res, err := task.Execute(number)
ch <- Response{res, err}
}(task)
}
first := <-ch
return first.res, first.err
}
func Fastest(tasks ...Task) Task {
f := new(Fast)
f.tasks = tasks[:]
return f
}
type Timer struct {
task Task
timeout time.Duration
}
func (t *Timer) Execute(number int) (int, error) {
ch := make(chan Response, 1)
go func() {
res, err := t.task.Execute(number)
ch <- Response{res, err}
close(ch)
}()
select {
case s := <-ch:
return s.res, s.err
case <-time.After(t.timeout):
return 0, errors.New("Timeout")
}
}
func Timed(task Task, timeout time.Duration) Task {
t := new(Timer)
t.task = task
t.timeout = timeout
return t
}
type Reducer struct {
reduce func(results []int) int
tasks []Task
}
func (r *Reducer) Execute(number int) (int, error) {
numberOfTasks := len(r.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
var results []int
ch := make(chan Response, numberOfTasks)
for _, task := range r.tasks {
go func(task Task) {
res, err := task.Execute(number)
ch <- Response{res, err}
}(task)
}
for i := 0; i < numberOfTasks; i++ {
s := <-ch
if s.err != nil {
return 0, s.err
}
results = append(results, s.res)
}
return r.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
r := new(Reducer)
r.reduce = reduce
r.tasks = tasks[:]
return r
}
type Searcher struct {
errorLimit int
tasks <-chan Task
}
func (s *Searcher) Execute(number int) (int, error) {
var results []int
errs := 0
var wg sync.WaitGroup
var mtx sync.Mutex
for task := range s.tasks {
wg.Add(1)
go func(task Task) {
res, err := task.Execute(number)
if err != nil {
mtx.Lock()
errs++
mtx.Unlock()
} else {
mtx.Lock()
results = append(results, res)
mtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
if len(results) == 0 || errs > s.errorLimit {
return 0, errors.New("Too many errors!")
}
max := results[0]
for _, res := range results {
if res > max {
max = res
}
}
return max, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
s := new(Searcher)
s.errorLimit = errorLimit
s.tasks = tasks
return s
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.003s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.003s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.003s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.103s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.203s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.134s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.203s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.003s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.003s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.003s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.048s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.048s
PASS
ok  	_/tmp/d20161129-30451-dayd3b	0.123s

История (4 версии и 5 коментара)

Данислав обнови решението на 26.11.2016 10:56 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Pipeliner struct {
+ tasks []Task
+}
+
+func (p *Pipeliner) Execute(number int) (int, error) {
+ numberOfTasks := len(p.tasks)
+ if numberOfTasks == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+
+ res := number
+ err := errors.New("")
+ for _, task := range p.tasks {
+ res, err = task.Execute(res)
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return res, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ p := new(Pipeliner)
+ p.tasks = tasks[:]
+ return p
+}
+
+type Fast struct {
+ tasks []Task
+}
+
+func (f *Fast) Execute(number int) (int, error) {
+ numberOfTasks := len(f.tasks)
+ if numberOfTasks == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+
+ ch := make(chan struct {
+ res int
+ err error
+ }, numberOfTasks)
+ for _, task := range f.tasks {
+ go func(task Task) {
+ res, err := task.Execute(number)
+ ch <- struct {
+ res int
+ err error
+ }{res, err}
+ }(task)
+ }
+
+ first := <-ch
+ return first.res, first.err
+}
+
+func Fastest(tasks ...Task) Task {
+ f := new(Fast)
+ f.tasks = tasks[:]
+ return f
+}
+
+type Timer struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t *Timer) Execute(number int) (int, error) {
+ ch := make(chan struct {
+ res int
+ err error
+ }, 1)
+ go func() {
+ res, err := t.task.Execute(number)
+ ch <- struct {
+ res int
+ err error
+ }{res, err}
+ close(ch)
+ }()
+
+ select {
+ case s := <-ch:
+ return s.res, s.err
+ case <-time.After(t.timeout):
+ return 0, errors.New("Timeout")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ t := new(Timer)
+ t.task = task
+ t.timeout = timeout
+ return t
+}
+
+type Reducer struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (r *Reducer) Execute(number int) (int, error) {
+ numberOfTasks := len(r.tasks)
+ if numberOfTasks == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+
+ var results []int
+ ch := make(chan struct {
+ res int
+ err error
+ }, numberOfTasks)
+ for _, task := range r.tasks {
+ go func(task Task) {
+ res, err := task.Execute(number)
+ ch <- struct {
+ res int
+ err error
+ }{res, err}
+ }(task)
+ }
+
+ for i := 0; i < numberOfTasks; i++ {
+ s := <-ch
+ if s.err != nil {
+ return 0, s.err
+ }
+ results = append(results, s.res)
+ }
+ return r.reduce(results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ r := new(Reducer)
+ r.reduce = reduce
+ r.tasks = tasks[:]
+ return r
+}
+
+type Searcher struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (s *Searcher) Execute(number int) (int, error) {
+ var results []int
+ errs := int64(0)
+ var wg sync.WaitGroup
+ for task := range s.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ res, err := task.Execute(number)
+ if err != nil {
+ errs = atomic.AddInt64(&errs, 1)
+ } else {
+ results = append(results, res)
+ }
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ if len(results) == 0 && errs == 0 {
+ return 0, errors.New("No tasks to execute!")
+ }
+ if errs > int64(s.errorLimit) {
+ return 0, errors.New("Too many errors!")
+ }
+
+ max := results[0]
+ for _, res := range results {
+ if res > max {
+ max = res
+ }
+ }
+
+ return int(max), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ s := new(Searcher)
+ s.errorLimit = errorLimit
+ s.tasks = tasks
+ return s
+}

Може да си дефинираш някакъв твой тип за тази структура с грешката и резултата, за да не се налага да му пишеш дефиницията навсякъде.

Иначе изглежда супер. Оше не сме написали всичките тестове и не мога да кажа със сигурност, на на пръв поглед не виждам някакив очевидни синхронизационни проблеми или висящи горутини.

Данислав обнови решението на 28.11.2016 22:44 (преди над 1 година)

package main
import (
"errors"
"sync"
"sync/atomic"
"time"
)
type Task interface {
Execute(int) (int, error)
}
+type Response struct {
+ res int
+ err error
+}
+
type Pipeliner struct {
tasks []Task
}
func (p *Pipeliner) Execute(number int) (int, error) {
numberOfTasks := len(p.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
res := number
- err := errors.New("")
+ var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
return 0, err
}
}
return res, nil
}
func Pipeline(tasks ...Task) Task {
p := new(Pipeliner)
p.tasks = tasks[:]
return p
}
type Fast struct {
tasks []Task
}
func (f *Fast) Execute(number int) (int, error) {
numberOfTasks := len(f.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
- ch := make(chan struct {
- res int
- err error
- }, numberOfTasks)
+ ch := make(chan Response, numberOfTasks)
for _, task := range f.tasks {
go func(task Task) {
res, err := task.Execute(number)
- ch <- struct {
- res int
- err error
- }{res, err}
+ ch <- Response{res, err}
}(task)
}
first := <-ch
return first.res, first.err
}
func Fastest(tasks ...Task) Task {
f := new(Fast)
f.tasks = tasks[:]
return f
}
type Timer struct {
task Task
timeout time.Duration
}
func (t *Timer) Execute(number int) (int, error) {
- ch := make(chan struct {
- res int
- err error
- }, 1)
+ ch := make(chan Response, 1)
go func() {
res, err := t.task.Execute(number)
- ch <- struct {
- res int
- err error
- }{res, err}
+ ch <- Response{res, err}
close(ch)
}()
select {
case s := <-ch:
return s.res, s.err
case <-time.After(t.timeout):
return 0, errors.New("Timeout")
}
}
func Timed(task Task, timeout time.Duration) Task {
t := new(Timer)
t.task = task
t.timeout = timeout
return t
}
type Reducer struct {
reduce func(results []int) int
tasks []Task
}
func (r *Reducer) Execute(number int) (int, error) {
numberOfTasks := len(r.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
var results []int
- ch := make(chan struct {
- res int
- err error
- }, numberOfTasks)
+ ch := make(chan Response, numberOfTasks)
for _, task := range r.tasks {
go func(task Task) {
res, err := task.Execute(number)
- ch <- struct {
- res int
- err error
- }{res, err}
+ ch <- Response{res, err}
}(task)
}
for i := 0; i < numberOfTasks; i++ {
s := <-ch
if s.err != nil {
return 0, s.err
}
results = append(results, s.res)
}
return r.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
r := new(Reducer)
r.reduce = reduce
r.tasks = tasks[:]
return r
}
type Searcher struct {
errorLimit int
tasks <-chan Task
}
func (s *Searcher) Execute(number int) (int, error) {
var results []int
errs := int64(0)
var wg sync.WaitGroup
for task := range s.tasks {
wg.Add(1)
go func(task Task) {
res, err := task.Execute(number)
if err != nil {
errs = atomic.AddInt64(&errs, 1)
} else {
results = append(results, res)
}
wg.Done()
}(task)
}
wg.Wait()
if len(results) == 0 && errs == 0 {
return 0, errors.New("No tasks to execute!")
}
if errs > int64(s.errorLimit) {
return 0, errors.New("Too many errors!")
}
max := results[0]
for _, res := range results {
if res > max {
max = res
}
}
return int(max), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
s := new(Searcher)
s.errorLimit = errorLimit
s.tasks = tasks
return s
}

Данислав обнови решението на 29.11.2016 15:26 (преди над 1 година)

package main
import (
"errors"
"sync"
"sync/atomic"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type Response struct {
res int
err error
}
type Pipeliner struct {
tasks []Task
}
func (p *Pipeliner) Execute(number int) (int, error) {
numberOfTasks := len(p.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
res := number
var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
return 0, err
}
}
return res, nil
}
func Pipeline(tasks ...Task) Task {
p := new(Pipeliner)
p.tasks = tasks[:]
return p
}
type Fast struct {
tasks []Task
}
func (f *Fast) Execute(number int) (int, error) {
numberOfTasks := len(f.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
ch := make(chan Response, numberOfTasks)
for _, task := range f.tasks {
go func(task Task) {
res, err := task.Execute(number)
ch <- Response{res, err}
}(task)
}
first := <-ch
return first.res, first.err
}
func Fastest(tasks ...Task) Task {
f := new(Fast)
f.tasks = tasks[:]
return f
}
type Timer struct {
task Task
timeout time.Duration
}
func (t *Timer) Execute(number int) (int, error) {
ch := make(chan Response, 1)
go func() {
res, err := t.task.Execute(number)
ch <- Response{res, err}
close(ch)
}()
select {
case s := <-ch:
return s.res, s.err
case <-time.After(t.timeout):
return 0, errors.New("Timeout")
}
}
func Timed(task Task, timeout time.Duration) Task {
t := new(Timer)
t.task = task
t.timeout = timeout
return t
}
type Reducer struct {
reduce func(results []int) int
tasks []Task
}
func (r *Reducer) Execute(number int) (int, error) {
numberOfTasks := len(r.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
var results []int
ch := make(chan Response, numberOfTasks)
for _, task := range r.tasks {
go func(task Task) {
res, err := task.Execute(number)
ch <- Response{res, err}
}(task)
}
for i := 0; i < numberOfTasks; i++ {
s := <-ch
if s.err != nil {
return 0, s.err
}
results = append(results, s.res)
}
return r.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
r := new(Reducer)
r.reduce = reduce
r.tasks = tasks[:]
return r
}
type Searcher struct {
errorLimit int
tasks <-chan Task
}
func (s *Searcher) Execute(number int) (int, error) {
var results []int
errs := int64(0)
var wg sync.WaitGroup
for task := range s.tasks {
wg.Add(1)
go func(task Task) {
res, err := task.Execute(number)
if err != nil {
errs = atomic.AddInt64(&errs, 1)
} else {
results = append(results, res)
}
wg.Done()
}(task)
}
wg.Wait()
- if len(results) == 0 && errs == 0 {
- return 0, errors.New("No tasks to execute!")
- }
- if errs > int64(s.errorLimit) {
+ if len(results) == 0 || errs > int64(s.errorLimit) {
return 0, errors.New("Too many errors!")
}
max := results[0]
for _, res := range results {
if res > max {
max = res
}
}
return int(max), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
s := new(Searcher)
s.errorLimit = errorLimit
s.tasks = tasks
return s
}

Данислав обнови решението на 29.11.2016 15:53 (преди над 1 година)

package main
import (
"errors"
"sync"
- "sync/atomic"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type Response struct {
res int
err error
}
type Pipeliner struct {
tasks []Task
}
func (p *Pipeliner) Execute(number int) (int, error) {
numberOfTasks := len(p.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
res := number
var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
return 0, err
}
}
return res, nil
}
func Pipeline(tasks ...Task) Task {
p := new(Pipeliner)
p.tasks = tasks[:]
return p
}
type Fast struct {
tasks []Task
}
func (f *Fast) Execute(number int) (int, error) {
numberOfTasks := len(f.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
ch := make(chan Response, numberOfTasks)
for _, task := range f.tasks {
go func(task Task) {
res, err := task.Execute(number)
ch <- Response{res, err}
}(task)
}
first := <-ch
return first.res, first.err
}
func Fastest(tasks ...Task) Task {
f := new(Fast)
f.tasks = tasks[:]
return f
}
type Timer struct {
task Task
timeout time.Duration
}
func (t *Timer) Execute(number int) (int, error) {
ch := make(chan Response, 1)
go func() {
res, err := t.task.Execute(number)
ch <- Response{res, err}
close(ch)
}()
select {
case s := <-ch:
return s.res, s.err
case <-time.After(t.timeout):
return 0, errors.New("Timeout")
}
}
func Timed(task Task, timeout time.Duration) Task {
t := new(Timer)
t.task = task
t.timeout = timeout
return t
}
type Reducer struct {
reduce func(results []int) int
tasks []Task
}
func (r *Reducer) Execute(number int) (int, error) {
numberOfTasks := len(r.tasks)
if numberOfTasks == 0 {
return 0, errors.New("No tasks to execute!")
}
var results []int
ch := make(chan Response, numberOfTasks)
for _, task := range r.tasks {
go func(task Task) {
res, err := task.Execute(number)
ch <- Response{res, err}
}(task)
}
for i := 0; i < numberOfTasks; i++ {
s := <-ch
if s.err != nil {
return 0, s.err
}
results = append(results, s.res)
}
return r.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
r := new(Reducer)
r.reduce = reduce
r.tasks = tasks[:]
return r
}
type Searcher struct {
errorLimit int
tasks <-chan Task
}
func (s *Searcher) Execute(number int) (int, error) {
var results []int
- errs := int64(0)
+ errs := 0
var wg sync.WaitGroup
+ var mtx sync.Mutex
for task := range s.tasks {
wg.Add(1)
go func(task Task) {
res, err := task.Execute(number)
if err != nil {
- errs = atomic.AddInt64(&errs, 1)
+ mtx.Lock()
+ errs++
+ mtx.Unlock()
} else {
+ mtx.Lock()
results = append(results, res)
+ mtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
- if len(results) == 0 || errs > int64(s.errorLimit) {
+ if len(results) == 0 || errs > s.errorLimit {
return 0, errors.New("Too many errors!")
}
max := results[0]
for _, res := range results {
if res > max {
max = res
}
}
- return int(max), nil
+ return max, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
s := new(Searcher)
s.errorLimit = errorLimit
s.tasks = tasks
return s
}