diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 68ed8b9b2190..545cb65df142 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -1,10 +1,8 @@ package planner import ( - "bytes" "context" "fmt" - "io" "math" "sync" "testing" @@ -15,14 +13,12 @@ import ( "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" "github.com/grafana/loki/v3/pkg/bloombuild/protos" - "github.com/grafana/loki/v3/pkg/compression" - iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -30,114 +26,10 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util/mempool" ) -var testDay = parseDayTime("2023-09-01") -var testTable = config.NewDayTable(testDay, "index_") - -func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { - return tsdb.SingleTenantTSDBIdentifier{ - TS: time.Unix(int64(n), 0), - } -} - -func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { - m := bloomshipper.Meta{ - MetaRef: bloomshipper.MetaRef{ - Ref: bloomshipper.Ref{ - TenantID: "fakeTenant", - TableName: testTable.Addr(), - Bounds: v1.NewBounds(min, max), - }, - }, - Blocks: blocks, - } - for _, source := range sources { - m.Sources = append(m.Sources, tsdbID(source)) - } - return m -} - -func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { - startTS, endTS := testDay.Bounds() - return bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: "fakeTenant", - TableName: testTable.Addr(), - Bounds: v1.NewBounds(min, max), - StartTimestamp: startTS, - EndTimestamp: endTS, - Checksum: 0, - }, - } -} - -func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) - reader := v1.NewByteReader(indexBuf, bloomsBuf) - - blockOpts := v1.NewBlockOptions(compression.None, 0, 0) - - builder, err := v1.NewBlockBuilder(blockOpts, writer) - if err != nil { - return bloomshipper.Block{}, err - } - - if _, err = builder.BuildFrom(iter.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil { - return bloomshipper.Block{}, err - } - - block := v1.NewBlock(reader, v1.NewMetrics(nil)) - - buf := bytes.NewBuffer(nil) - if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil { - return bloomshipper.Block{}, err - } - - tarReader := bytes.NewReader(buf.Bytes()) - - return bloomshipper.Block{ - BlockRef: ref, - Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader}, - }, nil -} - -func genSeries(bounds v1.FingerprintBounds) []*v1.Series { - series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) - for i := bounds.Min; i <= bounds.Max; i++ { - series = append(series, &v1.Series{ - Fingerprint: i, - Chunks: v1.ChunkRefs{ - { - From: 0, - Through: 1, - Checksum: 1, - }, - }, - }) - } - return series -} - -func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { - tasks := make([]*QueueTask, 0, n) - // Enqueue tasks - for i := 0; i < n; i++ { - task := NewQueueTask( - context.Background(), time.Now(), - protos.NewTask(config.NewDayTable(testDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), tsdbID(1), nil), - resultsCh, - ) - tasks = append(tasks, task) - } - return tasks -} - func createPlanner( t *testing.T, cfg Config, @@ -147,7 +39,7 @@ func createPlanner( schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{ { - From: parseDayTime("2023-09-01"), + From: plannertest.ParseDayTime("2023-09-01"), IndexTables: config.IndexPeriodicTableConfig{ PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", @@ -371,28 +263,6 @@ func Test_BuilderLoop(t *testing.T) { } } -func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error { - for _, meta := range metas { - err := bloomClient.PutMeta(context.Background(), meta) - if err != nil { - return err - } - - for _, block := range meta.Blocks { - writtenBlock, err := genBlock(block) - if err != nil { - return err - } - - err = bloomClient.PutBlock(context.Background(), writtenBlock) - if err != nil { - return err - } - } - } - return nil -} - func Test_processTenantTaskResults(t *testing.T) { for _, tc := range []struct { name string @@ -405,8 +275,8 @@ func Test_processTenantTaskResults(t *testing.T) { { name: "errors", originalMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, taskResults: []*protos.TaskResult{ { @@ -420,16 +290,16 @@ func Test_processTenantTaskResults(t *testing.T) { }, expectedMetas: []bloomshipper.Meta{ // The original metas should remain unchanged - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, expectedTasksSucceed: 0, }, { name: "no new metas", originalMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, taskResults: []*protos.TaskResult{ { @@ -441,8 +311,8 @@ func Test_processTenantTaskResults(t *testing.T) { }, expectedMetas: []bloomshipper.Meta{ // The original metas should remain unchanged - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, expectedTasksSucceed: 2, }, @@ -452,58 +322,58 @@ func Test_processTenantTaskResults(t *testing.T) { { TaskID: "1", CreatedMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), }, }, { TaskID: "2", CreatedMetas: []bloomshipper.Meta{ - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, }, }, expectedMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, expectedTasksSucceed: 2, }, { name: "single meta covers all original", originalMetas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), - genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}), + plannertest.GenMeta(0, 5, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 5)}), + plannertest.GenMeta(6, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(6, 10)}), }, taskResults: []*protos.TaskResult{ { TaskID: "1", CreatedMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + plannertest.GenMeta(0, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), }, }, }, expectedMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + plannertest.GenMeta(0, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), }, expectedTasksSucceed: 1, }, { name: "multi version ordering", originalMetas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), - genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), // only part of the range is outdated, must keep + plannertest.GenMeta(0, 5, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 5)}), + plannertest.GenMeta(0, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), // only part of the range is outdated, must keep }, taskResults: []*protos.TaskResult{ { TaskID: "1", CreatedMetas: []bloomshipper.Meta{ - genMeta(8, 10, []int{2}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), + plannertest.GenMeta(8, 10, []int{2}, []bloomshipper.BlockRef{plannertest.GenBlockRef(8, 10)}), }, }, }, expectedMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(8, 10, []int{2}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), + plannertest.GenMeta(0, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(8, 10, []int{2}, []bloomshipper.BlockRef{plannertest.GenBlockRef(8, 10)}), }, expectedTasksSucceed: 1, }, @@ -518,11 +388,11 @@ func Test_processTenantTaskResults(t *testing.T) { } planner := createPlanner(t, cfg, &fakeLimits{}, logger) - bloomClient, err := planner.bloomStore.Client(testDay.ModelTime()) + bloomClient, err := planner.bloomStore.Client(plannertest.TestDay.ModelTime()) require.NoError(t, err) // Create original metas and blocks - err = putMetas(bloomClient, tc.originalMetas) + err = plannertest.PutMetas(bloomClient, tc.originalMetas) require.NoError(t, err) ctx, ctxCancel := context.WithCancel(context.Background()) @@ -536,7 +406,7 @@ func Test_processTenantTaskResults(t *testing.T) { completed, err := planner.processTenantTaskResults( ctx, - testTable, + plannertest.TestTable, "fakeTenant", tc.originalMetas, len(tc.taskResults), @@ -549,7 +419,7 @@ func Test_processTenantTaskResults(t *testing.T) { for _, taskResult := range tc.taskResults { if len(taskResult.CreatedMetas) > 0 { // Emulate builder putting new metas to obj store - err = putMetas(bloomClient, taskResult.CreatedMetas) + err = plannertest.PutMetas(bloomClient, taskResult.CreatedMetas) require.NoError(t, err) } @@ -564,7 +434,7 @@ func Test_processTenantTaskResults(t *testing.T) { context.Background(), bloomshipper.MetaSearchParams{ TenantID: "fakeTenant", - Interval: bloomshipper.NewInterval(testTable.Bounds()), + Interval: bloomshipper.NewInterval(plannertest.TestTable.Bounds()), Keyspace: v1.NewBounds(0, math.MaxUint64), }, ) @@ -607,63 +477,63 @@ func Test_deleteOutdatedMetas(t *testing.T) { { name: "only up to date metas", originalMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), }, newMetas: []bloomshipper.Meta{ - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, expectedUpToDateMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(10, 20)}), }, }, { name: "outdated metas", originalMetas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), + plannertest.GenMeta(0, 5, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 5)}), }, newMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + plannertest.GenMeta(0, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), }, expectedUpToDateMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + plannertest.GenMeta(0, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(0, 10)}), }, }, { name: "new metas reuse blocks from outdated meta", originalMetas: []bloomshipper.Meta{ - genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ // Outdated - genBlockRef(0, 5), // Reuse - genBlockRef(5, 10), // Delete + plannertest.GenMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ // Outdated + plannertest.GenBlockRef(0, 5), // Reuse + plannertest.GenBlockRef(5, 10), // Delete }), - genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{ // Outdated - genBlockRef(10, 20), // Reuse + plannertest.GenMeta(10, 20, []int{0}, []bloomshipper.BlockRef{ // Outdated + plannertest.GenBlockRef(10, 20), // Reuse }), - genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ // Up to date - genBlockRef(20, 30), + plannertest.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ // Up to date + plannertest.GenBlockRef(20, 30), }), }, newMetas: []bloomshipper.Meta{ - genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ - genBlockRef(0, 5), // Reused block + plannertest.GenMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 5), // Reused block }), - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ - genBlockRef(5, 7), // New block - genBlockRef(7, 10), // New block - genBlockRef(10, 20), // Reused block + plannertest.GenMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(5, 7), // New block + plannertest.GenBlockRef(7, 10), // New block + plannertest.GenBlockRef(10, 20), // Reused block }), }, expectedUpToDateMetas: []bloomshipper.Meta{ - genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ - genBlockRef(0, 5), + plannertest.GenMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 5), }), - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ - genBlockRef(5, 7), - genBlockRef(7, 10), - genBlockRef(10, 20), + plannertest.GenMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(5, 7), + plannertest.GenBlockRef(7, 10), + plannertest.GenBlockRef(10, 20), }), - genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ - genBlockRef(20, 30), + plannertest.GenMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(20, 30), }), }, }, @@ -678,13 +548,13 @@ func Test_deleteOutdatedMetas(t *testing.T) { } planner := createPlanner(t, cfg, &fakeLimits{}, logger) - bloomClient, err := planner.bloomStore.Client(testDay.ModelTime()) + bloomClient, err := planner.bloomStore.Client(plannertest.TestDay.ModelTime()) require.NoError(t, err) // Create original/new metas and blocks - err = putMetas(bloomClient, tc.originalMetas) + err = plannertest.PutMetas(bloomClient, tc.originalMetas) require.NoError(t, err) - err = putMetas(bloomClient, tc.newMetas) + err = plannertest.PutMetas(bloomClient, tc.newMetas) require.NoError(t, err) // Get all metas @@ -692,7 +562,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { context.Background(), bloomshipper.MetaSearchParams{ TenantID: "fakeTenant", - Interval: bloomshipper.NewInterval(testTable.Bounds()), + Interval: bloomshipper.NewInterval(plannertest.TestTable.Bounds()), Keyspace: v1.NewBounds(0, math.MaxUint64), }, ) @@ -700,7 +570,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { removeLocFromMetasSources(metas) require.ElementsMatch(t, append(tc.originalMetas, tc.newMetas...), metas) - upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.newMetas, tc.originalMetas, phasePlanning) + upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), plannertest.TestTable, "fakeTenant", tc.newMetas, tc.originalMetas, phasePlanning) require.NoError(t, err) require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate) @@ -709,7 +579,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { context.Background(), bloomshipper.MetaSearchParams{ TenantID: "fakeTenant", - Interval: bloomshipper.NewInterval(testTable.Bounds()), + Interval: bloomshipper.NewInterval(plannertest.TestTable.Bounds()), Keyspace: v1.NewBounds(0, math.MaxUint64), }, ) @@ -841,6 +711,20 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) { }, nil } +func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { + tasks := make([]*QueueTask, 0, n) + // Enqueue tasks + for i := 0; i < n; i++ { + task := NewQueueTask( + context.Background(), time.Now(), + protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil), + resultsCh, + ) + tasks = append(tasks, task) + } + return tasks +} + type fakeLimits struct { Limits timeout time.Duration @@ -866,27 +750,3 @@ func (f *fakeLimits) BloomBuildMaxBuilders(_ string) int { func (f *fakeLimits) BloomTaskMaxRetries(_ string) int { return f.maxRetries } - -func parseDayTime(s string) config.DayTime { - t, err := time.Parse("2006-01-02", s) - if err != nil { - panic(err) - } - return config.DayTime{ - Time: model.TimeFromUnix(t.Unix()), - } -} - -type DummyReadSeekCloser struct{} - -func (d *DummyReadSeekCloser) Read(_ []byte) (n int, err error) { - return 0, io.EOF -} - -func (d *DummyReadSeekCloser) Seek(_ int64, _ int) (int64, error) { - return 0, nil -} - -func (d *DummyReadSeekCloser) Close() error { - return nil -} diff --git a/pkg/bloombuild/planner/plannertest/utils.go b/pkg/bloombuild/planner/plannertest/utils.go new file mode 100644 index 000000000000..f0c8f0ec7036 --- /dev/null +++ b/pkg/bloombuild/planner/plannertest/utils.go @@ -0,0 +1,137 @@ +package plannertest + +import ( + "bytes" + "context" + "time" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/compression" + v2 "github.com/grafana/loki/v3/pkg/iter/v2" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" +) + +var TestDay = ParseDayTime("2023-09-01") +var TestTable = config.NewDayTable(TestDay, "index_") + +func TsdbID(n int) tsdb.SingleTenantTSDBIdentifier { + return tsdb.SingleTenantTSDBIdentifier{ + TS: time.Unix(int64(n), 0), + } +} + +func GenMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { + m := bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + TenantID: "fakeTenant", + TableName: TestTable.Addr(), + Bounds: v1.NewBounds(min, max), + }, + }, + Blocks: blocks, + } + for _, source := range sources { + m.Sources = append(m.Sources, TsdbID(source)) + } + return m +} + +func GenBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { + startTS, endTS := TestDay.Bounds() + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: "fakeTenant", + TableName: TestTable.Addr(), + Bounds: v1.NewBounds(min, max), + StartTimestamp: startTS, + EndTimestamp: endTS, + Checksum: 0, + }, + } +} + +func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := v1.NewByteReader(indexBuf, bloomsBuf) + + blockOpts := v1.NewBlockOptions(compression.None, 0, 0) + + builder, err := v1.NewBlockBuilder(blockOpts, writer) + if err != nil { + return bloomshipper.Block{}, err + } + + if _, err = builder.BuildFrom(v2.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil { + return bloomshipper.Block{}, err + } + + block := v1.NewBlock(reader, v1.NewMetrics(nil)) + + buf := bytes.NewBuffer(nil) + if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil { + return bloomshipper.Block{}, err + } + + tarReader := bytes.NewReader(buf.Bytes()) + + return bloomshipper.Block{ + BlockRef: ref, + Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader}, + }, nil +} + +func GenSeries(bounds v1.FingerprintBounds) []*v1.Series { + series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) + for i := bounds.Min; i <= bounds.Max; i++ { + series = append(series, &v1.Series{ + Fingerprint: i, + Chunks: v1.ChunkRefs{ + { + From: 0, + Through: 1, + Checksum: 1, + }, + }, + }) + } + return series +} + +func PutMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error { + for _, meta := range metas { + err := bloomClient.PutMeta(context.Background(), meta) + if err != nil { + return err + } + + for _, block := range meta.Blocks { + writtenBlock, err := GenBlock(block) + if err != nil { + return err + } + + err = bloomClient.PutBlock(context.Background(), writtenBlock) + if err != nil { + return err + } + } + } + return nil +} + +func ParseDayTime(s string) config.DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return config.DayTime{ + Time: model.TimeFromUnix(t.Unix()), + } +} diff --git a/pkg/bloombuild/planner/retention_test.go b/pkg/bloombuild/planner/retention_test.go index 6738ac336e74..a309a7fc5301 100644 --- a/pkg/bloombuild/planner/retention_test.go +++ b/pkg/bloombuild/planner/retention_test.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -23,7 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/validation" ) -var testTime = parseDayTime("2024-12-31").ModelTime() +var testTime = plannertest.ParseDayTime("2024-12-31").ModelTime() func TestRetention(t *testing.T) { for _, tc := range []struct { diff --git a/pkg/bloombuild/planner/strategies/splitkeyspace_test.go b/pkg/bloombuild/planner/strategies/splitkeyspace_test.go index e5f6781d06c0..7f5a49a54cc2 100644 --- a/pkg/bloombuild/planner/strategies/splitkeyspace_test.go +++ b/pkg/bloombuild/planner/strategies/splitkeyspace_test.go @@ -3,26 +3,21 @@ package strategies import ( "context" "testing" - "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/bloombuild/common" + "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" "github.com/grafana/loki/v3/pkg/bloombuild/protos" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" - "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) -var testDay = parseDayTime("2023-09-01") -var testTable = config.NewDayTable(testDay, "index_") - func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { - for _, tc := range []struct { desc string err bool @@ -36,24 +31,24 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { err: true, ownershipRange: v1.NewBounds(0, 10), tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, + plannertest.TsdbID(0): nil, }, metas: []bloomshipper.Meta{ - genMeta(11, 20, []int{0}, nil), + plannertest.GenMeta(11, 20, []int{0}, nil), }, }, { desc: "single tsdb", ownershipRange: v1.NewBounds(0, 10), tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, + plannertest.TsdbID(0): nil, }, metas: []bloomshipper.Meta{ - genMeta(4, 8, []int{0}, nil), + plannertest.GenMeta(4, 8, []int{0}, nil), }, exp: []tsdbGaps{ { - tsdbIdentifier: tsdbID(0), + tsdbIdentifier: plannertest.TsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(0, 3), v1.NewBounds(9, 10), @@ -65,22 +60,22 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { desc: "multiple tsdbs with separate blocks", ownershipRange: v1.NewBounds(0, 10), tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, - tsdbID(1): nil, + plannertest.TsdbID(0): nil, + plannertest.TsdbID(1): nil, }, metas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0}, nil), - genMeta(6, 10, []int{1}, nil), + plannertest.GenMeta(0, 5, []int{0}, nil), + plannertest.GenMeta(6, 10, []int{1}, nil), }, exp: []tsdbGaps{ { - tsdbIdentifier: tsdbID(0), + tsdbIdentifier: plannertest.TsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(6, 10), }, }, { - tsdbIdentifier: tsdbID(1), + tsdbIdentifier: plannertest.TsdbID(1), gaps: []v1.FingerprintBounds{ v1.NewBounds(0, 5), }, @@ -91,22 +86,22 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { desc: "multiple tsdbs with the same blocks", ownershipRange: v1.NewBounds(0, 10), tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ - tsdbID(0): nil, - tsdbID(1): nil, + plannertest.TsdbID(0): nil, + plannertest.TsdbID(1): nil, }, metas: []bloomshipper.Meta{ - genMeta(0, 5, []int{0, 1}, nil), - genMeta(6, 8, []int{1}, nil), + plannertest.GenMeta(0, 5, []int{0, 1}, nil), + plannertest.GenMeta(6, 8, []int{1}, nil), }, exp: []tsdbGaps{ { - tsdbIdentifier: tsdbID(0), + tsdbIdentifier: plannertest.TsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(6, 10), }, }, { - tsdbIdentifier: tsdbID(1), + tsdbIdentifier: plannertest.TsdbID(1), gaps: []v1.FingerprintBounds{ v1.NewBounds(9, 10), }, @@ -137,17 +132,17 @@ func Test_blockPlansForGaps(t *testing.T) { { desc: "single overlapping meta+no overlapping block", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: []tsdb.SingleTenantTSDBIdentifier{plannertest.TsdbID(0)}, metas: []bloomshipper.Meta{ - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), + plannertest.GenMeta(5, 20, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(11, 20)}), }, exp: []blockPlan{ { - tsdb: tsdbID(0), + tsdb: plannertest.TsdbID(0), gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), - Series: genSeries(v1.NewBounds(0, 10)), + Series: plannertest.GenSeries(v1.NewBounds(0, 10)), }, }, }, @@ -156,18 +151,18 @@ func Test_blockPlansForGaps(t *testing.T) { { desc: "single overlapping meta+one overlapping block", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: []tsdb.SingleTenantTSDBIdentifier{plannertest.TsdbID(0)}, metas: []bloomshipper.Meta{ - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), + plannertest.GenMeta(5, 20, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(9, 20)}), }, exp: []blockPlan{ { - tsdb: tsdbID(0), + tsdb: plannertest.TsdbID(0), gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), - Series: genSeries(v1.NewBounds(0, 10)), - Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, + Series: plannertest.GenSeries(v1.NewBounds(0, 10)), + Blocks: []bloomshipper.BlockRef{plannertest.GenBlockRef(9, 20)}, }, }, }, @@ -179,18 +174,18 @@ func Test_blockPlansForGaps(t *testing.T) { // but we can trim the range needing generation desc: "trims up to date area", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: []tsdb.SingleTenantTSDBIdentifier{plannertest.TsdbID(0)}, metas: []bloomshipper.Meta{ - genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb - genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb + plannertest.GenMeta(9, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(9, 20)}), // block for same tsdb + plannertest.GenMeta(9, 20, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(9, 20)}), // block for different tsdb }, exp: []blockPlan{ { - tsdb: tsdbID(0), + tsdb: plannertest.TsdbID(0), gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 8), - Series: genSeries(v1.NewBounds(0, 8)), + Series: plannertest.GenSeries(v1.NewBounds(0, 8)), }, }, }, @@ -199,19 +194,19 @@ func Test_blockPlansForGaps(t *testing.T) { { desc: "uses old block for overlapping range", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: []tsdb.SingleTenantTSDBIdentifier{plannertest.TsdbID(0)}, metas: []bloomshipper.Meta{ - genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb - genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb + plannertest.GenMeta(9, 20, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(9, 20)}), // block for same tsdb + plannertest.GenMeta(5, 20, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(5, 20)}), // block for different tsdb }, exp: []blockPlan{ { - tsdb: tsdbID(0), + tsdb: plannertest.TsdbID(0), gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 8), - Series: genSeries(v1.NewBounds(0, 8)), - Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, + Series: plannertest.GenSeries(v1.NewBounds(0, 8)), + Blocks: []bloomshipper.BlockRef{plannertest.GenBlockRef(5, 20)}, }, }, }, @@ -220,50 +215,50 @@ func Test_blockPlansForGaps(t *testing.T) { { desc: "multi case", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs + tsdbs: []tsdb.SingleTenantTSDBIdentifier{plannertest.TsdbID(0), plannertest.TsdbID(1)}, // generate for both tsdbs metas: []bloomshipper.Meta{ - genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ - genBlockRef(0, 1), - genBlockRef(1, 2), + plannertest.GenMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(0, 1), + plannertest.GenBlockRef(1, 2), }), // tsdb_0 - genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0 + plannertest.GenMeta(6, 8, []int{0}, []bloomshipper.BlockRef{plannertest.GenBlockRef(6, 8)}), // tsdb_0 - genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1 - genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1 + plannertest.GenMeta(3, 5, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(3, 5)}), // tsdb_1 + plannertest.GenMeta(8, 10, []int{1}, []bloomshipper.BlockRef{plannertest.GenBlockRef(8, 10)}), // tsdb_1 }, exp: []blockPlan{ { - tsdb: tsdbID(0), + tsdb: plannertest.TsdbID(0), gaps: []protos.Gap{ // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) { Bounds: v1.NewBounds(3, 5), - Series: genSeries(v1.NewBounds(3, 5)), - Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, + Series: plannertest.GenSeries(v1.NewBounds(3, 5)), + Blocks: []bloomshipper.BlockRef{plannertest.GenBlockRef(3, 5)}, }, { Bounds: v1.NewBounds(9, 10), - Series: genSeries(v1.NewBounds(9, 10)), - Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, + Series: plannertest.GenSeries(v1.NewBounds(9, 10)), + Blocks: []bloomshipper.BlockRef{plannertest.GenBlockRef(8, 10)}, }, }, }, // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) { - tsdb: tsdbID(1), + tsdb: plannertest.TsdbID(1), gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 2), - Series: genSeries(v1.NewBounds(0, 2)), + Series: plannertest.GenSeries(v1.NewBounds(0, 2)), Blocks: []bloomshipper.BlockRef{ - genBlockRef(0, 1), - genBlockRef(1, 2), + plannertest.GenBlockRef(0, 1), + plannertest.GenBlockRef(1, 2), }, }, { Bounds: v1.NewBounds(6, 7), - Series: genSeries(v1.NewBounds(6, 7)), - Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, + Series: plannertest.GenSeries(v1.NewBounds(6, 7)), + Blocks: []bloomshipper.BlockRef{plannertest.GenBlockRef(6, 8)}, }, }, }, @@ -272,28 +267,28 @@ func Test_blockPlansForGaps(t *testing.T) { { desc: "dedupes block refs", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: []tsdb.SingleTenantTSDBIdentifier{plannertest.TsdbID(0)}, metas: []bloomshipper.Meta{ - genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ - genBlockRef(1, 4), - genBlockRef(9, 20), + plannertest.GenMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(1, 4), + plannertest.GenBlockRef(9, 20), }), // blocks for first diff tsdb - genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ - genBlockRef(5, 10), - genBlockRef(9, 20), // same block references in prior meta (will be deduped) + plannertest.GenMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ + plannertest.GenBlockRef(5, 10), + plannertest.GenBlockRef(9, 20), // same block references in prior meta (will be deduped) }), // block for second diff tsdb }, exp: []blockPlan{ { - tsdb: tsdbID(0), + tsdb: plannertest.TsdbID(0), gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), - Series: genSeries(v1.NewBounds(0, 10)), + Series: plannertest.GenSeries(v1.NewBounds(0, 10)), Blocks: []bloomshipper.BlockRef{ - genBlockRef(1, 4), - genBlockRef(5, 10), - genBlockRef(9, 20), + plannertest.GenBlockRef(1, 4), + plannertest.GenBlockRef(5, 10), + plannertest.GenBlockRef(9, 20), }, }, }, @@ -305,7 +300,7 @@ func Test_blockPlansForGaps(t *testing.T) { // We add series spanning the whole FP ownership range tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries) for _, id := range tc.tsdbs { - tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange)) + tsdbs[id] = newFakeForSeries(plannertest.GenSeries(tc.ownershipRange)) } // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested @@ -328,70 +323,6 @@ func Test_blockPlansForGaps(t *testing.T) { } } -func genSeries(bounds v1.FingerprintBounds) []*v1.Series { - series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) - for i := bounds.Min; i <= bounds.Max; i++ { - series = append(series, &v1.Series{ - Fingerprint: i, - Chunks: v1.ChunkRefs{ - { - From: 0, - Through: 1, - Checksum: 1, - }, - }, - }) - } - return series -} - -func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { - m := bloomshipper.Meta{ - MetaRef: bloomshipper.MetaRef{ - Ref: bloomshipper.Ref{ - TenantID: "fakeTenant", - TableName: testTable.Addr(), - Bounds: v1.NewBounds(min, max), - }, - }, - Blocks: blocks, - } - for _, source := range sources { - m.Sources = append(m.Sources, tsdbID(source)) - } - return m -} - -func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { - startTS, endTS := testDay.Bounds() - return bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: "fakeTenant", - TableName: testTable.Addr(), - Bounds: v1.NewBounds(min, max), - StartTimestamp: startTS, - EndTimestamp: endTS, - Checksum: 0, - }, - } -} - -func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { - return tsdb.SingleTenantTSDBIdentifier{ - TS: time.Unix(int64(n), 0), - } -} - -func parseDayTime(s string) config.DayTime { - t, err := time.Parse("2006-01-02", s) - if err != nil { - panic(err) - } - return config.DayTime{ - Time: model.TimeFromUnix(t.Unix()), - } -} - type fakeForSeries struct { series []*v1.Series }