From 6ff46b96891f82a420b8964f2cd4a890781a0116 Mon Sep 17 00:00:00 2001 From: lihy Date: Mon, 5 Aug 2024 16:21:47 +0800 Subject: [PATCH] 1. Optimize the multi-point download strategy 2. Complete the source file upload --- client/webapi.go | 51 ++++++++- example/main.go | 2 +- go.mod | 5 + go.sum | 10 ++ mobile/main.go | 2 +- range/dispatcher.go | 215 +++++++++++++++++++++++++++++++++++++ range/queue.go | 51 +++++++++ range/range.go | 166 ++++++++++++++++++++++++++++ request/request.go | 205 +++++++++++++++++++++++++++++++++++ request/request_builder.go | 121 +++++++++++++++++++++ request/types.go | 40 +++++++ storage.go | 213 +++++++++++++++++++----------------- storage_test.go | 46 ++++---- 13 files changed, 998 insertions(+), 129 deletions(-) create mode 100644 range/dispatcher.go create mode 100644 range/queue.go create mode 100644 range/range.go create mode 100644 request/request.go create mode 100644 request/request_builder.go create mode 100644 request/types.go diff --git a/client/webapi.go b/client/webapi.go index 2a854e0..a3fbcab 100644 --- a/client/webapi.go +++ b/client/webapi.go @@ -147,12 +147,16 @@ type ListAssetSummaryRsp struct { } type UploadInfo struct { - UploadURL string - Token string - NodeID string + List []*NodeUploadInfo AlreadyExists bool } +type NodeUploadInfo struct { + UploadURL string + Token string + NodeID string +} + type VipInfo struct { UserID string `json:"uid"` VIP bool `json:"vip"` @@ -657,7 +661,46 @@ func (s *webserver) GetAPPKeyPermissions(ctx context.Context, userID, keyName st // GetNodeUploadInfo func (s *webserver) GetNodeUploadInfo(ctx context.Context, userID string) (*UploadInfo, error) { - return nil, nil + url := fmt.Sprintf("%s/api/v1/storage/get_upload_info?encrypted=false", s.url) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("apikey", s.apiKey) + + rsp, err := s.client.Do(req) + if err != nil { + return nil, err + } + + if rsp.StatusCode != http.StatusOK { + buf, _ := io.ReadAll(rsp.Body) + return nil, fmt.Errorf("status code %d %s", rsp.StatusCode, string(buf)) + } + + body, err := io.ReadAll(rsp.Body) + if err != nil { + return nil, err + } + + ret := &Result{} + err = json.Unmarshal(body, ret) + if err != nil { + return nil, err + } + + if ret.Code != 0 { + return nil, fmt.Errorf(fmt.Sprintf("code: %d, err: %d, msg: %s", ret.Code, ret.Err, ret.Msg)) + } + + uploadNodes := &UploadInfo{} + err = interfaceToStruct(ret.Data, uploadNodes) + if err != nil { + return nil, err + } + + return uploadNodes, nil } func interfaceToStruct(input interface{}, output interface{}) error { diff --git a/example/main.go b/example/main.go index d078f13..77e09cf 100644 --- a/example/main.go +++ b/example/main.go @@ -166,7 +166,7 @@ var getFileCmd = &cobra.Command{ log.Fatal("NewStorage error ", err) } - reader, _, err := s.GetFileWithCid(context.Background(), cid, false) + reader, _, err := s.GetFileWithCid(context.Background(), cid) if err != nil { log.Fatal("UploadFilesWithPath ", err) } diff --git a/go.mod b/go.mod index 51c3e92..1e892eb 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,8 @@ require ( ) require ( + github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect + github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -34,10 +36,12 @@ require ( github.com/ipfs/go-ipfs-chunker v0.0.5 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect + github.com/ipfs/go-ipfs-files v0.3.0 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect github.com/ipfs/go-ipld-cbor v0.1.0 // indirect github.com/ipfs/go-ipld-format v0.6.0 // indirect github.com/ipfs/go-ipld-legacy v0.2.1 // indirect + github.com/ipfs/go-libipfs v0.6.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipfs/go-merkledag v0.11.0 // indirect @@ -58,6 +62,7 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/smartystreets/assertions v1.13.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 7238693..b254757 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg= +github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -14,6 +16,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= +github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001 h1:/ZshrfQzayqRSBDodmp3rhNCHJCff+utvgBuWRbiqu4= +github.com/eikenb/pipeat v0.0.0-20210730190139-06b3e6902001/go.mod h1:kltMsfRMTHSFdMbK66XdS8mfMW77+FZA1fGY1xYMF84= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= @@ -90,6 +94,8 @@ github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNo github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y= github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= +github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ= +github.com/ipfs/go-ipfs-files v0.3.0/go.mod h1:xAUtYMwB+iu/dtf6+muHNSFQCJG2dSiStR2P6sn9tIM= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-routing v0.3.0 h1:9W/W3N+g+y4ZDeffSgqhgo7BsBSJwPMcyssET9OWevc= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= @@ -101,6 +107,8 @@ github.com/ipfs/go-ipld-format v0.6.0 h1:VEJlA2kQ3LqFSIm5Vu6eIlSxD/Ze90xtc4Meten github.com/ipfs/go-ipld-format v0.6.0/go.mod h1:g4QVMTn3marU3qXchwjpKPKgJv+zF+OlaKMyhJ4LHPg= github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk= github.com/ipfs/go-ipld-legacy v0.2.1/go.mod h1:782MOUghNzMO2DER0FlBR94mllfdCJCkTtDtPM51otM= +github.com/ipfs/go-libipfs v0.6.0 h1:3FuckAJEm+zdHbHbf6lAyk0QUzc45LsFcGw102oBCZM= +github.com/ipfs/go-libipfs v0.6.0/go.mod h1:UjjDIuehp2GzlNP0HEr5I9GfFT7zWgst+YfpUEIThtw= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= @@ -218,6 +226,7 @@ github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+ github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= @@ -350,6 +359,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210601080250-7ecdf8ef093b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/mobile/main.go b/mobile/main.go index 0fbaf0e..800cc9b 100644 --- a/mobile/main.go +++ b/mobile/main.go @@ -101,7 +101,7 @@ func GetURL(rootCID string) (string, error) { } func GetFileWithCid(rootCID string) (data []byte, err error) { - readerCloser, _, err1 := storage_api.GetFileWithCid(context.Background(), rootCID, false) + readerCloser, _, err1 := storage_api.GetFileWithCid(context.Background(), rootCID) if err1 != nil { err = err1 return diff --git a/range/dispatcher.go b/range/dispatcher.go new file mode 100644 index 0000000..a34d849 --- /dev/null +++ b/range/dispatcher.go @@ -0,0 +1,215 @@ +package byterange + +import ( + "context" + "fmt" + "io" + "math" + "math/rand" + "net/http" + "time" + + "github.com/eikenb/pipeat" + "github.com/pkg/errors" +) + +type dispatcher struct { + fileSize int64 + rangeSize int64 + todos JobQueue + workers chan worker + resp chan response + writer *pipeat.PipeWriterAt + reader *pipeat.PipeReaderAt + backoff *backoff +} + +type worker struct { + c *http.Client + e string +} + +type response struct { + offset int64 + data []byte +} + +type job struct { + index int + start int64 + end int64 + retry int +} + +type backoff struct { + minDelay time.Duration + maxDelay time.Duration +} + +func (b *backoff) next(attempt int) time.Duration { + if attempt < 0 { + return b.minDelay + } + + minf := float64(b.minDelay) + durf := minf * math.Pow(1.5, float64(attempt)) + durf = durf + rand.Float64()*minf + + delay := time.Duration(durf) + if delay > b.maxDelay { + return b.maxDelay + } + + return delay +} + +func (d *dispatcher) generateJobs() { + count := int64(math.Ceil(float64(d.fileSize) / float64(d.rangeSize))) + for i := int64(0); i < count; i++ { + start := i * d.rangeSize + end := (i + 1) * d.rangeSize + + if end > d.fileSize { + end = d.fileSize + } + + newJob := &job{ + index: int(i), + start: start, + end: end, + } + + d.todos.Push(newJob) + } +} + +func (d *dispatcher) run(ctx context.Context) { + d.generateJobs() + d.writeData(ctx) + + var ( + counter int64 + finished = make(chan int64, 1) + ) + + go func() { + for { + select { + case w := <-d.workers: + go func() { + j, ok := d.todos.Pop() + if !ok { + d.workers <- w + return + } + + data, err := d.fetch(ctx, w, j) + if err != nil { + errMsg := fmt.Sprintf("pull data failed : %v", err) + if j.retry > 0 { + log.Errorf("pull data failed (retries: %d): %v", j.retry, err) + <-time.After(d.backoff.next(j.retry)) + } + + log.Warnf(errMsg) + + j.retry++ + d.todos.PushFront(j) + d.workers <- w + return + } + + dataLen := j.end - j.start + + if int64(len(data)) < dataLen { + log.Errorf("unexpected data size, want %d got %d", dataLen, len(data)) + d.todos.PushFront(j) + d.workers <- w + return + } + + d.workers <- w + d.resp <- response{ + data: data[:dataLen], + offset: j.start, + } + finished <- dataLen + }() + case size := <-finished: + counter += size + if counter >= d.fileSize { + return + } + case <-ctx.Done(): + return + } + } + }() + + return +} + +func (d *dispatcher) writeData(ctx context.Context) { + go func() { + defer d.finally() + + var count int64 + for { + select { + case r := <-d.resp: + _, err := d.writer.WriteAt(r.data, r.offset) + if err != nil { + log.Errorf("write data failed: %v", err) + continue + } + + count += int64(len(r.data)) + if count >= d.fileSize { + return + } + case <-ctx.Done(): + return + } + } + + }() +} + +func (d *dispatcher) fetch(ctx context.Context, w worker, j *job) ([]byte, error) { + startTime := time.Now() + req, err := http.NewRequest("GET", w.e, nil) + if err != nil { + return nil, errors.Errorf("new request failed: %v", err) + } + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", j.start, j.end)) + resp, err := w.c.Do(req) + if err != nil { + return nil, errors.Errorf("fetch failed: %v", err) + } + + defer func() { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + }() + + if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to download chunk: %d-%d, status code: %d", j.start, j.end, resp.StatusCode) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("read data failed: %v", err) + } + + elapsed := time.Since(startTime) + log.Infof("Chunk: %fs, Link: %s", elapsed.Seconds(), w.e) + + return data, nil +} + +func (d *dispatcher) finally() { + if err := d.writer.Close(); err != nil { + log.Errorf("close write failed: %v", err) + } +} diff --git a/range/queue.go b/range/queue.go new file mode 100644 index 0000000..abe054a --- /dev/null +++ b/range/queue.go @@ -0,0 +1,51 @@ +package byterange + +import ( + "sync" +) + +type JobQueue struct { + items []*job + sync.Mutex +} + +func (q *JobQueue) Len() int { return len(q.items) } + +func (q *JobQueue) Less(i, j int) bool { + return q.items[i].index < q.items[j].index +} + +func (q *JobQueue) Swap(i, j int) { + q.Lock() + defer q.Unlock() + q.items[i], q.items[j] = q.items[j], q.items[i] +} + +func (q *JobQueue) Push(item *job) { + q.Lock() + defer q.Unlock() + q.items = append(q.items, item) +} + +func (q *JobQueue) PushFront(item *job) { + q.Lock() + defer q.Unlock() + + q.items = append(q.items, nil) + copy(q.items[1:], q.items) + q.items[0] = item +} + +func (q *JobQueue) Pop() (*job, bool) { + q.Lock() + defer q.Unlock() + + if len(q.items) == 0 { + return nil, false + } + + item := q.items[0] + q.items = q.items[1:] + + return item, true +} diff --git a/range/range.go b/range/range.go new file mode 100644 index 0000000..e91e5a3 --- /dev/null +++ b/range/range.go @@ -0,0 +1,166 @@ +package byterange + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/Filecoin-Titan/titan-storage-sdk/client" + "github.com/Filecoin-Titan/titan-storage-sdk/request" + "github.com/eikenb/pipeat" + logging "github.com/ipfs/go-log" +) + +const ( + minBackoffDelay = 100 * time.Millisecond + maxBackoffDelay = 3 * time.Second +) + +var log = logging.Logger("range") + +type Range struct { + size int64 + c *http.Client +} + +func New(size int64) *Range { + return &Range{ + size: size, + c: &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + Timeout: 3 * time.Second, + }, + } +} + +func (r *Range) GetFile(ctx context.Context, resources *client.ShareAssetResult) (io.ReadCloser, error) { + workerChan, err := r.makeWorkerChan(ctx, resources) + if err != nil { + return nil, err + } + + fileSize, err := r.getFileSize(ctx, workerChan) + if err != nil { + return nil, err + } + + reader, writer, err := pipeat.Pipe() + if err != nil { + return nil, err + } + + (&dispatcher{ + fileSize: fileSize, + rangeSize: r.size, + reader: reader, + writer: writer, + workers: workerChan, + resp: make(chan response, len(workerChan)), + backoff: &backoff{ + minDelay: minBackoffDelay, + maxDelay: maxBackoffDelay, + }, + }).run(ctx) + + return reader, nil +} + +func (r *Range) getFileSize(ctx context.Context, workerChan chan worker) (int64, error) { + var ( + start int64 = 0 + size int64 = 1 + ) + + for { + select { + case w := <-workerChan: + req, err := http.NewRequest("GET", w.e, nil) + if err != nil { + log.Errorf("new request failed: %v", err) + continue + } + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, start+size)) + resp, err := w.c.Do(req) + if err != nil { + log.Errorf("fetch failed: %v", err) + continue + } + defer func() { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + }() + v := resp.Header.Get("Content-Range") + if v != "" { + subs := strings.Split(v, "/") + if len(subs) != 2 { + log.Errorf("invalid content range: %s", v) + } + return strconv.ParseInt(subs[1], 10, 64) + } + + case <-ctx.Done(): + return 0, ctx.Err() + } + } +} + +func (r *Range) makeWorkerChan(ctx context.Context, res *client.ShareAssetResult) (chan worker, error) { + workerChan := make(chan worker, len(res.URLs)) + + var wg sync.WaitGroup + wg.Add(len(res.URLs)) + + for _, endpoint := range res.URLs { + go func(e string) { + defer wg.Done() + + client := &http.Client{ + // Transport: &http3.RoundTripper{TLSClientConfig: tls.Config{ + // InsecureSkipVerify: true, + // }}, + Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + Timeout: 3 * time.Second, + } + + u, err := url.Parse(e) + if err != nil { + log.Errorf("parse url failed: %v", err) + return + } + + req := request.Request{ + Jsonrpc: "2.0", + ID: "1", + Method: "titan.Version", + Params: nil, + } + + rpcUrl := fmt.Sprintf("%s/rpc/v0", u.Host) + _, err = request.PostJsonRPC(client, rpcUrl, req, nil) + if err != nil { + log.Errorf("send packet failed: %v", err) + return + } + + workerChan <- worker{ + c: client, + e: e, + } + }(endpoint) + } + wg.Wait() + + if len(workerChan) == 0 { + return nil, fmt.Errorf("no worker available") + } + + return workerChan, nil +} diff --git a/request/request.go b/request/request.go new file mode 100644 index 0000000..93c8bcc --- /dev/null +++ b/request/request.go @@ -0,0 +1,205 @@ +package request + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + + "github.com/pkg/errors" + + files "github.com/ipfs/go-ipfs-files" +) + +type request struct { + Ctx context.Context + ApiBase string + Namespace string + Args []string + Opts map[string]string + Body io.Reader + Headers http.Header +} + +func PostJsonRPC(client *http.Client, url string, in Request, requestHeader http.Header) ([]byte, error) { + b, err := json.Marshal(&in) + if err != nil { + return nil, errors.Errorf("marshalling request: %v", err) + } + + var out Response + err = NewBuilder(client, url, "rpc", requestHeader).BodyBytes(b).Exec(context.Background(), &out) + if err != nil { + return nil, errors.Errorf("send request: %v", err) + } + + if out.Error != nil { + return nil, out.Error + } + + return json.Marshal(out.Result) +} + +func NewRequest(ctx context.Context, url, namespace string, header http.Header) *request { + if !strings.HasPrefix(url, "http") { + url = "https://" + url + } + + return &request{ + Ctx: ctx, + ApiBase: url, + Namespace: namespace, + Headers: header, + } +} + +type trailerReader struct { + resp *http.Response +} + +func (r *trailerReader) Read(b []byte) (int, error) { + n, err := r.resp.Body.Read(b) + if err != nil { + if e := r.resp.Trailer.Get("X-Stream-Error"); e != "" { + err = errors.New(e) + } + } + return n, err +} + +func (r *trailerReader) Close() error { + return r.resp.Body.Close() +} + +type response struct { + Output io.ReadCloser + Error *Error + Header http.Header +} + +func (r *response) Close() error { + if r.Output != nil { + // always drain output (response body) + _, err1 := io.Copy(io.Discard, r.Output) + err2 := r.Output.Close() + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + } + return nil +} + +func (r *response) Decode(dec interface{}) error { + defer r.Close() + if r.Error != nil { + return r.Error + } + + return json.NewDecoder(r.Output).Decode(dec) +} + +type Error struct { + Namespace string + Message string + Code int +} + +func (e *Error) Error() string { + var out string + if e.Namespace != "" { + out = e.Namespace + ": " + } + if e.Code != 0 { + out = fmt.Sprintf("%s%d: ", out, e.Code) + } + return out + e.Message +} + +func (r *request) Send(c *http.Client, method string) (*response, error) { + url := r.getURL() + req, err := http.NewRequest(method, url, r.Body) + if err != nil { + return nil, err + } + + req = req.WithContext(r.Ctx) + + // Add any headers that were supplied via the Builder. + req.Header = r.Headers.Clone() + + if fr, ok := r.Body.(*files.MultiFileReader); ok { + req.Header.Set("Content-Type", "multipart/form-data; boundary="+fr.Boundary()) + req.Header.Set("Content-Disposition", "form-data; name=\"files\"") + } + + resp, err := c.Do(req) + if err != nil { + return nil, err + } + + nresp := new(response) + nresp.Header = resp.Header.Clone() + + contentType := resp.Header.Get("Content-Type") + parts := strings.Split(contentType, ";") + contentType = parts[0] + + nresp.Output = &trailerReader{resp} + if resp.StatusCode >= http.StatusBadRequest { + e := &Error{ + Namespace: r.Namespace, + } + + switch { + case resp.StatusCode == http.StatusNotFound: + e.Message = "endpoint not found" + case contentType == "text/plain": + out, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Fprintf(os.Stderr, "warning! response (%d) read error: %s\n", resp.StatusCode, err) + } + e.Message = string(out) + case contentType == "application/json": + if err = json.NewDecoder(resp.Body).Decode(e); err != nil { + fmt.Fprintf(os.Stderr, "warning! response (%d) unmarshall error: %s\n", resp.StatusCode, err) + } + default: + out, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Fprintf(os.Stderr, "response (%d) read error: %s\n", resp.StatusCode, err) + } + e.Message = string(out) + } + nresp.Error = e + nresp.Output = nil + + // drain body and close + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + + return nresp, nil +} + +func (r *request) getURL() string { + values := make(url.Values) + for _, arg := range r.Args { + values.Add("arg", arg) + } + for k, v := range r.Opts { + values.Add(k, v) + } + + if r.Namespace == "rpc" { + return r.ApiBase + } + + return fmt.Sprintf("%s/%s?%s", r.ApiBase, r.Namespace, values.Encode()) +} diff --git a/request/request_builder.go b/request/request_builder.go new file mode 100644 index 0000000..69b00fa --- /dev/null +++ b/request/request_builder.go @@ -0,0 +1,121 @@ +package request + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strconv" + "strings" +) + +// Builder is an IPFS commands request builder. +type Builder struct { + namespace string + opts map[string]string + headers http.Header + body io.Reader + baseApi string + client *http.Client +} + +func NewBuilder(c *http.Client, baseApi, namespace string, requestHeader http.Header) *Builder { + return &Builder{ + namespace: namespace, + baseApi: baseApi, + client: c, + headers: requestHeader, + } +} + +// BodyString sets the request body to the given string. +func (r *Builder) BodyString(body string) *Builder { + return r.Body(strings.NewReader(body)) +} + +// BodyBytes sets the request body to the given buffer. +func (r *Builder) BodyBytes(body []byte) *Builder { + return r.Body(bytes.NewReader(body)) +} + +// Body sets the request body to the given reader. +func (r *Builder) Body(body io.Reader) *Builder { + r.body = body + return r +} + +// Option sets the given config. +func (r *Builder) Option(key string, value interface{}) *Builder { + var s string + switch v := value.(type) { + case bool: + s = strconv.FormatBool(v) + case string: + s = v + case []byte: + s = string(v) + default: + // slow case. + s = fmt.Sprint(value) + } + if r.opts == nil { + r.opts = make(map[string]string, 1) + } + r.opts[key] = s + return r +} + +// Header sets the given header. +func (r *Builder) Header(name, value string) *Builder { + if r.headers == nil { + r.headers = http.Header{} + } + r.headers.Set(name, value) + return r +} + +// Post sends the post request and return the response. +func (r *Builder) Post(ctx context.Context) (*response, error) { + req := NewRequest(ctx, r.baseApi, r.namespace, r.headers) + req.Opts = r.opts + req.Body = r.body + return req.Send(r.client, http.MethodPost) +} + +// Get sends the get request and return the response. +func (r *Builder) Get(ctx context.Context) (*response, error) { + req := NewRequest(ctx, r.baseApi, r.namespace, r.headers) + req.Opts = r.opts + req.Body = r.body + return req.Send(r.client, http.MethodGet) +} + +// Head sends the head request and return the response. +func (r *Builder) Head(ctx context.Context) (http.Header, error) { + req := NewRequest(ctx, r.baseApi, r.namespace, r.headers) + req.Opts = r.opts + resp, err := r.client.Head(req.getURL()) + if err != nil { + return nil, err + } + return resp.Header, nil +} + +// Exec sends the request a request and decodes the response. +func (r *Builder) Exec(ctx context.Context, res interface{}) error { + httpRes, err := r.Post(ctx) + if err != nil { + return err + } + + if res == nil { + lateErr := httpRes.Close() + if httpRes.Error != nil { + return httpRes.Error + } + return lateErr + } + + return httpRes.Decode(res) +} diff --git a/request/types.go b/request/types.go new file mode 100644 index 0000000..5d46d21 --- /dev/null +++ b/request/types.go @@ -0,0 +1,40 @@ +package request + +import ( + "encoding/json" + "fmt" +) + +type ErrorCode int + +// Request defines a JSON RPC request from the spec +// http://www.jsonrpc.org/specification#request_object +type Request struct { + Jsonrpc string `json:"jsonrpc"` + ID interface{} `json:"id,omitempty"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + Meta map[string]string `json:"meta,omitempty"` +} + +// Response defines a JSON RPC response from the spec +// http://www.jsonrpc.org/specification#response_object +type Response struct { + Jsonrpc string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + ID interface{} `json:"id"` + Error *respError `json:"error,omitempty"` +} + +type respError struct { + Code ErrorCode `json:"code"` + Message string `json:"message"` + Meta json.RawMessage `json:"meta,omitempty"` +} + +func (e *respError) Error() string { + if e.Code >= -32768 && e.Code <= -32000 { + return fmt.Sprintf("RPC error (%d): %s", e.Code, e.Message) + } + return e.Message +} diff --git a/storage.go b/storage.go index 9a1e691..d73ab07 100644 --- a/storage.go +++ b/storage.go @@ -6,8 +6,6 @@ import ( "encoding/json" "fmt" "io" - "log" - "math" "mime/multipart" "net/http" "net/url" @@ -20,6 +18,7 @@ import ( "github.com/Filecoin-Titan/titan-storage-sdk/client" "github.com/Filecoin-Titan/titan-storage-sdk/memfile" + byterange "github.com/Filecoin-Titan/titan-storage-sdk/range" "github.com/ipfs/go-cid" ) @@ -68,7 +67,7 @@ type Storage interface { // GetFileWithCid retrieves the file content associated with the specified rootCID from the titan storage. // parallel means multiple concurrent download tasks. // It returns an io.ReadCloser for reading the file content and filename and any error encountered during the retrieval process. - GetFileWithCid(ctx context.Context, rootCID string, parallel bool) (io.ReadCloser, string, error) + GetFileWithCid(ctx context.Context, rootCID string) (io.ReadCloser, string, error) // CreateGroup create a group CreateGroup(ctx context.Context, name string, parentID int) error // ListGroup list groups @@ -155,7 +154,15 @@ func (s *storage) UploadFilesWithPath(ctx context.Context, filePath string, prog rsp, err := s.webAPI.GetNodeUploadInfo(ctx, s.userID) if err != nil { - return cid.Cid{}, nil + return cid.Cid{}, err + } + + if rsp.AlreadyExists { + return cid.Cid{}, fmt.Errorf("file already exists") + } + + if len(rsp.List) == 0 { + return cid.Cid{}, fmt.Errorf("endpoints is empty") } f, err := os.Open(filePath) @@ -164,7 +171,9 @@ func (s *storage) UploadFilesWithPath(ctx context.Context, filePath string, prog } defer f.Close() - ret, err := s.uploadFileWithForm(ctx, f, f.Name(), rsp.UploadURL, rsp.Token, progress) + node := rsp.List[0] + + ret, err := s.uploadFileWithForm(ctx, f, f.Name(), node.UploadURL, node.Token, progress) if err != nil { return cid.Cid{}, fmt.Errorf("upload file with form failed, %s", err.Error()) } @@ -195,7 +204,7 @@ func (s *storage) UploadFilesWithPath(ctx context.Context, filePath string, prog AssetName: fileInfo.Name(), AssetSize: fileInfo.Size(), AssetType: fileType, - NodeID: rsp.NodeID, + NodeID: node.NodeID, GroupID: s.groupID, } @@ -423,115 +432,119 @@ func (s *storage) UploadStream(ctx context.Context, r io.Reader, name string, pr } // GetFileWithCid gets a single file by rootCID -func (s *storage) GetFileWithCid(ctx context.Context, rootCID string, parallel bool) (io.ReadCloser, string, error) { +func (s *storage) GetFileWithCid(ctx context.Context, rootCID string) (io.ReadCloser, string, error) { res, err := s.GetURL(ctx, rootCID) if err != nil { return nil, "", err } - taskCount := int64(len(res.URLs)) - if !parallel { - taskCount = 1 - } - - type downloadReq struct { - url string - start, end int64 - index int - } - - var ( - wg sync.WaitGroup - mu sync.Mutex - chunkSize = res.Size / taskCount - chunks = make([][]byte, taskCount) - firstFailChan = make(chan downloadReq, taskCount) - fastestTime = math.MaxInt - fastestTask downloadReq - ) + r := byterange.New(1 << 20) + reader, err := r.GetFile(ctx, res) - downloadChunk := func(d downloadReq, fc chan downloadReq) { - defer wg.Done() - start := time.Now() - - req, err := http.NewRequest("GET", d.url, nil) - if err != nil { - fc <- d - return - } - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", d.start, d.end)) - resp, err := http.DefaultClient.Do(req) - if err != nil { - fc <- d - return - } - - defer func() { - if resp != nil && resp.Body != nil { - resp.Body.Close() - } - }() - - if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { - log.Printf("failed to download chunk: %d-%d, status code: %d", d.start, d.end, resp.StatusCode) - fc <- d - return - } + return reader, res.FileName, err + // taskCount := int64(len(res.URLs)) + // if !parallel { + // taskCount = 1 + // } - data, err := io.ReadAll(resp.Body) - if err != nil { - fc <- d - return - } - elapsed := time.Since(start) - log.Printf("Chunk: %fs, Link: %s", elapsed.Seconds(), d.url) + // type downloadReq struct { + // url string + // start, end int64 + // index int + // } - mu.Lock() - if int(elapsed.Milliseconds()) < fastestTime { - fastestTime = int(elapsed.Milliseconds()) - fastestTask = d - } - chunks[d.index] = data - mu.Unlock() - } + // var ( + // wg sync.WaitGroup + // mu sync.Mutex + // chunkSize = res.Size / taskCount + // chunks = make([][]byte, taskCount) + // firstFailChan = make(chan downloadReq, taskCount) + // fastestTime = math.MaxInt + // fastestTask downloadReq + // ) + + // downloadChunk := func(d downloadReq, fc chan downloadReq) { + // defer wg.Done() + // start := time.Now() + + // req, err := http.NewRequest("GET", d.url, nil) + // if err != nil { + // fc <- d + // return + // } + // req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", d.start, d.end)) + // resp, err := http.DefaultClient.Do(req) + // if err != nil { + // fc <- d + // return + // } + + // defer func() { + // if resp != nil && resp.Body != nil { + // resp.Body.Close() + // } + // }() + + // if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { + // log.Printf("failed to download chunk: %d-%d, status code: %d", d.start, d.end, resp.StatusCode) + // fc <- d + // return + // } + + // data, err := io.ReadAll(resp.Body) + // if err != nil { + // fc <- d + // return + // } + // elapsed := time.Since(start) + // log.Printf("Chunk: %fs, Link: %s", elapsed.Seconds(), d.url) + + // mu.Lock() + // if int(elapsed.Milliseconds()) < fastestTime { + // fastestTime = int(elapsed.Milliseconds()) + // fastestTask = d + // } + // chunks[d.index] = data + // mu.Unlock() + // } - for i := 0; i < int(taskCount); i++ { - wg.Add(1) - start := int64(i) * chunkSize - end := start + chunkSize - 1 - if i == int(taskCount)-1 { - end = res.Size - 1 // Ensure last chunk covers the remainder - } - go downloadChunk(downloadReq{res.URLs[i], start, end, i}, firstFailChan) - } + // for i := 0; i < int(taskCount); i++ { + // wg.Add(1) + // start := int64(i) * chunkSize + // end := start + chunkSize - 1 + // if i == int(taskCount)-1 { + // end = res.Size - 1 // Ensure last chunk covers the remainder + // } + // go downloadChunk(downloadReq{res.URLs[i], start, end, i}, firstFailChan) + // } - // wait normal - wg.Wait() - close(firstFailChan) + // // wait normal + // wg.Wait() + // close(firstFailChan) - secFailChan := make(chan downloadReq, len(firstFailChan)) - for fail := range firstFailChan { - wg.Add(1) - fail.url = fastestTask.url - log.Printf("retry download chunk: %d-%d", fail.start, fail.end) - downloadChunk(fail, secFailChan) - } - // wait failed - wg.Wait() - close(secFailChan) + // secFailChan := make(chan downloadReq, len(firstFailChan)) + // for fail := range firstFailChan { + // wg.Add(1) + // fail.url = fastestTask.url + // log.Printf("retry download chunk: %d-%d", fail.start, fail.end) + // downloadChunk(fail, secFailChan) + // } + // // wait failed + // wg.Wait() + // close(secFailChan) - if len(secFailChan) > 0 { - return nil, "", fmt.Errorf("failed to download all chunks") - } + // if len(secFailChan) > 0 { + // return nil, "", fmt.Errorf("failed to download all chunks") + // } - readers := make([]io.Reader, len(chunks)) - for i, chunk := range chunks { - readers[i] = bytes.NewReader(chunk) - } + // readers := make([]io.Reader, len(chunks)) + // for i, chunk := range chunks { + // readers[i] = bytes.NewReader(chunk) + // } - multiReader := io.MultiReader(readers...) + // multiReader := io.MultiReader(readers...) - return io.NopCloser(multiReader), res.FileName, nil + // return io.NopCloser(multiReader), res.FileName, nil } // errAssetNotExist returns an error indicating that the asset does not exist diff --git a/storage_test.go b/storage_test.go index 9f2f673..10bdd5b 100644 --- a/storage_test.go +++ b/storage_test.go @@ -32,7 +32,7 @@ func TestCalculateCarCID(t *testing.T) { } func TestCreateCarWithFile(t *testing.T) { - // } + input := "./example/example.exe" output := "./example/example.car" @@ -137,32 +137,32 @@ func TestGetFile(t *testing.T) { t.Fatal("NewStorage error ", err) } - storageObject := s.(*storage) - t.Log("candidate node ", storageObject.candidateID) + // storageObject := s.(*storage) + // t.Log("candidate node ", storageObject.candidateID) - progress := func(doneSize int64, totalSize int64) { - t.Logf("upload %d of %d", doneSize, totalSize) - } + // progress := func(doneSize int64, totalSize int64) { + // t.Logf("upload %d of %d", doneSize, totalSize) + // } - filePath := "./storage_test.go" - f, err := os.Open(filePath) - if err != nil { - t.Fatal(err) - } + // filePath := "./storage_test.go" + // f, err := os.Open(filePath) + // if err != nil { + // t.Fatal(err) + // } - cid, err := s.UploadStream(context.Background(), f, f.Name(), progress) - if err != nil { - t.Fatal("upload file failed ", err.Error()) - } + // cid, err := s.UploadStream(context.Background(), f, f.Name(), progress) + // if err != nil { + // t.Fatal("upload file failed ", err.Error()) + // } - res, err := s.GetURL(context.Background(), cid.String()) - if err != nil { - t.Fatal("get url ", err) - } + // res, err := s.GetURL(context.Background(), cid.String()) + // if err != nil { + // t.Fatal("get url ", err) + // } - t.Log("url:", res.URLs) + // t.Log("url:", res.URLs) - reader, _, err := s.GetFileWithCid(context.Background(), cid.String(), true) + reader, fn, err := s.GetFileWithCid(context.Background(), "bafybeibhwmjstnkv3pvlswwcgs2as24bvlbr5tvba6mthtgf77mw4uown4") if err != nil { t.Fatal("get url ", err) } @@ -174,7 +174,7 @@ func TestGetFile(t *testing.T) { t.Fatal("get url ", err) } - newFilePath := fmt.Sprintf("./example/%s", res.FileName) + newFilePath := fmt.Sprintf("./example/%s", fn) newFile, err := os.Create(newFilePath) if err != nil { t.Fatal("Create file", err) @@ -183,7 +183,7 @@ func TestGetFile(t *testing.T) { newFile.Write(data) - t.Logf("write file %s %d", res.FileName, len(data)) + t.Logf("write file %s %d", fn, len(data)) } func TestUploadFileWithURL(t *testing.T) {