Skip to content

Commit

Permalink
Handle warnings from remote engines (thanos-io#6619)
Browse files Browse the repository at this point in the history
The remote engine implementation currently converts warnings to errors.
This prevents using partial response with the distributed engine since every
warning will cause a query to fail.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored and harsh-ps-2003 committed Aug 22, 2023
1 parent 680772b commit d8a6064
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
12 changes: 9 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/thanos-io/promql-engine/api"

Expand Down Expand Up @@ -234,7 +235,11 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
return &promql.Result{Err: err}
}

result := make(promql.Matrix, 0)
var (
result = make(promql.Matrix, 0)
warnings storage.Warnings
)

for {
msg, err := qry.Recv()
if err == io.EOF {
Expand All @@ -245,7 +250,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}

if warn := msg.GetWarnings(); warn != "" {
return &promql.Result{Err: errors.New(warn)}
warnings = append(warnings, errors.New(warn))
continue
}

ts := msg.GetTimeseries()
Expand Down Expand Up @@ -273,7 +279,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}
level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start))

return &promql.Result{Value: result}
return &promql.Result{Value: result, Warnings: warnings}
}

func (r *remoteQuery) Close() { r.Cancel() }
Expand Down
45 changes: 45 additions & 0 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,41 @@
package query

import (
"context"
"io"
"math"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

func TestRemoteEngine_Warnings(t *testing.T) {
client := NewClient(&queryWarnClient{}, "", nil)
engine := newRemoteEngine(log.NewNopLogger(), client, Opts{
Timeout: 1 * time.Second,
})
var (
query = "up"
start = time.Unix(0, 0)
end = time.Unix(120, 0)
step = 30 * time.Second
)
qry, err := engine.NewRangeQuery(context.Background(), nil, query, start, end, step)
testutil.Ok(t, err)
res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
}

func TestRemoteEngine_LabelSets(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -164,3 +188,24 @@ func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet {
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings(ss...)),
}
}

type queryWarnClient struct {
querypb.QueryClient
}

func (m queryWarnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) {
return &queryRangeWarnClient{}, nil
}

type queryRangeWarnClient struct {
querypb.Query_QueryRangeClient
warnSent bool
}

func (m *queryRangeWarnClient) Recv() (*querypb.QueryRangeResponse, error) {
if m.warnSent {
return nil, io.EOF
}
m.warnSent = true
return querypb.NewQueryRangeWarningsResponse(errors.New("warning")), nil
}

0 comments on commit d8a6064

Please sign in to comment.