Решение на Concurrent Retry Executor от Георги Горанов

Обратно към всички решения

Към профила на Георги Горанов

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 9 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
type Task struct {
index int
task func() string
}
type TaskResult struct {
index int
result string
}
func ExecuteTasks(done chan struct{}, cTasks chan Task, cResults chan TaskResult, retryLimit int) {
for task := range cTasks {
var res TaskResult
res.index = task.index
for attempt := 0; attempt < retryLimit; attempt++ {
res.result = task.task()
cResults <- res
if res.result != "" {
break
}
}
}
done <- struct{}{}
}
func ConcurencyExecutor(cResults chan TaskResult, cTasks chan Task, concurrentLimit int, retryLimit int) {
defer close(cResults)
done := make(chan struct{})
for i := 0; i < concurrentLimit; i++ {
go ExecuteTasks(done, cTasks, cResults, retryLimit)
}
for i := 0; i < concurrentLimit; i++ {
<-done
}
}
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan TaskResult {
cBuff := len(tasks) * retryLimit
cResults := make(chan TaskResult, cBuff)
cTasks := make(chan Task, len(tasks))
for i, task := range tasks {
cTasks <- Task{index: i, task: task}
}
close(cTasks)
go ConcurencyExecutor(cResults, cTasks, concurrentLimit, retryLimit)
return cResults
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.003s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.022s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.020s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.003s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.005s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.057s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.420s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.211s
PASS
ok  	_/tmp/d20161115-21147-15vpjyg	0.203s

История (3 версии и 7 коментара)

Георги обнови решението на 13.11.2016 11:34 (преди над 1 година)

+package main
+
+import (
+ "sync"
+)
+
+type Result struct {
+ index int
+ result string
+}
+
+func ExecuteTasks(tasks []func() string, position int, retryLimit int, c chan Result, wg *sync.WaitGroup) {
+ defer wg.Done()
+ //fmt.Println("Thread for tasks", tasks)
+ for _, task := range tasks {
+ //fmt.Println("Started task", position+1)
+ var res Result
+ res.index = position
+ for attempt := 0; attempt < retryLimit; attempt++ {
+ res.result = task()
+ c <- res
+ if res.result != "" {
+ break
+ }
+ }
+ position++
+ }
+}
+
+func ConcurencyExecutor(c chan Result, tasks []func() string, concurrentLimit int, retryLimit int) {
+ var wg sync.WaitGroup
+ elements := len(tasks) / concurrentLimit
+ for i := 0; i < concurrentLimit; i++ {
+ if i == concurrentLimit-1 {
+ //fmt.Println("Starting thread", i+1)
+ wg.Add(1)
+ go ExecuteTasks(tasks[i*elements:], i*elements, retryLimit, c, &wg)
+ } else {
+ //fmt.Println("Starting thread", i+1)
+ wg.Add(1)
+ go ExecuteTasks(tasks[i*elements:(i+1)*elements], i*elements, retryLimit, c, &wg)
+ }
+ }
+ wg.Wait()
+ close(c)
+}
+
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan Result {
+ cBuff := len(tasks) * retryLimit
+ c := make(chan Result, cBuff)
+ go ConcurencyExecutor(c, tasks, concurrentLimit, retryLimit)
+ return c
+}

Не изпълняваш задачите подред, което част от условието което е важно за нас :).

В допълнение в реалния свят е възможно да имаш 10 задачи с конкурентост 2 и ако първите 5 отнемат X време а вторите 5*X, няма да изпълняваш оптимално бързо колекцията от задачи.

п.п. Ако тестовете ти не минават е доста сигурно че не изпълняваш условието.

Да, очевидно не изпълнявам услоивието. Но самото условие не е особено ясно. Какво се предполага да означава конкурентно и последователно? Кое трябва да е конкурентно и кое последователно?

Изпълнението на задачите - конкуретно трябва да ги изпълнявате, а под последователно се има предвид ако имате 10 задачи и конкурентност 2 да пуснете първо първа и втора и после трета, четвърта, пета, шеста и така нататък когато завършат предишните и съответно освободят бройка в конкурентното изпълнение.

При теб ще се изпълнят първо първа и шеста и после или втора и седма, което не е последователно.

