From 19c3085a3b20ca36d751ff4459cd8e4992bfe289 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 15 Feb 2021 10:57:23 +0100 Subject: [PATCH] Refactoring: allow to pass BytesPool to store.NewBucketStore() Signed-off-by: Marco Pracucci --- cmd/thanos/store.go | 7 ++++++- pkg/pool/pool.go | 2 +- pkg/store/bucket.go | 12 ++++++------ pkg/store/bucket_e2e_test.go | 5 ++++- pkg/store/bucket_test.go | 30 ++++++++++++++++++++++++------ 5 files changed, 41 insertions(+), 15 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index eadcf9340f..217e89c9e3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -300,6 +300,11 @@ func runStore( queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency) + chunkPool, err := store.NewDefaultChunkBytesPool(chunkPoolSizeBytes) + if err != nil { + return errors.Wrap(err, "create chunk pool") + } + bs, err := store.NewBucketStore( logger, reg, @@ -308,7 +313,7 @@ func runStore( dataDir, indexCache, queriesGate, - chunkPoolSizeBytes, + chunkPool, store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. store.NewSeriesLimiterFactory(maxSeriesCount), verbose, diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index ede9e667bf..25eec7c736 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -61,7 +61,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) // ErrPoolExhausted is returned if a pool cannot provide the request bytes. var ErrPoolExhausted = errors.New("pool exhausted") -// Get returns a new byte slices that fits the given size. +// Get returns a new byte slice that fits the given size. func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) { p.mtx.Lock() defer p.mtx.Unlock() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 26b02ab8ec..51200ada59 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -300,7 +300,7 @@ func NewBucketStore( dir string, indexCache storecache.IndexCache, queryGate gate.Gate, - maxChunkPoolBytes uint64, + chunkPool pool.BytesPool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, debugLogging bool, @@ -316,11 +316,6 @@ func NewBucketStore( logger = log.NewNopLogger() } - chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) - if err != nil { - return nil, errors.Wrap(err, "create chunk pool") - } - s := &BucketStore{ logger: logger, bkt: bkt, @@ -2449,3 +2444,8 @@ func (s queryStats) merge(o *queryStats) *queryStats { return &s } + +// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings. +func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.BytesPool, error) { + return pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) +} diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6626fb41a8..b5b329cc3d 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -154,6 +154,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m }, nil) testutil.Ok(t, err) + chunkPool, err := NewDefaultChunkBytesPool(0) + testutil.Ok(t, err) + store, err := NewBucketStore( s.logger, nil, @@ -162,7 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m dir, s.cache, nil, - 0, + chunkPool, NewChunksLimiterFactory(maxChunksLimit), NewSeriesLimiterFactory(0), false, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ac9a33ffcb..19153f5fbf 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -565,6 +565,9 @@ func TestBucketStore_Info(t *testing.T) { defer testutil.Ok(t, os.RemoveAll(dir)) + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(t, err) + bucketStore, err := NewBucketStore( nil, nil, @@ -573,7 +576,7 @@ func TestBucketStore_Info(t *testing.T) { dir, noopCache{}, nil, - 2e5, + chunkPool, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), false, @@ -817,6 +820,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul }, nil) testutil.Ok(t, err) + chunkPool, err := NewDefaultChunkBytesPool(0) + testutil.Ok(t, err) + bucketStore, err := NewBucketStore( logger, nil, @@ -825,7 +831,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul dir, noopCache{}, nil, - 0, + chunkPool, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), false, @@ -1636,6 +1642,9 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(tb, err) + chunkPool, err := NewDefaultChunkBytesPool(1000000) + testutil.Ok(t, err) + store, err := NewBucketStore( logger, nil, @@ -1644,7 +1653,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { tmpDir, indexCache, nil, - 1000000, + chunkPool, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), false, @@ -1730,6 +1739,9 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(tb, err) + chunkPool, err := NewDefaultChunkBytesPool(1000000) + testutil.Ok(t, err) + store, err := NewBucketStore( logger, nil, @@ -1738,7 +1750,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { tmpDir, indexCache, nil, - 1000000, + chunkPool, NewChunksLimiterFactory(100000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), false, @@ -1875,6 +1887,9 @@ func TestBlockWithLargeChunks(t *testing.T) { indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(t, err) + chunkPool, err := NewDefaultChunkBytesPool(1000000) + testutil.Ok(t, err) + store, err := NewBucketStore( logger, nil, @@ -1883,7 +1898,7 @@ func TestBlockWithLargeChunks(t *testing.T) { tmpDir, indexCache, nil, - 1000000, + chunkPool, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), false, @@ -2036,6 +2051,9 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) testutil.Ok(tb, err) + chunkPool, err := NewDefaultChunkBytesPool(1000000) + testutil.Ok(t, err) + store, err := NewBucketStore( logger, nil, @@ -2044,7 +2062,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb tmpDir, indexCache, nil, - 1000000, + chunkPool, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), false,