Решение на HTTP сваляч от Анатоли Бързев

Обратно към всички решения

Към профила на Анатоли Бързев

Резултати

  • 8 точки от тестове
  • 0 бонус точки
  • 8 точки общо
  • 14 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
ctxClosed bool
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
maxWorkers int
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
if len(chunk.data) == 0 {
return nil
}
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
maxWorkers: maxWorkers,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
chunk.err = ctx.Err()
chunk.ctxClosed = true
comm.res <- chunk
return
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
chunk := chunk{
data: make([]byte, 0),
}
chunk.err = ctx.Err()
chunk.ctxClosed = true
comm.res <- chunk
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
ctxErrs := make([]error, 0)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
f.addChunk(chunk)
if chunk.ctxClosed {
ctxErrs = append(ctxErrs, chunk.err)
} else {
currUrls = discardURL(currUrls, chunk.url)
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
currLen := chunk.to - chunk.pos
currStart := chunk.pos
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
}
} else {
f.addChunk(chunk)
}
if f.isReady() {
return
}
case <-ctx.Done():
if len(ctxErrs) == f.maxWorkers {
f.setErr(ctxErrs[len(ctxErrs)-1])
return
}
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}

Лог от изпълнението

▸ Покажи лога

История (8 версии и 2 коментара)

Анатоли обнови решението на 02.01.2017 19:42 (преди над 1 година)