Георги обнови решението на 14.11.2016 12:26 (преди над 1 година)

package main
import (
"sync"
)
-type Result struct {
+type Task struct {
+ index int
+ task func() string
+}
+
+type TaskResult struct {
index int
result string
}
-func ExecuteTasks(tasks []func() string, position int, retryLimit int, c chan Result, wg *sync.WaitGroup) {
+func ExecuteTasks(cTasks chan Task, cResults chan TaskResult, retryLimit int, wg *sync.WaitGroup) {
defer wg.Done()
- //fmt.Println("Thread for tasks", tasks)
- for _, task := range tasks {
- //fmt.Println("Started task", position+1)
- var res Result
- res.index = position
+ for task := range cTasks {
+ var res TaskResult
+ res.index = task.index
for attempt := 0; attempt < retryLimit; attempt++ {
- res.result = task()
- c <- res
+ res.result = task.task()
+ cResults <- res
if res.result != "" {
break
}
}
- position++
}
}
-func ConcurencyExecutor(c chan Result, tasks []func() string, concurrentLimit int, retryLimit int) {
+func ConcurencyExecutor(cResults chan TaskResult, cTasks chan Task, concurrentLimit int, retryLimit int) {
var wg sync.WaitGroup
- elements := len(tasks) / concurrentLimit
+ wg.Add(concurrentLimit)
for i := 0; i < concurrentLimit; i++ {
- if i == concurrentLimit-1 {
- //fmt.Println("Starting thread", i+1)
- wg.Add(1)
- go ExecuteTasks(tasks[i*elements:], i*elements, retryLimit, c, &wg)
- } else {
- //fmt.Println("Starting thread", i+1)
- wg.Add(1)
- go ExecuteTasks(tasks[i*elements:(i+1)*elements], i*elements, retryLimit, c, &wg)
- }
+ go ExecuteTasks(cTasks, cResults, retryLimit, &wg)
}
wg.Wait()
- close(c)
+ close(cResults)
}
-func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan Result {
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan TaskResult {
cBuff := len(tasks) * retryLimit
- c := make(chan Result, cBuff)
- go ConcurencyExecutor(c, tasks, concurrentLimit, retryLimit)
+ cResults := make(chan TaskResult, cBuff)
- return c
+ cTasks := make(chan Task, len(tasks))
-}
+ for i, task := range tasks {
+ cTasks <- Task{index: i, task: task}
+ }
+ close(cTasks)
+ go ConcurencyExecutor(cResults, cTasks, concurrentLimit, retryLimit)
+ return cResults
+}

Георги обнови решението на 14.11.2016 14:37 (преди над 1 година)

package main
-import (
- "sync"
-)
-
type Task struct {
index int
task func() string
}
type TaskResult struct {
index int
result string
}
-func ExecuteTasks(cTasks chan Task, cResults chan TaskResult, retryLimit int, wg *sync.WaitGroup) {
- defer wg.Done()
+func ExecuteTasks(done chan struct{}, cTasks chan Task, cResults chan TaskResult, retryLimit int) {
for task := range cTasks {
var res TaskResult
res.index = task.index
for attempt := 0; attempt < retryLimit; attempt++ {
res.result = task.task()
cResults <- res
if res.result != "" {
break
}
}
}
+ done <- struct{}{}
}
func ConcurencyExecutor(cResults chan TaskResult, cTasks chan Task, concurrentLimit int, retryLimit int) {
- var wg sync.WaitGroup
- wg.Add(concurrentLimit)
+ defer close(cResults)
+ done := make(chan struct{})
for i := 0; i < concurrentLimit; i++ {
- go ExecuteTasks(cTasks, cResults, retryLimit, &wg)
+ go ExecuteTasks(done, cTasks, cResults, retryLimit)
}
- wg.Wait()
- close(cResults)
+ for i := 0; i < concurrentLimit; i++ {
+ <-done
+ }
}
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan TaskResult {
cBuff := len(tasks) * retryLimit
cResults := make(chan TaskResult, cBuff)
cTasks := make(chan Task, len(tasks))
for i, task := range tasks {
cTasks <- Task{index: i, task: task}
}
close(cTasks)
go ConcurencyExecutor(cResults, cTasks, concurrentLimit, retryLimit)
return cResults
-}
+}