Решение на Concurrent Tasks от Ралица Великова

Обратно към всички решения

Към профила на Ралица Великова

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
if len(s.tasks) == 0 {
return result, errors.New("No tasks to execute")
}
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.Mutex
}
func (f fastTasks) Execute(initialValue int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("No fast tasks to execute")
}
for _, t := range f.tasks {
go func(task Task) {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.Lock()
currentIsDone := f.isDone
if !currentIsDone {
f.isDone = true
f.fastest <- taskReturnData{currentResult, currentError}
}
f.doneMutex.Unlock()
}(t)
}
fastestTaskResult := <-f.fastest
close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
timeLimitExceeded bool
tleMutex sync.Mutex
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.tleMutex.Lock()
if !t.timeLimitExceeded {
t.resultChannel <- taskReturnData{result, err}
}
t.tleMutex.Unlock()
}()
select {
case timedTaskResult := <-t.resultChannel:
close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
t.tleMutex.Lock()
t.timeLimitExceeded = true
t.tleMutex.Unlock()
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData), false, sync.Mutex{}}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
} else {
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready <- struct{}{}
}
}(i, t)
}
var unexpectedError error
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
}
}
close(r.ready)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.002s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.103s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.209s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.134s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.203s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.003s
--- FAIL: TestGreatestSearcherSimple (0.00s)
	solution_test.go:289: Received result 0 when expecting 2
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1vdwz42	0.003s
--- FAIL: TestGreatestSearcherComplex (0.00s)
	solution_test.go:302: Received result 0 when expecting 42
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1vdwz42	0.003s
--- FAIL: TestGreatestSearcherErrors (0.00s)
    --- FAIL: TestGreatestSearcherErrors/like_the_example (0.00s)
    	solution_test.go:313: Expected error did not occur instead got 0
    --- FAIL: TestGreatestSearcherErrors/close_immediately (0.00s)
    	solution_test.go:322: Expected error did not occur instead got 0
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1vdwz42	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vdwz42	0.083s

История (9 версии и 5 коментара)