▸ Покажи разликите
+package main
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "sort"
+ "sync"
+)
+
+type chunk struct {
+ data []byte
+ from int64
+ to int64
+ pos int64
+ url string
+ err error
+}
+
+type comm struct {
+ jobs chan chunk
+ res chan chunk
+ done chan struct{}
+}
+
+type chunkSort []chunk
+
+func (a chunkSort) Len() int { return len(a) }
+func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
+
+type file struct {
+ chunks []chunk
+ size int64
+ currSize int64
+ pos int64
+ urls []string
+ comm *comm
+ err error
+ lock *sync.Mutex
+}
+
+func (f *file) Read(p []byte) (n int, err error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ if f.err != nil {
+ return f.readChunks(p), f.err
+ }
+
+ if f.pos == f.size && f.size > 0 {
+ return 0, io.EOF
+ }
+
+ return f.readChunks(p), nil
+}
+
+func (f *file) readChunks(p []byte) (n int) {
+ //fmt.Println("read chunks", len(p), f.pos, f.size)
+ prevTo := int64(0)
+ bytesToRead := len(p)
+ pos := int64(0)
+ bytes := 0
+
+ if bytesToRead == 0 {
+ return 0
+ }
+
+Breaking:
+ for _, c := range f.chunks {
+ if prevTo != c.from {
+ break
+ }
+
+ for _, d := range c.data {
+ if pos < f.pos {
+ pos++
+ continue
+ }
+ p[bytes] = d
+
+ bytes++
+ f.pos++
+ pos = f.pos + 1
+
+ if bytes == bytesToRead {
+ break Breaking
+ }
+ }
+
+ prevTo = c.pos
+ }
+
+ return bytes
+}
+
+func (f *file) addChunk(chunk chunk) (err error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.chunks = append(f.chunks, chunk)
+ f.currSize += int64(len(chunk.data))
+ sort.Sort(chunkSort(f.chunks))
+ return nil
+}
+
+func (f *file) setErr(err error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.err = err
+}
+
+func (f *file) setSize(size int64) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.size = size
+}
+
+func (f *file) isReady() bool {
+ return f.currSize == f.size && f.size > 0
+}
+
+//DownloadFile No comment
+func DownloadFile(ctx context.Context, urls []string) io.Reader {
+ fmt.Println("---- NEW DOWNLOADER ----")
+ maxWorkers := len(urls)
+ urlCount := maxWorkers
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
+ maxWorkers = tmpMaxWorkers
+ }
+
+ comm := &comm{
+ jobs: make(chan chunk, urlCount),
+ res: make(chan chunk, urlCount),
+ done: make(chan struct{}, 1),
+ }
+
+ // create downloaders
+ for i := 0; i < maxWorkers; i++ {
+ go downloadWorker(ctx, comm)
+ }
+
+ f := &file{
+ chunks: make([]chunk, 0),
+ lock: &sync.Mutex{},
+ comm: comm,
+ urls: urls,
+ }
+
+ go startDownloading(ctx, f)
+ return f
+}
+
+func downloadWorker(ctx context.Context, comm *comm) {
+ var client http.Client
+Breaking:
+ for {
+ select {
+ case chunk := <-comm.jobs:
+
+ to := chunk.to
+ from := chunk.from
+ chunk.pos = chunk.from
+
+ for {
+ rheader := fmt.Sprintf("bytes=%d-%d", from, to)
+
+ req, _ := http.NewRequest("GET", chunk.url, nil)
+ req.Header.Add("Range", rheader)
+ resp, rerr := client.Do(req)
+
+ if rerr != nil {
+ chunk.err = rerr
+ comm.res <- chunk
+ continue Breaking
+ }
+
+ if resp.StatusCode < 200 || resp.StatusCode > 299 {
+ resp.Body.Close()
+ chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
+ comm.res <- chunk
+ continue Breaking
+ }
+
+ buf := make([]byte, 1000)
+ var ioerr error
+ var read int
+ for {
+ read, ioerr = resp.Body.Read(buf)
+
+ if read > 0 {
+ chunk.data = append(chunk.data, buf[:read]...)
+ chunk.pos = chunk.pos + int64(len(buf[:read]))
+ }
+
+ if ioerr != nil {
+ break
+ }
+
+ select {
+ case <-ctx.Done():
+ resp.Body.Close()
+ comm.res <- chunk
+ break Breaking
+ default:
+ }
+ }
+
+ resp.Body.Close()
+ if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
+ break
+ }
+
+ from = chunk.pos
+ }
+
+ comm.res <- chunk
+
+ case <-comm.done:
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+ //fmt.Println("Exit worker...")
+}
+
+func startDownloading(ctx context.Context, f *file) {
+ // while the file is not ready
+ defer close(f.comm.done)
+ contentLen, err := readContentLen(f.urls)
+
+ if err != nil {
+ fmt.Println("Failed to read content len from: ", f.urls)
+ f.setErr(err)
+ return
+ }
+
+ f.setSize(contentLen)
+
+ currUrls := f.urls
+ createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
+
+ for {
+ select {
+ case chunk := <-f.comm.res:
+ if chunk.err != nil {
+ fmt.Printf(
+ "Failed to download chunk %v-%v(%v): %v\n",
+ chunk.from,
+ chunk.to,
+ chunk.pos,
+ chunk.err,
+ )
+
+ currUrls = discardURL(currUrls, chunk.url)
+ currLen := chunk.to - chunk.pos
+ currStart := chunk.pos
+
+ if len(chunk.data) > 0 {
+ f.addChunk(chunk)
+ }
+
+ if len(currUrls) == 0 {
+ f.setErr(fmt.Errorf("no valid urls"))
+ return
+ }
+
+ createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
+ } else {
+ f.addChunk(chunk)
+ }
+ case <-ctx.Done():
+ return
+ }
+
+ if f.isReady() {
+ return
+ }
+ }
+}
+
+func readContentLen(urls []string) (size int64, err error) {
+ for _, url := range urls {
+ resp, tmperr := http.Head(url)
+ if tmperr != nil {
+ err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
+ continue
+ }
+
+ if resp.StatusCode < 200 || resp.StatusCode > 299 {
+ err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
+ continue
+ }
+
+ if resp.ContentLength == -1 {
+ err = io.EOF
+ continue
+ }
+
+ size = resp.ContentLength
+ err = nil
+ break
+ }
+
+ return size, err
+}
+
+func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
+ ranges := calcRanges(int64(len(urls)), currLen, start)
+
+ for idx, url := range urls {
+ r := ranges[idx]
+ fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
+
+ job := chunk{
+ data: make([]byte, 0),
+ from: r[0],
+ to: r[1],
+ url: url,
+ }
+
+ jobsChan <- job
+ }
+}
+
+func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
+
+ var i int64
+ lastByte := contentLen % urlCount
+ step := contentLen / urlCount
+ end := start + step
+ ranges = make([][]int64, urlCount)
+
+ if start == 0 {
+ end--
+ }
+
+ for i = 0; i < urlCount; i++ {
+ ranges[i] = make([]int64, 2)
+ if lastByte > 0 {
+ end++
+ lastByte--
+ }
+
+ ranges[i][0] = start
+ ranges[i][1] = end
+
+ start = end + 1
+ end = end + step
+ }
+
+ return ranges
+}
+
+func discardURL(urls []string, url string) []string {
+ for idx, tmpURL := range urls {
+ if url == tmpURL {
+ fmt.Printf("Discarding url %s ...\n", url)
+ return append(urls[:idx], urls[idx+1:]...)
+ }
+ }
+
+ return nil
+}
+
+func main() {
+ fmt.Println("ahellow")
+}

