From 647435ac713d8ac6de6aa32e962e4de18b2f5ca9 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Thu, 21 Jan 2021 11:40:44 +0100 Subject: [PATCH] pkg/rules/proxy: fix hotlooping when receiving client errors Currently, if we receive an error from the underlying client stream, we continue with trying to receive additional data. This causes a hotloop as we will receive the same error again. This fixes it by returning in the error case and adds a unit test for the proxy logic. Fixes #3717 Signed-off-by: Sergiusz Urbaniak --- CHANGELOG.md | 5 + pkg/rules/proxy.go | 7 +- pkg/rules/proxy_test.go | 253 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 pkg/rules/proxy_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e4b09cbdc87..261c1e000d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ## Unreleased +### Fixed + +- [#TBD](https://github.com/thanos-io/thanos/pull/TBD) pkg/rules/proxy: fix hotlooping when receiving client errors + ## [v0.18.0](https://github.com/thanos-io/thanos/releases/tag/v0.18.0) - 2021.01.19 ### Added @@ -34,6 +38,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3560](https://github.com/thanos-io/thanos/pull/3560) Query Frontend: Allow separate label cache - [#3672](https://github.com/thanos-io/thanos/pull/3672) Rule: Prevent crashing due to `no such host error` when using `dnssrv+` or `dnssrvnoa+`. - [#3461](https://github.com/thanos-io/thanos/pull/3461) Compact, Shipper, Store: Fixed panic when no external labels are set in block metadata. +- [#TBD ](https://github.com/thanos-io/thanos/pull/TBD ) pkg/rules/proxy: fix hotlooping when receiving client errors ### Changed diff --git a/pkg/rules/proxy.go b/pkg/rules/proxy.go index 7910f0ef687..8a1493c9066 100644 --- a/pkg/rules/proxy.go +++ b/pkg/rules/proxy.go @@ -108,6 +108,9 @@ func (stream *rulesStream) receive(ctx context.Context) error { } if err != nil { + // An error happened in Recv(), hence the underlying stream is aborted + // as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + // We must not continue receiving additional data from it and must return. err = errors.Wrapf(err, "receiving rules from rules client %v", stream.client) if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { @@ -118,13 +121,15 @@ func (stream *rulesStream) receive(ctx context.Context) error { return errors.Wrapf(err, "sending rules error to server %v", stream.server) } - continue + // Return no error if response strategy is warning. + return nil } if w := rule.GetWarning(); w != "" { if err := stream.server.Send(rulespb.NewWarningRulesResponse(errors.New(w))); err != nil { return errors.Wrapf(err, "sending rules warning to server %v", stream.server) } + // Client stream is not aborted, it is ok to receive additional data. continue } diff --git a/pkg/rules/proxy_test.go b/pkg/rules/proxy_test.go new file mode 100644 index 00000000000..54e112ef000 --- /dev/null +++ b/pkg/rules/proxy_test.go @@ -0,0 +1,253 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "io" + "os" + "reflect" + "testing" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc" +) + +type testRulesClient struct { + grpc.ClientStream + rulesErr, recvErr error + response *rulespb.RulesResponse + sentResponse bool +} + +func (t *testRulesClient) String() string { + return "test" +} + +func (t *testRulesClient) Recv() (*rulespb.RulesResponse, error) { + // A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + if t.recvErr != nil { + return nil, t.recvErr + } + + if t.sentResponse { + return nil, io.EOF + } + t.sentResponse = true + + return t.response, nil +} + +func (t *testRulesClient) Rules(ctx context.Context, in *rulespb.RulesRequest, opts ...grpc.CallOption) (rulespb.Rules_RulesClient, error) { + return t, t.rulesErr +} + +var _ rulespb.RulesClient = &testRulesClient{} + +type testRulesServer struct { + grpc.ServerStream + sendErr error + response *rulespb.RulesResponse +} + +func (t *testRulesServer) String() string { + return "test" +} + +func (t *testRulesServer) Send(response *rulespb.RulesResponse) error { + if t.sendErr != nil { + return t.sendErr + } + t.response = response + return nil +} + +func (t *testRulesServer) Context() context.Context { + return context.Background() +} + +func TestProxy(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + + for _, tc := range []struct { + name string + request *rulespb.RulesRequest + client rulespb.RulesClient + server *testRulesServer + wantResponse *rulespb.RulesResponse + wantError error + }{ + { + name: "rule group proxy success", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ + Name: "foo", + }), + recvErr: nil, + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ + Name: "foo", + }), + }, + { + name: "warning proxy success", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewWarningRulesResponse(errors.New("warning from client")), + recvErr: nil, + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewWarningRulesResponse(errors.New("warning from client")), + }, + { + name: "warn: retreiving rules client failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + rulesErr: errors.New("retreiving rules failed"), + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewWarningRulesResponse(errors.New("fetching rules from rules client test: retreiving rules failed")), + }, + { + name: "warn: retreiving rules client failed, forward warning failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + rulesErr: errors.New("retreiving rules failed"), + }, + server: &testRulesServer{ + sendErr: errors.New("forwarding warning response failed"), + }, + wantError: errors.New("forwarding warning response failed"), + }, + { + name: "abort: retreiving rules client failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + client: &testRulesClient{ + response: nil, + rulesErr: errors.New("retreiving rules failed"), + }, + server: &testRulesServer{}, + wantError: errors.New("fetching rules from rules client test: retreiving rules failed"), + }, + { + name: "warn: receive failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + recvErr: errors.New("503 from Prometheus"), + }, + server: &testRulesServer{}, + wantResponse: rulespb.NewWarningRulesResponse(errors.New("receiving rules from rules client test: 503 from Prometheus")), + }, + { + name: "warn: receive failed, forward warning failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: nil, + recvErr: errors.New("503 from Prometheus"), + }, + server: &testRulesServer{ + sendErr: errors.New("forwarding warning response failed"), + }, + wantError: errors.New("sending rules error to server test: forwarding warning response failed"), + }, + { + name: "abort: receive failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + client: &testRulesClient{ + response: nil, + recvErr: errors.New("503 from Prometheus"), + }, + server: &testRulesServer{}, + wantError: errors.New("receiving rules from rules client test: 503 from Prometheus"), + }, + { + name: "send failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewRuleGroupRulesResponse(&rulespb.RuleGroup{ + Name: "foo", + }), + recvErr: nil, + }, + server: &testRulesServer{ + sendErr: errors.New("sending message failed"), + }, + wantError: errors.New("rpc error: code = Unknown desc = send rules response: sending message failed"), + }, + { + name: "sending warning response failed", + request: &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + client: &testRulesClient{ + response: rulespb.NewWarningRulesResponse(errors.New("warning from client")), + recvErr: nil, + }, + server: &testRulesServer{ + sendErr: errors.New("sending message failed"), + }, + wantError: errors.New("sending rules warning to server test: sending message failed"), + }, + } { + t.Run(tc.name, func(t *testing.T) { + p := NewProxy(logger, func() []rulespb.RulesClient { + return []rulespb.RulesClient{tc.client} + }) + + err := p.Rules(tc.request, tc.server) + gotErr := "" + if err != nil { + gotErr = err.Error() + } + wantErr := "" + if tc.wantError != nil { + wantErr = tc.wantError.Error() + } + + if gotErr != wantErr { + t.Errorf("want error %q, got %q", wantErr, gotErr) + } + + if !reflect.DeepEqual(tc.wantResponse, tc.server.response) { + t.Errorf("want response %v, got %v", tc.wantResponse, tc.server.response) + } + }) + } +}