Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/db/storage/blob/s3: remove ctx from struct #473

Merged
merged 3 commits into from
Jun 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions internal/db/storage/blob/s3/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type writer struct {

pw io.WriteCloser
wg *sync.WaitGroup

ctx context.Context
}

type Writer interface {
Expand All @@ -59,8 +57,6 @@ func New(opts ...Option) Writer {
}

func (w *writer) Open(ctx context.Context) (err error) {
w.ctx = ctx

w.wg = new(sync.WaitGroup)

var pr io.ReadCloser
Expand All @@ -73,7 +69,7 @@ func (w *writer) Open(ctx context.Context) (err error) {
defer w.wg.Done()
defer pr.Close()

return w.upload(pr)
return w.upload(ctx, pr)
}))

return err
Expand All @@ -92,14 +88,14 @@ func (w *writer) Close() error {
}

func (w *writer) Write(p []byte) (n int, err error) {
if w.ctx == nil || w.pw == nil {
if w.pw == nil {
return 0, errors.ErrStorageWriterNotOpened
}

return w.pw.Write(p)
}

func (w *writer) upload(body io.Reader) (err error) {
func (w *writer) upload(ctx context.Context, body io.Reader) (err error) {
uploader := s3manager.NewUploaderWithClient(
w.service,
func(u *s3manager.Uploader) {
Expand All @@ -112,7 +108,7 @@ func (w *writer) upload(body io.Reader) (err error) {
Body: body,
}

res, err := uploader.UploadWithContext(w.ctx, input)
res, err := uploader.UploadWithContext(ctx, input)
if err != nil {
log.Error("upload failed with error: ", err)
return err
Expand Down
31 changes: 9 additions & 22 deletions internal/db/storage/blob/s3/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNew(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand Down Expand Up @@ -112,7 +112,6 @@ func Test_writer_Open(t *testing.T) {
maxPartSize int64
pw io.WriteCloser
wg *sync.WaitGroup
ctx context.Context
}
type want struct {
err error
Expand Down Expand Up @@ -148,7 +147,6 @@ func Test_writer_Open(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -171,7 +169,6 @@ func Test_writer_Open(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -182,7 +179,7 @@ func Test_writer_Open(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand All @@ -200,7 +197,6 @@ func Test_writer_Open(t *testing.T) {
maxPartSize: test.fields.maxPartSize,
pw: test.fields.pw,
wg: test.fields.wg,
ctx: test.fields.ctx,
}

err := w.Open(test.args.ctx)
Expand All @@ -221,7 +217,6 @@ func Test_writer_Close(t *testing.T) {
maxPartSize int64
pw io.WriteCloser
wg *sync.WaitGroup
ctx context.Context
}
type want struct {
err error
Expand Down Expand Up @@ -253,7 +248,6 @@ func Test_writer_Close(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -273,7 +267,6 @@ func Test_writer_Close(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -284,7 +277,7 @@ func Test_writer_Close(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc()
}
Expand All @@ -302,7 +295,6 @@ func Test_writer_Close(t *testing.T) {
maxPartSize: test.fields.maxPartSize,
pw: test.fields.pw,
wg: test.fields.wg,
ctx: test.fields.ctx,
}

err := w.Close()
Expand All @@ -326,7 +318,6 @@ func Test_writer_Write(t *testing.T) {
maxPartSize int64
pw io.WriteCloser
wg *sync.WaitGroup
ctx context.Context
}
type want struct {
wantN int
Expand Down Expand Up @@ -366,7 +357,6 @@ func Test_writer_Write(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -389,7 +379,6 @@ func Test_writer_Write(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -400,7 +389,7 @@ func Test_writer_Write(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand All @@ -418,7 +407,6 @@ func Test_writer_Write(t *testing.T) {
maxPartSize: test.fields.maxPartSize,
pw: test.fields.pw,
wg: test.fields.wg,
ctx: test.fields.ctx,
}

gotN, err := w.Write(test.args.p)
Expand All @@ -432,6 +420,7 @@ func Test_writer_Write(t *testing.T) {

func Test_writer_upload(t *testing.T) {
type args struct {
ctx context.Context
body io.Reader
}
type fields struct {
Expand All @@ -442,7 +431,6 @@ func Test_writer_upload(t *testing.T) {
maxPartSize int64
pw io.WriteCloser
wg *sync.WaitGroup
ctx context.Context
}
type want struct {
err error
Expand All @@ -468,6 +456,7 @@ func Test_writer_upload(t *testing.T) {
{
name: "test_case_1",
args: args {
ctx: nil,
body: nil,
},
fields: fields {
Expand All @@ -478,7 +467,6 @@ func Test_writer_upload(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -491,6 +479,7 @@ func Test_writer_upload(t *testing.T) {
return test {
name: "test_case_2",
args: args {
ctx: nil,
body: nil,
},
fields: fields {
Expand All @@ -501,7 +490,6 @@ func Test_writer_upload(t *testing.T) {
maxPartSize: 0,
pw: nil,
wg: sync.WaitGroup{},
ctx: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
Expand All @@ -512,7 +500,7 @@ func Test_writer_upload(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand All @@ -530,10 +518,9 @@ func Test_writer_upload(t *testing.T) {
maxPartSize: test.fields.maxPartSize,
pw: test.fields.pw,
wg: test.fields.wg,
ctx: test.fields.ctx,
}

err := w.upload(test.args.body)
err := w.upload(test.args.ctx, test.args.body)
if err := test.checkFunc(test.want, err); err != nil {
tt.Errorf("error = %v", err)
}
Expand Down