Анатоли обнови решението на 03.01.2017 00:28 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
- pos := int64(0)
bytes := 0
if bytesToRead == 0 {
return 0
}
-Breaking:
for _, c := range f.chunks {
if prevTo != c.from {
break
}
- for _, d := range c.data {
- if pos < f.pos {
- pos++
- continue
- }
- p[bytes] = d
+ prevTo = c.pos
+ if f.pos > c.pos {
+ continue
+ }
+ relPos := int64(len(c.data)) - (c.pos - f.pos)
+ //fmt.Println(f.pos, c.pos, relPos, len(c.data))
+
+ for _, d := range c.data[relPos:] {
+ p[bytes] = d
bytes++
f.pos++
- pos = f.pos + 1
if bytes == bytesToRead {
- break Breaking
+ return bytes
}
}
-
- prevTo = c.pos
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
comm.res <- chunk
break Breaking
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
currUrls = discardURL(currUrls, chunk.url)
currLen := chunk.to - chunk.pos
currStart := chunk.pos
if len(chunk.data) > 0 {
f.addChunk(chunk)
}
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
case <-ctx.Done():
return
}
if f.isReady() {
return
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}

Анатоли обнови решението на 03.01.2017 14:41 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
+ "time"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
+
+ if from > 500000 && from < 600000 {
+ time.Sleep(5 * time.Second)
+ }
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
comm.res <- chunk
break Breaking
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
currUrls = discardURL(currUrls, chunk.url)
currLen := chunk.to - chunk.pos
currStart := chunk.pos
if len(chunk.data) > 0 {
f.addChunk(chunk)
}
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
case <-ctx.Done():
return
}
if f.isReady() {
return
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
- if resp.ContentLength == -1 {
+ if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}
+

Анатоли обнови решението на 03.01.2017 14:43 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
"time"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
if from > 500000 && from < 600000 {
time.Sleep(5 * time.Second)
}
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
comm.res <- chunk
break Breaking
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
currUrls = discardURL(currUrls, chunk.url)
currLen := chunk.to - chunk.pos
currStart := chunk.pos
if len(chunk.data) > 0 {
f.addChunk(chunk)
}
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
case <-ctx.Done():
return
}
if f.isReady() {
return
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}
-

