diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index cb2447780b..234d9db573 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/stats" "github.com/thanos-io/promql-engine/api" @@ -234,7 +235,11 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { return &promql.Result{Err: err} } - result := make(promql.Matrix, 0) + var ( + result = make(promql.Matrix, 0) + warnings storage.Warnings + ) + for { msg, err := qry.Recv() if err == io.EOF { @@ -245,7 +250,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } if warn := msg.GetWarnings(); warn != "" { - return &promql.Result{Err: errors.New(warn)} + warnings = append(warnings, errors.New(warn)) + continue } ts := msg.GetTimeseries() @@ -273,7 +279,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start)) - return &promql.Result{Value: result} + return &promql.Result{Value: result, Warnings: warnings} } func (r *remoteQuery) Close() { r.Cancel() } diff --git a/pkg/query/remote_engine_test.go b/pkg/query/remote_engine_test.go index e9b75961e0..7ecfdf2301 100644 --- a/pkg/query/remote_engine_test.go +++ b/pkg/query/remote_engine_test.go @@ -4,17 +4,41 @@ package query import ( + "context" + "io" "math" "testing" + "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "google.golang.org/grpc" + "github.com/thanos-io/thanos/pkg/api/query/querypb" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" ) +func TestRemoteEngine_Warnings(t *testing.T) { + client := NewClient(&queryWarnClient{}, "", nil) + engine := newRemoteEngine(log.NewNopLogger(), client, Opts{ + Timeout: 1 * time.Second, + }) + var ( + query = "up" + start = time.Unix(0, 0) + end = time.Unix(120, 0) + step = 30 * time.Second + ) + qry, err := engine.NewRangeQuery(context.Background(), nil, query, start, end, step) + testutil.Ok(t, err) + res := qry.Exec(context.Background()) + testutil.Ok(t, res.Err) + testutil.Equals(t, 1, len(res.Warnings)) +} + func TestRemoteEngine_LabelSets(t *testing.T) { tests := []struct { name string @@ -164,3 +188,24 @@ func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet { Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings(ss...)), } } + +type queryWarnClient struct { + querypb.QueryClient +} + +func (m queryWarnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) { + return &queryRangeWarnClient{}, nil +} + +type queryRangeWarnClient struct { + querypb.Query_QueryRangeClient + warnSent bool +} + +func (m *queryRangeWarnClient) Recv() (*querypb.QueryRangeResponse, error) { + if m.warnSent { + return nil, io.EOF + } + m.warnSent = true + return querypb.NewQueryRangeWarningsResponse(errors.New("warning")), nil +}