Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle warnings from remote engines #6619

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/thanos-io/promql-engine/api"

Expand Down Expand Up @@ -234,7 +235,11 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
return &promql.Result{Err: err}
}

result := make(promql.Matrix, 0)
var (
result = make(promql.Matrix, 0)
warnings storage.Warnings
)

for {
msg, err := qry.Recv()
if err == io.EOF {
Expand All @@ -245,7 +250,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}

if warn := msg.GetWarnings(); warn != "" {
return &promql.Result{Err: errors.New(warn)}
warnings = append(warnings, errors.New(warn))
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am wondering if things need to change since remote engine (can be any implementation) might return both warning and results the same time? Can we safely ignore the results?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it seems like either way we'd skip to the next iteration couple lines below if there are no series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we can only have either a warning or a series in a response, at least that's how I understand the oneOf clause in the proto:

message QueryResponse {
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I didn't realize it's oneof field, then this makes sense, thanks 👍

Copy link
Contributor

@yeya24 yeya24 Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the proto def in Thanos, right? It is defined by us.
For the Prometheus query api itself, it can return both warning and results the same time?
I wonder if this oneof makes sense or not. At least from what I understand Prometheus query api can have both fields set in the response

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the remote engine talks to our grpc API, and that one will send warnings in separate messages even if the result contains series data: https://github.com/thanos-io/thanos/blob/main/pkg/api/query/grpc.go#L131-L135.

We don't have a remote engine to query HTTP endpoints yet. If we decide to add it, we would need to handle cases where warnings and series are sent in the same response.

}

ts := msg.GetTimeseries()
Expand Down Expand Up @@ -273,7 +279,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}
level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start))

return &promql.Result{Value: result}
return &promql.Result{Value: result, Warnings: warnings}
}

func (r *remoteQuery) Close() { r.Cancel() }
Expand Down
45 changes: 45 additions & 0 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,41 @@
package query

import (
"context"
"io"
"math"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

func TestRemoteEngine_Warnings(t *testing.T) {
client := NewClient(&queryWarnClient{}, "", nil)
engine := newRemoteEngine(log.NewNopLogger(), client, Opts{
Timeout: 1 * time.Second,
})
var (
query = "up"
start = time.Unix(0, 0)
end = time.Unix(120, 0)
step = 30 * time.Second
)
qry, err := engine.NewRangeQuery(context.Background(), nil, query, start, end, step)
testutil.Ok(t, err)
res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
}

func TestRemoteEngine_LabelSets(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -164,3 +188,24 @@ func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet {
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings(ss...)),
}
}

type queryWarnClient struct {
querypb.QueryClient
}

func (m queryWarnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) {
return &queryRangeWarnClient{}, nil
}

type queryRangeWarnClient struct {
querypb.Query_QueryRangeClient
warnSent bool
}

func (m *queryRangeWarnClient) Recv() (*querypb.QueryRangeResponse, error) {
if m.warnSent {
return nil, io.EOF
}
m.warnSent = true
return querypb.NewQueryRangeWarningsResponse(errors.New("warning")), nil
}