Решение на Concurrent Tasks от Анатоли Бързев

Обратно към всички решения

Към профила на Анатоли Бързев

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"fmt"
"sync"
"time"
)
type tolib struct {
r int
e error
}
// Task ...
type Task interface {
Execute(int) (int, error)
}
// Pipeline
type tolibPipeline struct {
tasks []Task
}
func (t *tolibPipeline) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
var err error
for _, task := range t.tasks {
if i, err = task.Execute(i); err != nil {
return i, err
}
}
return i, nil
}
func Pipeline(tasks ...Task) Task {
return &tolibPipeline{tasks}
}
// Fastest
type tolibFastest struct {
tasks []Task
}
func (t *tolibFastest) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
ch := make(chan tolib, len(t.tasks))
var wg sync.WaitGroup
for _, task := range t.tasks {
wg.Add(1)
go func(t Task) {
r, e := t.Execute(i)
ch <- tolib{r, e}
wg.Done()
}(task)
}
go func() {
wg.Wait()
close(ch)
}()
res := <-ch
return res.r, res.e
}
func Fastest(tasks ...Task) Task {
return &tolibFastest{tasks}
}
// Timed
type tolibTimed struct {
task Task
timeout time.Duration
}
func (t *tolibTimed) Execute(i int) (int, error) {
ch := make(chan tolib, 1)
res := tolib{i, fmt.Errorf("Timeout")}
go func() {
r, e := t.task.Execute(i)
ch <- tolib{r, e}
close(ch)
}()
select {
case res = <-ch:
case <-time.After(t.timeout):
}
return res.r, res.e
}
func Timed(task Task, timeout time.Duration) Task {
return &tolibTimed{task, timeout}
}
// MapReduce
type tolibMapReduce struct {
tasks []Task
f func([]int) int
}
func (t *tolibMapReduce) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
chDone := make(chan tolib, len(t.tasks))
chErr := make(chan tolib, len(t.tasks))
res := tolib{i, nil}
var wg sync.WaitGroup
for _, task := range t.tasks {
wg.Add(1)
go func(t Task) {
if r, e := t.Execute(i); e != nil {
chErr <- tolib{r, e}
} else {
chDone <- tolib{r, e}
}
wg.Done()
}(task)
}
go func() {
wg.Wait()
close(chDone)
close(chErr)
}()
toreduce := make([]int, len(t.tasks))
for i := 0; i < len(t.tasks); i++ {
select {
case res = <-chDone:
toreduce[i] = res.r
case res = <-chErr:
return res.r, res.e
}
}
return t.f(toreduce), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &tolibMapReduce{tasks, reduce}
}
// GSearch
type tolibGSearch struct {
tasks <-chan Task
errorLimit int
}
func (t *tolibGSearch) Execute(i int) (int, error) {
chDone := make(chan int)
chErr := make(chan struct{})
var wg sync.WaitGroup
for task := range t.tasks{
fmt.Println(task)
wg.Add(1)
go func(t Task) {
res, err := t.Execute(i)
if err != nil {
chErr <- struct{}{}
} else {
chDone <- res
}
wg.Done()
}(task)
}
go func() {
wg.Wait()
close(chErr)
close(chDone)
}()
errors, tasks, max := 0, 0, 0
for {
select {
case res, ok := <-chDone:
if !ok {
if tasks == 0 {
return i, fmt.Errorf("No tasks provided")
}
return max, nil
}
tasks++
if max <= res {
max = res
}
case _, ok := <-chErr:
if ok {
errors++
}
if errors > t.errorLimit {
return i, fmt.Errorf("Error limit exceeded")
}
}
}
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &tolibGSearch{tasks, errorLimit}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-10xpism	0.002s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.003s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.003s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.103s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.203s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.138s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.204s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.005s
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.003s
{-2}
0x475bf0
{1}
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.003s
{4}
{{22} 20}
{127}
{32}
0x475bf0
{5}
0x475bf0
{-1}
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.048s
{4}
{{22} 20}
{127}
{32}
0x475bf0
{5}
0x475bf0
{-1}
0x475bf0
0x475bf0
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.049s
{10}
0x475bf0
{60}
PASS
ok  	_/tmp/d20161129-30451-10xpism	0.123s

История (3 версии и 5 коментара)

Анатоли обнови решението на 29.11.2016 12:58 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "time"
+ "sync"
+)
+
+// Task ...
+type Task interface {
+ Execute(int) (int, error)
+}
+
+// Pipeline
+
+type tolibPipeline struct {
+ tasks []Task
+}
+
+func (t *tolibPipeline) Execute(i int) (int, error) {
+
+ if len(t.tasks) == 0 {
+ return i, fmt.Errorf("No valid tasks provided")
+ }
+
+ var err error
+ for _, task := range t.tasks {
+ if i, err = task.Execute(i); err != nil {
+ return i, err
+ }
+ }
+
+ return i, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ return &tolibPipeline{tasks}
+}
+
+// Fastest
+
+type tolibFastest struct {
+ tasks []Task
+}
+
+func (t *tolibFastest) Execute(i int) (int, error) {
+
+ if len(t.tasks) == 0 {
+ return i, fmt.Errorf("No valid tasks provided")
+ }
+
+ type myerr struct {
+ r int
+ e error
+ }
+
+ ch := make(chan myerr, len(t.tasks))
+ var wg sync.WaitGroup
+
+ for _, task := range t.tasks {
+ wg.Add(1)
+ go func() {
+ r, e := task.Execute(i)
+ ch <- myerr{r, e}
+ wg.Done()
+ }()
+ }
+
+ res := <-ch
+ wg.Wait()

Предполагам разбираш че идеята на Fastest е че искаме резултата от най-бързата задача възможно най бързо а за другите искаме просто да не хабят goroutine-и но не и да ги чакаме за да си получим резултата

Условието казва: трябва да се изпълнят конкурентно и да се върне резултата (или грешката) на тази задача, която завърши първа. Това, според мен, не изисква да се върне веднага резултата на най-бързата, само на тази която е завършила първа.

+ close(ch)
+ return res.r, res.e
+}
+
+func Fastest(tasks ...Task) Task {
+ return &tolibFastest{tasks}
+}
+
+// Timed
+
+type tolibTimed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t *tolibTimed) Execute(i int) (int, error) {
+
+ type myerr struct {
+ r int
+ e error
+ }
+
+ ch := make(chan myerr, 1)
+ res := myerr{i, fmt.Errorf("Timeout")}
+
+ go func() {
+ r, e := t.task.Execute(i)
+ ch <- myerr{r, e}
+ close(ch)
+ }()
+
+ select {
+ case res = <-ch:
+ case <-time.After(t.timeout):
+ }
+
+ return res.r, res.e
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return &tolibTimed{task, timeout}
+}
+
+// MapReduce
+
+type tolibMapReduce struct {
+ tasks []Task
+ f func([]int) int
+}
+
+func (t *tolibMapReduce) Execute(i int) (int, error) {
+
+ if len(t.tasks) == 0 {
+ return i, fmt.Errorf("No valid tasks provided")
+ }
+
+ type myerr struct {
+ r int
+ e error
+ }
+
+ chdone := make(chan myerr, len(t.tasks))
+ cherrs := make(chan myerr, len(t.tasks))
+ res := myerr{i, nil}
+
+ go func() {
+ var wg sync.WaitGroup
+ for _, task := range t.tasks {
+ wg.Add(1)
+ go func() {
+ r, e := task.Execute(i)
+
+ if e != nil {
+ cherrs <- myerr{r, e}
+ } else {
+ chdone <- myerr{r, e}
+ }
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ close(chdone)
+ close(cherrs)
+ }()
+
+ toreduce := make([]int, len(t.tasks))
+
+ for i := 0; i < len(t.tasks); i++ {
+ select {
+ case res = <-chdone:
+ toreduce[i] = res.r
+ case res = <-cherrs:
+ return res.r, res.e
+ }
+ }
+
+ return t.f(toreduce), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return &tolibMapReduce{tasks, reduce}
+}
+
+// GSearch
+
+type tolibGSearch struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (t *tolibGSearch) Execute(i int) (int, error) {
+
+ chres := make(chan int)
+
+ go func() {
+ var wg sync.WaitGroup
+
+ for {
+ task, ok := <-t.tasks
+ if !ok {
+ break
+ }
+
+ wg.Add(1)
+ go func() {
+ res, err := task.Execute(i)
+ if err != nil {
+ fmt.Println("Error", err)
+ cherrs <- struct{}{}
+ } else {
+ chres <- res
+ }
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+ close(cherrs)
+ close(chres)
+ }()
+
+ errors, tasks, max := 0, 0, 0
+ for {
+ select {
+ case res, ok := <-chres:
+ if !ok {
+ if tasks == 0 {
+ return i, fmt.Errorf("No tasks provided")
+ }
+
+ return max, nil
+ }
+
+ tasks++
+ if max <= res {
+ max = res
+ }
+ case _, ok := <-cherrs:
+
+ if ok {
+ errors++
+ }
+
+ if errors > t.errorLimit {
+ return i, fmt.Errorf("Error limit exceeded")
+ }
+ }
+ }
+
+ return 0, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return &tolibGSearch{tasks, errorLimit}
+}

Анатоли обнови решението на 29.11.2016 14:58 (преди над 1 година)

package main
import (
"fmt"
- "time"
"sync"
+ "time"
)
+type tolib struct {
+ r int
+ e error
+}
+
// Task ...
type Task interface {
Execute(int) (int, error)
}
// Pipeline
type tolibPipeline struct {
tasks []Task
}
func (t *tolibPipeline) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
var err error
for _, task := range t.tasks {
if i, err = task.Execute(i); err != nil {
return i, err
}
}
return i, nil
}
func Pipeline(tasks ...Task) Task {
return &tolibPipeline{tasks}
}
// Fastest
type tolibFastest struct {
tasks []Task
}
func (t *tolibFastest) Execute(i int) (int, error) {
-
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
- type myerr struct {
- r int
- e error
- }
-
- ch := make(chan myerr, len(t.tasks))
+ ch := make(chan tolib, len(t.tasks))
var wg sync.WaitGroup
for _, task := range t.tasks {
wg.Add(1)
go func() {
r, e := task.Execute(i)
- ch <- myerr{r, e}
+ ch <- tolib{r, e}
wg.Done()
}()
}
+ go func() {
+ wg.Wait()
+ close(ch)
+ }()
+
res := <-ch
- wg.Wait()
- close(ch)
return res.r, res.e
}
func Fastest(tasks ...Task) Task {
return &tolibFastest{tasks}
}
// Timed
type tolibTimed struct {
task Task
timeout time.Duration
}
func (t *tolibTimed) Execute(i int) (int, error) {
+ ch := make(chan tolib, 1)
+ res := tolib{i, fmt.Errorf("Timeout")}
- type myerr struct {
- r int
- e error
- }
-
- ch := make(chan myerr, 1)
- res := myerr{i, fmt.Errorf("Timeout")}
-
go func() {
r, e := t.task.Execute(i)
- ch <- myerr{r, e}
+ ch <- tolib{r, e}
close(ch)
}()
select {
case res = <-ch:
case <-time.After(t.timeout):
}
return res.r, res.e
}
func Timed(task Task, timeout time.Duration) Task {
return &tolibTimed{task, timeout}
}
// MapReduce
type tolibMapReduce struct {
tasks []Task
f func([]int) int
}
func (t *tolibMapReduce) Execute(i int) (int, error) {
-
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
- type myerr struct {
- r int
- e error
+ chDone := make(chan tolib, len(t.tasks))
+ chErr := make(chan tolib, len(t.tasks))
+ res := tolib{i, nil}
+ var wg sync.WaitGroup
+ for _, task := range t.tasks {
+ wg.Add(1)
+ go func() {
+ if r, e := task.Execute(i); e != nil {
+ chErr <- tolib{r, e}
+ } else {
+ chDone <- tolib{r, e}
+ }
+ wg.Done()
+ }()
}
- chdone := make(chan myerr, len(t.tasks))
- cherrs := make(chan myerr, len(t.tasks))
- res := myerr{i, nil}
-
go func() {
- var wg sync.WaitGroup
- for _, task := range t.tasks {
- wg.Add(1)
- go func() {
- r, e := task.Execute(i)
-
- if e != nil {
- cherrs <- myerr{r, e}
- } else {
- chdone <- myerr{r, e}
- }
- wg.Done()
- }()
- }
wg.Wait()
- close(chdone)
- close(cherrs)
+ close(chDone)
+ close(chErr)
}()
toreduce := make([]int, len(t.tasks))
for i := 0; i < len(t.tasks); i++ {
select {
- case res = <-chdone:
+ case res = <-chDone:
toreduce[i] = res.r
- case res = <-cherrs:
+ case res = <-chErr:
return res.r, res.e
}
}
return t.f(toreduce), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &tolibMapReduce{tasks, reduce}
}
// GSearch
type tolibGSearch struct {
tasks <-chan Task
errorLimit int
}
func (t *tolibGSearch) Execute(i int) (int, error) {
- chres := make(chan int)
- cherrs := make(chan struct{})
+ chDone := make(chan int)
+ chErr := make(chan struct{})
- go func() {
- var wg sync.WaitGroup
+ var wg sync.WaitGroup
- for {
- task, ok := <-t.tasks
-
- if !ok {
- break
+ for task := range t.tasks{
+ fmt.Println(task)
+ wg.Add(1)
+ go func(t Task) {
+ res, err := t.Execute(i)
+ if err != nil {
+ fmt.Println(err)
+ chErr <- struct{}{}
+ } else {
+ fmt.Println(res)
+ chDone <- res
}
+ wg.Done()
+ }(task)
+ }
- wg.Add(1)
- go func() {
- res, err := task.Execute(i)
- if err != nil {
- fmt.Println("Error", err)
- cherrs <- struct{}{}
- } else {
- chres <- res
- }
- wg.Done()
- }()
- }
-
+ go func() {
wg.Wait()
- close(cherrs)
- close(chres)
+ close(chErr)
+ close(chDone)
}()
errors, tasks, max := 0, 0, 0
+
for {
select {
- case res, ok := <-chres:
+ case res, ok := <-chDone:
if !ok {
if tasks == 0 {
return i, fmt.Errorf("No tasks provided")
}
return max, nil
}
tasks++
if max <= res {
max = res
}
- case _, ok := <-cherrs:
-
+ case _, ok := <-chErr:
if ok {
errors++
}
if errors > t.errorLimit {
return i, fmt.Errorf("Error limit exceeded")
}
}
}
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &tolibGSearch{tasks, errorLimit}
-}
+}

Анатоли обнови решението на 29.11.2016 15:04 (преди над 1 година)

package main
import (
"fmt"
"sync"
"time"
)
type tolib struct {
r int
e error
}
// Task ...
type Task interface {
Execute(int) (int, error)
}
// Pipeline
type tolibPipeline struct {
tasks []Task
}
func (t *tolibPipeline) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
var err error
for _, task := range t.tasks {
if i, err = task.Execute(i); err != nil {
return i, err
}
}
return i, nil
}
func Pipeline(tasks ...Task) Task {
return &tolibPipeline{tasks}
}
// Fastest
type tolibFastest struct {
tasks []Task
}
func (t *tolibFastest) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
ch := make(chan tolib, len(t.tasks))
var wg sync.WaitGroup
for _, task := range t.tasks {
wg.Add(1)
- go func() {
- r, e := task.Execute(i)
+ go func(t Task) {
+ r, e := t.Execute(i)
ch <- tolib{r, e}
wg.Done()
- }()
+ }(task)
}
go func() {
wg.Wait()
close(ch)
}()
res := <-ch
return res.r, res.e
}
func Fastest(tasks ...Task) Task {
return &tolibFastest{tasks}
}
// Timed
type tolibTimed struct {
task Task
timeout time.Duration
}
func (t *tolibTimed) Execute(i int) (int, error) {
ch := make(chan tolib, 1)
res := tolib{i, fmt.Errorf("Timeout")}
go func() {
r, e := t.task.Execute(i)
ch <- tolib{r, e}
close(ch)
}()
select {
case res = <-ch:
case <-time.After(t.timeout):
}
return res.r, res.e
}
func Timed(task Task, timeout time.Duration) Task {
return &tolibTimed{task, timeout}
}
// MapReduce
type tolibMapReduce struct {
tasks []Task
f func([]int) int
}
func (t *tolibMapReduce) Execute(i int) (int, error) {
if len(t.tasks) == 0 {
return i, fmt.Errorf("No valid tasks provided")
}
chDone := make(chan tolib, len(t.tasks))
chErr := make(chan tolib, len(t.tasks))
res := tolib{i, nil}
var wg sync.WaitGroup
for _, task := range t.tasks {
wg.Add(1)
- go func() {
- if r, e := task.Execute(i); e != nil {
+ go func(t Task) {
+ if r, e := t.Execute(i); e != nil {
chErr <- tolib{r, e}
} else {
chDone <- tolib{r, e}
}
wg.Done()
- }()
+ }(task)
}
go func() {
wg.Wait()
close(chDone)
close(chErr)
}()
toreduce := make([]int, len(t.tasks))
for i := 0; i < len(t.tasks); i++ {
select {
case res = <-chDone:
toreduce[i] = res.r
case res = <-chErr:
return res.r, res.e
}
}
return t.f(toreduce), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &tolibMapReduce{tasks, reduce}
}
// GSearch
type tolibGSearch struct {
tasks <-chan Task
errorLimit int
}
func (t *tolibGSearch) Execute(i int) (int, error) {
chDone := make(chan int)
chErr := make(chan struct{})
var wg sync.WaitGroup
for task := range t.tasks{
fmt.Println(task)
wg.Add(1)
go func(t Task) {
res, err := t.Execute(i)
if err != nil {
- fmt.Println(err)
chErr <- struct{}{}
} else {
- fmt.Println(res)
chDone <- res
}
wg.Done()
}(task)
}
go func() {
wg.Wait()
close(chErr)
close(chDone)
}()
errors, tasks, max := 0, 0, 0
for {
select {
case res, ok := <-chDone:
if !ok {
if tasks == 0 {
return i, fmt.Errorf("No tasks provided")
}
return max, nil
}
tasks++
if max <= res {
max = res
}
case _, ok := <-chErr:
if ok {
errors++
}
if errors > t.errorLimit {
return i, fmt.Errorf("Error limit exceeded")
}
}
}
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &tolibGSearch{tasks, errorLimit}
-}
+}