Skip to content

Commit

Permalink
Fix S3 BucketWithRetries upload empty content issue (#5217)
Browse files Browse the repository at this point in the history
* Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations (#5213)

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
Signed-off-by: Alex Le <[email protected]>

* Fix S3 BucketWithRetries upload empty content issue

Signed-off-by: Alex Le <[email protected]>

* Update CHANGELOG

Signed-off-by: Alex Le <[email protected]>

* Revert "Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations (#5213)"

This reverts commit 4821ba3.

Signed-off-by: Alex Le <[email protected]>

* Only retry if input reader is seekable

Signed-off-by: Alex Le <[email protected]>

* Rename mock type

Signed-off-by: Alex Le <[email protected]>

* Add logging

Signed-off-by: Alex Le <[email protected]>

* nit fixing

Signed-off-by: Alex Le <[email protected]>

* add comment

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
Signed-off-by: Alex Le <[email protected]>
Co-authored-by: Xiaochao Dong <[email protected]>
  • Loading branch information
alexqyle and damnever authored Mar 20, 2023
1 parent 5779116 commit 37f6451
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* [BUGFIX] Query-frontend: Fix shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk`, `bottomk` functions. #5148, #5170
* [BUGFIX] Querier: Fix `/api/v1/series` returning 5XX instead of 4XX when limits are hit. #5169
* [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188
* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217
* [FEATURE] Alertmanager: Add support for time_intervals. #5102

## 1.14.0 2022-12-02
Expand Down
20 changes: 18 additions & 2 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/s3"
Expand All @@ -29,6 +30,7 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke
return nil, err
}
return &BucketWithRetries{
logger: logger,
bucket: bucket,
operationRetries: defaultOperationRetries,
retryMinBackoff: defaultRetryMinBackoff,
Expand All @@ -48,6 +50,7 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore
return nil, err
}
return &BucketWithRetries{
logger: logger,
bucket: bucket,
operationRetries: defaultOperationRetries,
retryMinBackoff: defaultRetryMinBackoff,
Expand Down Expand Up @@ -92,6 +95,7 @@ func newS3Config(cfg Config) (s3.Config, error) {
}

type BucketWithRetries struct {
logger log.Logger
bucket objstore.Bucket
operationRetries int
retryMinBackoff time.Duration
Expand All @@ -115,7 +119,10 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error {
}
retries.Wait()
}
return lastErr
if lastErr != nil {
level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr)
}
return nil
}

func (b *BucketWithRetries) Name() string {
Expand Down Expand Up @@ -153,8 +160,17 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo
}

func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error {
return b.retry(ctx, func() error {
rs, ok := r.(io.ReadSeeker)
if !ok {
// Skip retry if incoming Reader is not seekable to avoid
// loading entire content into memory
return b.bucket.Upload(ctx, name, r)
}
return b.retry(ctx, func() error {
if _, err := rs.Seek(0, io.SeekStart); err != nil {
return err
}
return b.bucket.Upload(ctx, name, rs)
})
}

Expand Down
123 changes: 123 additions & 0 deletions pkg/storage/bucket/s3/bucket_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package s3

import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
)

func TestBucketWithRetries_UploadSeekable(t *testing.T) {
t.Parallel()

m := mockBucket{
FailCount: 3,
}
b := BucketWithRetries{
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

input := []byte("test input")
err := b.Upload(context.Background(), "dummy", bytes.NewReader(input))
require.NoError(t, err)
require.Equal(t, input, m.uploadedContent)
}

func TestBucketWithRetries_UploadNonSeekable(t *testing.T) {
t.Parallel()

maxFailCount := 3
m := mockBucket{
FailCount: maxFailCount,
}
b := BucketWithRetries{
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

input := &fakeReader{}
err := b.Upload(context.Background(), "dummy", input)
require.Errorf(t, err, "empty byte slice")
require.Equal(t, maxFailCount, m.FailCount)
}

type fakeReader struct {
}

func (f *fakeReader) Read(p []byte) (n int, err error) {
return 0, fmt.Errorf("empty byte slice")
}

type mockBucket struct {
FailCount int
uploadedContent []byte
}

// Upload mocks objstore.Bucket.Upload()
func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
var buf bytes.Buffer
if _, err := buf.ReadFrom(r); err != nil {
return err
}
m.uploadedContent = buf.Bytes()
if m.FailCount > 0 {
m.FailCount--
return fmt.Errorf("failed upload: %d", m.FailCount)
}
return nil
}

// Delete mocks objstore.Bucket.Delete()
func (m *mockBucket) Delete(ctx context.Context, name string) error {
return nil
}

// Name mocks objstore.Bucket.Name()
func (m *mockBucket) Name() string {
return "mock"
}

// Iter mocks objstore.Bucket.Iter()
func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return nil
}

// Get mocks objstore.Bucket.Get()
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return nil, nil
}

// GetRange mocks objstore.Bucket.GetRange()
func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return nil, nil
}

// Exists mocks objstore.Bucket.Exists()
func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) {
return false, nil
}

// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr()
func (m *mockBucket) IsObjNotFoundErr(err error) bool {
return false
}

// ObjectSize mocks objstore.Bucket.Attributes()
func (m *mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return objstore.ObjectAttributes{Size: 0, LastModified: time.Now()}, nil
}

// Close mocks objstore.Bucket.Close()
func (m *mockBucket) Close() error {
return nil
}

0 comments on commit 37f6451

Please sign in to comment.