Александър обнови решението на 13.11.2016 17:40 (преди над 1 година)
+package main
+
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
+ index int
+ result string
+} {
+ resultChannel := make(chan struct {
+ index int
+ result string
+ }, len(tasks)*retryLimit) // we need so much in order to not block on resultChannel
+ // The upper size of the buffer should be adjusted based on the usage of the function
+ // right now it is very conservative size which will not block the executors but potentially
+ // can waste much memory however without real data and usage case this size will suffice
+
+ type fillChannelType struct {
+ index int
+ function func() string
+ }
+
+ // channel for syncing the executors
+ syncChannel := make(chan struct{})
+ // channel with task which the executors will take from
+ taskChannel := make(chan fillChannelType, concurrentLimit) // we need only concurrentLimit buffer because we have only concurrentLimit executors which will take tasks
+
+ // Fill the task channel
+ go func() {
+ for i, f := range tasks {
+ taskChannel <- fillChannelType{i, f}
+ }
+ close(taskChannel)
+ }()
+
+ // start concurrentLimit executors
+ for i := 0; i < concurrentLimit; i++ {
+ go func() {
+ for task := range taskChannel {
+ for count := 0; count < retryLimit; count++ {
+ result := task.function()
+ resultChannel <- struct {
+ index int
+ result string
+ }{task.index, result}
+ // if we have correct result break out of the retry loop and go to next task
+ if result != "" {
+ break
+ }
+ }
+ }
+
+ // we have finished all of the task we need to sync with other executor so we can close the resultChannel
+ syncChannel <- struct{}{}
+ }()
+ }
+ // start the sync goroutine which will wait for all of the executors to finish and will close the resultChannel
+ go func() {
+ for i := 0; i < concurrentLimit; i++ {
+ <-syncChannel
+ }
+
+ close(syncChannel)
+ close(resultChannel)
+ }()
+
+ return resultChannel
+}