Решение на Concurrent Tasks от Валентин Латунов

Обратно към всички решения

Към профила на Валентин Латунов

Резултати

  • 12 точки от тестове
  • 0 бонус точки
  • 12 точки общо
  • 12 успешни тест(а)
  • 1 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type Result struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
type Chain struct {
tasks []Task
}
func (c Chain) Execute(arg int) (_ int, e error) {
if len(c.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
for _, t := range c.tasks {
arg, e = t.Execute(arg)
if e != nil {
return 0, e
}
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
return Chain{tasks}
}
type Concurrent struct {
tasks []Task
fastestResult chan Result
}
func (p Concurrent) Execute(arg int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
once := sync.Once{}
for _, t := range p.tasks {
go func(t Task) {
r, e := t.Execute(arg)
once.Do(func() { p.fastestResult <- Result{r, e} })
}(t)
}
result := <-p.fastestResult
return result.result, result.err
}
func Fastest(tasks ...Task) Task {
return Concurrent{
tasks: tasks,
fastestResult: make(chan Result),
}
}
type TimeIsMoney struct {
task Task
time time.Duration
}
func (t TimeIsMoney) Execute(arg int) (int, error) {
done := make(chan Result)
quit := make(chan struct{})
go func() {
n, e := t.task.Execute(arg)
select {
case done <- Result{n, e}:
case <-quit:
}
}()
select {
case <-time.After(t.time):
go func() {
quit <- struct{}{}
}()
return 0, errors.New("Task did not finish in time.")
case r := <-done:
return r.result, r.err
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimeIsMoney{
task: task,
time: timeout,
}
}
type Reducer struct {
tasks []Task
reduce func(results []int) int
}
func (r Reducer) Execute(arg int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
resultStream := make(chan Result)
canSend := make(chan struct{})
resultSlice := make([]int, 0, len(r.tasks))
for _, t := range r.tasks {
go func(t Task) {
n, e := t.Execute(arg)
if _, ok := <-canSend; ok {
resultStream <- Result{n, e}
}
}(t)
}
for range r.tasks {
canSend <- struct{}{}
res := <-resultStream
if res.err != nil {
close(canSend)
return 0, res.err
}
resultSlice = append(resultSlice, res.result)
}
return r.reduce(resultSlice), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return Reducer{
tasks: tasks,
reduce: reduce,
}
}
type Searcher struct {
tasks <-chan Task
errLimit int
}
func (r Searcher) Execute(arg int) (int, error) {
var (
firstTask bool = true
numberOfErrors int
maxNumber int
wg sync.WaitGroup
resultChannel chan Result = make(chan Result)
yield chan struct{} = make(chan struct{})
)
go func() {
for res := range resultChannel {
if res.err != nil {
numberOfErrors++
}
if maxNumber < res.result || firstTask {
maxNumber = res.result
firstTask = false
}
}
yield <- struct{}{}
}()
for t := range r.tasks {
wg.Add(1)
go func(t Task) {
// wg.Add(1) HERE CAUSES RACE CONDITION!!
n, e := t.Execute(arg)
resultChannel <- Result{n, e}
wg.Done()
}(t)
}
wg.Wait()
close(resultChannel)
<-yield
if firstTask {
return 0, errors.New("No tasks to execute.")
}
if numberOfErrors > r.errLimit {
return 0, errors.New("Maximum number of errors exceeded.")
}
return maxNumber, nil
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return Searcher{
tasks: tasks,
errLimit: errorLimit,
}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.005s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.104s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.203s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.135s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.204s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-7bgrzj	0.048s
PASS
ok  	_/tmp/d20161129-30451-7bgrzj	0.126s

История (4 версии и 5 коментара)

Валентин обнови решението на 25.11.2016 23:34 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Result struct {
+ result int
+ err error
+}
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Chain struct {
+ tasks []Task
+}
+
+func (c Chain) Execute(arg int) (_ int, e error) {
+ if len(c.tasks) == 0 {
+ return 0, errors.New("No tasks to execute.")
+ }
+ for _, t := range c.tasks {
+ arg, e = t.Execute(arg)
+ if e != nil {
+ return 0, e
+ }
+ }
+ return arg, nil
+}
+func Pipeline(tasks ...Task) Task {
+ return Chain{tasks}
+}
+
+type Concurrent struct {
+ tasks []Task
+ fastestResult chan Result
+}
+
+func (p Concurrent) Execute(arg int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks to execute.")
+ }
+ once := sync.Once{}
+ for _, t := range p.tasks {
+ go func(t Task) {
+ r, e := t.Execute(arg)
+ once.Do(func() { p.fastestResult <- Result{r, e} })
+ }(t)
+ }
+ result := <-p.fastestResult
+ return result.result, result.err
+}
+
+func Fastest(tasks ...Task) Task {
+ return Concurrent{
+ tasks: tasks,
+ fastestResult: make(chan Result),
+ }
+}
+
+type TimeIsMoney struct {
+ task Task
+ time time.Duration
+}
+
+func (t TimeIsMoney) Execute(arg int) (int, error) {
+ done := make(chan Result)
+ quit := make(chan struct{})
+ go func() {
+ n, e := t.task.Execute(arg)
+ select {
+ case done <- Result{n, e}:
+ case <-quit:
+ }
+ }()
+ select {
+ case <-time.After(t.time):
+ quit <- struct{}{}
+ return 0, errors.New("Task did not finish in time.")
+ case r := <-done:
+ return r.result, r.err
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return TimeIsMoney{
+ task: task,
+ time: timeout,
+ }
+}
+
+type Reducer struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (r Reducer) Execute(arg int) (int, error) {
+ if len(r.tasks) == 0 {
+ return 0, errors.New("No tasks to execute.")
+ }
+ resultStream := make(chan Result)
+ resultSlice := make([]int, 0, len(r.tasks))
+ for _, t := range r.tasks {
+ go func(t Task) {
+ n, e := t.Execute(arg)
+ defer func() {
+ recover()
+ }()
+ resultStream <- Result{n, e}
+ }(t)
+ }
+ for range r.tasks {
+ res := <-resultStream
+ if res.err != nil {
+ close(resultStream)
+ return 0, res.err
+ }
+ resultSlice = append(resultSlice, res.result)
+ }
+ return r.reduce(resultSlice), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return Reducer{
+ tasks: tasks,
+ reduce: reduce,
+ }
+}
+
+type Searcher struct {
+ tasks <-chan Task
+ errLimit int
+}
+
+func (r Searcher) Execute(arg int) (int, error) {
+ var (
+ firstTask bool = true
+ numberOfErrors int
+ maxNumber int
+ wg sync.WaitGroup
+ resultChannel chan Result = make(chan Result)
+ )
+ go func() {
+ for res := range resultChannel {
+ if res.err != nil {
+ numberOfErrors++
+ }
+ if maxNumber < res.result || firstTask {
+ maxNumber = res.result
+ firstTask = false
+ }
+ }
+ }()
+ for t := range r.tasks {
+ wg.Add(1)
+ go func(t Task) {
+ // wg.Add(1) HERE CAUSES RACE CONDITION!!
+ n, e := t.Execute(arg)
+ resultChannel <- Result{n, e}
+ wg.Done()
+ }(t)
+ }
+ wg.Wait()
+ close(resultChannel)
+ if firstTask {
+ return 0, errors.New("No tasks to execute.")
+ }
+ if numberOfErrors > r.errLimit {
+ return 0, errors.New("Maximum number of errors exceeded.")
+ }
+ return maxNumber, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return Searcher{
+ tasks: tasks,
+ errLimit: errorLimit,
+ }
+}

Валентин обнови решението на 25.11.2016 23:45 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Result struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
type Chain struct {
tasks []Task
}
func (c Chain) Execute(arg int) (_ int, e error) {
if len(c.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
for _, t := range c.tasks {
arg, e = t.Execute(arg)
if e != nil {
return 0, e
}
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
return Chain{tasks}
}
type Concurrent struct {
tasks []Task
fastestResult chan Result
}
func (p Concurrent) Execute(arg int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
once := sync.Once{}
+ start := sync.RWMutex{}
+ start.Lock()
for _, t := range p.tasks {
go func(t Task) {
+ defer start.RUnlock()
+ start.RLock()
r, e := t.Execute(arg)
once.Do(func() { p.fastestResult <- Result{r, e} })
}(t)
}
+ start.Unlock()
result := <-p.fastestResult
return result.result, result.err
}
func Fastest(tasks ...Task) Task {
return Concurrent{
tasks: tasks,
fastestResult: make(chan Result),
}
}
type TimeIsMoney struct {
task Task
time time.Duration
}
func (t TimeIsMoney) Execute(arg int) (int, error) {
done := make(chan Result)
quit := make(chan struct{})
go func() {
n, e := t.task.Execute(arg)
select {
case done <- Result{n, e}:
case <-quit:
}
}()
select {
case <-time.After(t.time):
quit <- struct{}{}
return 0, errors.New("Task did not finish in time.")
case r := <-done:
return r.result, r.err
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimeIsMoney{
task: task,
time: timeout,
}
}
type Reducer struct {
tasks []Task
reduce func(results []int) int
}
func (r Reducer) Execute(arg int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
resultStream := make(chan Result)
resultSlice := make([]int, 0, len(r.tasks))
for _, t := range r.tasks {
go func(t Task) {
n, e := t.Execute(arg)
defer func() {
recover()
}()
resultStream <- Result{n, e}
}(t)
}
for range r.tasks {
res := <-resultStream
if res.err != nil {
close(resultStream)
return 0, res.err
}
resultSlice = append(resultSlice, res.result)
}
return r.reduce(resultSlice), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return Reducer{
tasks: tasks,
reduce: reduce,
}
}
type Searcher struct {
tasks <-chan Task
errLimit int
}
func (r Searcher) Execute(arg int) (int, error) {
var (
firstTask bool = true
numberOfErrors int
maxNumber int
wg sync.WaitGroup
resultChannel chan Result = make(chan Result)
)
go func() {
for res := range resultChannel {
if res.err != nil {
numberOfErrors++
}
if maxNumber < res.result || firstTask {
maxNumber = res.result
firstTask = false
}
}
}()
for t := range r.tasks {
wg.Add(1)
go func(t Task) {
// wg.Add(1) HERE CAUSES RACE CONDITION!!
n, e := t.Execute(arg)
resultChannel <- Result{n, e}
wg.Done()
}(t)
}
wg.Wait()
close(resultChannel)
if firstTask {
return 0, errors.New("No tasks to execute.")
}
if numberOfErrors > r.errLimit {
return 0, errors.New("Maximum number of errors exceeded.")
}
return maxNumber, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return Searcher{
tasks: tasks,
errLimit: errorLimit,
}
}

Валентин обнови решението на 27.11.2016 09:06 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Result struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
type Chain struct {
tasks []Task
}
func (c Chain) Execute(arg int) (_ int, e error) {
if len(c.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
for _, t := range c.tasks {
arg, e = t.Execute(arg)
if e != nil {
return 0, e
}
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
return Chain{tasks}
}
type Concurrent struct {
tasks []Task
fastestResult chan Result
}
func (p Concurrent) Execute(arg int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
once := sync.Once{}
- start := sync.RWMutex{}
- start.Lock()
for _, t := range p.tasks {
go func(t Task) {
- defer start.RUnlock()
- start.RLock()
r, e := t.Execute(arg)
once.Do(func() { p.fastestResult <- Result{r, e} })
}(t)
}
- start.Unlock()
result := <-p.fastestResult
return result.result, result.err
}
func Fastest(tasks ...Task) Task {
return Concurrent{
tasks: tasks,
fastestResult: make(chan Result),
}
}
type TimeIsMoney struct {
task Task
time time.Duration
}
func (t TimeIsMoney) Execute(arg int) (int, error) {
done := make(chan Result)
quit := make(chan struct{})
go func() {
n, e := t.task.Execute(arg)
select {
case done <- Result{n, e}:
case <-quit:
}
}()
select {
case <-time.After(t.time):
- quit <- struct{}{}
+ go func() {
+ quit <- struct{}{}
+ }()
return 0, errors.New("Task did not finish in time.")
case r := <-done:
return r.result, r.err
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimeIsMoney{
task: task,
time: timeout,
}
}
type Reducer struct {
tasks []Task
reduce func(results []int) int
}
func (r Reducer) Execute(arg int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
resultStream := make(chan Result)
+ canSend := make(chan struct{})
resultSlice := make([]int, 0, len(r.tasks))
for _, t := range r.tasks {
go func(t Task) {
n, e := t.Execute(arg)
- defer func() {
- recover()
- }()
- resultStream <- Result{n, e}
+ if _, ok := <-canSend; ok {
+ resultStream <- Result{n, e}
+ }
}(t)
}
for range r.tasks {
+ canSend <- struct{}{}
res := <-resultStream
if res.err != nil {
- close(resultStream)
+ close(canSend)
return 0, res.err
}
resultSlice = append(resultSlice, res.result)
}
return r.reduce(resultSlice), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return Reducer{
tasks: tasks,
reduce: reduce,
}
}
type Searcher struct {
tasks <-chan Task
errLimit int
}
func (r Searcher) Execute(arg int) (int, error) {
var (
firstTask bool = true
numberOfErrors int
maxNumber int
wg sync.WaitGroup
resultChannel chan Result = make(chan Result)
)
go func() {
for res := range resultChannel {
if res.err != nil {
numberOfErrors++
}
if maxNumber < res.result || firstTask {
maxNumber = res.result
firstTask = false
}
}
}()
for t := range r.tasks {
wg.Add(1)
go func(t Task) {
// wg.Add(1) HERE CAUSES RACE CONDITION!!
n, e := t.Execute(arg)
resultChannel <- Result{n, e}
wg.Done()
}(t)
}
wg.Wait()
close(resultChannel)
if firstTask {
return 0, errors.New("No tasks to execute.")
}
if numberOfErrors > r.errLimit {
return 0, errors.New("Maximum number of errors exceeded.")
}
return maxNumber, nil
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return Searcher{
tasks: tasks,
errLimit: errorLimit,
}
}

Валентин обнови решението на 28.11.2016 23:57 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Result struct {
result int
err error
}
type Task interface {
Execute(int) (int, error)
}
type Chain struct {
tasks []Task
}
func (c Chain) Execute(arg int) (_ int, e error) {
if len(c.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
for _, t := range c.tasks {
arg, e = t.Execute(arg)
if e != nil {
return 0, e
}
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
return Chain{tasks}
}
type Concurrent struct {
tasks []Task
fastestResult chan Result
}
func (p Concurrent) Execute(arg int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
once := sync.Once{}
for _, t := range p.tasks {
go func(t Task) {
r, e := t.Execute(arg)
once.Do(func() { p.fastestResult <- Result{r, e} })
}(t)
}
result := <-p.fastestResult
return result.result, result.err
}
func Fastest(tasks ...Task) Task {
return Concurrent{
tasks: tasks,
fastestResult: make(chan Result),
}
}
type TimeIsMoney struct {
task Task
time time.Duration
}
func (t TimeIsMoney) Execute(arg int) (int, error) {
done := make(chan Result)
quit := make(chan struct{})
go func() {
n, e := t.task.Execute(arg)
select {
case done <- Result{n, e}:
case <-quit:
}
}()
select {
case <-time.After(t.time):
go func() {
quit <- struct{}{}
}()
return 0, errors.New("Task did not finish in time.")
case r := <-done:
return r.result, r.err
}
}
func Timed(task Task, timeout time.Duration) Task {
return TimeIsMoney{
task: task,
time: timeout,
}
}
type Reducer struct {
tasks []Task
reduce func(results []int) int
}
func (r Reducer) Execute(arg int) (int, error) {
if len(r.tasks) == 0 {
return 0, errors.New("No tasks to execute.")
}
resultStream := make(chan Result)
canSend := make(chan struct{})
resultSlice := make([]int, 0, len(r.tasks))
for _, t := range r.tasks {
go func(t Task) {
n, e := t.Execute(arg)
if _, ok := <-canSend; ok {
resultStream <- Result{n, e}
}
}(t)
}
for range r.tasks {
canSend <- struct{}{}
res := <-resultStream
if res.err != nil {
close(canSend)
return 0, res.err
}
resultSlice = append(resultSlice, res.result)
}
return r.reduce(resultSlice), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return Reducer{
tasks: tasks,
reduce: reduce,
}
}
type Searcher struct {
tasks <-chan Task
errLimit int
}
func (r Searcher) Execute(arg int) (int, error) {
var (
firstTask bool = true
numberOfErrors int
maxNumber int
wg sync.WaitGroup
resultChannel chan Result = make(chan Result)
+ yield chan struct{} = make(chan struct{})
)
go func() {
for res := range resultChannel {
if res.err != nil {
numberOfErrors++
}
if maxNumber < res.result || firstTask {
maxNumber = res.result
firstTask = false
}
}
+ yield <- struct{}{}
}()
for t := range r.tasks {
wg.Add(1)
go func(t Task) {
// wg.Add(1) HERE CAUSES RACE CONDITION!!
n, e := t.Execute(arg)
resultChannel <- Result{n, e}
wg.Done()
}(t)
}
wg.Wait()
close(resultChannel)
+ <-yield
if firstTask {
return 0, errors.New("No tasks to execute.")
}
if numberOfErrors > r.errLimit {
return 0, errors.New("Maximum number of errors exceeded.")
}
return maxNumber, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return Searcher{
tasks: tasks,
errLimit: errorLimit,
}
}