Skip to content

Commit

Permalink
[patch] Revise S3 reader/writer: compatible with IBM Cloud Object Sto…
Browse files Browse the repository at this point in the history
…rage (#509)

* 🔧 revise downloading s3 parts

Signed-off-by: Rintaro Okamura <[email protected]>

* ✨ Add internal/io package

Signed-off-by: Rintaro Okamura <[email protected]>

* ✨ Add backoff to s3 reader

Signed-off-by: Rintaro Okamura <[email protected]>

* 🔧 add bytes.Buffer to s3 reader

Signed-off-by: Rintaro Okamura <[email protected]>

* ♻️ use defer for Close()

Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx authored Jun 25, 2020
1 parent 61fed82 commit c6fcb37
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 45 deletions.
5 changes: 5 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,9 @@ agent:
# @schema {"name": "agent.sidecar.config.blob_storage.s3.max_part_size", "type": "string", "pattern": "^[0-9]+(kb|mb|gb)$"}
# agent.sidecar.config.blob_storage.s3.max_part_size -- s3 multipart upload max part size
max_part_size: 64mb
# @schema {"name": "agent.sidecar.config.blob_storage.s3.max_chunk_size", "type": "string", "pattern": "^[0-9]+(kb|mb|gb)$"}
# agent.sidecar.config.blob_storage.s3.max_chunk_size -- s3 download max chunk size
max_chunk_size: 64mb
# @schema {"name": "agent.sidecar.config.compress", "type": "object"}
compress:
# @schema {"name": "agent.sidecar.config.compress.compress_algorithm", "type": "string", "enum": ["gob", "gzip", "lz4", "zstd"]}
Expand Down Expand Up @@ -1536,6 +1539,8 @@ agent:
retry_count: 100
# agent.sidecar.config.client.transport.backoff.enable_error_log -- backoff error log enabled
enable_error_log: true
# agent.sidecar.config.restore_backoff_enabled -- restore backoff enabled
restore_backoff_enabled: false
# @schema {"name": "agent.sidecar.config.restore_backoff", "alias": "backoff"}
restore_backoff:
# agent.sidecar.config.restore_backoff.initial_duration -- restore backoff initial duration
Expand Down
4 changes: 3 additions & 1 deletion internal/config/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type S3Config struct {
EnableEndpointDiscovery bool `json:"enable_endpoint_discovery" yaml:"enable_endpoint_discovery"`
EnableEndpointHostPrefix bool `json:"enable_endpoint_host_prefix" yaml:"enable_endpoint_host_prefix"`

MaxPartSize string `json:"max_part_size" yaml:"max_part_size"`
MaxPartSize string `json:"max_part_size" yaml:"max_part_size"`
MaxChunkSize string `json:"max_chunk_size" yaml:"max_chunk_size"`
}

func (b *Blob) Bind() *Blob {
Expand All @@ -94,6 +95,7 @@ func (s *S3Config) Bind() *S3Config {
s.SecretAccessKey = GetActualValue(s.SecretAccessKey)
s.Token = GetActualValue(s.Token)
s.MaxPartSize = GetActualValue(s.MaxPartSize)
s.MaxChunkSize = GetActualValue(s.MaxChunkSize)

return s
}
3 changes: 3 additions & 0 deletions internal/config/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type AgentSidecar struct {
// Compress represent compression configurations
Compress *CompressCore `yaml:"compress" json:"compress"`

// RestoreBackoffEnabled represent backoff enabled or not
RestoreBackoffEnabled bool `yaml:"restore_backoff_enabled" json:"restore_backoff_enabled"`

// RestoreBackoff represent backoff configurations for restoring process
RestoreBackoff *Backoff `yaml:"restore_backoff" json:"restore_backoff"`

Expand Down
35 changes: 35 additions & 0 deletions internal/db/storage/blob/s3/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package s3
import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/unit"
)
Expand Down Expand Up @@ -70,3 +71,37 @@ func WithMaxPartSize(size string) Option {
return nil
}
}

func WithMaxChunkSize(size string) Option {
return func(c *client) error {
b, err := unit.ParseBytes(size)
if err != nil {
return err
}

if int64(b) >= s3manager.MinUploadPartSize {
c.maxChunkSize = int64(b)
}

return nil
}
}

func WithReaderBackoff(enabled bool) Option {
return func(c *client) error {
c.readerBackoffEnabled = enabled
return nil
}
}

func WithReaderBackoffOpts(opts ...backoff.Option) Option {
return func(c *client) error {
if c.readerBackoffOpts == nil {
c.readerBackoffOpts = opts
}

c.readerBackoffOpts = append(c.readerBackoffOpts, opts...)

return nil
}
}
25 changes: 25 additions & 0 deletions internal/db/storage/blob/s3/reader/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reader

import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/errgroup"
)

