Решение на Concurrent Tasks от Александър Александров

Обратно към всички решения

Към профила на Александър Александров

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type pipelineTask struct {
tasks []Task
}
func (pt *pipelineTask) Execute(passed int) (int, error) {
tasks := pt.tasks
// check the tasks slice
if tasks == nil || len(tasks) == 0 {
return 0, errors.New("Received nil or zero arguments for pipeline.")
}
var err error = nil
// for each task pass the answer from the last and check for an error
for _, task := range tasks {
passed, err = task.Execute(passed)
if err != nil {
return 0, err
}
}
return passed, nil
}
func Pipeline(tasks ...Task) Task {
var pt Task = &pipelineTask{tasks: tasks}
return pt
}
type fastestTask struct {
tasks []Task
}
// create a struct to be used in the channel of results
type result struct {
answer int
err error
}
func (ft *fastestTask) Execute(passed int) (int, error) {
tasks := ft.tasks
// check the tasks slice
if tasks == nil || len(tasks) == 0 {
return 0, errors.New("Received nil or zero arguments for fastest task.")
}
// make a channel of results for the tasks
resultCh := make(chan result, len(tasks))
for _, task := range tasks {
// execute each task concurrently and write the result to the channel
go func(task Task) {
answer, err := task.Execute(passed)
res := result{answer: answer, err: err}
resultCh <- res
}(task)
}
// get the first finished result
firstRes := <-resultCh
//spawn a routine to get remaining result and close the channel
go func() {
for i := 0; i < len(tasks)-1; i++ {
<-resultCh
}
close(resultCh)
}()
return firstRes.answer, firstRes.err
}
func Fastest(tasks ...Task) Task {
var ft Task = &fastestTask{tasks: tasks}
return ft
}
type timedTask struct {
task Task
timeout time.Duration
}
func (tt *timedTask) Execute(passed int) (int, error) {
// make a channel for the task execution
resultCh := make(chan result, 1)
// execute the task concurrently and pass the result to the channel
go func(task Task) {
answer, err := task.Execute(passed)
resultCh <- result{answer: answer, err: err}
}(tt.task)
// chose the first result
select {
case res := <-resultCh:
// close the channel after use
close(resultCh)
return res.answer, res.err
case <-time.After(tt.timeout):
// spwan a routine to read from the channel and close it after that
go func() {
<-resultCh
close(resultCh)
}()
return 0, errors.New("Task did not finish in required timeframe.")
}
}
func Timed(task Task, timeout time.Duration) Task {
var tt Task = &timedTask{task: task, timeout: timeout}
return tt
}
type cmrTask struct {
reduce func([]int) int
tasks []Task
}
func (t *cmrTask) Execute(passed int) (int, error) {
tasks := t.tasks
// check the tasks slice
if tasks == nil || len(tasks) == 0 {
return 0, errors.New("Received nil or zero arguments for concurrent map reduce.")
}
// channel for the errors
errCh := make(chan error, len(tasks))
// channel for the results
resultCh := make(chan int, len(tasks))
// channel for synchronisation
syncCh := make(chan struct{}, 1)
// spawn a routine that executes all tasks concurrently and waits
// for them to finish
go func(tasks []Task) {
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
// execute each task concurrently
go func(task Task) {
answer, err := task.Execute(passed)
// if there was an error add it to the errors channel
if err != nil {
errCh <- err
} else {
resultCh <- answer
}
wg.Done()
}(task)
}
wg.Wait()
close(resultCh)
// when all tasks are finished add to the synch channel
syncCh <- struct{}{}
close(syncCh)
}(tasks)
// wait for all tasks to finish or there is an error
select {
case <-syncCh:
results := make([]int, 0)
for {
res, ok := <-resultCh
if !ok {
break
}
results = append(results, res)
}
return t.reduce(results), nil
case err := <-errCh:
return 0, err
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
var cmpt Task = &cmrTask{tasks: tasks, reduce: reduce}
return cmpt
}
type gsTask struct {
tasks <-chan Task
errorLimit int
}
func (t *gsTask) Execute(passed int) (int, error) {
results := make([]int, 0)
tasks := t.tasks
errorLimit := 0
// variables used for synchronisation
var mutext sync.Mutex
var wg sync.WaitGroup
// while the channel is open execute each task concurrently
for {
task, ok := <-tasks
if !ok {
break
}
wg.Add(1)
go func(task Task) {
answer, err := task.Execute(passed)
// depending on the result use the mutex to increment one of the two
if err != nil {
mutext.Lock()
errorLimit++
mutext.Unlock()
} else {
mutext.Lock()
results = append(results, answer)
mutext.Unlock()
}
wg.Done()
}(task)
}
// wait for all tasks to finish
wg.Wait()
if errorLimit > t.errorLimit {
return 0, errors.New("Error limit exceeded.")
}
if len(results) == 0 {
return 0, errors.New("No results were processed.")
}
biggest := results[0]
for i := 1; i < len(results); i++ {
if biggest < results[i] {
biggest = results[i]
}
}
return biggest, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var gst Task = &gsTask{tasks: tasks, errorLimit: errorLimit}
return gst
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.003s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.003s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.003s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.103s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.203s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.134s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.203s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.003s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.003s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.003s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.048s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.048s
PASS
ok  	_/tmp/d20161129-30451-1jra1ws	0.123s

История (1 версия и 0 коментара)

Александър обнови решението на 27.11.2016 13:09 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelineTask struct {
+ tasks []Task
+}
+
+func (pt *pipelineTask) Execute(passed int) (int, error) {
+ tasks := pt.tasks
+ // check the tasks slice
+ if tasks == nil || len(tasks) == 0 {
+ return 0, errors.New("Received nil or zero arguments for pipeline.")
+ }
+ var err error = nil
+ // for each task pass the answer from the last and check for an error
+ for _, task := range tasks {
+ passed, err = task.Execute(passed)
+ if err != nil {
+ return 0, err
+ }
+ }
+ return passed, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ var pt Task = &pipelineTask{tasks: tasks}
+ return pt
+}
+
+type fastestTask struct {
+ tasks []Task
+}
+
+// create a struct to be used in the channel of results
+type result struct {
+ answer int
+ err error
+}
+
+func (ft *fastestTask) Execute(passed int) (int, error) {
+ tasks := ft.tasks
+ // check the tasks slice
+ if tasks == nil || len(tasks) == 0 {
+ return 0, errors.New("Received nil or zero arguments for fastest task.")
+ }
+
+ // make a channel of results for the tasks
+ resultCh := make(chan result, len(tasks))
+ for _, task := range tasks {
+ // execute each task concurrently and write the result to the channel
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ res := result{answer: answer, err: err}
+ resultCh <- res
+ }(task)
+ }
+
+ // get the first finished result
+ firstRes := <-resultCh
+
+ //spawn a routine to get remaining result and close the channel
+ go func() {
+ for i := 0; i < len(tasks)-1; i++ {
+ <-resultCh
+ }
+ close(resultCh)
+ }()
+
+ return firstRes.answer, firstRes.err
+
+}
+
+func Fastest(tasks ...Task) Task {
+ var ft Task = &fastestTask{tasks: tasks}
+ return ft
+}
+
+type timedTask struct {
+ task Task
+ timeout time.Duration
+}
+
+func (tt *timedTask) Execute(passed int) (int, error) {
+ // make a channel for the task execution
+ resultCh := make(chan result, 1)
+ // execute the task concurrently and pass the result to the channel
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ resultCh <- result{answer: answer, err: err}
+ }(tt.task)
+ // chose the first result
+ select {
+ case res := <-resultCh:
+ // close the channel after use
+ close(resultCh)
+ return res.answer, res.err
+ case <-time.After(tt.timeout):
+ // spwan a routine to read from the channel and close it after that
+ go func() {
+ <-resultCh
+ close(resultCh)
+ }()
+ return 0, errors.New("Task did not finish in required timeframe.")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ var tt Task = &timedTask{task: task, timeout: timeout}
+ return tt
+}
+
+type cmrTask struct {
+ reduce func([]int) int
+ tasks []Task
+}
+
+func (t *cmrTask) Execute(passed int) (int, error) {
+ tasks := t.tasks
+ // check the tasks slice
+ if tasks == nil || len(tasks) == 0 {
+ return 0, errors.New("Received nil or zero arguments for concurrent map reduce.")
+ }
+
+ // channel for the errors
+ errCh := make(chan error, len(tasks))
+ // channel for the results
+ resultCh := make(chan int, len(tasks))
+ // channel for synchronisation
+ syncCh := make(chan struct{}, 1)
+
+ // spawn a routine that executes all tasks concurrently and waits
+ // for them to finish
+ go func(tasks []Task) {
+ var wg sync.WaitGroup
+ for _, task := range tasks {
+ wg.Add(1)
+ // execute each task concurrently
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ // if there was an error add it to the errors channel
+ if err != nil {
+ errCh <- err
+ } else {
+ resultCh <- answer
+ }
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ close(resultCh)
+ // when all tasks are finished add to the synch channel
+ syncCh <- struct{}{}
+ close(syncCh)
+ }(tasks)
+
+ // wait for all tasks to finish or there is an error
+ select {
+ case <-syncCh:
+ results := make([]int, 0)
+ for {
+ res, ok := <-resultCh
+ if !ok {
+ break
+ }
+ results = append(results, res)
+ }
+ return t.reduce(results), nil
+ case err := <-errCh:
+ return 0, err
+ }
+
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ var cmpt Task = &cmrTask{tasks: tasks, reduce: reduce}
+ return cmpt
+}
+
+type gsTask struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (t *gsTask) Execute(passed int) (int, error) {
+ results := make([]int, 0)
+ tasks := t.tasks
+ errorLimit := 0
+ // variables used for synchronisation
+ var mutext sync.Mutex
+ var wg sync.WaitGroup
+
+ // while the channel is open execute each task concurrently
+ for {
+ task, ok := <-tasks
+ if !ok {
+ break
+ }
+
+ wg.Add(1)
+ go func(task Task) {
+ answer, err := task.Execute(passed)
+ // depending on the result use the mutex to increment one of the two
+ if err != nil {
+ mutext.Lock()
+ errorLimit++
+ mutext.Unlock()
+ } else {
+ mutext.Lock()
+ results = append(results, answer)
+ mutext.Unlock()
+ }
+ wg.Done()
+ }(task)
+
+ }
+
+ // wait for all tasks to finish
+ wg.Wait()
+
+ if errorLimit > t.errorLimit {
+ return 0, errors.New("Error limit exceeded.")
+ }
+
+ if len(results) == 0 {
+ return 0, errors.New("No results were processed.")
+ }
+
+ biggest := results[0]
+ for i := 1; i < len(results); i++ {
+ if biggest < results[i] {
+ biggest = results[i]
+ }
+ }
+
+ return biggest, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ var gst Task = &gsTask{tasks: tasks, errorLimit: errorLimit}
+ return gst
+}