diff --git a/CHANGELOG.md b/CHANGELOG.md index f19286e4e8..8804ccdcd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred. - [#7122](https://github.com/thanos-io/thanos/pull/7122) Store Gateway: Fix lazy expanded postings estimate base cardinality using posting group with remove keys. - [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration. +- [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness. ### Added + - [#7194](https://github.com/thanos-io/thanos/pull/7194) Downsample: retry objstore related errors - [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules - [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use. diff --git a/go.mod b/go.mod index e492a9fcdf..2a3957c41b 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/prometheus/common v0.46.0 github.com/prometheus/exporter-toolkit v0.11.0 // Prometheus maps version 2.x.y to tags v0.x.y. - github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba + github.com/prometheus/prometheus v0.50.0 github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20240309075357-e8336a5fd5f3 @@ -116,6 +116,7 @@ require ( ) require ( + github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5 github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.27.10 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 diff --git a/go.sum b/go.sum index 59b07307f0..7d8774fcbc 100644 --- a/go.sum +++ b/go.sum @@ -778,8 +778,8 @@ github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cortexproject/promqlsmith v0.0.0-20240103062231-e3aada49136f h1:0IyHUBbWzCxe/dgjBCQcIv4ZIc7Rvlp2jXVYibcoAW8= -github.com/cortexproject/promqlsmith v0.0.0-20240103062231-e3aada49136f/go.mod h1:by/B++NlNa8Hp+d2054cy3YR4ijhPNGnuWjUWOUppms= +github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5 h1:UfuB6no2X1BcKxICY63lzgY+XaPMQ/Wv4woP3p0L+mg= +github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5/go.mod h1:fcysbw4fOsOipXKeXPXWSh7tXrUQSUr5V4duojv0oCM= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -1464,8 +1464,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba h1:bDCs3jd+3KURFIDykicCeNfa573KYVZGhN4F62WHTmI= -github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU= +github.com/prometheus/prometheus v0.50.0 h1:gf+SN6jtbsZ70KkIGg7S3LuB4kHyUfatZLCGwZ1/aec= +github.com/prometheus/prometheus v0.50.0/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw= github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU= diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 20cf10e657..a45150681f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1159,10 +1159,11 @@ func (b *blockSeriesClient) nextBatch(tenant string) error { } b.i = end + lazyExpandedPosting := b.lazyPostings.lazyExpanded() postingsBatch := b.lazyPostings.postings[start:end] if len(postingsBatch) == 0 { b.hasMorePostings = false - if b.lazyPostings.lazyExpanded() { + if lazyExpandedPosting { // No need to fetch index version again if lazy posting has 0 length. if len(b.lazyPostings.postings) > 0 { v, err := b.indexr.IndexVersion() @@ -1196,11 +1197,13 @@ OUTER: if err := b.ctx.Err(); err != nil { return err } - ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt) + hasMatchedChunks, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt) if err != nil { return errors.Wrap(err, "read series") } - if !ok { + // Skip getting series symbols if we know there is no matched chunks + // and lazy expanded posting not enabled. + if !lazyExpandedPosting && !hasMatchedChunks { continue } @@ -1218,9 +1221,15 @@ OUTER: continue OUTER } } - if b.lazyPostings.lazyExpanded() { + if lazyExpandedPosting { b.expandedPostings = append(b.expandedPostings, postingsBatch[i]) } + // If lazy expanded postings enabled, due to expanded postings cache, we need to + // make sure we check lazy posting matchers and update expanded postings before + // going to next series. + if !hasMatchedChunks { + continue + } completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) if b.extLsetToRemove != nil { @@ -1261,7 +1270,7 @@ OUTER: b.entries = append(b.entries, s) } - if b.lazyPostings.lazyExpanded() { + if lazyExpandedPosting { // Apply series limit before fetching chunks, for actual series matched. if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil { return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index cb245180cf..0f4038bc61 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -54,10 +54,15 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) } - // No posting ranges found means empty posting. - if len(rngs) == 0 { + // If the posting group adds keys, no posting ranges found means empty posting. + if len(pg.addKeys) > 0 && len(rngs) == 0 { return nil, true, nil } + // If the posting group removes keys, no posting ranges found is fine. It means + // that the posting group is a noop. {job != "some_non_existent_value"} + if len(pg.removeKeys) > 0 && len(rngs) == 0 { + continue + } for _, r := range rngs { if r == indexheader.NotFoundRange { continue diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 86cd5e3bf8..21a04c3c93 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -308,10 +308,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { expectedError: "postings offsets for foo: random", }, { - name: "posting offsets empty", - inputPostings: map[string]map[string]index.Range{ - "foo": {"bar": index.Range{End: 8}}, - }, + name: "posting offsets empty with add keys, expect empty posting", + inputPostings: map[string]map[string]index.Range{}, seriesMaxSize: 1000, seriesMatchRatio: 0.5, postingGroups: []*postingGroup{ @@ -322,7 +320,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { expectedEmptyPosting: true, }, { - name: "posting group label doesn't exist", + name: "posting group label with add keys doesn't exist, return empty postings", inputPostings: map[string]map[string]index.Range{ "foo": {"bar": index.Range{End: 8}}, }, @@ -335,6 +333,39 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { expectedPostingGroups: nil, expectedEmptyPosting: true, }, + { + name: "posting group label with remove keys doesn't exist, noop", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}, addAll: true}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, + { + name: "posting group label with remove keys exist but no matching value, noop", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 8}}, + "bar": {"baz": index.Range{Start: 8, End: 16}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.5, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}, addAll: true}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true}, + {name: "foo", addKeys: []string{"bar"}, cardinality: 1}, + }, + }, { name: "posting group keys partial exist", inputPostings: map[string]map[string]index.Range{ diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 453048a173..3bf797af39 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -811,3 +811,110 @@ func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { return iw.Close() } + +// CreateBlockWithChurn writes a block with the given series. Start time of each series +// will be randomized in the given time window to create churn. Only float chunk is supported right now. +func CreateBlockWithChurn( + ctx context.Context, + rnd *rand.Rand, + dir string, + series []labels.Labels, + numSamples int, + mint, maxt int64, + extLset labels.Labels, + resolution int64, + scrapeInterval int64, + seriesSize int64, +) (id ulid.ULID, err error) { + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = filepath.Join(dir, "chunks") + headOpts.ChunkRange = 10000000000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + if err != nil { + return id, errors.Wrap(err, "create head block") + } + defer func() { + runutil.CloseWithErrCapture(&err, h, "TSDB Head") + if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil { + err = errors.Wrap(e, "delete chunks dir") + } + }() + + app := h.Appender(ctx) + for i := 0; i < len(series); i++ { + + var ref storage.SeriesRef + start := RandRange(rnd, mint, maxt) + for j := 0; j < numSamples; j++ { + if ref == 0 { + ref, err = app.Append(0, series[i], start, float64(i+j)) + } else { + ref, err = app.Append(ref, series[i], start, float64(i+j)) + } + if err != nil { + if rerr := app.Rollback(); rerr != nil { + err = errors.Wrapf(err, "rollback failed: %v", rerr) + } + return id, errors.Wrap(err, "add sample") + } + start += scrapeInterval + if start > maxt { + break + } + } + } + if err := app.Commit(); err != nil { + return id, errors.Wrap(err, "commit") + } + + c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil) + if err != nil { + return id, errors.Wrap(err, "create compactor") + } + + id, err = c.Write(dir, h, mint, maxt, nil) + if err != nil { + return id, errors.Wrap(err, "write block") + } + + if id.Compare(ulid.ULID{}) == 0 { + return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) + } + + blockDir := filepath.Join(dir, id.String()) + logger := log.NewNopLogger() + + if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: resolution}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, + }, nil); err != nil { + return id, errors.Wrap(err, "finalize block") + } + + return id, nil +} + +// AddDelay rewrites a given block with delay. +func AddDelay(blockID ulid.ULID, dir string, blockDelay time.Duration) (ulid.ULID, error) { + id, err := ulid.New(uint64(timestamp.FromTime(timestamp.Time(int64(blockID.Time())).Add(-blockDelay))), bytes.NewReader(blockID.Entropy())) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "create block id") + } + + bdir := path.Join(dir, blockID.String()) + m, err := metadata.ReadFromDir(bdir) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "open meta file") + } + + logger := log.NewNopLogger() + m.ULID = id + m.Compaction.Sources = []ulid.ULID{id} + if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { + return ulid.ULID{}, errors.Wrap(err, "write meta.json file") + } + + return id, os.Rename(path.Join(dir, blockID.String()), path.Join(dir, id.String())) +} diff --git a/pkg/testutil/e2eutil/rand.go b/pkg/testutil/e2eutil/rand.go new file mode 100644 index 0000000000..5cac2d6f07 --- /dev/null +++ b/pkg/testutil/e2eutil/rand.go @@ -0,0 +1,11 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package e2eutil + +import "math/rand" + +// RandRange returns a random int64 from [min, max]. +func RandRange(rnd *rand.Rand, min, max int64) int64 { + return rnd.Int63n(max-min) + min +} diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index e13f79a495..6e5e0eb756 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -7,15 +7,18 @@ import ( "bytes" "context" "fmt" + "math/rand" "net/http" "os" "path" "path/filepath" + "reflect" "strconv" "strings" "testing" "time" + "github.com/cortexproject/promqlsmith" "github.com/efficientgo/core/testutil" "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" @@ -23,10 +26,12 @@ import ( "github.com/efficientgo/e2e/monitoring/matchers" e2eobs "github.com/efficientgo/e2e/observable" "github.com/go-kit/log" + "github.com/google/go-cmp/cmp" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" @@ -1260,3 +1265,177 @@ func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { testutil.Ok(t, s1.WaitSumMetrics(e2emon.GreaterOrEqual(1), "thanos_bucket_store_lazy_expanded_postings_total"), e2emon.WaitMissingMetrics()) testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total"), e2emon.WaitMissingMetrics()) } + +var labelSetsComparer = cmp.Comparer(func(x, y []map[string]string) bool { + if len(x) != len(y) { + return false + } + for i := 0; i < len(x); i++ { + if !reflect.DeepEqual(x[i], y[i]) { + return false + } + } + return true +}) + +func TestStoreGatewayLazyExpandedPostingsPromQLSmithFuzz(t *testing.T) { + t.Skip("Skipping the testcase in CI due to its randomness.") + + t.Parallel() + + e, err := e2e.NewDockerEnvironment("fuzz-sg-lazy") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "fuzz-store-gateway-lazy-expanded-postings-test" + m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + // Create 2 store gateways, one with lazy expanded postings enabled and another one disabled. + s1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + "", + []string{"--store.enable-lazy-expanded-postings"}, + ) + s2 := e2ethanos.NewStoreGW( + e, + "2", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(s1, s2)) + + q1 := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Init() + q2 := e2ethanos.NewQuerierBuilder(e, "2", s2.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q1, q2)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + now := time.Now() + start := now.Add(-time.Minute * 20) + startMs := start.UnixMilli() + end := now.Add(-time.Minute * 10) + endMs := end.UnixMilli() + numSeries := 1000 + numSamples := 50 + lbls := make([]labels.Labels, 0, numSeries) + scrapeInterval := (10 * time.Second).Milliseconds() + metricName := "http_requests_total" + statusCodes := []string{"200", "400", "404", "500", "502"} + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + for i := 0; i < numSeries; i++ { + lbl := labels.Labels{ + {Name: labels.MetricName, Value: metricName}, + {Name: "job", Value: "test"}, + {Name: "series", Value: strconv.Itoa(i % 200)}, + {Name: "status_code", Value: statusCodes[i%5]}, + } + lbls = append(lbls, lbl) + } + id, err := e2eutil.CreateBlockWithChurn(ctx, rnd, dir, lbls, numSamples, startMs, endMs, extLset, 0, scrapeInterval, 10) + testutil.Ok(t, err) + id, err = e2eutil.AddDelay(id, dir, 30*time.Minute) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + + opts := []promqlsmith.Option{ + promqlsmith.WithEnforceLabelMatchers([]*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + labels.MustNewMatcher(labels.MatchEqual, "job", "test"), + }), + } + ps := promqlsmith.New(rnd, lbls, opts...) + + type testCase struct { + matchers string + res1, newRes1, res2, newRes2 []map[string]string + } + + cases := make([]*testCase, 0, 1000) + + client := promclient.NewDefaultClient() + + u1 := urlParse(t, "http://"+q1.Endpoint("http")) + u2 := urlParse(t, "http://"+q2.Endpoint("http")) + matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName) + // Wait until series can be queried. + series(t, ctx, q1.Endpoint("http"), []*labels.Matcher{matcher}, startMs, endMs, func(res []map[string]string) bool { + return len(res) > 0 + }) + series(t, ctx, q2.Endpoint("http"), []*labels.Matcher{matcher}, startMs, endMs, func(res []map[string]string) bool { + return len(res) > 0 + }) + + for i := 0; i < 1000; i++ { + matchers := ps.WalkSelectors() + matcherStrings := storepb.PromMatchersToString(matchers...) + minT := e2eutil.RandRange(rnd, startMs, endMs) + maxT := e2eutil.RandRange(rnd, minT+1, endMs) + + res1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT) + testutil.Ok(t, err) + res2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT) + testutil.Ok(t, err) + + // Try again with a different timestamp and let requests hit posting cache. + minT = e2eutil.RandRange(rnd, startMs, endMs) + maxT = e2eutil.RandRange(rnd, minT+1, endMs) + newRes1, err := client.SeriesInGRPC(ctx, u1, matchers, minT, maxT) + testutil.Ok(t, err) + newRes2, err := client.SeriesInGRPC(ctx, u2, matchers, minT, maxT) + testutil.Ok(t, err) + + cases = append(cases, &testCase{ + matchers: matcherStrings, + res1: res1, + newRes1: newRes1, + res2: res2, + newRes2: newRes2, + }) + } + + failures := 0 + for i, tc := range cases { + if !cmp.Equal(tc.res1, tc.res2, labelSetsComparer) { + t.Logf("case %d results mismatch for the first attempt.\n%s\nres1 len: %d data: %s\nres2 len: %d data: %s\n", i, tc.matchers, len(tc.res1), tc.res1, len(tc.res2), tc.res2) + failures++ + } else if !cmp.Equal(tc.newRes1, tc.newRes2, labelSetsComparer) { + t.Logf("case %d results mismatch for the second attempt.\n%s\nres1 len: %d data: %s\nres2 len: %d data: %s\n", i, tc.matchers, len(tc.newRes1), tc.newRes1, len(tc.newRes2), tc.newRes2) + failures++ + } + } + if failures > 0 { + require.Failf(t, "finished store gateway lazy expanded posting fuzzing tests", "%d test cases failed", failures) + } +}