Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to customise the partitioner used by the BucketStore #3802

Merged
merged 1 commit into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func runStore(
chunkPoolSizeBytes,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(maxSeriesCount),
store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
19 changes: 13 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
// not too small (too much memory).
DefaultPostingOffsetInMemorySampling = 32

partitionerMaxGapSize = 512 * 1024
PartitionerMaxGapSize = 512 * 1024

// Labels for metrics.
labelEncode = "encode"
Expand Down Expand Up @@ -278,7 +278,7 @@ type BucketStore struct {
chunksLimiterFactory ChunksLimiterFactory
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call.
seriesLimiterFactory SeriesLimiterFactory
partitioner partitioner
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand All @@ -303,6 +303,7 @@ func NewBucketStore(
maxChunkPoolBytes uint64,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand Down Expand Up @@ -337,7 +338,7 @@ func NewBucketStore(
queryGate: queryGate,
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: partitioner,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
Expand Down Expand Up @@ -1383,7 +1384,7 @@ type bucketBlock struct {

pendingReaders sync.WaitGroup

partitioner partitioner
partitioner Partitioner

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
// request hints' BlockMatchers.
Expand All @@ -1400,7 +1401,7 @@ func newBucketBlock(
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
p Partitioner,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
Expand Down Expand Up @@ -2039,7 +2040,7 @@ type part struct {
elemRng [2]int
}

type partitioner interface {
type Partitioner interface {
// Partition partitions length entries into n <= length ranges that cover all
// input ranges
// It supports overlapping ranges.
Expand All @@ -2051,6 +2052,12 @@ type gapBasedPartitioner struct {
maxGapSize uint64
}

func NewGapBasedPartitioner(maxGapSize uint64) Partitioner {
return gapBasedPartitioner{
maxGapSize: maxGapSize,
}
}

// Partition partitions length entries into n <= length ranges that cover all
// input ranges by combining entries that are separated by reasonably small gaps.
// It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
0,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
filterConf,
Expand Down
14 changes: 10 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ func TestBucketStore_Info(t *testing.T) {
2e5,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -828,6 +829,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
0,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1140,7 +1142,7 @@ func benchmarkExpandedPostings(
indexCache: noopCache{},
bkt: bkt,
meta: &metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: id}},
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
}

indexr := newBucketIndexReader(context.Background(), b)
Expand Down Expand Up @@ -1256,7 +1258,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer
metrics: m,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
extLset: extLset,
Expand Down Expand Up @@ -1430,7 +1432,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
Expand Down Expand Up @@ -1469,7 +1471,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
Expand Down Expand Up @@ -1647,6 +1649,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down Expand Up @@ -1741,6 +1744,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
1000000,
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down Expand Up @@ -1886,6 +1890,7 @@ func TestBlockWithLargeChunks(t *testing.T) {
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down Expand Up @@ -2047,6 +2052,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down