Анатоли обнови решението на 03.01.2017 15:47 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
- "time"
)
type chunk struct {
- data []byte
- from int64
- to int64
- pos int64
- url string
- err error
+ data []byte
+ from int64
+ to int64
+ pos int64
+ url string
+ err error
+ ctxClosed bool
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
- chunks []chunk
- size int64
- currSize int64
- pos int64
- urls []string
- comm *comm
- err error
- lock *sync.Mutex
+ chunks []chunk
+ size int64
+ currSize int64
+ pos int64
+ urls []string
+ comm *comm
+ err error
+ maxWorkers int
+ lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
- chunks: make([]chunk, 0),
- lock: &sync.Mutex{},
- comm: comm,
- urls: urls,
+ chunks: make([]chunk, 0),
+ lock: &sync.Mutex{},
+ comm: comm,
+ urls: urls,
+ maxWorkers: maxWorkers,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
-
- if from > 500000 && from < 600000 {
- time.Sleep(5 * time.Second)
- }
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
-
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
+ chunk.err = fmt.Errorf("ctx closed")
+ chunk.ctxClosed = true
comm.res <- chunk
- break Breaking
+ return
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
+ chunk := chunk{
+ data: make([]byte, 0),
+ }
+ chunk.err = fmt.Errorf("ctx closed")
+ chunk.ctxClosed = true
+ comm.res <- chunk
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
+ ctxErrs := make([]error, 0)
+
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
+
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
- currUrls = discardURL(currUrls, chunk.url)
- currLen := chunk.to - chunk.pos
- currStart := chunk.pos
-
if len(chunk.data) > 0 {
f.addChunk(chunk)
}
+ if chunk.ctxClosed {
+ ctxErrs = append(ctxErrs, chunk.err)
+ break
+ }
+
+ currUrls = discardURL(currUrls, chunk.url)
+
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
+ currLen := chunk.to - chunk.pos
+ currStart := chunk.pos
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
- case <-ctx.Done():
- return
- }
- if f.isReady() {
+ if f.isReady() {
+ return
+ }
+ case <-ctx.Done():
+ if len(ctxErrs) == f.maxWorkers {
+ f.setErr(ctxErrs[len(ctxErrs)-1])
+ }
return
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}

Анатоли обнови решението на 03.01.2017 15:57 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
ctxClosed bool
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
maxWorkers int
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
maxWorkers: maxWorkers,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
+
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
chunk.err = fmt.Errorf("ctx closed")
chunk.ctxClosed = true
comm.res <- chunk
return
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
chunk := chunk{
data: make([]byte, 0),
}
chunk.err = fmt.Errorf("ctx closed")
chunk.ctxClosed = true
comm.res <- chunk
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
ctxErrs := make([]error, 0)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
if len(chunk.data) > 0 {
f.addChunk(chunk)
}
if chunk.ctxClosed {
ctxErrs = append(ctxErrs, chunk.err)
break
}
currUrls = discardURL(currUrls, chunk.url)
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
currLen := chunk.to - chunk.pos
currStart := chunk.pos
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
if f.isReady() {
return
}
case <-ctx.Done():
if len(ctxErrs) == f.maxWorkers {
f.setErr(ctxErrs[len(ctxErrs)-1])
+ return
}
- return
+
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}

Анатоли обнови решението на 03.01.2017 16:11 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
ctxClosed bool
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
maxWorkers int
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
maxWorkers: maxWorkers,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
- chunk.err = fmt.Errorf("ctx closed")
+ chunk.err = ctx.Err()
chunk.ctxClosed = true
comm.res <- chunk
return
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
chunk := chunk{
data: make([]byte, 0),
}
- chunk.err = fmt.Errorf("ctx closed")
+ chunk.err = ctx.Err()
chunk.ctxClosed = true
comm.res <- chunk
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
ctxErrs := make([]error, 0)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
if len(chunk.data) > 0 {
f.addChunk(chunk)
}
if chunk.ctxClosed {
ctxErrs = append(ctxErrs, chunk.err)
break
}
currUrls = discardURL(currUrls, chunk.url)
if len(currUrls) == 0 {
f.setErr(fmt.Errorf("no valid urls"))
return
}
currLen := chunk.to - chunk.pos
currStart := chunk.pos
createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
if f.isReady() {
return
}
case <-ctx.Done():
if len(ctxErrs) == f.maxWorkers {
f.setErr(ctxErrs[len(ctxErrs)-1])
return
}
-
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}

Анатоли обнови решението на 03.01.2017 16:20 (преди над 1 година)

