Skip to content

Commit

Permalink
Store: add failing test for potential dedup issue (#6693)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored Sep 4, 2023
1 parent 78f21b9 commit 7133977
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -3341,3 +3342,99 @@ func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) {
testutil.NotOk(t, err)
})
}

func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) {
t.Skip("Known Issue, Added for debugging in followup PR.")

logger := log.NewNopLogger()
tmpDir := t.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")
extLset := labels.FromStrings("region", "eu-west")

testutil.Ok(t, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(t, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bkt.Close()) })

for i := 0; i < 2; i++ {
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, h.Close()) })

app := h.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings("replica", "a", "z", "1"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("replica", "a", "z", "2"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("replica", "b", "z", "1"), 0, 1)
testutil.Ok(t, err)
_, err = app.Append(0, labels.FromStrings("replica", "b", "z", "2"), 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())

id := createBlockFromHead(t, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(t, err)

testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))
}

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) })

testutil.Ok(t, bucketStore.SyncBlocks(context.Background()))

// make sure to have updated inner label names
bucketStore.UpdateLabelNames()

srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{
WithoutReplicaLabels: []string{"replica"},
MinTime: timestamp.FromTime(minTime),
MaxTime: timestamp.FromTime(maxTime),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "z", Value: ""},
},
}, srv))

testutil.Equals(t, 2, len(srv.SeriesSet))
}

0 comments on commit 7133977

Please sign in to comment.