From 4a2a4555d24665a52c3ed43e007301dd492af9b3 Mon Sep 17 00:00:00 2001 From: Filip Petkovski <filip.petkovsky@gmail.com> Date: Thu, 21 Mar 2024 09:57:33 +0100 Subject: [PATCH 1/2] Update thanos-io/promql-engine (#7215) * Update thanos-io/promql-engine This commit updates the promql-engine module to latest main and modifies to remote engine based on the breaking change. Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Fix lint Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> --------- Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> --- cmd/thanos/rule.go | 7 ++++++- go.mod | 2 +- go.sum | 4 ++-- pkg/query/remote_engine.go | 18 +++++++++--------- pkg/query/remote_engine_test.go | 5 ++++- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 1be8a8b406..999f418ba1 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -951,7 +951,12 @@ func queryFuncCreator( queryAPIClients := grpcEndpointSet.GetQueryAPIClients() for _, i := range rand.Perm(len(queryAPIClients)) { e := query.NewRemoteEngine(logger, queryAPIClients[i], query.Opts{}) - q, err := e.NewInstantQuery(ctx, nil, qs, t) + expr, err := parser.ParseExpr(qs) + if err != nil { + level.Error(logger).Log("err", err, "query", qs) + continue + } + q, err := e.NewInstantQuery(ctx, nil, expr, t) if err != nil { level.Error(logger).Log("err", err, "query", qs) continue diff --git a/go.mod b/go.mod index 58f4171a4b..c1d4718cdb 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 - github.com/thanos-io/promql-engine v0.0.0-20240125175542-4a8e9731acba + github.com/thanos-io/promql-engine v0.0.0-20240318110350-23714ea2522d github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a diff --git a/go.sum b/go.sum index 11eb3def35..d56dcd1e74 100644 --- a/go.sum +++ b/go.sum @@ -1536,8 +1536,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 h1:gx2MTto1UQRumGoJzY3aFPQ31Ov3nOV7NaD7j6q288k= github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98/go.mod h1:JauBAcJ61tRSv9widgISVmA6akQXDeUMXBrVmWW4xog= -github.com/thanos-io/promql-engine v0.0.0-20240125175542-4a8e9731acba h1:BFohBPqCWBpbqNO3F3lC2uZ0egSfPGQoSDloTRraPHU= -github.com/thanos-io/promql-engine v0.0.0-20240125175542-4a8e9731acba/go.mod h1:YGk7VqhYDfhUyZjWK7ZU1JmBQKSvr5mT5Txut8oK1MA= +github.com/thanos-io/promql-engine v0.0.0-20240318110350-23714ea2522d h1:/6Gy8ul/6iKHaAg3OhaoPmph2TRAlansv4z+VAbTOKk= +github.com/thanos-io/promql-engine v0.0.0-20240318110350-23714ea2522d/go.mod h1:YGk7VqhYDfhUyZjWK7ZU1JmBQKSvr5mT5Txut8oK1MA= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index 1896d7b641..c48768b233 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -181,26 +181,26 @@ func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos { return infos } -func (r *remoteEngine) NewRangeQuery(_ context.Context, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { +func (r *remoteEngine) NewRangeQuery(_ context.Context, _ promql.QueryOpts, plan parser.Expr, start, end time.Time, interval time.Duration) (promql.Query, error) { return &remoteQuery{ logger: r.logger, client: r.client, opts: r.opts, - qs: qs, + plan: plan, start: start, end: end, interval: interval, }, nil } -func (r *remoteEngine) NewInstantQuery(_ context.Context, _ promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { +func (r *remoteEngine) NewInstantQuery(_ context.Context, _ promql.QueryOpts, plan parser.Expr, ts time.Time) (promql.Query, error) { return &remoteQuery{ logger: r.logger, client: r.client, opts: r.opts, - qs: qs, + plan: plan, start: ts, end: ts, interval: 0, @@ -212,7 +212,7 @@ type remoteQuery struct { client Client opts Opts - qs string + plan parser.Expr start time.Time end time.Time interval time.Duration @@ -235,7 +235,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { // Instant query. if r.start == r.end { request := &querypb.QueryRequest{ - Query: r.qs, + Query: r.plan.String(), TimeSeconds: r.start.Unix(), TimeoutSeconds: int64(r.opts.Timeout.Seconds()), EnablePartialResponse: r.opts.EnablePartialResponse, @@ -286,7 +286,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } request := &querypb.QueryRangeRequest{ - Query: r.qs, + Query: r.plan.String(), StartTimeSeconds: r.start.Unix(), EndTimeSeconds: r.end.Unix(), IntervalSeconds: int64(r.interval.Seconds()), @@ -349,7 +349,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } result = append(result, series) } - level.Debug(r.logger).Log("msg", "Executed query", "query", r.qs, "time", time.Since(start)) + level.Debug(r.logger).Log("msg", "Executed query", "query", r.plan, "time", time.Since(start)) return &promql.Result{Value: result, Warnings: warnings} } @@ -360,7 +360,7 @@ func (r *remoteQuery) Statement() parser.Statement { return nil } func (r *remoteQuery) Stats() *stats.Statistics { return nil } -func (r *remoteQuery) String() string { return r.qs } +func (r *remoteQuery) String() string { return r.plan.String() } func (r *remoteQuery) Cancel() { if r.cancel != nil { diff --git a/pkg/query/remote_engine_test.go b/pkg/query/remote_engine_test.go index 224acc6039..46c1159d1c 100644 --- a/pkg/query/remote_engine_test.go +++ b/pkg/query/remote_engine_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/api/query/querypb" @@ -27,11 +28,13 @@ func TestRemoteEngine_Warnings(t *testing.T) { Timeout: 1 * time.Second, }) var ( - query = "up" start = time.Unix(0, 0) end = time.Unix(120, 0) step = 30 * time.Second ) + query, err := parser.ParseExpr("up") + testutil.Ok(t, err) + qry, err := engine.NewRangeQuery(context.Background(), nil, query, start, end, step) testutil.Ok(t, err) res := qry.Exec(context.Background()) From deb615fff62c3ca0301e229e2fb0c50fb99ae32f Mon Sep 17 00:00:00 2001 From: Ben Ye <benye@amazon.com> Date: Sat, 23 Mar 2024 02:05:27 -0700 Subject: [PATCH 2/2] expose NewPromSeriesSet (#7214) Signed-off-by: Ben Ye <benye@amazon.com> --- pkg/query/iter.go | 11 +++++++++++ pkg/query/querier.go | 28 ++++++++++++++-------------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pkg/query/iter.go b/pkg/query/iter.go index d55027df69..7bee002df2 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -27,6 +27,17 @@ type promSeriesSet struct { warns annotations.Annotations } +// NewPromSeriesSet constructs a promSeriesSet. +func NewPromSeriesSet(seriesSet storepb.SeriesSet, mint, maxt int64, aggrs []storepb.Aggr, warns annotations.Annotations) storage.SeriesSet { + return &promSeriesSet{ + set: seriesSet, + mint: mint, + maxt: maxt, + aggrs: aggrs, + warns: warns, + } +} + func (s *promSeriesSet) Next() bool { return s.set.Next() } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 0cfcc2ad21..d55285b459 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -364,25 +364,25 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns := annotations.New().Merge(resp.warnings) if !q.isDedupEnabled() { - return &promSeriesSet{ - mint: q.mint, - maxt: q.maxt, - set: newStoreSeriesSet(resp.seriesSet), - aggrs: aggrs, - warns: warns, - }, resp.seriesSetStats, nil + return NewPromSeriesSet( + newStoreSeriesSet(resp.seriesSet), + q.mint, + q.maxt, + aggrs, + warns, + ), resp.seriesSetStats, nil } // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. - set := &promSeriesSet{ - mint: q.mint, - maxt: q.maxt, - set: dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)), - aggrs: aggrs, - warns: warns, - } + set := NewPromSeriesSet( + dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)), + q.mint, + q.maxt, + aggrs, + warns, + ) return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil }