Skip to content

Commit

Permalink
🔧 add bytes.Buffer to s3 reader
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx committed Jun 25, 2020
1 parent e5f4182 commit e13c15d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 38 deletions.
88 changes: 55 additions & 33 deletions internal/db/storage/blob/s3/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ func (r *reader) Open(ctx context.Context) (err error) {
default:
}

body, err := r.getObject(ctx, offset, r.maxChunkSize)
body, err := func() (io.Reader, error) {
if r.backoffEnabled {
return r.getObjectWithBackoff(ctx, offset, r.maxChunkSize)
} else {
return r.getObject(ctx, offset, r.maxChunkSize)
}
}()
if err != nil {
return err
}

body, err = ctxio.NewReadCloserWithContext(ctx, body)
body, err = ctxio.NewReaderWithContext(ctx, body)
if err != nil {
return err
}
Expand All @@ -104,53 +110,52 @@ func (r *reader) Open(ctx context.Context) (err error) {
}

offset += chunk

err = body.Close()
if err != nil {
return err
}
}
}))

return nil
}

func (r *reader) getObject(ctx context.Context, offset, length int64) (io.ReadCloser, error) {
func (r *reader) getObjectWithBackoff(ctx context.Context, offset, length int64) (io.Reader, error) {
getFunc := func() (interface{}, error) {
log.Debugf("reading %d-%d bytes...", offset, offset+length-1)
return r.service.GetObjectWithContext(
ctx,
&s3.GetObjectInput{
Bucket: aws.String(r.bucket),
Key: aws.String(r.key),
Range: aws.String("bytes=" +
strconv.FormatInt(offset, 10) +
"-" +
strconv.FormatInt(offset+length-1, 10),
),
},
)
return r.getObject(ctx, offset, length)
}

var resp interface{}
var err error

if r.backoffEnabled {
b := backoff.New(r.backoffOpts...)
defer b.Close()
b := backoff.New(r.backoffOpts...)
defer b.Close()

resp, err = b.Do(ctx, getFunc)
} else {
resp, err = getFunc()
res, err := b.Do(ctx, getFunc)
if err != nil {
return nil, err
}

return res.(io.Reader), nil
}

func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader, error) {
log.Debugf("reading %d-%d bytes...", offset, offset+length-1)
resp, err := r.service.GetObjectWithContext(
ctx,
&s3.GetObjectInput{
Bucket: aws.String(r.bucket),
Key: aws.String(r.key),
Range: aws.String("bytes=" +
strconv.FormatInt(offset, 10) +
"-" +
strconv.FormatInt(offset+length-1, 10),
),
},
)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
return nil, errors.NewErrBlobNoSuchBucket(err, r.bucket)
log.Error(errors.NewErrBlobNoSuchBucket(err, r.bucket))
return ioutil.NopCloser(bytes.NewReader(nil)), nil
case s3.ErrCodeNoSuchKey:
return nil, errors.NewErrBlobNoSuchKey(err, r.key)
log.Error(errors.NewErrBlobNoSuchKey(err, r.key))
return ioutil.NopCloser(bytes.NewReader(nil)), nil
case "InvalidRange":
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
Expand All @@ -159,7 +164,24 @@ func (r *reader) getObject(ctx context.Context, offset, length int64) (io.ReadCl
return nil, err
}

return resp.(*s3.GetObjectOutput).Body, nil
res, err := ctxio.NewReadCloserWithContext(ctx, resp.Body)
if err != nil {
return nil, err
}

buf := new(bytes.Buffer)

_, err = io.Copy(buf, res)
if err != nil {
return nil, err
}

err = res.Close()
if err != nil {
log.Warn(err)
}

return buf, nil
}

func (r *reader) Close() error {
Expand Down
5 changes: 0 additions & 5 deletions pkg/agent/sidecar/service/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ func (r *restorer) startRestore(ctx context.Context) (<-chan error, error) {
if err != nil {
log.Errorf("restoring failed: %s", err)

if errors.IsErrBlobNoSuchBucket(err) ||
errors.IsErrBlobNoSuchKey(err) {
return nil, nil
}

return nil, err
}

Expand Down

0 comments on commit e13c15d

Please sign in to comment.