Skip to content

Commit

Permalink
Fix lazy expanded postings cache and bug of non equal matcher (#7220)
Browse files Browse the repository at this point in the history
* fix lazy expanded postings cache and bug of non equal matcher with non existent values

Signed-off-by: Ben Ye <[email protected]>

* test case for remove keys noop

Signed-off-by: Ben Ye <[email protected]>

* add promqlsmith fuzz test

Signed-off-by: Ben Ye <[email protected]>

* update

Signed-off-by: Ben Ye <[email protected]>

* changelog

Signed-off-by: Ben Ye <[email protected]>

* fix go mod

Signed-off-by: Ben Ye <[email protected]>

* rename test

Signed-off-by: Ben Ye <[email protected]>

* fix series request timestamp

Signed-off-by: Ben Ye <[email protected]>

* skip e2e test

Signed-off-by: Ben Ye <[email protected]>

* handle non lazy expanded case

Signed-off-by: Ben Ye <[email protected]>

* update comment

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Mar 27, 2024
1 parent 6c613fc commit 4d7a75f
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred.
- [#7122](https://github.com/thanos-io/thanos/pull/7122) Store Gateway: Fix lazy expanded postings estimate base cardinality using posting group with remove keys.
- [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration.
- [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness.

### Added

- [#7194](https://github.com/thanos-io/thanos/pull/7194) Downsample: retry objstore related errors
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
github.com/prometheus/common v0.46.0
github.com/prometheus/exporter-toolkit v0.11.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba
github.com/prometheus/prometheus v0.50.0
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20240309075357-e8336a5fd5f3
Expand Down Expand Up @@ -116,6 +116,7 @@ require (
)

require (
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,8 @@ github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cortexproject/promqlsmith v0.0.0-20240103062231-e3aada49136f h1:0IyHUBbWzCxe/dgjBCQcIv4ZIc7Rvlp2jXVYibcoAW8=
github.com/cortexproject/promqlsmith v0.0.0-20240103062231-e3aada49136f/go.mod h1:by/B++NlNa8Hp+d2054cy3YR4ijhPNGnuWjUWOUppms=
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5 h1:UfuB6no2X1BcKxICY63lzgY+XaPMQ/Wv4woP3p0L+mg=
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5/go.mod h1:fcysbw4fOsOipXKeXPXWSh7tXrUQSUr5V4duojv0oCM=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -1464,8 +1464,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba h1:bDCs3jd+3KURFIDykicCeNfa573KYVZGhN4F62WHTmI=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/prometheus/prometheus v0.50.0 h1:gf+SN6jtbsZ70KkIGg7S3LuB4kHyUfatZLCGwZ1/aec=
github.com/prometheus/prometheus v0.50.0/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
Expand Down
19 changes: 14 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,10 +1159,11 @@ func (b *blockSeriesClient) nextBatch(tenant string) error {
}
b.i = end

lazyExpandedPosting := b.lazyPostings.lazyExpanded()
postingsBatch := b.lazyPostings.postings[start:end]
if len(postingsBatch) == 0 {
b.hasMorePostings = false
if b.lazyPostings.lazyExpanded() {
if lazyExpandedPosting {
// No need to fetch index version again if lazy posting has 0 length.
if len(b.lazyPostings.postings) > 0 {
v, err := b.indexr.IndexVersion()
Expand Down Expand Up @@ -1196,11 +1197,13 @@ OUTER:
if err := b.ctx.Err(); err != nil {
return err
}
ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt)
hasMatchedChunks, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt)
if err != nil {
return errors.Wrap(err, "read series")
}
if !ok {
// Skip getting series symbols if we know there is no matched chunks
// and lazy expanded posting not enabled.
if !lazyExpandedPosting && !hasMatchedChunks {
continue
}

Expand All @@ -1218,9 +1221,15 @@ OUTER:
continue OUTER
}
}
if b.lazyPostings.lazyExpanded() {
if lazyExpandedPosting {
b.expandedPostings = append(b.expandedPostings, postingsBatch[i])
}
// If lazy expanded postings enabled, due to expanded postings cache, we need to
// make sure we check lazy posting matchers and update expanded postings before
// going to next series.
if !hasMatchedChunks {
continue
}

completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset)
if b.extLsetToRemove != nil {
Expand Down Expand Up @@ -1261,7 +1270,7 @@ OUTER:
b.entries = append(b.entries, s)
}

if b.lazyPostings.lazyExpanded() {
if lazyExpandedPosting {
// Apply series limit before fetching chunks, for actual series matched.
if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups
return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name)
}

// No posting ranges found means empty posting.
if len(rngs) == 0 {
// If the posting group adds keys, no posting ranges found means empty posting.
if len(pg.addKeys) > 0 && len(rngs) == 0 {
return nil, true, nil
}
// If the posting group removes keys, no posting ranges found is fine. It means
// that the posting group is a noop. {job != "some_non_existent_value"}
if len(pg.removeKeys) > 0 && len(rngs) == 0 {
continue
}
for _, r := range rngs {
if r == indexheader.NotFoundRange {
continue
Expand Down
41 changes: 36 additions & 5 deletions pkg/store/lazy_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
expectedError: "postings offsets for foo: random",
},
{
name: "posting offsets empty",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
},
name: "posting offsets empty with add keys, expect empty posting",
inputPostings: map[string]map[string]index.Range{},
seriesMaxSize: 1000,
seriesMatchRatio: 0.5,
postingGroups: []*postingGroup{
Expand All @@ -322,7 +320,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
expectedEmptyPosting: true,
},
{
name: "posting group label doesn't exist",
name: "posting group label with add keys doesn't exist, return empty postings",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
},
Expand All @@ -335,6 +333,39 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
expectedPostingGroups: nil,
expectedEmptyPosting: true,
},
{
name: "posting group label with remove keys doesn't exist, noop",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
},
seriesMaxSize: 1000,
seriesMatchRatio: 0.5,
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", removeKeys: []string{"foo"}, addAll: true},
},
expectedPostingGroups: []*postingGroup{
{name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true},
{name: "foo", addKeys: []string{"bar"}, cardinality: 1},
},
},
{
name: "posting group label with remove keys exist but no matching value, noop",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
"bar": {"baz": index.Range{Start: 8, End: 16}},
},
seriesMaxSize: 1000,
seriesMatchRatio: 0.5,
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", removeKeys: []string{"foo"}, addAll: true},
},
expectedPostingGroups: []*postingGroup{
{name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true},
{name: "foo", addKeys: []string{"bar"}, cardinality: 1},
},
},
{
name: "posting group keys partial exist",
inputPostings: map[string]map[string]index.Range{
Expand Down
107 changes: 107 additions & 0 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,110 @@ func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error {

return iw.Close()
}

// CreateBlockWithChurn writes a block with the given series. Start time of each series
// will be randomized in the given time window to create churn. Only float chunk is supported right now.
func CreateBlockWithChurn(
ctx context.Context,
rnd *rand.Rand,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
extLset labels.Labels,
resolution int64,
scrapeInterval int64,
seriesSize int64,
) (id ulid.ULID, err error) {
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
headOpts.ChunkRange = 10000000000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer func() {
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil {
err = errors.Wrap(e, "delete chunks dir")
}
}()

app := h.Appender(ctx)
for i := 0; i < len(series); i++ {

var ref storage.SeriesRef
start := RandRange(rnd, mint, maxt)
for j := 0; j < numSamples; j++ {
if ref == 0 {
ref, err = app.Append(0, series[i], start, float64(i+j))
} else {
ref, err = app.Append(ref, series[i], start, float64(i+j))
}
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return id, errors.Wrap(err, "add sample")
}
start += scrapeInterval
if start > maxt {
break
}
}
}
if err := app.Commit(); err != nil {
return id, errors.Wrap(err, "commit")
}

c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil)
if err != nil {
return id, errors.Wrap(err, "create compactor")
}

id, err = c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}

if id.Compare(ulid.ULID{}) == 0 {
return id, errors.Errorf("nothing to write, asked for %d samples", numSamples)
}

blockDir := filepath.Join(dir, id.String())
logger := log.NewNopLogger()

if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize},
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}

return id, nil
}

// AddDelay rewrites a given block with delay.
func AddDelay(blockID ulid.ULID, dir string, blockDelay time.Duration) (ulid.ULID, error) {
id, err := ulid.New(uint64(timestamp.FromTime(timestamp.Time(int64(blockID.Time())).Add(-blockDelay))), bytes.NewReader(blockID.Entropy()))
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create block id")
}

bdir := path.Join(dir, blockID.String())
m, err := metadata.ReadFromDir(bdir)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "open meta file")
}

logger := log.NewNopLogger()
m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}
if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil {
return ulid.ULID{}, errors.Wrap(err, "write meta.json file")
}

return id, os.Rename(path.Join(dir, blockID.String()), path.Join(dir, id.String()))
}
11 changes: 11 additions & 0 deletions pkg/testutil/e2eutil/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package e2eutil

import "math/rand"

// RandRange returns a random int64 from [min, max].
func RandRange(rnd *rand.Rand, min, max int64) int64 {
return rnd.Int63n(max-min) + min
}
Loading

0 comments on commit 4d7a75f

Please sign in to comment.