Expand All @@ -26,6 +27,8 @@ type Option func(r *reader)
var (
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),
WithMaxChunkSize(512 * 1024 * 1024),
WithBackoff(false),
}
)

Expand Down Expand Up @@ -56,3 +59,25 @@ func WithKey(key string) Option {
r.key = key
}
}

func WithMaxChunkSize(size int64) Option {
return func(r *reader) {
r.maxChunkSize = size
}
}

func WithBackoff(enabled bool) Option {
return func(r *reader) {
r.backoffEnabled = enabled
}
}

func WithBackoffOpts(opts ...backoff.Option) Option {
return func(r *reader) {
if r.backoffOpts == nil {
r.backoffOpts = opts
}

r.backoffOpts = append(r.backoffOpts, opts...)
}
}
132 changes: 113 additions & 19 deletions internal/db/storage/blob/s3/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
package reader

import (
"bytes"
"context"
"io"
"io/ioutil"
"strconv"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
ctxio "github.com/vdaas/vald/internal/io"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
)

Expand All @@ -37,6 +43,10 @@ type reader struct {

pr io.ReadCloser
wg *sync.WaitGroup

backoffEnabled bool
backoffOpts []backoff.Option
maxChunkSize int64
}

type Reader interface {
Expand All @@ -54,42 +64,126 @@ func New(opts ...Option) Reader {
}

func (r *reader) Open(ctx context.Context) (err error) {
input := &s3.GetObjectInput{
Bucket: aws.String(r.bucket),
Key: aws.String(r.key),
}

var pw io.WriteCloser

r.pr, pw = io.Pipe()

resp, err := r.service.GetObjectWithContext(ctx, input)
r.wg = new(sync.WaitGroup)
r.wg.Add(1)
r.eg.Go(safety.RecoverFunc(func() (err error) {
defer r.wg.Done()
defer pw.Close()

var offset int64

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

body, err := func() (io.Reader, error) {
if r.backoffEnabled {
return r.getObjectWithBackoff(ctx, offset, r.maxChunkSize)
} else {
return r.getObject(ctx, offset, r.maxChunkSize)
}
}()
if err != nil {
return err
}

body, err = ctxio.NewReaderWithContext(ctx, body)
if err != nil {
return err
}

chunk, err := io.Copy(pw, body)
if err != nil {
return err
}

if chunk < r.maxChunkSize {
log.Debugf("read %d bytes.", offset+chunk)
return nil
}

offset += chunk
}
}))

return nil
}

func (r *reader) getObjectWithBackoff(ctx context.Context, offset, length int64) (io.Reader, error) {
getFunc := func() (interface{}, error) {
return r.getObject(ctx, offset, length)
}

b := backoff.New(r.backoffOpts...)
defer b.Close()

res, err := b.Do(ctx, getFunc)
if err != nil {
return nil, err
}

return res.(io.Reader), nil
}