Ралица обнови решението на 28.11.2016 18:50 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "fmt"
+)
+
+type taskReturnData struct {
+ result int
+ err error
+}
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+/*______________________________Pipeline______________________________*/
+
+type serialTasks struct {
+ tasks []Task
+}
+
+func (s serialTasks) Execute(initialValue int) (result int, err error) {
+ for i, task := range s.tasks {
+ if i == 0 {
+ result, err = task.Execute(initialValue)
+ } else {
+ result, err = task.Execute(result)
+ }
+ if err != nil {
+ return
+ }
+ }
+
+ return
+}
+
+func Pipeline(tasks ...Task) Task {
+ return serialTasks{tasks}
+}
+
+/*______________________________Fastest______________________________*/
+
+type fastTasks struct {
+ tasks []Task
+
+ fastest chan taskReturnData
+ isDone bool
+ doneMutex sync.RWMutex
+}
+
+func (f fastTasks) Execute(initialValue int) (result int, err error) {
+ if len(f.tasks) == 0 {
+ return result, errors.New("No fast tasks to execute")
+ }
+
+ for _, task := range f.tasks {
+ go func() {
+ currentResult, currentError := task.Execute(initialValue)
+
+ f.doneMutex.RLock()
+ currentIsDone := f.isDone
+ f.doneMutex.RUnlock()
+
+ if !currentIsDone {
+ f.doneMutex.Lock()
+ f.isDone = true
+ f.doneMutex.Unlock()
+
+ f.fastest <- taskReturnData{currentResult, currentError}
+ }
+ }()
+ }
+
+ fastestTaskResult := <-f.fastest
+ return fastestTaskResult.result, fastestTaskResult.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return fastTasks{tasks, make(chan taskReturnData), false, sync.RWMutex{}}
+}
+
+/*_______________________________Timed______________________________*/
+
+type timedTask struct {
+ task Task
+ timeLimit time.Duration
+ resultChannel chan taskReturnData
+}
+
+func (t timedTask) Execute(initialValue int) (int, error) {
+ go func() {
+ result, err := t.task.Execute(initialValue)
+ t.resultChannel <- taskReturnData{result, err}
+ }()
+
+ select {
+ case timedTaskResult := <-t.resultChannel:
+ return timedTaskResult.result, timedTaskResult.err
+ case <-time.After(t.timeLimit):
+ return 0, errors.New("Time limit exceeded")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timedTask{task, timeout, make(chan taskReturnData)}
+}
+
+/*________________________ConcurrentMapReduce______________________*/
+
+type reducableTasks struct {
+ tasks []Task
+
+ results []int
+ resultsMutex sync.Mutex
+
+ ready sync.WaitGroup
+
+ reduce func(results []int) int
+}
+
+func (r reducableTasks) Execute(initialValue int) (int, error) {
+ if len(r.tasks) == 0 {
+ return 0, errors.New("No tasks to execute")
+ }
+
+ for i, t := range r.tasks {
+ r.ready.Add(1)
+
+ go func(index int, task Task) {
+ currentResult, currentError := task.Execute(initialValue)
+ if currentError != nil {
+ fmt.Println("Noooo") /// todo - return error when single task fails
+ }
+
+ r.resultsMutex.Lock()
+ r.results = append(r.results, currentResult)
+ r.resultsMutex.Unlock()
+
+ r.ready.Done()
+ }(i, t)
+ }
+
+ r.ready.Wait()
+
+ return r.reduce(r.results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return reducableTasks{tasks, make([]int, 0, 2), sync.Mutex{}, sync.WaitGroup{}, reduce}
+}
+
+/*__________________________GreatestSearcher________________________*/
+
+type dummy struct{}
+
+func (d dummy) Execute(n int) (int, error) {
+ return 0, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return dummy{}
+}

Ралица обнови решението на 28.11.2016 20:45 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
"fmt"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.RWMutex
}
func (f fastTasks) Execute(initialValue int) (result int, err error) {
if len(f.tasks) == 0 {
return result, errors.New("No fast tasks to execute")
}
for _, task := range f.tasks {
go func() {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.RLock()
currentIsDone := f.isDone
f.doneMutex.RUnlock()
if !currentIsDone {
f.doneMutex.Lock()
f.isDone = true
f.doneMutex.Unlock()
f.fastest <- taskReturnData{currentResult, currentError}
}
}()
}
fastestTaskResult := <-f.fastest
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.RWMutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
-type reducableTasks struct {
+type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
ready sync.WaitGroup
reduce func(results []int) int
}
-func (r reducableTasks) Execute(initialValue int) (int, error) {
+func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
r.ready.Add(1)
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
fmt.Println("Noooo") /// todo - return error when single task fails
}
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready.Done()
}(i, t)
}
r.ready.Wait()
return r.reduce(r.results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
- return reducableTasks{tasks, make([]int, 0, 2), sync.Mutex{}, sync.WaitGroup{}, reduce}
+ return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, sync.WaitGroup{}, reduce}
}
/*__________________________GreatestSearcher________________________*/
type dummy struct{}
func (d dummy) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return dummy{}
}

Ралица обнови решението на 28.11.2016 21:37 (преди над 1 година)

package main
import (
"errors"
"sync"
- "time"
+ "time")
- "fmt"
-)
-
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.RWMutex
}
func (f fastTasks) Execute(initialValue int) (result int, err error) {
if len(f.tasks) == 0 {
return result, errors.New("No fast tasks to execute")
}
for _, task := range f.tasks {
go func() {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.RLock()
currentIsDone := f.isDone
f.doneMutex.RUnlock()
if !currentIsDone {
f.doneMutex.Lock()
f.isDone = true
f.doneMutex.Unlock()
f.fastest <- taskReturnData{currentResult, currentError}
}
}()
}
fastestTaskResult := <-f.fastest
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.RWMutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
- ready sync.WaitGroup
+ errorChannel chan error
+ ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
- r.ready.Add(1)
go func(index int, task Task) {
+
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
- fmt.Println("Noooo") /// todo - return error when single task fails
+ r.errorChannel <- currentError
}
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
-
- r.ready.Done()
+ r.ready <- struct{}{}
}(i, t)
}
- r.ready.Wait()
+ var unexpectedError error
- return r.reduce(r.results), nil
+GuardingLoop:
+ for range r.tasks {
+ select {
+ case unexpectedError = <-r.errorChannel:
+ return 0, unexpectedError
+ case <-r.ready:
+ continue GuardingLoop
+ }
+ }
+
+ return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
- return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, sync.WaitGroup{}, reduce}
+ return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
-type dummy struct{}
+type greedyConcurrentTasks struct{}
-func (d dummy) Execute(n int) (int, error) {
+func (g greedyConcurrentTasks) Execute(n int) (int, error) {
+
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
- return dummy{}
+ return greedyConcurrentTasks{}
}

Ралица обнови решението на 28.11.2016 23:14 (преди над 1 година)

package main
import (
"errors"
"sync"
- "time")
+ "time"
+)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
+ if len(s.tasks) == 0 {
+ return result, errors.New("No tasks to execute")
+ }
+
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
- doneMutex sync.RWMutex
+ doneMutex sync.Mutex
}
func (f fastTasks) Execute(initialValue int) (result int, err error) {
if len(f.tasks) == 0 {
return result, errors.New("No fast tasks to execute")
}
for _, task := range f.tasks {
go func() {
currentResult, currentError := task.Execute(initialValue)
- f.doneMutex.RLock()
+ f.doneMutex.Lock()
currentIsDone := f.isDone
- f.doneMutex.RUnlock()
-
if !currentIsDone {
- f.doneMutex.Lock()
f.isDone = true
- f.doneMutex.Unlock()
-
f.fastest <- taskReturnData{currentResult, currentError}
}
+ f.doneMutex.Unlock()
}()
}
fastestTaskResult := <-f.fastest
+ close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
- return fastTasks{tasks, make(chan taskReturnData), false, sync.RWMutex{}}
+ return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
+ close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
}
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready <- struct{}{}
}(i, t)
}
var unexpectedError error
GuardingLoop:
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
+ close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
continue GuardingLoop
}
}
+ close(r.errorChannel)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
-}
+}

