diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 54dd9a09e4..8d75ed6b04 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -34,6 +34,7 @@ import ( promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -3341,3 +3342,99 @@ func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) { testutil.NotOk(t, err) }) } + +func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { + t.Skip("Known Issue, Added for debugging in followup PR.") + + logger := log.NewNopLogger() + tmpDir := t.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + auxDir := filepath.Join(tmpDir, "aux") + metaDir := filepath.Join(tmpDir, "meta") + extLset := labels.FromStrings("region", "eu-west") + + testutil.Ok(t, os.MkdirAll(metaDir, os.ModePerm)) + testutil.Ok(t, os.MkdirAll(auxDir, os.ModePerm)) + + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bkt.Close()) }) + + for i := 0; i < 2; i++ { + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = tmpDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, h.Close()) }) + + app := h.Appender(context.Background()) + _, err = app.Append(0, labels.FromStrings("replica", "a", "z", "1"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("replica", "a", "z", "2"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("replica", "b", "z", "1"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("replica", "b", "z", "2"), 0, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + id := createBlockFromHead(t, auxDir, h) + + auxBlockDir := filepath.Join(auxDir, id.String()) + _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, nil) + testutil.Ok(t, err) + + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + } + + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(t, err) + + metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + }) + testutil.Ok(t, err) + + bucketStore, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(10e6), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + ) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) }) + + testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) + + // make sure to have updated inner label names + bucketStore.UpdateLabelNames() + + srv := newStoreSeriesServer(context.Background()) + testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{ + WithoutReplicaLabels: []string{"replica"}, + MinTime: timestamp.FromTime(minTime), + MaxTime: timestamp.FromTime(maxTime), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NEQ, Name: "z", Value: ""}, + }, + }, srv)) + + testutil.Equals(t, 2, len(srv.SeriesSet)) +}