▸ Покажи разликите
package main
import (
"context"
"fmt"
"io"
"net/http"
"sort"
"sync"
)
type chunk struct {
data []byte
from int64
to int64
pos int64
url string
err error
ctxClosed bool
}
type comm struct {
jobs chan chunk
res chan chunk
done chan struct{}
}
type chunkSort []chunk
func (a chunkSort) Len() int { return len(a) }
func (a chunkSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSort) Less(i, j int) bool { return a[i].from < a[j].from }
type file struct {
chunks []chunk
size int64
currSize int64
pos int64
urls []string
comm *comm
err error
maxWorkers int
lock *sync.Mutex
}
func (f *file) Read(p []byte) (n int, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.readChunks(p), f.err
}
if f.pos == f.size && f.size > 0 {
return 0, io.EOF
}
return f.readChunks(p), nil
}
func (f *file) readChunks(p []byte) (n int) {
//fmt.Println("read chunks", len(p), f.pos, f.size)
prevTo := int64(0)
bytesToRead := len(p)
bytes := 0
if bytesToRead == 0 {
return 0
}
for _, c := range f.chunks {
if prevTo != c.from {
break
}
prevTo = c.pos
if f.pos > c.pos {
continue
}
relPos := int64(len(c.data)) - (c.pos - f.pos)
//fmt.Println(f.pos, c.pos, relPos, len(c.data))
for _, d := range c.data[relPos:] {
p[bytes] = d
bytes++
f.pos++
if bytes == bytesToRead {
return bytes
}
}
}
return bytes
}
func (f *file) addChunk(chunk chunk) (err error) {
+ if len(chunk.data) == 0 {
+ return nil
+ }
+
f.lock.Lock()
defer f.lock.Unlock()
f.chunks = append(f.chunks, chunk)
f.currSize += int64(len(chunk.data))
sort.Sort(chunkSort(f.chunks))
return nil
}
func (f *file) setErr(err error) {
f.lock.Lock()
defer f.lock.Unlock()
f.err = err
}
func (f *file) setSize(size int64) {
f.lock.Lock()
defer f.lock.Unlock()
f.size = size
}
func (f *file) isReady() bool {
return f.currSize == f.size && f.size > 0
}
//DownloadFile No comment
func DownloadFile(ctx context.Context, urls []string) io.Reader {
fmt.Println("---- NEW DOWNLOADER ----")
maxWorkers := len(urls)
urlCount := maxWorkers
if ctx == nil {
ctx = context.Background()
}
if tmpMaxWorkers, ok := ctx.Value("max-connections").(int); ok {
maxWorkers = tmpMaxWorkers
}
comm := &comm{
jobs: make(chan chunk, urlCount),
res: make(chan chunk, urlCount),
done: make(chan struct{}, 1),
}
// create downloaders
for i := 0; i < maxWorkers; i++ {
go downloadWorker(ctx, comm)
}
f := &file{
chunks: make([]chunk, 0),
lock: &sync.Mutex{},
comm: comm,
urls: urls,
maxWorkers: maxWorkers,
}
go startDownloading(ctx, f)
return f
}
func downloadWorker(ctx context.Context, comm *comm) {
var client http.Client
Breaking:
for {
select {
case chunk := <-comm.jobs:
to := chunk.to
from := chunk.from
chunk.pos = chunk.from
for {
rheader := fmt.Sprintf("bytes=%d-%d", from, to)
req, _ := http.NewRequest("GET", chunk.url, nil)
req.Header.Add("Range", rheader)
resp, rerr := client.Do(req)
if rerr != nil {
chunk.err = rerr
comm.res <- chunk
continue Breaking
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
resp.Body.Close()
chunk.err = fmt.Errorf("HTTP status %v", resp.StatusCode)
comm.res <- chunk
continue Breaking
}
buf := make([]byte, 1000)
var ioerr error
var read int
for {
read, ioerr = resp.Body.Read(buf)
if read > 0 {
chunk.data = append(chunk.data, buf[:read]...)
chunk.pos = chunk.pos + int64(len(buf[:read]))
}
if ioerr != nil {
break
}
select {
case <-ctx.Done():
resp.Body.Close()
chunk.err = ctx.Err()
chunk.ctxClosed = true
comm.res <- chunk
return
default:
}
}
resp.Body.Close()
if (ioerr == nil || ioerr == io.EOF) && (chunk.pos-1) == chunk.to {
break
}
from = chunk.pos
}
comm.res <- chunk
case <-comm.done:
return
case <-ctx.Done():
chunk := chunk{
data: make([]byte, 0),
}
chunk.err = ctx.Err()
chunk.ctxClosed = true
comm.res <- chunk
return
}
}
//fmt.Println("Exit worker...")
}
func startDownloading(ctx context.Context, f *file) {
// while the file is not ready
defer close(f.comm.done)
contentLen, err := readContentLen(f.urls)
if err != nil {
fmt.Println("Failed to read content len from: ", f.urls)
f.setErr(err)
return
}
f.setSize(contentLen)
currUrls := f.urls
createDownloadJobs(currUrls, f.size, 0, f.comm.jobs)
ctxErrs := make([]error, 0)
for {
select {
case chunk := <-f.comm.res:
if chunk.err != nil {
fmt.Printf(
"Failed to download chunk %v-%v(%v): %v\n",
chunk.from,
chunk.to,
chunk.pos,
chunk.err,
)
- if len(chunk.data) > 0 {
- f.addChunk(chunk)
- }
+ f.addChunk(chunk)
if chunk.ctxClosed {
ctxErrs = append(ctxErrs, chunk.err)
- break
- }
+ } else {
+ currUrls = discardURL(currUrls, chunk.url)
+ if len(currUrls) == 0 {
+ f.setErr(fmt.Errorf("no valid urls"))
+ return
+ }
- currUrls = discardURL(currUrls, chunk.url)
-
- if len(currUrls) == 0 {
- f.setErr(fmt.Errorf("no valid urls"))
- return
+ currLen := chunk.to - chunk.pos
+ currStart := chunk.pos
+ createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
}
-
- currLen := chunk.to - chunk.pos
- currStart := chunk.pos
- createDownloadJobs(currUrls, currLen, currStart, f.comm.jobs)
} else {
f.addChunk(chunk)
}
if f.isReady() {
return
}
case <-ctx.Done():
if len(ctxErrs) == f.maxWorkers {
f.setErr(ctxErrs[len(ctxErrs)-1])
return
}
}
}
}
func readContentLen(urls []string) (size int64, err error) {
for _, url := range urls {
resp, tmperr := http.Head(url)
if tmperr != nil {
err = fmt.Errorf("[ERROR] HEAD %v: %v", url, tmperr)
continue
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("[ERROR] HEAD `%v`[%v]", url, resp.StatusCode)
continue
}
if resp.ContentLength == -1 || resp.ContentLength == 0 {
err = io.EOF
continue
}
size = resp.ContentLength
err = nil
break
}
return size, err
}
func createDownloadJobs(urls []string, currLen, start int64, jobsChan chan<- chunk) {
ranges := calcRanges(int64(len(urls)), currLen, start)
for idx, url := range urls {
r := ranges[idx]
fmt.Printf("New job for range %v-%v/%v : %v\n", r[0], r[1], currLen, url)
job := chunk{
data: make([]byte, 0),
from: r[0],
to: r[1],
url: url,
}
jobsChan <- job
}
}
func calcRanges(urlCount int64, contentLen, start int64) (ranges [][]int64) {
var i int64
lastByte := contentLen % urlCount
step := contentLen / urlCount
end := start + step
ranges = make([][]int64, urlCount)
if start == 0 {
end--
}
for i = 0; i < urlCount; i++ {
ranges[i] = make([]int64, 2)
if lastByte > 0 {
end++
lastByte--
}
ranges[i][0] = start
ranges[i][1] = end
start = end + 1
end = end + step
}
return ranges
}
func discardURL(urls []string, url string) []string {
for idx, tmpURL := range urls {
if url == tmpURL {
fmt.Printf("Discarding url %s ...\n", url)
return append(urls[:idx], urls[idx+1:]...)
}
}
return nil
}
func main() {
fmt.Println("ahellow")
}