From 4821ba33b81aa6469302c442d1d3cbcfdb9ec89f Mon Sep 17 00:00:00 2001 From: Xiaochao Dong Date: Sat, 18 Mar 2023 04:04:14 +0800 Subject: [PATCH 1/9] Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations (#5213) Signed-off-by: Xiaochao Dong (@damnever) Signed-off-by: Alex Le --- CHANGELOG.md | 1 + pkg/util/grpcencoding/snappy/snappy.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49ef11c3eb..aeca128bd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [ENHANCEMENT] Update Go version to 1.20.1. #5159 * [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193 * [ENHANCEMENT] Query Frontend: Add number of chunks and samples fetched in query stats. #5198 +* [ENHANCEMENT] Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations. #5213 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/pkg/util/grpcencoding/snappy/snappy.go b/pkg/util/grpcencoding/snappy/snappy.go index fe01b4ca35..6ffc4b678e 100644 --- a/pkg/util/grpcencoding/snappy/snappy.go +++ b/pkg/util/grpcencoding/snappy/snappy.go @@ -51,6 +51,20 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { return reader{dr, &c.readersPool}, nil } +// If a Compressor implements DecompressedSize(compressedBytes []byte) int, +// gRPC will call it to determine the size of the buffer allocated for the +// result of decompression. +// Return -1 to indicate unknown size. +// +// This is an EXPERIMENTAL feature of grpc-go. +func (c *compressor) DecompressedSize(compressedBytes []byte) int { + decompressedSize, err := snappy.DecodedLen(compressedBytes) + if err != nil { + return -1 + } + return decompressedSize +} + type writeCloser struct { writer *snappy.Writer pool *sync.Pool From 5a5e6622c4b64e81d1a6995a0dddb6d9c8ea444f Mon Sep 17 00:00:00 2001 From: Alex Le Date: Sun, 19 Mar 2023 14:10:07 -0700 Subject: [PATCH 2/9] Fix S3 BucketWithRetries upload empty content issue Signed-off-by: Alex Le --- pkg/storage/bucket/s3/bucket_client.go | 13 ++- pkg/storage/bucket/s3/bucket_client_test.go | 96 +++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 pkg/storage/bucket/s3/bucket_client_test.go diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 68a8ee714f..94fbe2af91 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -1,6 +1,7 @@ package s3 import ( + "bytes" "context" "io" "time" @@ -153,8 +154,18 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo } func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error { + // Convert Reader to Seeker + var buf bytes.Buffer + if _, err := buf.ReadFrom(r); err != nil { + return err + } + s := bytes.NewReader(buf.Bytes()) return b.retry(ctx, func() error { - return b.bucket.Upload(ctx, name, r) + err := b.bucket.Upload(ctx, name, s) + if _, err := s.Seek(0, io.SeekStart); err != nil { + return err + } + return err }) } diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go new file mode 100644 index 0000000000..816b434cb1 --- /dev/null +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -0,0 +1,96 @@ +package s3 + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" +) + +func TestBucketWithRetries_Upload(t *testing.T) { + t.Parallel() + + m := mockBucket{ + MaxFailCount: 3, + } + b := BucketWithRetries{ + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + input := []byte("test input") + err := b.Upload(context.Background(), "dummy", bytes.NewReader(input)) + require.NoError(t, err) + require.Equal(t, input, m.uploadedContent) +} + +type mockBucket struct { + MaxFailCount int + uploadedContent []byte +} + +// Upload mocks objstore.Bucket.Upload() +func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error { + var buf bytes.Buffer + if _, err := buf.ReadFrom(r); err != nil { + return err + } + m.uploadedContent = buf.Bytes() + if m.MaxFailCount > 0 { + m.MaxFailCount-- + return fmt.Errorf("failed upload: %d", m.MaxFailCount) + } + return nil +} + +// Delete mocks objstore.Bucket.Delete() +func (m *mockBucket) Delete(ctx context.Context, name string) error { + return nil +} + +// Name mocks objstore.Bucket.Name() +func (m *mockBucket) Name() string { + return "mock" +} + +// Iter mocks objstore.Bucket.Iter() +func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + return nil +} + +// Get mocks objstore.Bucket.Get() +func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return nil, nil +} + +// GetRange mocks objstore.Bucket.GetRange() +func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return nil, nil +} + +// Exists mocks objstore.Bucket.Exists() +func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) { + return false, nil +} + +// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() +func (m *mockBucket) IsObjNotFoundErr(err error) bool { + return false +} + +// ObjectSize mocks objstore.Bucket.Attributes() +func (m *mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + return objstore.ObjectAttributes{Size: 0, LastModified: time.Now()}, nil +} + +// Close mocks objstore.Bucket.Close() +func (m *mockBucket) Close() error { + return nil +} From c0517e2a368b56508e3047ce3a04028b61b4aedf Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 01:01:58 -0700 Subject: [PATCH 3/9] Update CHANGELOG Signed-off-by: Alex Le --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aeca128bd8..7b24d561ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ * [BUGFIX] Query-frontend: Fix shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk`, `bottomk` functions. #5148, #5170 * [BUGFIX] Querier: Fix `/api/v1/series` returning 5XX instead of 4XX when limits are hit. #5169 * [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188 +* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217 * [FEATURE] Alertmanager: Add support for time_intervals. #5102 ## 1.14.0 2022-12-02 From 0f5980ea8fa5b2917a2c04e52d6de90741104bf7 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 01:03:17 -0700 Subject: [PATCH 4/9] Revert "Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations (#5213)" This reverts commit 4821ba33b81aa6469302c442d1d3cbcfdb9ec89f. Signed-off-by: Alex Le --- CHANGELOG.md | 1 - pkg/util/grpcencoding/snappy/snappy.go | 14 -------------- 2 files changed, 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b24d561ba..f682e74cd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,6 @@ * [ENHANCEMENT] Update Go version to 1.20.1. #5159 * [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193 * [ENHANCEMENT] Query Frontend: Add number of chunks and samples fetched in query stats. #5198 -* [ENHANCEMENT] Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations. #5213 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/pkg/util/grpcencoding/snappy/snappy.go b/pkg/util/grpcencoding/snappy/snappy.go index 6ffc4b678e..fe01b4ca35 100644 --- a/pkg/util/grpcencoding/snappy/snappy.go +++ b/pkg/util/grpcencoding/snappy/snappy.go @@ -51,20 +51,6 @@ func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { return reader{dr, &c.readersPool}, nil } -// If a Compressor implements DecompressedSize(compressedBytes []byte) int, -// gRPC will call it to determine the size of the buffer allocated for the -// result of decompression. -// Return -1 to indicate unknown size. -// -// This is an EXPERIMENTAL feature of grpc-go. -func (c *compressor) DecompressedSize(compressedBytes []byte) int { - decompressedSize, err := snappy.DecodedLen(compressedBytes) - if err != nil { - return -1 - } - return decompressedSize -} - type writeCloser struct { writer *snappy.Writer pool *sync.Pool From c507d33fa85b6f92f7f620ebf3752b07df49818f Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 11:30:32 -0700 Subject: [PATCH 5/9] Only retry if input reader is seekable Signed-off-by: Alex Le --- pkg/storage/bucket/s3/bucket_client.go | 14 +++----- pkg/storage/bucket/s3/bucket_client_test.go | 39 +++++++++++++++++---- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 94fbe2af91..f25984e2b9 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -1,7 +1,6 @@ package s3 import ( - "bytes" "context" "io" "time" @@ -154,18 +153,15 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo } func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error { - // Convert Reader to Seeker - var buf bytes.Buffer - if _, err := buf.ReadFrom(r); err != nil { - return err + rs, ok := r.(io.ReadSeeker) + if !ok { + return b.bucket.Upload(ctx, name, r) } - s := bytes.NewReader(buf.Bytes()) return b.retry(ctx, func() error { - err := b.bucket.Upload(ctx, name, s) - if _, err := s.Seek(0, io.SeekStart); err != nil { + if _, err := rs.Seek(0, io.SeekStart); err != nil { return err } - return err + return b.bucket.Upload(ctx, name, rs) }) } diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index 816b434cb1..b78c2d4015 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -12,11 +12,11 @@ import ( "github.com/thanos-io/objstore" ) -func TestBucketWithRetries_Upload(t *testing.T) { +func TestBucketWithRetries_UploadSeekable(t *testing.T) { t.Parallel() m := mockBucket{ - MaxFailCount: 3, + FailCount: 3, } b := BucketWithRetries{ bucket: &m, @@ -31,8 +31,35 @@ func TestBucketWithRetries_Upload(t *testing.T) { require.Equal(t, input, m.uploadedContent) } +func TestBucketWithRetries_UploadNonSeekable(t *testing.T) { + t.Parallel() + + maxFailCount := 3 + m := mockBucket{ + FailCount: maxFailCount, + } + b := BucketWithRetries{ + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + input := &FakeReader{} + err := b.Upload(context.Background(), "dummy", input) + require.Errorf(t, err, "empty byte slice") + require.Equal(t, maxFailCount, m.FailCount) +} + +type FakeReader struct { +} + +func (f *FakeReader) Read(p []byte) (n int, err error) { + return 0, fmt.Errorf("empty byte slice") +} + type mockBucket struct { - MaxFailCount int + FailCount int uploadedContent []byte } @@ -43,9 +70,9 @@ func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error return err } m.uploadedContent = buf.Bytes() - if m.MaxFailCount > 0 { - m.MaxFailCount-- - return fmt.Errorf("failed upload: %d", m.MaxFailCount) + if m.FailCount > 0 { + m.FailCount-- + return fmt.Errorf("failed upload: %d", m.FailCount) } return nil } From f6077b85db29c81e7aa95e82ef3106c5408fcdcd Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 11:33:32 -0700 Subject: [PATCH 6/9] Rename mock type Signed-off-by: Alex Le --- pkg/storage/bucket/s3/bucket_client_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index b78c2d4015..38e020426b 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -45,16 +45,16 @@ func TestBucketWithRetries_UploadNonSeekable(t *testing.T) { retryMaxBackoff: time.Second, } - input := &FakeReader{} + input := &fakeReader{} err := b.Upload(context.Background(), "dummy", input) require.Errorf(t, err, "empty byte slice") require.Equal(t, maxFailCount, m.FailCount) } -type FakeReader struct { +type fakeReader struct { } -func (f *FakeReader) Read(p []byte) (n int, err error) { +func (f *fakeReader) Read(p []byte) (n int, err error) { return 0, fmt.Errorf("empty byte slice") } From 4f742d303d9809158f35c7afac1ad36ac9e31da9 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 12:33:01 -0700 Subject: [PATCH 7/9] Add logging Signed-off-by: Alex Le --- pkg/storage/bucket/s3/bucket_client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index f25984e2b9..f59634bf7f 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/s3" @@ -92,6 +93,7 @@ func newS3Config(cfg Config) (s3.Config, error) { } type BucketWithRetries struct { + logger log.Logger bucket objstore.Bucket operationRetries int retryMinBackoff time.Duration @@ -115,6 +117,9 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error { } retries.Wait() } + if lastErr != nil { + level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr) + } return lastErr } From b47a3f947ff2af411913f949e912712914e68b54 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 14:29:32 -0700 Subject: [PATCH 8/9] nit fixing Signed-off-by: Alex Le --- pkg/storage/bucket/s3/bucket_client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index f59634bf7f..ab3724dee7 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -30,6 +30,7 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return nil, err } return &BucketWithRetries{ + logger: logger, bucket: bucket, operationRetries: defaultOperationRetries, retryMinBackoff: defaultRetryMinBackoff, @@ -49,6 +50,7 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore return nil, err } return &BucketWithRetries{ + logger: logger, bucket: bucket, operationRetries: defaultOperationRetries, retryMinBackoff: defaultRetryMinBackoff, @@ -120,7 +122,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error { if lastErr != nil { level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr) } - return lastErr + return nil } func (b *BucketWithRetries) Name() string { From 437d2b15b9ea1d6e7ab7109fdd1a803633db0f1c Mon Sep 17 00:00:00 2001 From: Alex Le Date: Mon, 20 Mar 2023 14:48:40 -0700 Subject: [PATCH 9/9] add comment Signed-off-by: Alex Le --- pkg/storage/bucket/s3/bucket_client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index ab3724dee7..ad766ffb30 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -162,6 +162,8 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error { rs, ok := r.(io.ReadSeeker) if !ok { + // Skip retry if incoming Reader is not seekable to avoid + // loading entire content into memory return b.bucket.Upload(ctx, name, r) } return b.retry(ctx, func() error {