Решение на Concurrent Tasks от Кирил Костов

Обратно към всички решения

Към профила на Кирил Костов

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"fmt"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type PipelineStruct struct {
tasks []Task
}
func (p PipelineStruct) Execute(firstArg int) (int, error) {
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
var taskArg int = firstArg
var err error = nil
n := len(p.tasks)
for i := 0; i < n; i++ {
taskArg, err = p.tasks[i].Execute(taskArg)
if err != nil {
return taskArg, fmt.Errorf("A task returned an error : %s", err)
}
}
return taskArg, nil
}
func Pipeline(tasks ...Task) Task {
pl := PipelineStruct{tasks: make([]Task, 0)}
for _, task := range tasks {
pl.tasks = append(pl.tasks, task)
}
return pl
}
type FastestStruct struct {
tasks []Task
}
func (f FastestStruct) Execute(firstArg int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
type result struct {
val int
err error
}
retChan := make(chan result, 1)
ftask := func(task Task) result {
retVal, retErr := task.Execute(firstArg)
return result{retVal, retErr}
}
for _, task := range f.tasks {
go func(task Task) {
select {
case retChan <- ftask(task):
default:
}
}(task)
}
retStruct := <-retChan
return retStruct.val, retStruct.err
}
func Fastest(tasks ...Task) Task {
fast := FastestStruct{tasks: make([]Task, 0)}
for _, task := range tasks {
fast.tasks = append(fast.tasks, task)
}
return fast
}
type TimedStruct struct {
task Task
timeout time.Duration
}
func (t TimedStruct) Execute(arg int) (int, error) {
type result struct {
val int
err error
}
ftask := func(task Task) <-chan result {
retChan := make(chan result, 1)
go func() {
retVal, retErr := task.Execute(arg)
retChan <- result{retVal, retErr}
}()
return retChan
}
select {
case result := <-ftask(t.task):
return result.val, result.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("Time out!")
}
}
func Timed(task Task, timeout time.Duration) Task {
timedStruct := TimedStruct{task, timeout}
return timedStruct
}
type ConcurrentMapReducer struct {
reduce func(results []int) int
tasks []Task
}
func (cmr ConcurrentMapReducer) Execute(arg int) (int, error) {
if len(cmr.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
resultChannel := make(chan int)
results := make([]int, 0)
errorChannel := make(chan struct{})
var mut sync.Mutex
go func() {
for _, task := range cmr.tasks {
go func(task Task) {
retVal, retErr := task.Execute(arg)
if retErr != nil {
errorChannel <- struct{}{}
} else {
resultChannel <- retVal
}
}(task)
}
}()
n := len(cmr.tasks)
for i := 0; i < n; i++ {
select {
case <-errorChannel:
return 0, fmt.Errorf("Error")
case val := <-resultChannel:
mut.Lock()
results = append(results, val)
mut.Unlock()
}
}
return cmr.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
cmr := ConcurrentMapReducer{reduce: reduce, tasks: make([]Task, 0)}
for _, task := range tasks {
cmr.tasks = append(cmr.tasks, task)
}
return cmr
}
type GreatestSearcherStruct struct {
errorLimit int
tasks <-chan Task
}
func (gss GreatestSearcherStruct) Execute(arg int) (int, error) {
var errorCnt, greatest int
resultChannel := make(chan int)
errorChannel := make(chan struct{})
var mut sync.Mutex
done := make(chan struct{})
go func() {
var wg sync.WaitGroup
for task := range gss.tasks {
wg.Add(1)
go func(task Task) {
retVal, retErr := task.Execute(arg)
if retErr != nil {
errorChannel <- struct{}{}
} else {
resultChannel <- retVal
}
wg.Done()
}(task)
}
wg.Wait()
done <- struct{}{}
}()
hasSuccessful := false
for {
select {
case <-errorChannel:
errorCnt++
case val := <-resultChannel:
mut.Lock()
hasSuccessful = true
if greatest < val {
greatest = val
}
mut.Unlock()
case <-done:
if !hasSuccessful {
return 0, fmt.Errorf("No successful tasks")
} else {
return greatest, nil
}
}
if errorCnt > gss.errorLimit {
return 0, fmt.Errorf("Error limit reached")
}
}
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
gss := GreatestSearcherStruct{errorLimit: errorLimit, tasks: tasks}
return gss
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-ww085f	0.003s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.003s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.003s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.103s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.203s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.134s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.204s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.003s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.003s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.003s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.048s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.048s
PASS
ok  	_/tmp/d20161129-30451-ww085f	0.123s

История (4 версии и 1 коментар)

Кирил обнови решението на 28.11.2016 23:14 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type PipelineStruct struct {
+ tasks []Task
+}
+
+func (p PipelineStruct) Execute(firstArg int) (int, error) {
+
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute!")
+ }
+ var taskArg int = firstArg
+ var err error = nil
+
+ n := len(p.tasks)
+
+ for i := 0; i < n; i++ {
+ taskArg, err = p.tasks[i].Execute(taskArg)
+ if err != nil {
+ return taskArg, fmt.Errorf("A task returned an error : %s", err)
+ }
+ }
+
+ return taskArg, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ pl := PipelineStruct{tasks: make([]Task, 0)}
+
+ for _, task := range tasks {
+ pl.tasks = append(pl.tasks, task)
+ }
+
+ return pl
+}
+
+type FastestStruct struct {
+ tasks []Task
+}
+
+func (f FastestStruct) Execute(firstArg int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute!")
+ }
+
+ type result struct {
+ val int
+ err error
+ }
+ retChan := make(chan result, 1)
+
+ ftask := func(task Task) result {
+ retVal, retErr := task.Execute(firstArg)
+ return result{retVal, retErr}
+ }
+ for _, task := range f.tasks {
+ go func(task Task) {
+ select {
+ case retChan <- ftask(task):
+ default:
+ }
+ }(task)
+ }
+ retStruct := <-retChan
+ return retStruct.val, retStruct.err
+}
+
+func Fastest(tasks ...Task) Task {
+ fast := FastestStruct{tasks: make([]Task, 0)}
+ for _, task := range tasks {
+ fast.tasks = append(fast.tasks, task)
+ }
+ return fast
+}
+
+type TimedStruct struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t TimedStruct) Execute(arg int) (int, error) {
+ type result struct {
+ val int
+ err error
+ }
+ ftask := func(task Task) <-chan result {
+ retChan := make(chan result, 1)
+ go func() {
+ retVal, retErr := task.Execute(arg)
+ retChan <- result{retVal, retErr}
+ }()
+
+ return retChan
+ }
+
+ select {
+ case result := <-ftask(t.task):
+ return result.val, result.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("Time out!")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ timedStruct := TimedStruct{task, timeout}
+ return timedStruct
+}

Кирил обнови решението на 29.11.2016 00:14 (преди над 1 година)

package main
import (
"fmt"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type PipelineStruct struct {
tasks []Task
}
func (p PipelineStruct) Execute(firstArg int) (int, error) {
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
var taskArg int = firstArg
var err error = nil
n := len(p.tasks)
for i := 0; i < n; i++ {
taskArg, err = p.tasks[i].Execute(taskArg)
if err != nil {
return taskArg, fmt.Errorf("A task returned an error : %s", err)
}
}
return taskArg, nil
}
func Pipeline(tasks ...Task) Task {
pl := PipelineStruct{tasks: make([]Task, 0)}
for _, task := range tasks {
pl.tasks = append(pl.tasks, task)
}
return pl
}
type FastestStruct struct {
tasks []Task
}
func (f FastestStruct) Execute(firstArg int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
type result struct {
val int
err error
}
retChan := make(chan result, 1)
ftask := func(task Task) result {
retVal, retErr := task.Execute(firstArg)
return result{retVal, retErr}
}
for _, task := range f.tasks {
go func(task Task) {
select {
case retChan <- ftask(task):
default:
}
}(task)
}
retStruct := <-retChan
return retStruct.val, retStruct.err
}
func Fastest(tasks ...Task) Task {
fast := FastestStruct{tasks: make([]Task, 0)}
for _, task := range tasks {
fast.tasks = append(fast.tasks, task)
}
return fast
}
type TimedStruct struct {
task Task
timeout time.Duration
}
func (t TimedStruct) Execute(arg int) (int, error) {
type result struct {
val int
err error
}
ftask := func(task Task) <-chan result {
retChan := make(chan result, 1)
go func() {
retVal, retErr := task.Execute(arg)
retChan <- result{retVal, retErr}
}()
return retChan
}
select {
case result := <-ftask(t.task):
return result.val, result.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("Time out!")
}
}
func Timed(task Task, timeout time.Duration) Task {
timedStruct := TimedStruct{task, timeout}
return timedStruct
+}
+
+type ConcurrentMapReducer struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (cmr ConcurrentMapReducer) Execute(int) (int, error) {
+ return 0, fmt.Errorf("Error")
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ cmr := ConcurrentMapReducer{reduce: reduce, tasks: make([]Task, 0)}
+ for _, task := range tasks {
+ cmr.tasks = append(cmr.tasks, task)
+ }
+
+ return cmr
+}
+
+type GreatestSearcherStruct struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (gss GreatestSearcherStruct) Execute(int) (int, error) {
+ return 0, fmt.Errorf("Error")
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ gss := GreatestSearcherStruct{errorLimit: errorLimit, tasks: tasks}
+ return gss
}

Кирил обнови решението на 29.11.2016 01:32 (преди над 1 година)

package main
import (
- "fmt"
- "time"
+ "fmt"
+ "time"
+ "sync"
)
type Task interface {
Execute(int) (int, error)
}
type PipelineStruct struct {
tasks []Task
}
func (p PipelineStruct) Execute(firstArg int) (int, error) {
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
var taskArg int = firstArg
var err error = nil
n := len(p.tasks)
for i := 0; i < n; i++ {
taskArg, err = p.tasks[i].Execute(taskArg)
if err != nil {
return taskArg, fmt.Errorf("A task returned an error : %s", err)
}
}
return taskArg, nil
}
func Pipeline(tasks ...Task) Task {
pl := PipelineStruct{tasks: make([]Task, 0)}
for _, task := range tasks {
pl.tasks = append(pl.tasks, task)
}
return pl
}
type FastestStruct struct {
tasks []Task
}
func (f FastestStruct) Execute(firstArg int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
type result struct {
val int
err error
}
retChan := make(chan result, 1)
ftask := func(task Task) result {
retVal, retErr := task.Execute(firstArg)
return result{retVal, retErr}
}
for _, task := range f.tasks {
go func(task Task) {
select {
case retChan <- ftask(task):
default:
}
}(task)
}
retStruct := <-retChan
return retStruct.val, retStruct.err
}
func Fastest(tasks ...Task) Task {
fast := FastestStruct{tasks: make([]Task, 0)}
for _, task := range tasks {
fast.tasks = append(fast.tasks, task)
}
return fast
}
type TimedStruct struct {
task Task
timeout time.Duration
}
func (t TimedStruct) Execute(arg int) (int, error) {
type result struct {
val int
err error
}
ftask := func(task Task) <-chan result {
retChan := make(chan result, 1)
go func() {
retVal, retErr := task.Execute(arg)
retChan <- result{retVal, retErr}
}()
return retChan
}
select {
case result := <-ftask(t.task):
return result.val, result.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("Time out!")
}
}
func Timed(task Task, timeout time.Duration) Task {
timedStruct := TimedStruct{task, timeout}
return timedStruct
}
type ConcurrentMapReducer struct {
reduce func(results []int) int
tasks []Task
}
-func (cmr ConcurrentMapReducer) Execute(int) (int, error) {
- return 0, fmt.Errorf("Error")
+func (cmr ConcurrentMapReducer) Execute(arg int) (int, error) {
+ if len(cmr.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute!")
+ }
+
+ resultChannel := make(chan int)
+ results := make([]int, 0)
+ errorChannel := make(chan struct{})
+ var mut sync.Mutex
+
+ go func() {
+ for _, task := range cmr.tasks {
+ go func(task Task) {
+ retVal, retErr := task.Execute(arg)
+ if retErr != nil {
+ errorChannel <- struct{}{}
+ } else {
+ resultChannel <- retVal
+ }
+ }(task)
+ }
+
+ }()
+ n := len(cmr.tasks)
+ for i := 0; i < n; i++ {
+ select {
+ case <-errorChannel:
+ return 0, fmt.Errorf("Error")
+ case val := <-resultChannel:
+ mut.Lock()
+ results = append(results, val)
+ mut.Unlock()
+ }
+ }
+
+ return cmr.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
cmr := ConcurrentMapReducer{reduce: reduce, tasks: make([]Task, 0)}
for _, task := range tasks {
cmr.tasks = append(cmr.tasks, task)
}
return cmr
}
type GreatestSearcherStruct struct {
errorLimit int
tasks <-chan Task
}
func (gss GreatestSearcherStruct) Execute(int) (int, error) {
return 0, fmt.Errorf("Error")
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
gss := GreatestSearcherStruct{errorLimit: errorLimit, tasks: tasks}
return gss
}

Кирил обнови решението на 29.11.2016 12:25 (преди над 1 година)

package main
import (
"fmt"
- "time"
"sync"
+ "time"
)
type Task interface {
- Execute(int) (int, error)
+ Execute(int) (int, error)
}
type PipelineStruct struct {
- tasks []Task
+ tasks []Task
}
func (p PipelineStruct) Execute(firstArg int) (int, error) {
- if len(p.tasks) == 0 {
- return 0, fmt.Errorf("No tasks to execute!")
- }
- var taskArg int = firstArg
- var err error = nil
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute!")
+ }
+ var taskArg int = firstArg
+ var err error = nil
- n := len(p.tasks)
+ n := len(p.tasks)
- for i := 0; i < n; i++ {
- taskArg, err = p.tasks[i].Execute(taskArg)
- if err != nil {
- return taskArg, fmt.Errorf("A task returned an error : %s", err)
- }
- }
+ for i := 0; i < n; i++ {
+ taskArg, err = p.tasks[i].Execute(taskArg)
+ if err != nil {
+ return taskArg, fmt.Errorf("A task returned an error : %s", err)
+ }
+ }
- return taskArg, nil
+ return taskArg, nil
}
func Pipeline(tasks ...Task) Task {
- pl := PipelineStruct{tasks: make([]Task, 0)}
+ pl := PipelineStruct{tasks: make([]Task, 0)}
- for _, task := range tasks {
- pl.tasks = append(pl.tasks, task)
- }
+ for _, task := range tasks {
+ pl.tasks = append(pl.tasks, task)
+ }
- return pl
+ return pl
}
type FastestStruct struct {
- tasks []Task
+ tasks []Task
}
func (f FastestStruct) Execute(firstArg int) (int, error) {
- if len(f.tasks) == 0 {
- return 0, fmt.Errorf("No tasks to execute!")
- }
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute!")
+ }
- type result struct {
- val int
- err error
- }
- retChan := make(chan result, 1)
+ type result struct {
+ val int
+ err error
+ }
+ retChan := make(chan result, 1)
- ftask := func(task Task) result {
- retVal, retErr := task.Execute(firstArg)
- return result{retVal, retErr}
- }
- for _, task := range f.tasks {
- go func(task Task) {
- select {
- case retChan <- ftask(task):
- default:
- }
- }(task)
- }
- retStruct := <-retChan
- return retStruct.val, retStruct.err
+ ftask := func(task Task) result {
+ retVal, retErr := task.Execute(firstArg)
+ return result{retVal, retErr}
+ }
+ for _, task := range f.tasks {
+ go func(task Task) {
+ select {
+ case retChan <- ftask(task):
+ default:
+ }
+ }(task)
+ }
+ retStruct := <-retChan
+ return retStruct.val, retStruct.err
}
func Fastest(tasks ...Task) Task {
- fast := FastestStruct{tasks: make([]Task, 0)}
- for _, task := range tasks {
- fast.tasks = append(fast.tasks, task)
- }
- return fast
+ fast := FastestStruct{tasks: make([]Task, 0)}
+ for _, task := range tasks {
+ fast.tasks = append(fast.tasks, task)
+ }
+ return fast
}
type TimedStruct struct {
- task Task
- timeout time.Duration
+ task Task
+ timeout time.Duration
}
func (t TimedStruct) Execute(arg int) (int, error) {
- type result struct {
- val int
- err error
- }
- ftask := func(task Task) <-chan result {
- retChan := make(chan result, 1)
- go func() {
- retVal, retErr := task.Execute(arg)
- retChan <- result{retVal, retErr}
- }()
+ type result struct {
+ val int
+ err error
+ }
+ ftask := func(task Task) <-chan result {
+ retChan := make(chan result, 1)
+ go func() {
+ retVal, retErr := task.Execute(arg)
+ retChan <- result{retVal, retErr}
+ }()
- return retChan
- }
+ return retChan
+ }
- select {
- case result := <-ftask(t.task):
- return result.val, result.err
- case <-time.After(t.timeout):
- return 0, fmt.Errorf("Time out!")
- }
+ select {
+ case result := <-ftask(t.task):
+ return result.val, result.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("Time out!")
+ }
}
func Timed(task Task, timeout time.Duration) Task {
- timedStruct := TimedStruct{task, timeout}
- return timedStruct
+ timedStruct := TimedStruct{task, timeout}
+ return timedStruct
}
type ConcurrentMapReducer struct {
reduce func(results []int) int
tasks []Task
}
func (cmr ConcurrentMapReducer) Execute(arg int) (int, error) {
if len(cmr.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute!")
}
resultChannel := make(chan int)
results := make([]int, 0)
errorChannel := make(chan struct{})
var mut sync.Mutex
go func() {
for _, task := range cmr.tasks {
go func(task Task) {
retVal, retErr := task.Execute(arg)
if retErr != nil {
errorChannel <- struct{}{}
} else {
resultChannel <- retVal
}
}(task)
}
}()
n := len(cmr.tasks)
for i := 0; i < n; i++ {
select {
case <-errorChannel:
return 0, fmt.Errorf("Error")
case val := <-resultChannel:
mut.Lock()
results = append(results, val)
mut.Unlock()
}
}
return cmr.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
cmr := ConcurrentMapReducer{reduce: reduce, tasks: make([]Task, 0)}
for _, task := range tasks {
cmr.tasks = append(cmr.tasks, task)
}
return cmr
}
type GreatestSearcherStruct struct {
errorLimit int
tasks <-chan Task
}
-func (gss GreatestSearcherStruct) Execute(int) (int, error) {
- return 0, fmt.Errorf("Error")
+func (gss GreatestSearcherStruct) Execute(arg int) (int, error) {
+ var errorCnt, greatest int
+ resultChannel := make(chan int)
+ errorChannel := make(chan struct{})
+ var mut sync.Mutex
+ done := make(chan struct{})
+
+ go func() {
+ var wg sync.WaitGroup
+ for task := range gss.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ retVal, retErr := task.Execute(arg)
+ if retErr != nil {
+ errorChannel <- struct{}{}
+ } else {
+ resultChannel <- retVal
+ }
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ done <- struct{}{}
+ }()
+ hasSuccessful := false
+ for {
+ select {
+ case <-errorChannel:
+ errorCnt++
+ case val := <-resultChannel:
+ mut.Lock()
+ hasSuccessful = true
+ if greatest < val {
+ greatest = val
+ }
+ mut.Unlock()
+ case <-done:
+ if !hasSuccessful {
+ return 0, fmt.Errorf("No successful tasks")
+ } else {
+ return greatest, nil
+ }
+ }
+ if errorCnt > gss.errorLimit {
+ return 0, fmt.Errorf("Error limit reached")
+ }
+ }
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
gss := GreatestSearcherStruct{errorLimit: errorLimit, tasks: tasks}
return gss
-}
+}