diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 6ca10719d577..728e61e5096e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3765,7 +3765,7 @@ shard_streams: [bloom_creation_enabled: | default = false] # Experimental. Bloom planning strategy to use in bloom creation. Can be one of: -# 'split_keyspace_by_factor' +# 'split_keyspace_by_factor', 'split_by_series_chunks_size' # CLI flag: -bloom-build.planning-strategy [bloom_planning_strategy: | default = "split_keyspace_by_factor"] @@ -3775,6 +3775,10 @@ shard_streams: # CLI flag: -bloom-build.split-keyspace-by [bloom_split_series_keyspace_by: | default = 256] +# Experimental. Target chunk size in bytes for bloom tasks. Default is 20GB. +# CLI flag: -bloom-build.split-target-series-chunk-size +[bloom_task_target_series_chunk_size: | default = 20GB] + # Experimental. Compression algorithm for bloom block pages. # CLI flag: -bloom-build.block-encoding [bloom_block_encoding: | default = "none"] diff --git a/integration/bloom_building_test.go b/integration/bloom_building_test.go index 2c4662eef4cb..b5186b3b23ff 100644 --- a/integration/bloom_building_test.go +++ b/integration/bloom_building_test.go @@ -96,6 +96,8 @@ func TestBloomBuilding(t *testing.T) { "-bloom-build.planner.interval=15s", "-bloom-build.planner.min-table-offset=0", // Disable table offset so we process today's data. "-bloom.cache-list-ops=0", // Disable cache list operations to avoid caching issues. + "-bloom-build.planning-strategy=split_by_series_chunks_size", + "-bloom-build.split-target-series-chunk-size=1KB", ) require.NoError(t, clu.Run()) diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 18c43cfcfa12..33f0bf64c833 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -281,8 +281,6 @@ func (p *Planner) runOne(ctx context.Context) error { continue } - level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas)) - var tenantTableEnqueuedTasks int resultsCh := make(chan *protos.TaskResult, len(tasks)) @@ -377,13 +375,13 @@ func (p *Planner) computeTasks( table config.DayTable, tenant string, ) ([]*protos.Task, []bloomshipper.Meta, error) { - logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant) - strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger) if err != nil { return nil, nil, fmt.Errorf("error creating strategy: %w", err) } + logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "strategy", strategy.Name()) + // Fetch source metas to be used in both build and cleanup of out-of-date metas+blooms metas, err := p.bloomStore.FetchMetas( ctx, @@ -432,6 +430,7 @@ func (p *Planner) computeTasks( return nil, nil, fmt.Errorf("failed to plan tasks: %w", err) } + level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(metas)) return tasks, metas, nil } diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 545cb65df142..6b1b1e0beba1 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -18,6 +18,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies" "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" @@ -750,3 +751,11 @@ func (f *fakeLimits) BloomBuildMaxBuilders(_ string) int { func (f *fakeLimits) BloomTaskMaxRetries(_ string) int { return f.maxRetries } + +func (f *fakeLimits) BloomPlanningStrategy(_ string) string { + return strategies.SplitBySeriesChunkSizeStrategyName +} + +func (f *fakeLimits) BloomTaskTargetSeriesChunksSizeBytes(_ string) uint64 { + return 1 << 20 // 1MB +} diff --git a/pkg/bloombuild/planner/plannertest/utils.go b/pkg/bloombuild/planner/plannertest/utils.go index f0c8f0ec7036..706e0abdf00a 100644 --- a/pkg/bloombuild/planner/plannertest/utils.go +++ b/pkg/bloombuild/planner/plannertest/utils.go @@ -88,8 +88,12 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { } func GenSeries(bounds v1.FingerprintBounds) []*v1.Series { - series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) - for i := bounds.Min; i <= bounds.Max; i++ { + return GenSeriesWithStep(bounds, 1) +} + +func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series { + series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step) + for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) { series = append(series, &v1.Series{ Fingerprint: i, Chunks: v1.ChunkRefs{ diff --git a/pkg/bloombuild/planner/strategies/chunksize.go b/pkg/bloombuild/planner/strategies/chunksize.go new file mode 100644 index 000000000000..21f473908dd9 --- /dev/null +++ b/pkg/bloombuild/planner/strategies/chunksize.go @@ -0,0 +1,286 @@ +package strategies + +import ( + "context" + "fmt" + "math" + "sort" + + "github.com/dustin/go-humanize" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/bloombuild/protos" + iter "github.com/grafana/loki/v3/pkg/iter/v2" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" +) + +type ChunkSizeStrategyLimits interface { + BloomTaskTargetSeriesChunksSizeBytes(tenantID string) uint64 +} + +type ChunkSizeStrategy struct { + limits ChunkSizeStrategyLimits + logger log.Logger +} + +func NewChunkSizeStrategy( + limits ChunkSizeStrategyLimits, + logger log.Logger, +) (*ChunkSizeStrategy, error) { + return &ChunkSizeStrategy{ + limits: limits, + logger: logger, + }, nil +} + +func (s *ChunkSizeStrategy) Name() string { + return SplitBySeriesChunkSizeStrategyName +} + +func (s *ChunkSizeStrategy) Plan( + ctx context.Context, + table config.DayTable, + tenant string, + tsdbs TSDBSet, + metas []bloomshipper.Meta, +) ([]*protos.Task, error) { + targetTaskSize := s.limits.BloomTaskTargetSeriesChunksSizeBytes(tenant) + + logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant) + level.Debug(s.logger).Log("msg", "loading work for tenant", "target task size", humanize.Bytes(targetTaskSize)) + + // Determine which TSDBs have gaps and need to be processed. + tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(v1.NewBounds(0, math.MaxUint64), tsdbs, metas) + if err != nil { + level.Error(logger).Log("msg", "failed to find gaps", "err", err) + return nil, fmt.Errorf("failed to find gaps: %w", err) + } + + if len(tsdbsWithGaps) == 0 { + level.Debug(logger).Log("msg", "blooms exist for all tsdbs") + return nil, nil + } + + sizedIter, iterSize, err := s.sizedSeriesIter(ctx, tenant, tsdbsWithGaps, targetTaskSize) + if err != nil { + return nil, fmt.Errorf("failed to get sized series iter: %w", err) + } + + tasks := make([]*protos.Task, 0, iterSize) + for sizedIter.Next() { + series := sizedIter.At() + if series.Len() == 0 { + // This should never happen, but just in case. + level.Warn(logger).Log("msg", "got empty series batch", "tsdb", series.TSDB().Name()) + continue + } + + bounds := series.Bounds() + + blocks, err := getBlocksMatchingBounds(metas, bounds) + if err != nil { + return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err) + } + + planGap := protos.Gap{ + Bounds: bounds, + Series: series.V1Series(), + Blocks: blocks, + } + + tasks = append(tasks, protos.NewTask(table, tenant, bounds, series.TSDB(), []protos.Gap{planGap})) + } + if err := sizedIter.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate over sized series: %w", err) + } + + return tasks, nil +} + +func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBounds) ([]bloomshipper.BlockRef, error) { + blocks := make([]bloomshipper.BlockRef, 0, 10) + + for _, meta := range metas { + if meta.Bounds.Intersection(bounds) == nil { + // this meta doesn't overlap the gap, skip + continue + } + + for _, block := range meta.Blocks { + if block.Bounds.Intersection(bounds) == nil { + // this block doesn't overlap the gap, skip + continue + } + // this block overlaps the gap, add it to the plan + // for this gap + blocks = append(blocks, block) + } + } + + // ensure we sort blocks so deduping iterator works as expected + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Bounds.Less(blocks[j].Bounds) + }) + + peekingBlocks := iter.NewPeekIter( + iter.NewSliceIter( + blocks, + ), + ) + + // dedupe blocks which could be in multiple metas + itr := iter.NewDedupingIter( + func(a, b bloomshipper.BlockRef) bool { + return a == b + }, + iter.Identity[bloomshipper.BlockRef], + func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef { + return a + }, + peekingBlocks, + ) + + deduped, err := iter.Collect(itr) + if err != nil { + return nil, fmt.Errorf("failed to dedupe blocks: %w", err) + } + + return deduped, nil +} + +type seriesWithChunks struct { + tsdb tsdb.SingleTenantTSDBIdentifier + fp model.Fingerprint + chunks []index.ChunkMeta +} + +type seriesBatch struct { + series []seriesWithChunks + size uint64 +} + +func newSeriesBatch() seriesBatch { + return seriesBatch{ + series: make([]seriesWithChunks, 0, 100), + } +} + +func (b *seriesBatch) Bounds() v1.FingerprintBounds { + if len(b.series) == 0 { + return v1.NewBounds(0, 0) + } + + // We assume that the series are sorted by fingerprint. + // This is guaranteed since series are iterated in order by the TSDB. + return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp) +} + +func (b *seriesBatch) V1Series() []*v1.Series { + series := make([]*v1.Series, 0, len(b.series)) + for _, s := range b.series { + res := &v1.Series{ + Fingerprint: s.fp, + Chunks: make(v1.ChunkRefs, 0, len(s.chunks)), + } + for _, chk := range s.chunks { + res.Chunks = append(res.Chunks, v1.ChunkRef{ + From: model.Time(chk.MinTime), + Through: model.Time(chk.MaxTime), + Checksum: chk.Checksum, + }) + } + + series = append(series, res) + } + + return series +} + +func (b *seriesBatch) Append(s seriesWithChunks, size uint64) { + b.series = append(b.series, s) + b.size += size +} + +func (b *seriesBatch) Len() int { + return len(b.series) +} + +func (b *seriesBatch) Size() uint64 { + return b.size +} + +func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier { + if len(b.series) == 0 { + return tsdb.SingleTenantTSDBIdentifier{} + } + return b.series[0].tsdb +} + +func (s *ChunkSizeStrategy) sizedSeriesIter( + ctx context.Context, + tenant string, + tsdbsWithGaps []tsdbGaps, + targetTaskSizeBytes uint64, +) (iter.Iterator[seriesBatch], int, error) { + batches := make([]seriesBatch, 0, 100) + currentBatch := newSeriesBatch() + + for _, idx := range tsdbsWithGaps { + for _, gap := range idx.gaps { + if err := idx.tsdb.ForSeries( + ctx, + tenant, + gap, + 0, math.MaxInt64, + func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { + select { + case <-ctx.Done(): + return true + default: + var seriesSize uint64 + for _, chk := range chks { + seriesSize += uint64(chk.KB * 1024) + } + + // Cut a new batch IF the current batch is not empty (so we add at least one series to the batch) + // AND Adding this series to the batch would exceed the target task size. + if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes { + batches = append(batches, currentBatch) + currentBatch = newSeriesBatch() + } + + currentBatch.Append(seriesWithChunks{ + tsdb: idx.tsdbIdentifier, + fp: fp, + chunks: chks, + }, seriesSize) + return false + } + }, + labels.MustNewMatcher(labels.MatchEqual, "", ""), + ); err != nil { + return nil, 0, err + } + + // Add the last batch for this TSDB if it's not empty. + if currentBatch.Len() > 0 { + batches = append(batches, currentBatch) + currentBatch = newSeriesBatch() + } + } + } + + select { + case <-ctx.Done(): + return iter.NewEmptyIter[seriesBatch](), 0, ctx.Err() + default: + return iter.NewCancelableIter[seriesBatch](ctx, iter.NewSliceIter[seriesBatch](batches)), len(batches), nil + } +} diff --git a/pkg/bloombuild/planner/strategies/chunksize_test.go b/pkg/bloombuild/planner/strategies/chunksize_test.go new file mode 100644 index 000000000000..951d033e5c10 --- /dev/null +++ b/pkg/bloombuild/planner/strategies/chunksize_test.go @@ -0,0 +1,248 @@ +package strategies + +import ( + "context" + "testing" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *protos.Task { + return protos.NewTask(plannertest.TestTable, "fake", bounds, tsdb, []protos.Gap{ + { + Bounds: bounds, + Series: plannertest.GenSeriesWithStep(bounds, 10), + Blocks: blocks, + }, + }) +} + +func Test_ChunkSizeStrategy_Plan(t *testing.T) { + for _, tc := range []struct { + name string + limits ChunkSizeStrategyLimits + originalMetas []bloomshipper.Meta + tsdbs TSDBSet + expectedTasks []*protos.Task + }{ + { + name: "no previous blocks and metas", + limits: fakeChunkSizeLimits{TargetSize: 200 * 1 << 10}, // 2 series (100KB each) per task + + // Each series will have 1 chunk of 100KB each + tsdbs: TSDBSet{ + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + }, + + // We expect 5 tasks, each with 2 series each + expectedTasks: []*protos.Task{ + taskForGap(plannertest.TsdbID(0), v1.NewBounds(0, 10), nil), + taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil), + taskForGap(plannertest.TsdbID(0), v1.NewBounds(40, 50), nil), + taskForGap(plannertest.TsdbID(0), v1.NewBounds(60, 70), nil), + taskForGap(plannertest.TsdbID(0), v1.NewBounds(80, 90), nil), + taskForGap(plannertest.TsdbID(0), v1.NewBounds(100, 100), nil), + }, + }, + { + name: "previous metas with no gaps", + limits: fakeChunkSizeLimits{TargetSize: 200 * 1 << 10}, + + // Original metas cover the entire range + // One meta for each 2 series w/ 1 block per series + originalMetas: []bloomshipper.Meta{ + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), + }), + plannertest.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(20, 20), + plannertest.GenBlockRef(30, 30), + }), + plannertest.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), + }), + plannertest.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), + }), + plannertest.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), + }), + plannertest.GenMeta(100, 100, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(100, 100), + }), + }, + + tsdbs: TSDBSet{ + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + }, + + // We expect no tasks + expectedTasks: []*protos.Task{}, + }, + { + name: "Original metas do not cover the entire range", + limits: fakeChunkSizeLimits{TargetSize: 200 * 1 << 10}, + + // Original metas cover only part of the range + // Original metas cover the entire range + // One meta for each 2 series w/ 1 block per series + originalMetas: []bloomshipper.Meta{ + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), + }), + // Missing meta for 20-30 + plannertest.GenMeta(40, 50, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), + }), + plannertest.GenMeta(60, 70, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), + }), + plannertest.GenMeta(80, 90, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), + }), + plannertest.GenMeta(100, 100, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(100, 100), + }), + }, + + tsdbs: TSDBSet{ + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + }, + + // We expect 1 tasks for the missing series + expectedTasks: []*protos.Task{ + taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil), + }, + }, + { + name: "All metas are outdated", + limits: fakeChunkSizeLimits{TargetSize: 200 * 1 << 10}, + + originalMetas: []bloomshipper.Meta{ + plannertest.GenMeta(0, 100, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), + plannertest.GenBlockRef(20, 20), + plannertest.GenBlockRef(30, 30), + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), + plannertest.GenBlockRef(100, 100), + }), + }, + + tsdbs: TSDBSet{ + plannertest.TsdbID(1): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + }, + + // We expect 5 tasks, each with 2 series each + expectedTasks: []*protos.Task{ + taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(20, 30), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(20, 20), + plannertest.GenBlockRef(30, 30), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(40, 50), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + plannertest.GenBlockRef(50, 50), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(60, 70), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(80, 90), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(100, 100), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(100, 100), + }), + }, + }, + { + name: "Some metas are outdated", + limits: fakeChunkSizeLimits{TargetSize: 200 * 1 << 10}, + + originalMetas: []bloomshipper.Meta{ + // Outdated meta + plannertest.GenMeta(0, 49, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), + plannertest.GenBlockRef(20, 20), + plannertest.GenBlockRef(30, 30), + plannertest.GenBlockRef(40, 40), + }), + // Updated meta + plannertest.GenMeta(50, 100, []int{1}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(50, 50), + plannertest.GenBlockRef(60, 60), + plannertest.GenBlockRef(70, 70), + plannertest.GenBlockRef(80, 80), + plannertest.GenBlockRef(90, 90), + plannertest.GenBlockRef(100, 100), + }), + }, + + tsdbs: TSDBSet{ + plannertest.TsdbID(1): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series + }, + + // We expect 5 tasks, each with 2 series each + expectedTasks: []*protos.Task{ + taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 0), + plannertest.GenBlockRef(10, 10), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(20, 30), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(20, 20), + plannertest.GenBlockRef(30, 30), + }), + taskForGap(plannertest.TsdbID(1), v1.NewBounds(40, 40), []bloomshipper.BlockRef{ + plannertest.GenBlockRef(40, 40), + }), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + strategy, err := NewChunkSizeStrategy(tc.limits, logger) + require.NoError(t, err) + + actual, err := strategy.Plan(context.Background(), plannertest.TestTable, "fake", tc.tsdbs, tc.originalMetas) + require.NoError(t, err) + + require.ElementsMatch(t, tc.expectedTasks, actual) + }) + } +} + +type fakeChunkSizeLimits struct { + TargetSize uint64 +} + +func (f fakeChunkSizeLimits) BloomTaskTargetSeriesChunksSizeBytes(_ string) uint64 { + return f.TargetSize +} diff --git a/pkg/bloombuild/planner/strategies/factory.go b/pkg/bloombuild/planner/strategies/factory.go index 578f74f855d3..f58f91e51708 100644 --- a/pkg/bloombuild/planner/strategies/factory.go +++ b/pkg/bloombuild/planner/strategies/factory.go @@ -14,17 +14,20 @@ import ( ) const ( - SplitKeyspaceStrategyName = "split_keyspace_by_factor" + SplitKeyspaceStrategyName = "split_keyspace_by_factor" + SplitBySeriesChunkSizeStrategyName = "split_by_series_chunks_size" ) type Limits interface { BloomPlanningStrategy(tenantID string) string - BloomSplitSeriesKeyspaceBy(tenantID string) int + SplitKeyspaceStrategyLimits + ChunkSizeStrategyLimits } type TSDBSet = map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries type PlanningStrategy interface { + Name() string // Plan returns a set of tasks for a given tenant-table tuple and TSDBs. Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*protos.Task, error) } @@ -39,6 +42,8 @@ func NewStrategy( switch strategy { case SplitKeyspaceStrategyName: return NewSplitKeyspaceStrategy(limits, logger) + case SplitBySeriesChunkSizeStrategyName: + return NewChunkSizeStrategy(limits, logger) default: return nil, fmt.Errorf("unknown bloom planning strategy (%s)", strategy) } diff --git a/pkg/bloombuild/planner/strategies/splitkeyspace.go b/pkg/bloombuild/planner/strategies/splitkeyspace.go index 6aa572bd3af4..2e799d1ed490 100644 --- a/pkg/bloombuild/planner/strategies/splitkeyspace.go +++ b/pkg/bloombuild/planner/strategies/splitkeyspace.go @@ -3,7 +3,6 @@ package strategies import ( "context" "fmt" - "sort" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -17,13 +16,17 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" ) +type SplitKeyspaceStrategyLimits interface { + BloomSplitSeriesKeyspaceBy(tenantID string) int +} + type SplitKeyspaceStrategy struct { - limits Limits + limits SplitKeyspaceStrategyLimits logger log.Logger } func NewSplitKeyspaceStrategy( - limits Limits, + limits SplitKeyspaceStrategyLimits, logger log.Logger, ) (*SplitKeyspaceStrategy, error) { return &SplitKeyspaceStrategy{ @@ -32,6 +35,10 @@ func NewSplitKeyspaceStrategy( }, nil } +func (s *SplitKeyspaceStrategy) Name() string { + return SplitKeyspaceStrategyName +} + func (s *SplitKeyspaceStrategy) Plan( ctx context.Context, table config.DayTable, @@ -185,50 +192,10 @@ func blockPlansForGaps( return nil, fmt.Errorf("failed to collect series: %w", err) } - for _, meta := range metas { - if meta.Bounds.Intersection(gap) == nil { - // this meta doesn't overlap the gap, skip - continue - } - - for _, block := range meta.Blocks { - if block.Bounds.Intersection(gap) == nil { - // this block doesn't overlap the gap, skip - continue - } - // this block overlaps the gap, add it to the plan - // for this gap - planGap.Blocks = append(planGap.Blocks, block) - } - } - - // ensure we sort blocks so deduping iterator works as expected - sort.Slice(planGap.Blocks, func(i, j int) bool { - return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds) - }) - - peekingBlocks := iter.NewPeekIter( - iter.NewSliceIter( - planGap.Blocks, - ), - ) - // dedupe blocks which could be in multiple metas - itr := iter.NewDedupingIter( - func(a, b bloomshipper.BlockRef) bool { - return a == b - }, - iter.Identity[bloomshipper.BlockRef], - func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef { - return a - }, - peekingBlocks, - ) - - deduped, err := iter.Collect(itr) + planGap.Blocks, err = getBlocksMatchingBounds(metas, gap) if err != nil { - return nil, fmt.Errorf("failed to dedupe blocks: %w", err) + return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err) } - planGap.Blocks = deduped plan.gaps = append(plan.gaps, planGap) } diff --git a/pkg/bloombuild/planner/strategies/splitkeyspace_test.go b/pkg/bloombuild/planner/strategies/splitkeyspace_test.go index 7f5a49a54cc2..18480d74c98f 100644 --- a/pkg/bloombuild/planner/strategies/splitkeyspace_test.go +++ b/pkg/bloombuild/planner/strategies/splitkeyspace_test.go @@ -348,6 +348,7 @@ func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.Fingerpri MinTime: int64(c.From), MaxTime: int64(c.Through), Checksum: c.Checksum, + KB: 100, }) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 96a1af0693a2..7129db9a53b6 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -61,6 +61,7 @@ const ( defaultMaxStructuredMetadataCount = 128 defaultBloomBuildMaxBlockSize = "200MB" defaultBloomBuildMaxBloomSize = "128MB" + defaultBloomTaskTargetChunkSize = "20GB" defaultBlockedIngestionStatusCode = 260 // 260 is a custom status code to indicate blocked ingestion ) @@ -207,10 +208,11 @@ type Limits struct { BloomBuildTaskMaxRetries int `yaml:"bloom_build_task_max_retries" json:"bloom_build_task_max_retries" category:"experimental"` BloomBuilderResponseTimeout time.Duration `yaml:"bloom_build_builder_response_timeout" json:"bloom_build_builder_response_timeout" category:"experimental"` - BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` - BloomPlanningStrategy string `yaml:"bloom_planning_strategy" json:"bloom_planning_strategy" category:"experimental"` - BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` - BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"` + BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` + BloomPlanningStrategy string `yaml:"bloom_planning_strategy" json:"bloom_planning_strategy" category:"experimental"` + BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` + BloomTaskTargetSeriesChunkSize flagext.ByteSize `yaml:"bloom_task_target_series_chunk_size" json:"bloom_task_target_series_chunk_size" category:"experimental"` + BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"` BloomMaxBlockSize flagext.ByteSize `yaml:"bloom_max_block_size" json:"bloom_max_block_size" category:"experimental"` BloomMaxBloomSize flagext.ByteSize `yaml:"bloom_max_bloom_size" json:"bloom_max_bloom_size" category:"experimental"` @@ -394,8 +396,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ) f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.") - f.StringVar(&l.BloomPlanningStrategy, "bloom-build.planning-strategy", "split_keyspace_by_factor", "Experimental. Bloom planning strategy to use in bloom creation. Can be one of: 'split_keyspace_by_factor'") + f.StringVar(&l.BloomPlanningStrategy, "bloom-build.planning-strategy", "split_keyspace_by_factor", "Experimental. Bloom planning strategy to use in bloom creation. Can be one of: 'split_keyspace_by_factor', 'split_by_series_chunks_size'") f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Only if `bloom-build.planning-strategy` is 'split'. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.") + _ = l.BloomTaskTargetSeriesChunkSize.Set(defaultBloomTaskTargetChunkSize) + f.Var(&l.BloomTaskTargetSeriesChunkSize, "bloom-build.split-target-series-chunk-size", fmt.Sprintf("Experimental. Target chunk size in bytes for bloom tasks. Default is %s.", defaultBloomTaskTargetChunkSize)) f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.") f.DurationVar(&l.BloomBuilderResponseTimeout, "bloom-build.builder-response-timeout", 0, "Experimental. Timeout for a builder to finish a task. If a builder does not respond within this time, it is considered failed and the task will be requeued. 0 disables the timeout.") f.IntVar(&l.BloomBuildTaskMaxRetries, "bloom-build.task-max-retries", 3, "Experimental. Maximum number of retries for a failed task. If a task fails more than this number of times, it is considered failed and will not be retried. A value of 0 disables this limit.") @@ -1015,6 +1019,10 @@ func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int { return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy } +func (o *Overrides) BloomTaskTargetSeriesChunksSizeBytes(userID string) uint64 { + return uint64(o.getOverridesForUser(userID).BloomTaskTargetSeriesChunkSize) +} + func (o *Overrides) BloomBuildMaxBuilders(userID string) int { return o.getOverridesForUser(userID).BloomBuildMaxBuilders }