Ралица обнови решението на 28.11.2016 23:15 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
if len(s.tasks) == 0 {
return result, errors.New("No tasks to execute")
}
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.Mutex
}
func (f fastTasks) Execute(initialValue int) (result int, err error) {
if len(f.tasks) == 0 {
return result, errors.New("No fast tasks to execute")
}
for _, task := range f.tasks {
go func() {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.Lock()
currentIsDone := f.isDone
if !currentIsDone {
f.isDone = true
f.fastest <- taskReturnData{currentResult, currentError}
}
f.doneMutex.Unlock()
}()
}
fastestTaskResult := <-f.fastest
close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
}
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready <- struct{}{}
}(i, t)
}
var unexpectedError error
GuardingLoop:
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
continue GuardingLoop
}
}
- close(r.errorChannel)
+ close(r.ready)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
}

Ралица обнови решението на 28.11.2016 23:18 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
if len(s.tasks) == 0 {
return result, errors.New("No tasks to execute")
}
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.Mutex
}
func (f fastTasks) Execute(initialValue int) (result int, err error) {
if len(f.tasks) == 0 {
return result, errors.New("No fast tasks to execute")
}
for _, task := range f.tasks {
go func() {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.Lock()
currentIsDone := f.isDone
if !currentIsDone {
f.isDone = true
f.fastest <- taskReturnData{currentResult, currentError}
}
f.doneMutex.Unlock()
}()
}
fastestTaskResult := <-f.fastest
close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
}
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready <- struct{}{}
}(i, t)
}
var unexpectedError error
-GuardingLoop:
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
- continue GuardingLoop
}
}
close(r.ready)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
}

Ралица обнови решението на 29.11.2016 01:36 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
if len(s.tasks) == 0 {
return result, errors.New("No tasks to execute")
}
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.Mutex
}
func (f fastTasks) Execute(initialValue int) (result int, err error) {
if len(f.tasks) == 0 {
return result, errors.New("No fast tasks to execute")
}
for _, task := range f.tasks {
go func() {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.Lock()
currentIsDone := f.isDone
if !currentIsDone {
f.isDone = true
f.fastest <- taskReturnData{currentResult, currentError}
}
f.doneMutex.Unlock()
}()
}
fastestTaskResult := <-f.fastest
close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
+ } else {
+ r.resultsMutex.Lock()
+ r.results = append(r.results, currentResult)
+ r.resultsMutex.Unlock()
+ r.ready <- struct{}{}
}
-
- r.resultsMutex.Lock()
- r.results = append(r.results, currentResult)
- r.resultsMutex.Unlock()
- r.ready <- struct{}{}
}(i, t)
}
var unexpectedError error
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
}
}
close(r.ready)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
-}
+}

