Skip to content

Commit

Permalink
♻️ use io.pipe for reading s3 buckets
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx authored and actions-user committed Jun 12, 2020
1 parent 8a608c7 commit ac623f0
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
17 changes: 15 additions & 2 deletions internal/db/storage/blob/s3/reader/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,27 @@

package reader

import "github.com/aws/aws-sdk-go/service/s3"
import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/errgroup"
)

type Option func(r *reader)

var (
defaultOpts = []Option{}
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),
}
)

func WithErrGroup(eg errgroup.Group) Option {
return func(r *reader) {
if eg != nil {
r.eg = eg
}
}
}

func WithService(s *s3.S3) Option {
return func(r *reader) {
if s != nil {
Expand Down
43 changes: 36 additions & 7 deletions internal/db/storage/blob/s3/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@ package reader
import (
"context"
"io"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/safety"
)

type reader struct {
eg errgroup.Group
service *s3.S3
bucket string
key string

resp *s3.GetObjectOutput
pr io.ReadCloser
wg *sync.WaitGroup
}

type Reader interface {
Expand All @@ -53,23 +58,47 @@ func (r *reader) Open(ctx context.Context) (err error) {
Key: aws.String(r.key),
}

r.resp, err = r.service.GetObjectWithContext(ctx, input)
r.wg = new(sync.WaitGroup)

return err
var pw io.WriteCloser

r.pr, pw = io.Pipe()

resp, err := r.service.GetObjectWithContext(ctx, input)
if err != nil {
return err
}

r.wg.Add(1)

r.eg.Go(safety.RecoverFunc(func() (err error) {
defer r.wg.Done()
defer resp.Body.Close()
defer pw.Close()

_, err = io.Copy(pw, resp.Body)
return err
}))

return nil
}

func (r *reader) Close() error {
if r.resp != nil {
return r.resp.Body.Close()
if r.pr != nil {
return r.pr.Close()
}

if r.wg != nil {
r.wg.Wait()
}

return nil
}

func (r *reader) Read(p []byte) (n int, err error) {
if r.resp == nil {
if r.pr == nil {
return 0, errors.ErrStorageReaderNotOpened
}

return r.resp.Body.Read(p)
return r.pr.Read(p)
}
1 change: 1 addition & 0 deletions internal/db/storage/blob/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (c *client) Close() error {

func (c *client) Reader(ctx context.Context, key string) (io.ReadCloser, error) {
r := reader.New(
reader.WithErrGroup(c.eg),
reader.WithService(c.service),
reader.WithBucket(c.bucket),
reader.WithKey(key),
Expand Down

0 comments on commit ac623f0

Please sign in to comment.