Skip to content

Commit

Permalink
Handle warnings from remote engines (thanos-io#6619)
Browse files Browse the repository at this point in the history
The remote engine implementation currently converts warnings to errors.
This prevents using partial response with the distributed engine since every
warning will cause a query to fail.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored and coleenquadros committed Sep 18, 2023
1 parent 08d5d74 commit fd659f7
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
12 changes: 9 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/thanos-io/promql-engine/api"

Expand Down Expand Up @@ -234,7 +235,11 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
return &promql.Result{Err: err}
}

result := make(promql.Matrix, 0)
var (
result = make(promql.Matrix, 0)
warnings storage.Warnings
)

for {
msg, err := qry.Recv()
if err == io.EOF {
Expand All @@ -245,7 +250,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}

if warn := msg.GetWarnings(); warn != "" {
return &promql.Result{Err: errors.New(warn)}
warnings = append(warnings, errors.New(warn))
continue
}

ts := msg.GetTimeseries()
Expand Down Expand Up @@ -273,7 +279,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}
level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start))

return &promql.Result{Value: result}
return &promql.Result{Value: result, Warnings: warnings}
}

func (r *remoteQuery) Close() { r.Cancel() }
Expand Down
45 changes: 45 additions & 0 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,41 @@
package query

import (
"context"
"io"
"math"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

func TestRemoteEngine_Warnings(t *testing.T) {
client := NewClient(&queryWarnClient{}, "", nil)
engine := newRemoteEngine(log.NewNopLogger(), client, Opts{
Timeout: 1 * time.Second,
})
var (
query = "up"
start = time.Unix(0, 0)
end = time.Unix(120, 0)
step = 30 * time.Second
)
qry, err := engine.NewRangeQuery(context.Background(), nil, query, start, end, step)
testutil.Ok(t, err)
res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
}

func TestRemoteEngine_LabelSets(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -164,3 +188,24 @@ func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet {
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings(ss...)),
}
}

type queryWarnClient struct {
querypb.QueryClient
}

func (m queryWarnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) {
return &queryRangeWarnClient{}, nil
}

type queryRangeWarnClient struct {
querypb.Query_QueryRangeClient
warnSent bool
}

func (m *queryRangeWarnClient) Recv() (*querypb.QueryRangeResponse, error) {
if m.warnSent {
return nil, io.EOF
}
m.warnSent = true
return querypb.NewQueryRangeWarningsResponse(errors.New("warning")), nil
}

0 comments on commit fd659f7

Please sign in to comment.