Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add seed downloader
Browse files Browse the repository at this point in the history
Signed-off-by: allen.wq <[email protected]>
  • Loading branch information
wangforthinker committed Apr 23, 2020
1 parent 06d2368 commit 1da7fba
Show file tree
Hide file tree
Showing 5 changed files with 507 additions and 0 deletions.
130 changes: 130 additions & 0 deletions dfdaemon/seed/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package seed

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// downloader manage the downloading of seed file.
type downloader interface {
DownloadToWriterAt(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration, writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error)
}

func newLocalDownloader(url string, header map[string][]string, rate *ratelimiter.RateLimiter, copyCache bool) downloader {
return &localDownloader{
url: url,
header: header,
rate: rate,
copyCache: copyCache,
}
}

type localDownloader struct {
url string
header map[string][]string

// downloader will limit the rate.
rate *ratelimiter.RateLimiter

// if copyCache sets, the response body will store to memory cache and transfer to writer
copyCache bool
}

func (ld *localDownloader) DownloadToWriterAt(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration,
writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) {
return ld.download(ctx, rangeStruct, timeout, writeOff, writerAt, rateLimit)
}

func (ld *localDownloader) download(ctx context.Context, rangeStruct httputils.RangeStruct, timeout time.Duration,
writeOff int64, writerAt io.WriterAt, rateLimit bool) (length int64, err error) {
var (
written int64
n int
rd io.Reader
)

header := map[string]string{}
for k, v := range ld.header {
header[k] = v[0]
}

header[config.StrRange] = fmt.Sprintf("bytes=%d-%d", rangeStruct.StartIndex, rangeStruct.EndIndex)
resp, err := httputils.HTTPWithHeaders("GET", ld.url, header, timeout, nil)
if err != nil {
return 0, err
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
return 0, errortypes.NewHTTPError(resp.StatusCode, "resp code is not 200 or 206")
}

expectedLen := rangeStruct.EndIndex - rangeStruct.StartIndex + 1

defer resp.Body.Close()
rd = resp.Body
if rateLimit {
rd = limitreader.NewLimitReaderWithLimiter(ld.rate, resp.Body, false)
}

// in copyCache pattern, the bytes buffer will be transferred to io.WriterAt, and will be held by io.WriterAt.
if ld.copyCache {
buf := bytes.NewBuffer(nil)
buf.Grow(int(expectedLen))
written, err = io.CopyN(buf, rd, expectedLen)
if err != nil && err != io.EOF {
logrus.Errorf("failed to read data [%d, %d] from resp.body: %v", rangeStruct.StartIndex, rangeStruct.EndIndex, err)
}

if written < expectedLen {
return 0, errors.Wrap(io.ErrShortWrite, fmt.Sprintf("download from [%d,%d], expecte read %d, but got %d", rangeStruct.StartIndex, rangeStruct.EndIndex, expectedLen, written))
}

n, err = writerAt.WriteAt(buf.Bytes(), writeOff)
written = int64(n)
} else {
written, err = CopyBufferToWriterAt(writeOff, writerAt, rd)
}

if err == io.EOF {
err = nil
}

if err != nil {
return 0, errors.Wrap(err, fmt.Sprintf("failed to download from [%d,%d]", rangeStruct.StartIndex, rangeStruct.EndIndex))
}

if written < expectedLen {
return 0, errors.Wrap(io.ErrShortWrite, fmt.Sprintf("download from [%d,%d], expecte read %d, but got %d", rangeStruct.StartIndex, rangeStruct.EndIndex, expectedLen, written))
}

return written, err
}
102 changes: 102 additions & 0 deletions dfdaemon/seed/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package seed

import (
"bytes"
"context"
"fmt"

"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"

"github.com/go-check/check"
)

type mockBufferWriterAt struct {
buf *bytes.Buffer
}

func newMockBufferWriterAt() *mockBufferWriterAt {
return &mockBufferWriterAt{
buf: bytes.NewBuffer(nil),
}
}

