Решение на Concurrent Tasks от Диан Тодоров

Обратно към всички решения

Към профила на Диан Тодоров

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
// Task ...
type Task interface {
Execute(int) (int, error)
}
type pipelineManager struct {
tasks []Task
}
func (pe pipelineManager) Execute(x int) (int, error) {
if len(pe.tasks) == 0 {
return 0, errors.New("No tasks")
}
res := x
for _, t := range pe.tasks {
r, err := t.Execute(res)
res = r
if err != nil {
return 0, err
}
}
return res, nil
}
// Pipeline ...
func Pipeline(tasks ...Task) Task {
return pipelineManager{tasks: tasks}
}
// ======================================
type fastest struct {
tasks []Task
}
type result struct {
data int
err error
}
func (f fastest) Execute(x int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range f.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
finalResult := <-ch
return finalResult.data, finalResult.err
}
// Fastest ..
func Fastest(tasks ...Task) Task {
return fastest{tasks}
}
// ======================================
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(x int) (int, error) {
ch := make(chan result)
go func() {
res, err := t.task.Execute(x)
ch <- result{res, err}
}()
select {
case res := <-ch:
return res.data, res.err
case <-time.After(t.timeout):
return 0, errors.New("main: timed out on Execute")
}
}
// Timed ...
func Timed(task Task, timeout time.Duration) Task {
return timed{task: task, timeout: timeout}
}
// ======================================
type concurrentMapReduce struct {
reduce func(results []int) int
tasks []Task
}
func (c concurrentMapReduce) Execute(x int) (int, error) {
if len(c.tasks) == 0 {
return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range c.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
results := make([]int, 0)
for i := 0; i < len(c.tasks); i++ {
r := <-ch
if r.err != nil {
return 0, r.err
}
results = append(results, r.data)
}
return c.reduce(results), nil
}
// ConcurrentMapReduce ...
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return concurrentMapReduce{reduce: reduce, tasks: tasks}
}
// ======================================
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
// Execute is hedious.
func (g greatestSearcher) Execute(x int) (int, error) {
ch := make(chan result)
finish := make(chan struct{})
final := make(chan result)
go func() {
results := make([]result, 0)
for {
select {
case res := <-ch:
results = append(results, res)
case <-finish:
close(ch)
errorCount := 0
finalResult := result{}
if len(results) == 0 {
finalResult = result{0, errors.New("No Tasks")}
final <- finalResult
}
max := results[0].data
for _, r := range results[1:] {
if r.data > max {
max = r.data
}
if r.err != nil {
errorCount++
}
}
if errorCount > g.errorLimit {
finalResult = result{0, errors.New("ErrorLimit reached")}
} else {
finalResult = result{max, nil}
}
final <- finalResult
}
}
}()
var wg sync.WaitGroup
for task := range g.tasks {
wg.Add(1)
go func(t Task) {
res, err := t.Execute(x)
ch <- result{res, err}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
y := <-final
return y.data, y.err
}
// GreatestSearcher ...
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
}

Лог от изпълнението

▸ Покажи лога

История (2 версии и 2 коментара)

Диан обнови решението на 29.11.2016 09:36 (преди над 1 година)

▸ Покажи разликите
+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+// Task ...
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelineManager struct {
+ tasks []Task
+}
+
+func (pe pipelineManager) Execute(x int) (int, error) {
+ if len(pe.tasks) == 0 {
+ return 0, nil
+ }
+ res := x
+ for _, t := range pe.tasks {
+ r, err := t.Execute(res)
+ res = r
+ if err != nil {
+ return 0, err
+ }
+ }
+ return res, nil
+}
+
+// Pipeline ...
+func Pipeline(tasks ...Task) Task {
+ return pipelineManager{tasks: tasks}
+}
+
+// ======================================
+type fastest struct {
+ tasks []Task
+}
+
+type result struct {
+ data int
+ err error
+}
+
+func (f fastest) Execute(x int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, nil
+ }
+ ch := make(chan result, 1)
+ for _, task := range f.tasks {
+ go func(t Task) {
+ res, err := t.Execute(x)
+ select {
+ case ch <- result{res, err}:
+ default:
+ }
+ }(task)
+ }
+ finalResult := <-ch
+ return finalResult.data, finalResult.err
+}
+
+// Fastest ..
+func Fastest(tasks ...Task) Task {
+ return fastest{tasks}
+}
+
+// ======================================
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timed) Execute(x int) (int, error) {
+ ch := make(chan result)
+ go func() {
+ res, err := t.task.Execute(x)
+ ch <- result{res, err}
+ }()
+ select {
+ case res := <-ch:
+ return res.data, res.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("main: timed out on Execute")
+ }
+}
+
+// Timed ...
+func Timed(task Task, timeout time.Duration) Task {
+ return timed{task: task, timeout: timeout}
+}
+
+// ======================================
+type concurrentMapReduce struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (c concurrentMapReduce) Execute(x int) (int, error) {
+ if len(c.tasks) == 0 {
+ return 0, nil
+ }
+ ch := make(chan result, 1)
+ for _, task := range c.tasks {
+ go func(t Task) {
+ res, err := t.Execute(x)
+ select {
+ case ch <- result{res, err}:
+ default:
+ }
+ }(task)
+ }
+ results := make([]int, 0)
+ for i := 0; i < len(c.tasks); i++ {
+ r := <-ch
+ if r.err != nil {
+ return 0, r.err
+ }
+ results = append(results, r.data)
+ }
+ return c.reduce(results), nil
+}
+
+// ConcurrentMapReduce ...
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return concurrentMapReduce{reduce: reduce, tasks: tasks}
+}
+
+// ======================================
+type greatestSearcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+// Execute is hedious.
+func (g greatestSearcher) Execute(x int) (int, error) {
+ ch := make(chan result)
+ finish := make(chan struct{})
+ final := make(chan result)
+
+ go func() {
+ results := make([]result, 0)
+ for {
+ select {
+ case res := <-ch:
+ results = append(results, res)
+ case <-finish:
+ close(ch)
+ errorCount := 0
+ finalResult := result{}
+ if len(results) == 0 {
+ finalResult = result{0, fmt.Errorf("No Tasks")}
+ final <- finalResult
+ }
+
+ max := results[0].data
+ for _, r := range results[1:] {
+ if r.data > max {
+ max = r.data
+ }
+ if r.err != nil {
+ errorCount++
+ }
+ }
+
+ if errorCount > g.errorLimit {
+ finalResult = result{0, fmt.Errorf("ErrorLimit reached")}
+ } else {
+ finalResult = result{max, nil}
+ }
+ final <- finalResult
+ }
+ }
+ }()
+
+ var wg sync.WaitGroup
+ for task := range g.tasks {
+ wg.Add(1)
+ go func(t Task) {
+ res, err := t.Execute(x)
+ ch <- result{res, err}
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ finish <- struct{}{}
+ y := <-final
+ return y.data, y.err
+}
+
+// GreatestSearcher ...
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
+}

Диан обнови решението на 29.11.2016 14:39 (преди над 1 година)

▸ Покажи разликите
package main
import (
- "fmt"
+ "errors"
"sync"
"time"
)
// Task ...
type Task interface {
Execute(int) (int, error)
}
type pipelineManager struct {
tasks []Task
}
func (pe pipelineManager) Execute(x int) (int, error) {
if len(pe.tasks) == 0 {
- return 0, nil
+ return 0, errors.New("No tasks")
}
res := x
for _, t := range pe.tasks {
r, err := t.Execute(res)
res = r
if err != nil {
return 0, err
}
}
return res, nil
}
// Pipeline ...
func Pipeline(tasks ...Task) Task {
return pipelineManager{tasks: tasks}
}
// ======================================
type fastest struct {
tasks []Task
}
type result struct {
data int
err error
}
func (f fastest) Execute(x int) (int, error) {
if len(f.tasks) == 0 {
- return 0, nil
+ return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range f.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
finalResult := <-ch
return finalResult.data, finalResult.err
}
// Fastest ..
func Fastest(tasks ...Task) Task {
return fastest{tasks}
}
// ======================================
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(x int) (int, error) {
ch := make(chan result)
go func() {
res, err := t.task.Execute(x)
ch <- result{res, err}
}()
select {
case res := <-ch:
return res.data, res.err
case <-time.After(t.timeout):
- return 0, fmt.Errorf("main: timed out on Execute")
+ return 0, errors.New("main: timed out on Execute")
}
}
// Timed ...
func Timed(task Task, timeout time.Duration) Task {
return timed{task: task, timeout: timeout}
}
// ======================================
type concurrentMapReduce struct {
reduce func(results []int) int
tasks []Task
}
func (c concurrentMapReduce) Execute(x int) (int, error) {
if len(c.tasks) == 0 {
- return 0, nil
+ return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range c.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
results := make([]int, 0)
for i := 0; i < len(c.tasks); i++ {
r := <-ch
if r.err != nil {
return 0, r.err
}
results = append(results, r.data)
}
return c.reduce(results), nil
}
// ConcurrentMapReduce ...
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return concurrentMapReduce{reduce: reduce, tasks: tasks}
}
// ======================================
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
// Execute is hedious.
func (g greatestSearcher) Execute(x int) (int, error) {
ch := make(chan result)
finish := make(chan struct{})
final := make(chan result)
go func() {
results := make([]result, 0)
for {
select {
case res := <-ch:
results = append(results, res)
case <-finish:
close(ch)
errorCount := 0
finalResult := result{}
if len(results) == 0 {
- finalResult = result{0, fmt.Errorf("No Tasks")}
+ finalResult = result{0, errors.New("No Tasks")}
final <- finalResult
}
max := results[0].data
for _, r := range results[1:] {
if r.data > max {
max = r.data
}
if r.err != nil {
errorCount++
}
}
if errorCount > g.errorLimit {
- finalResult = result{0, fmt.Errorf("ErrorLimit reached")}
+ finalResult = result{0, errors.New("ErrorLimit reached")}
} else {
finalResult = result{max, nil}
}
final <- finalResult
}
}
}()
var wg sync.WaitGroup
for task := range g.tasks {
wg.Add(1)
go func(t Task) {
res, err := t.Execute(x)
ch <- result{res, err}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
y := <-final
return y.data, y.err
}
// GreatestSearcher ...
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
}