Skip to content

Commit

Permalink
1. Optimize the multi-point download strategy
Browse files Browse the repository at this point in the history
2. Complete the source file upload
  • Loading branch information
lihy authored and TitanNet committed Aug 5, 2024
1 parent c7ac049 commit 6ff46b9
Show file tree
Hide file tree
Showing 13 changed files with 998 additions and 129 deletions.
51 changes: 47 additions & 4 deletions client/webapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,16 @@ type ListAssetSummaryRsp struct {
}

type UploadInfo struct {
UploadURL string
Token string
NodeID string
List []*NodeUploadInfo
AlreadyExists bool
}

type NodeUploadInfo struct {
UploadURL string
Token string
NodeID string
}

type VipInfo struct {
UserID string `json:"uid"`
VIP bool `json:"vip"`
Expand Down Expand Up @@ -657,7 +661,46 @@ func (s *webserver) GetAPPKeyPermissions(ctx context.Context, userID, keyName st

// GetNodeUploadInfo
func (s *webserver) GetNodeUploadInfo(ctx context.Context, userID string) (*UploadInfo, error) {
return nil, nil
url := fmt.Sprintf("%s/api/v1/storage/get_upload_info?encrypted=false", s.url)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}

req.Header.Set("apikey", s.apiKey)

rsp, err := s.client.Do(req)
if err != nil {
return nil, err
}

if rsp.StatusCode != http.StatusOK {
buf, _ := io.ReadAll(rsp.Body)
return nil, fmt.Errorf("status code %d %s", rsp.StatusCode, string(buf))
}

body, err := io.ReadAll(rsp.Body)
if err != nil {
return nil, err
}

ret := &Result{}
err = json.Unmarshal(body, ret)
if err != nil {
return nil, err
}

if ret.Code != 0 {
return nil, fmt.Errorf(fmt.Sprintf("code: %d, err: %d, msg: %s", ret.Code, ret.Err, ret.Msg))
}

uploadNodes := &UploadInfo{}
err = interfaceToStruct(ret.Data, uploadNodes)
if err != nil {
return nil, err
}

return uploadNodes, nil
}

func interfaceToStruct(input interface{}, output interface{}) error {
Expand Down
2 changes: 1 addition & 1 deletion example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ var getFileCmd = &cobra.Command{
log.Fatal("NewStorage error ", err)
}

reader, _, err := s.GetFileWithCid(context.Background(), cid, false)
reader, _, err := s.GetFileWithCid(context.Background(), cid)
if err != nil {
log.Fatal("UploadFilesWithPath ", err)
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ require (
)

require (
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect
github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -34,10 +36,12 @@ require (
github.com/ipfs/go-ipfs-chunker v0.0.5 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect
github.com/ipfs/go-ipfs-files v0.3.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.1.0 // indirect
github.com/ipfs/go-ipld-format v0.6.0 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-libipfs v0.6.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipfs/go-merkledag v0.11.0 // indirect
Expand All @@ -58,6 +62,7 @@ require (
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/smartystreets/assertions v1.13.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU=
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001 h1:/ZshrfQzayqRSBDodmp3rhNCHJCff+utvgBuWRbiqu4=
github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001/go.mod h1:kltMsfRMTHSFdMbK66XdS8mfMW77+FZA1fGY1xYMF84=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
Expand Down Expand Up @@ -90,6 +94,8 @@ github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNo
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y=
github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ=
github.com/ipfs/go-ipfs-files v0.3.0/go.mod h1:xAUtYMwB+iu/dtf6+muHNSFQCJG2dSiStR2P6sn9tIM=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-routing v0.3.0 h1:9W/W3N+g+y4ZDeffSgqhgo7BsBSJwPMcyssET9OWevc=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
Expand All @@ -101,6 +107,8 @@ github.com/ipfs/go-ipld-format v0.6.0 h1:VEJlA2kQ3LqFSIm5Vu6eIlSxD/Ze90xtc4Meten
github.com/ipfs/go-ipld-format v0.6.0/go.mod h1:g4QVMTn3marU3qXchwjpKPKgJv+zF+OlaKMyhJ4LHPg=
github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk=
github.com/ipfs/go-ipld-legacy v0.2.1/go.mod h1:782MOUghNzMO2DER0FlBR94mllfdCJCkTtDtPM51otM=
github.com/ipfs/go-libipfs v0.6.0 h1:3FuckAJEm+zdHbHbf6lAyk0QUzc45LsFcGw102oBCZM=
github.com/ipfs/go-libipfs v0.6.0/go.mod h1:UjjDIuehp2GzlNP0HEr5I9GfFT7zWgst+YfpUEIThtw=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
Expand Down Expand Up @@ -218,6 +226,7 @@ github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4=
Expand Down Expand Up @@ -350,6 +359,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210601080250-7ecdf8ef093b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
2 changes: 1 addition & 1 deletion mobile/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func GetURL(rootCID string) (string, error) {
}

func GetFileWithCid(rootCID string) (data []byte, err error) {
readerCloser, _, err1 := storage_api.GetFileWithCid(context.Background(), rootCID, false)
readerCloser, _, err1 := storage_api.GetFileWithCid(context.Background(), rootCID)
if err1 != nil {
err = err1
return
Expand Down
215 changes: 215 additions & 0 deletions range/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package byterange

import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"time"

"github.com/eikenb/pipeat"
"github.com/pkg/errors"
)

type dispatcher struct {
fileSize int64
rangeSize int64
todos JobQueue
workers chan worker
resp chan response
writer *pipeat.PipeWriterAt
reader *pipeat.PipeReaderAt
backoff *backoff
}

type worker struct {
c *http.Client
e string
}

type response struct {
offset int64
data []byte
}

type job struct {
index int
start int64
end int64
retry int
}

type backoff struct {
minDelay time.Duration
maxDelay time.Duration
}

func (b *backoff) next(attempt int) time.Duration {
if attempt < 0 {
return b.minDelay
}

minf := float64(b.minDelay)
durf := minf * math.Pow(1.5, float64(attempt))
durf = durf + rand.Float64()*minf

delay := time.Duration(durf)
if delay > b.maxDelay {
return b.maxDelay
}

return delay
}

func (d *dispatcher) generateJobs() {
count := int64(math.Ceil(float64(d.fileSize) / float64(d.rangeSize)))
for i := int64(0); i < count; i++ {
start := i * d.rangeSize
end := (i + 1) * d.rangeSize

if end > d.fileSize {
end = d.fileSize
}

newJob := &job{
index: int(i),
start: start,
end: end,
}

d.todos.Push(newJob)
}
}

func (d *dispatcher) run(ctx context.Context) {
d.generateJobs()
d.writeData(ctx)

var (
counter int64
finished = make(chan int64, 1)
)

go func() {
for {
select {
case w := <-d.workers:
go func() {
j, ok := d.todos.Pop()
if !ok {
d.workers <- w
return
}

data, err := d.fetch(ctx, w, j)
if err != nil {
errMsg := fmt.Sprintf("pull data failed : %v", err)
if j.retry > 0 {
log.Errorf("pull data failed (retries: %d): %v", j.retry, err)
<-time.After(d.backoff.next(j.retry))
}

log.Warnf(errMsg)

j.retry++
d.todos.PushFront(j)
d.workers <- w
return
}

dataLen := j.end - j.start

if int64(len(data)) < dataLen {
log.Errorf("unexpected data size, want %d got %d", dataLen, len(data))
d.todos.PushFront(j)
d.workers <- w
return
}

d.workers <- w
d.resp <- response{
data: data[:dataLen],
offset: j.start,
}
finished <- dataLen
}()
case size := <-finished:
counter += size
if counter >= d.fileSize {
return
}
case <-ctx.Done():
return
}
}
}()

return
}

func (d *dispatcher) writeData(ctx context.Context) {
go func() {
defer d.finally()

var count int64
for {
select {
case r := <-d.resp:
_, err := d.writer.WriteAt(r.data, r.offset)
if err != nil {
log.Errorf("write data failed: %v", err)
continue
}

count += int64(len(r.data))
if count >= d.fileSize {
return
}
case <-ctx.Done():
return
}
}

}()
}

func (d *dispatcher) fetch(ctx context.Context, w worker, j *job) ([]byte, error) {
startTime := time.Now()
req, err := http.NewRequest("GET", w.e, nil)
if err != nil {
return nil, errors.Errorf("new request failed: %v", err)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", j.start, j.end))
resp, err := w.c.Do(req)
if err != nil {
return nil, errors.Errorf("fetch failed: %v", err)
}

defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()

if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to download chunk: %d-%d, status code: %d", j.start, j.end, resp.StatusCode)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("read data failed: %v", err)
}

elapsed := time.Since(startTime)
log.Infof("Chunk: %fs, Link: %s", elapsed.Seconds(), w.e)

return data, nil
}

func (d *dispatcher) finally() {
if err := d.writer.Close(); err != nil {
log.Errorf("close write failed: %v", err)
}
}
Loading

0 comments on commit 6ff46b9

Please sign in to comment.