func (mb *mockBufferWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
if off != int64(mb.buf.Len()) {
return 0, fmt.Errorf("failed to seek to %d", off)
}

return mb.buf.Write(p)
}

func (mb *mockBufferWriterAt) Bytes() []byte {
return mb.buf.Bytes()
}

func (suite *SeedTestSuite) checkLocalDownloadDataFromFileServer(c *check.C, path string, off int64, size int64) {
buf := newMockBufferWriterAt()

ld := newLocalDownloader(fmt.Sprintf("http://%s/%s", suite.host, path), nil, ratelimiter.NewRateLimiter(0, 0), false)

length, err := ld.DownloadToWriterAt(context.Background(), httputils.RangeStruct{StartIndex: off, EndIndex: off + size - 1}, 0, 0, buf, true)
c.Check(err, check.IsNil)
c.Check(size, check.Equals, length)

expectData, err := suite.readFromFileServer(path, off, size)
c.Check(err, check.IsNil)
c.Check(string(buf.Bytes()), check.Equals, string(expectData))

buf2 := newMockBufferWriterAt()
ld2 := newLocalDownloader(fmt.Sprintf("http://%s/%s", suite.host, path), nil, ratelimiter.NewRateLimiter(0, 0), true)

length2, err := ld2.DownloadToWriterAt(context.Background(), httputils.RangeStruct{StartIndex: off, EndIndex: off + size - 1}, 0, 0, buf2, false)
c.Check(err, check.IsNil)
c.Check(size, check.Equals, length2)
c.Check(string(buf2.Bytes()), check.Equals, string(expectData))
}

func (suite *SeedTestSuite) TestLocalDownload(c *check.C) {
// test read fileA
suite.checkLocalDownloadDataFromFileServer(c, "fileA", 0, 500*1024)
suite.checkLocalDownloadDataFromFileServer(c, "fileA", 0, 100*1024)
for i := 0; i < 5; i++ {
suite.checkLocalDownloadDataFromFileServer(c, "fileA", int64(i*100*1024), 100*1024)
}

// test read fileB
suite.checkLocalDownloadDataFromFileServer(c, "fileB", 0, 1024*1024)
suite.checkLocalDownloadDataFromFileServer(c, "fileB", 0, 100*1024)
for i := 0; i < 20; i++ {
suite.checkLocalDownloadDataFromFileServer(c, "fileB", int64(i*50*1024), 50*1024)
}
suite.checkLocalDownloadDataFromFileServer(c, "fileB", 1000*1024, 24*1024)

// test read fileC
suite.checkLocalDownloadDataFromFileServer(c, "fileC", 0, 1500*1024)
suite.checkLocalDownloadDataFromFileServer(c, "fileC", 0, 100*1024)
suite.checkLocalDownloadDataFromFileServer(c, "fileC", 1400*1024, 100*1024)
for i := 0; i < 75; i++ {
suite.checkLocalDownloadDataFromFileServer(c, "fileC", int64(i*20*1024), 20*1024)
}

//test read fileF
for i := 0; i < 50; i++ {
suite.checkLocalDownloadDataFromFileServer(c, "fileF", int64(i*20000), 20000)
}
}
48 changes: 48 additions & 0 deletions dfdaemon/seed/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package seed

import (
"bytes"
"io"
)

// CopyBufferToWriterAt copy data from rd and write to io.WriterAt.
func CopyBufferToWriterAt(off int64, writerAt io.WriterAt, rd io.Reader) (n int64, err error) {
buffer := bytes.NewBuffer(nil)
bufSize := int64(256 * 1024)

for {
_, err := io.CopyN(buffer, rd, bufSize)
if err != nil && err != io.EOF {
return 0, err
}

wcount, werr := writerAt.WriteAt(buffer.Bytes(), off)
n += int64(wcount)
if werr != nil {
return n, err
}

if err == io.EOF {
return n, io.EOF
}

buffer.Reset()
off += int64(wcount)
}
}
Loading

0 comments on commit 1da7fba

Please sign in to comment.