Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lazy expanded postings cache and bug of non equal matcher #7220

Merged
merged 11 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred.
- [#7122](https://github.com/thanos-io/thanos/pull/7122) Store Gateway: Fix lazy expanded postings estimate base cardinality using posting group with remove keys.
- [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration.
- [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness.

### Added

- [#7194](https://github.com/thanos-io/thanos/pull/7194) Downsample: retry objstore related errors
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
github.com/prometheus/common v0.46.0
github.com/prometheus/exporter-toolkit v0.11.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba
github.com/prometheus/prometheus v0.50.0
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20240309075357-e8336a5fd5f3
Expand Down Expand Up @@ -116,6 +116,7 @@ require (
)

require (
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,8 @@ github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cortexproject/promqlsmith v0.0.0-20240103062231-e3aada49136f h1:0IyHUBbWzCxe/dgjBCQcIv4ZIc7Rvlp2jXVYibcoAW8=
github.com/cortexproject/promqlsmith v0.0.0-20240103062231-e3aada49136f/go.mod h1:by/B++NlNa8Hp+d2054cy3YR4ijhPNGnuWjUWOUppms=
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5 h1:UfuB6no2X1BcKxICY63lzgY+XaPMQ/Wv4woP3p0L+mg=
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5/go.mod h1:fcysbw4fOsOipXKeXPXWSh7tXrUQSUr5V4duojv0oCM=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -1464,8 +1464,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba h1:bDCs3jd+3KURFIDykicCeNfa573KYVZGhN4F62WHTmI=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/prometheus/prometheus v0.50.0 h1:gf+SN6jtbsZ70KkIGg7S3LuB4kHyUfatZLCGwZ1/aec=
github.com/prometheus/prometheus v0.50.0/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
Expand Down
19 changes: 14 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,10 +1159,11 @@ func (b *blockSeriesClient) nextBatch(tenant string) error {
}
b.i = end

lazyExpandedPosting := b.lazyPostings.lazyExpanded()
postingsBatch := b.lazyPostings.postings[start:end]
if len(postingsBatch) == 0 {
b.hasMorePostings = false
if b.lazyPostings.lazyExpanded() {
if lazyExpandedPosting {
// No need to fetch index version again if lazy posting has 0 length.
if len(b.lazyPostings.postings) > 0 {
v, err := b.indexr.IndexVersion()
Expand Down Expand Up @@ -1196,11 +1197,13 @@ OUTER:
if err := b.ctx.Err(); err != nil {
return err
}
ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt)
hasMatchedChunks, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt)
if err != nil {
return errors.Wrap(err, "read series")
}
if !ok {
// Skip getting series symbols if we know there is no matched chunks
// and lazy expanded posting not enabled.
if !lazyExpandedPosting && !hasMatchedChunks {
continue
}

Expand All @@ -1218,9 +1221,15 @@ OUTER:
continue OUTER
}
}
if b.lazyPostings.lazyExpanded() {
if lazyExpandedPosting {
b.expandedPostings = append(b.expandedPostings, postingsBatch[i])
}
// If lazy expanded postings enabled, due to expanded postings cache, we need to
// make sure we check lazy posting matchers and update expanded postings before
// going to next series.
if !hasMatchedChunks {
continue
}

completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset)
if b.extLsetToRemove != nil {
Expand Down Expand Up @@ -1261,7 +1270,7 @@ OUTER:
b.entries = append(b.entries, s)
}

if b.lazyPostings.lazyExpanded() {
if lazyExpandedPosting {
// Apply series limit before fetching chunks, for actual series matched.
if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups
return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name)
}

// No posting ranges found means empty posting.
if len(rngs) == 0 {
// If the posting group adds keys, no posting ranges found means empty posting.
if len(pg.addKeys) > 0 && len(rngs) == 0 {
return nil, true, nil
}
// If the posting group removes keys, no posting ranges found is fine. It means
// that the posting group is a noop. {job != "some_non_existent_value"}
if len(pg.removeKeys) > 0 && len(rngs) == 0 {
continue
}
for _, r := range rngs {
if r == indexheader.NotFoundRange {
continue
Expand Down
41 changes: 36 additions & 5 deletions pkg/store/lazy_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,8 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
expectedError: "postings offsets for foo: random",
},
{
name: "posting offsets empty",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
},
name: "posting offsets empty with add keys, expect empty posting",
inputPostings: map[string]map[string]index.Range{},
seriesMaxSize: 1000,
seriesMatchRatio: 0.5,
postingGroups: []*postingGroup{
Expand All @@ -322,7 +320,7 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
expectedEmptyPosting: true,
},
{
name: "posting group label doesn't exist",
name: "posting group label with add keys doesn't exist, return empty postings",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
},
Expand All @@ -335,6 +333,39 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
expectedPostingGroups: nil,
expectedEmptyPosting: true,
},
{
name: "posting group label with remove keys doesn't exist, noop",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
},
seriesMaxSize: 1000,
seriesMatchRatio: 0.5,
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", removeKeys: []string{"foo"}, addAll: true},
},
expectedPostingGroups: []*postingGroup{
{name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true},
{name: "foo", addKeys: []string{"bar"}, cardinality: 1},
},
},
{
name: "posting group label with remove keys exist but no matching value, noop",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 8}},
"bar": {"baz": index.Range{Start: 8, End: 16}},
},
seriesMaxSize: 1000,
seriesMatchRatio: 0.5,
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", removeKeys: []string{"foo"}, addAll: true},
},
expectedPostingGroups: []*postingGroup{
{name: "bar", removeKeys: []string{"foo"}, cardinality: 0, addAll: true},
{name: "foo", addKeys: []string{"bar"}, cardinality: 1},
},
},
{
name: "posting group keys partial exist",
inputPostings: map[string]map[string]index.Range{
Expand Down
107 changes: 107 additions & 0 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,110 @@ func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error {

return iw.Close()
}

// CreateBlockWithChurn writes a block with the given series. Start time of each series
// will be randomized in the given time window to create churn. Only float chunk is supported right now.
func CreateBlockWithChurn(
ctx context.Context,
rnd *rand.Rand,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
extLset labels.Labels,
resolution int64,
scrapeInterval int64,
seriesSize int64,
) (id ulid.ULID, err error) {
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
headOpts.ChunkRange = 10000000000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer func() {
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil {
err = errors.Wrap(e, "delete chunks dir")
}
}()

app := h.Appender(ctx)
for i := 0; i < len(series); i++ {

var ref storage.SeriesRef
start := RandRange(rnd, mint, maxt)
for j := 0; j < numSamples; j++ {
if ref == 0 {
ref, err = app.Append(0, series[i], start, float64(i+j))
} else {
ref, err = app.Append(ref, series[i], start, float64(i+j))
}
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return id, errors.Wrap(err, "add sample")
}
start += scrapeInterval
if start > maxt {
break
}
}
}
if err := app.Commit(); err != nil {
return id, errors.Wrap(err, "commit")
}

c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil)
if err != nil {
return id, errors.Wrap(err, "create compactor")
}

id, err = c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}

if id.Compare(ulid.ULID{}) == 0 {
return id, errors.Errorf("nothing to write, asked for %d samples", numSamples)
}

blockDir := filepath.Join(dir, id.String())
logger := log.NewNopLogger()

if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize},
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}

return id, nil
}

// AddDelay rewrites a given block with delay.
func AddDelay(blockID ulid.ULID, dir string, blockDelay time.Duration) (ulid.ULID, error) {
id, err := ulid.New(uint64(timestamp.FromTime(timestamp.Time(int64(blockID.Time())).Add(-blockDelay))), bytes.NewReader(blockID.Entropy()))
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create block id")
}

bdir := path.Join(dir, blockID.String())
m, err := metadata.ReadFromDir(bdir)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "open meta file")
}

logger := log.NewNopLogger()
m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}
if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil {
return ulid.ULID{}, errors.Wrap(err, "write meta.json file")
}

return id, os.Rename(path.Join(dir, blockID.String()), path.Join(dir, id.String()))
}
11 changes: 11 additions & 0 deletions pkg/testutil/e2eutil/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package e2eutil

import "math/rand"

// RandRange returns a random int64 from [min, max].
func RandRange(rnd *rand.Rand, min, max int64) int64 {
return rnd.Int63n(max-min) + min
}
Loading
Loading