Ралица обнови решението на 29.11.2016 11:20 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
if len(s.tasks) == 0 {
return result, errors.New("No tasks to execute")
}
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.Mutex
}
-func (f fastTasks) Execute(initialValue int) (result int, err error) {
+func (f fastTasks) Execute(initialValue int) (int, error) {
if len(f.tasks) == 0 {
- return result, errors.New("No fast tasks to execute")
+ return 0, errors.New("No fast tasks to execute")
}
- for _, task := range f.tasks {
- go func() {
+ for _, t := range f.tasks {
+ go func(task Task) {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.Lock()
currentIsDone := f.isDone
if !currentIsDone {
f.isDone = true
f.fastest <- taskReturnData{currentResult, currentError}
}
f.doneMutex.Unlock()
- }()
+ }(t)
}
fastestTaskResult := <-f.fastest
close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
t.resultChannel <- taskReturnData{result, err}
}()
select {
case timedTaskResult := <-t.resultChannel:
close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
return timedTask{task, timeout, make(chan taskReturnData)}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
} else {
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready <- struct{}{}
}
}(i, t)
}
var unexpectedError error
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
}
}
close(r.ready)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
-}
+}

Ралица обнови решението на 29.11.2016 15:26 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type taskReturnData struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
/*______________________________Pipeline______________________________*/
type serialTasks struct {
tasks []Task
}
func (s serialTasks) Execute(initialValue int) (result int, err error) {
if len(s.tasks) == 0 {
return result, errors.New("No tasks to execute")
}
for i, task := range s.tasks {
if i == 0 {
result, err = task.Execute(initialValue)
} else {
result, err = task.Execute(result)
}
if err != nil {
return
}
}
return
}
func Pipeline(tasks ...Task) Task {
return serialTasks{tasks}
}
/*______________________________Fastest______________________________*/
type fastTasks struct {
tasks []Task
fastest chan taskReturnData
isDone bool
doneMutex sync.Mutex
}
func (f fastTasks) Execute(initialValue int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("No fast tasks to execute")
}
for _, t := range f.tasks {
go func(task Task) {
currentResult, currentError := task.Execute(initialValue)
f.doneMutex.Lock()
currentIsDone := f.isDone
if !currentIsDone {
f.isDone = true
f.fastest <- taskReturnData{currentResult, currentError}
}
f.doneMutex.Unlock()
}(t)
}
fastestTaskResult := <-f.fastest
close(f.fastest)
return fastestTaskResult.result, fastestTaskResult.err
}
func Fastest(tasks ...Task) Task {
return fastTasks{tasks, make(chan taskReturnData), false, sync.Mutex{}}
}
/*_______________________________Timed______________________________*/
type timedTask struct {
task Task
timeLimit time.Duration
resultChannel chan taskReturnData
+
+ timeLimitExceeded bool
+ tleMutex sync.Mutex
}
func (t timedTask) Execute(initialValue int) (int, error) {
go func() {
result, err := t.task.Execute(initialValue)
- t.resultChannel <- taskReturnData{result, err}
+ t.tleMutex.Lock()
+ if !t.timeLimitExceeded {
+ t.resultChannel <- taskReturnData{result, err}
+ }
+ t.tleMutex.Unlock()
}()
select {
case timedTaskResult := <-t.resultChannel:
close(t.resultChannel)
return timedTaskResult.result, timedTaskResult.err
case <-time.After(t.timeLimit):
+ t.tleMutex.Lock()
+ t.timeLimitExceeded = true
+ t.tleMutex.Unlock()
return 0, errors.New("Time limit exceeded")
}
}
func Timed(task Task, timeout time.Duration) Task {
- return timedTask{task, timeout, make(chan taskReturnData)}
+ return timedTask{task, timeout, make(chan taskReturnData), false, sync.Mutex{}}
}
/*________________________ConcurrentMapReduce______________________*/
type reducibleTasks struct {
tasks []Task
results []int
resultsMutex sync.Mutex
errorChannel chan error
ready chan struct{}
reduce func(results []int) int
}
func (r reducibleTasks) Execute(initialValue int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute")
}
for i, t := range r.tasks {
go func(index int, task Task) {
currentResult, currentError := task.Execute(initialValue)
if currentError != nil {
r.errorChannel <- currentError
} else {
r.resultsMutex.Lock()
r.results = append(r.results, currentResult)
r.resultsMutex.Unlock()
r.ready <- struct{}{}
}
}(i, t)
}
var unexpectedError error
for range r.tasks {
select {
case unexpectedError = <-r.errorChannel:
close(r.errorChannel)
return 0, unexpectedError
case <-r.ready:
}
}
close(r.ready)
return r.reduce(r.results), unexpectedError
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return reducibleTasks{tasks, make([]int, 0, 2), sync.Mutex{}, make(chan error), make(chan struct{}), reduce}
}
/*__________________________GreatestSearcher________________________*/
type greedyConcurrentTasks struct{}
func (g greedyConcurrentTasks) Execute(n int) (int, error) {
return 0, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greedyConcurrentTasks{}
-}
+}