func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader, error) {
log.Debugf("reading %d-%d bytes...", offset, offset+length-1)
resp, err := r.service.GetObjectWithContext(
ctx,
&s3.GetObjectInput{
Bucket: aws.String(r.bucket),
Key: aws.String(r.key),
Range: aws.String("bytes=" +
strconv.FormatInt(offset, 10) +
"-" +
strconv.FormatInt(offset+length-1, 10),
),
},
)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
return errors.NewErrBlobNoSuchBucket(err, r.bucket)
log.Error(errors.NewErrBlobNoSuchBucket(err, r.bucket))
return ioutil.NopCloser(bytes.NewReader(nil)), nil
case s3.ErrCodeNoSuchKey:
return errors.NewErrBlobNoSuchKey(err, r.key)
log.Error(errors.NewErrBlobNoSuchKey(err, r.key))
return ioutil.NopCloser(bytes.NewReader(nil)), nil
case "InvalidRange":
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
}

return err
return nil, err
}

r.wg = new(sync.WaitGroup)
r.wg.Add(1)
res, err := ctxio.NewReadCloserWithContext(ctx, resp.Body)
if err != nil {
return nil, err
}

r.eg.Go(safety.RecoverFunc(func() (err error) {
defer r.wg.Done()
defer resp.Body.Close()
defer pw.Close()
buf := new(bytes.Buffer)

_, err = io.Copy(pw, resp.Body)
return err
}))
defer func() {
e := res.Close()
if e != nil {
log.Warn(e)
}
}()

return nil
_, err = io.Copy(buf, res)
if err != nil {
return nil, err
}

return buf, nil
}

func (r *reader) Close() error {
Expand Down
10 changes: 9 additions & 1 deletion internal/db/storage/blob/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/db/storage/blob"
"github.com/vdaas/vald/internal/db/storage/blob/s3/reader"
"github.com/vdaas/vald/internal/db/storage/blob/s3/writer"
Expand All @@ -36,7 +37,11 @@ type client struct {
service *s3.S3
bucket string

maxPartSize int64
maxPartSize int64
maxChunkSize int64

readerBackoffEnabled bool
readerBackoffOpts []backoff.Option
}

func New(opts ...Option) (blob.Bucket, error) {
Expand Down Expand Up @@ -66,6 +71,9 @@ func (c *client) Reader(ctx context.Context, key string) (io.ReadCloser, error)
reader.WithService(c.service),
reader.WithBucket(c.bucket),
reader.WithKey(key),
reader.WithMaxChunkSize(c.maxChunkSize),
reader.WithBackoff(c.readerBackoffEnabled),
reader.WithBackoffOpts(c.readerBackoffOpts...),
)

return r, r.Open(ctx)
Expand Down
9 changes: 9 additions & 0 deletions internal/db/storage/blob/s3/writer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Option func(w *writer)
var (
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),
WithContentType("application/octet-stream"),
WithMaxPartSize(64 * 1024 * 1024),
}
)
Expand Down Expand Up @@ -58,6 +59,14 @@ func WithKey(key string) Option {
}
}

func WithContentType(ct string) Option {
return func(w *writer) {
if ct != "" {
w.contentType = ct
}
}
}

func WithMaxPartSize(max int64) Option {
return func(w *writer) {
w.maxPartSize = max
Expand Down
8 changes: 5 additions & 3 deletions internal/db/storage/blob/s3/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type writer struct {
bucket string
key string

contentType string
maxPartSize int64

pw io.WriteCloser
Expand Down Expand Up @@ -103,9 +104,10 @@ func (w *writer) upload(ctx context.Context, body io.Reader) (err error) {
},
)
input := &s3manager.UploadInput{
Bucket: aws.String(w.bucket),
Key: aws.String(w.key),
Body: body,
Bucket: aws.String(w.bucket),
Key: aws.String(w.key),
Body: body,
ContentType: aws.String(w.contentType),
}

res, err := uploader.UploadWithContext(ctx, input)
Expand Down
Loading

0 comments on commit c6fcb37

Please sign in to comment.