From 1833d899ea98e77dc7d239521c48cfb42ba029e6 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Mon, 27 Sep 2021 16:07:19 +0300 Subject: [PATCH 1/4] [coordinator] Make <500 endpoint errors as invalid params error --- src/query/server/query_test.go | 45 ++++++++++++++----- .../promremote/promremotetest/test_server.go | 13 ++++-- src/query/storage/promremote/storage.go | 10 ++++- src/query/storage/promremote/storage_test.go | 27 ++++++++++- 4 files changed, 76 insertions(+), 19 deletions(-) diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index ab57f29b50..5c5a86675e 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -237,18 +237,41 @@ tagOptions: promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req, err := http.NewRequestWithContext( - context.TODO(), - http.MethodPost, - fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL), - promReqBody, - ) - require.NoError(t, err) + requestURL := fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL) + + t.Run("write request", func(t *testing.T) { + defer externalFakePromServer.Reset() + req, err := http.NewRequestWithContext( + context.TODO(), + http.MethodPost, + requestURL, + promReqBody, + ) + require.NoError(t, err) - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + }) + + t.Run("bad request propagates", func(t *testing.T) { + defer externalFakePromServer.Reset() + externalFakePromServer.SetError("badRequest", http.StatusBadRequest) + req, err := http.NewRequestWithContext( + context.TODO(), + http.MethodPost, + requestURL, + promReqBody, + ) + require.NoError(t, err) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + assert.Equal(t, 400, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + }) } func TestGRPCBackend(t *testing.T) { diff --git a/src/query/storage/promremote/promremotetest/test_server.go b/src/query/storage/promremote/promremotetest/test_server.go index 913d24de36..a3570f873e 100644 --- a/src/query/storage/promremote/promremotetest/test_server.go +++ b/src/query/storage/promremote/promremotetest/test_server.go @@ -37,11 +37,16 @@ import ( type TestPromServer struct { mu sync.Mutex lastWriteRequest *prompb.WriteRequest - respErr error + respErr *respErr t *testing.T svr *httptest.Server } +type respErr struct { + error string + status int +} + // NewServer creates new instance of a fake server. func NewServer(t *testing.T) *TestPromServer { testPromServer := &TestPromServer{t: t} @@ -67,7 +72,7 @@ func (s *TestPromServer) handleWrite(w http.ResponseWriter, r *http.Request) { } s.lastWriteRequest = req if s.respErr != nil { - http.Error(w, s.respErr.Error(), http.StatusInternalServerError) + http.Error(w, s.respErr.error, s.respErr.status) return } } @@ -85,10 +90,10 @@ func (s *TestPromServer) WriteAddr() string { } // SetError sets error that will be returned for all incoming requests. -func (s *TestPromServer) SetError(err error) { +func (s *TestPromServer) SetError(body string, status int) { s.mu.Lock() defer s.mu.Unlock() - s.respErr = err + s.respErr = &respErr{error: body, status: status} } // Reset resets state to default. diff --git a/src/query/storage/promremote/storage.go b/src/query/storage/promremote/storage.go index b514596821..442ee34eea 100644 --- a/src/query/storage/promremote/storage.go +++ b/src/query/storage/promremote/storage.go @@ -164,8 +164,14 @@ func (p *promStorage) writeSingle( p.logger.Error("error reading body", zap.Error(err)) response = errorReadingBody } - return fmt.Errorf("expected status code 2XX: actual=%v, address=%v, resp=%s", - resp.StatusCode, address, response) + genericError := fmt.Errorf( + "expected status code 2XX: actual=%v, address=%v, resp=%s", + resp.StatusCode, address, response, + ) + if resp.StatusCode < 500 && resp.StatusCode != http.StatusTooManyRequests { + return xerrors.NewInvalidParamsError(genericError) + } + return genericError } metrics.ReportSuccess(methodDuration) return nil diff --git a/src/query/storage/promremote/storage_test.go b/src/query/storage/promremote/storage_test.go index 4fa312c760..0d825a7163 100644 --- a/src/query/storage/promremote/storage_test.go +++ b/src/query/storage/promremote/storage_test.go @@ -22,9 +22,9 @@ package promremote import ( "context" - "errors" "io" "math/rand" + "net/http" "testing" "time" @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/query/storage/m3/storagemetadata" "github.com/m3db/m3/src/query/storage/promremote/promremotetest" "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" ) @@ -239,7 +240,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("error should not prevent sending to other instances", func(t *testing.T) { reset() - promLongRetention.SetError(errors.New("test err")) + promLongRetention.SetError("test err", http.StatusInternalServerError) err := sendWrite(storagemetadata.Attributes{ Resolution: 10 * time.Minute, Retention: 8760 * time.Hour, @@ -248,6 +249,28 @@ func TestWriteBasedOnRetention(t *testing.T) { assert.Contains(t, err.Error(), "test err") assert.NotNil(t, promLongRetention2.GetLastWriteRequest()) }) + + t.Run("wrap non 5xx errors into invalid params", func(t *testing.T) { + reset() + promLongRetention.SetError("test err", http.StatusForbidden) + err := sendWrite(storagemetadata.Attributes{ + Resolution: 10 * time.Minute, + Retention: 8760 * time.Hour, + }) + require.Error(t, err) + assert.True(t, xerrors.IsInvalidParams(err)) + }) + + t.Run("429 should not be mapped to invalid params", func(t *testing.T) { + reset() + promLongRetention.SetError("test err", http.StatusTooManyRequests) + err := sendWrite(storagemetadata.Attributes{ + Resolution: 10 * time.Minute, + Retention: 8760 * time.Hour, + }) + require.Error(t, err) + assert.False(t, xerrors.IsInvalidParams(err)) + }) } func closeWithCheck(t *testing.T, c io.Closer) { From 91b4259f13447b67c3f090696bc0f7aa90e08ee6 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Mon, 27 Sep 2021 16:14:08 +0300 Subject: [PATCH 2/4] centai --- src/query/storage/promremote/storage_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/storage/promremote/storage_test.go b/src/query/storage/promremote/storage_test.go index 0d825a7163..3ed1de77c7 100644 --- a/src/query/storage/promremote/storage_test.go +++ b/src/query/storage/promremote/storage_test.go @@ -250,7 +250,7 @@ func TestWriteBasedOnRetention(t *testing.T) { assert.NotNil(t, promLongRetention2.GetLastWriteRequest()) }) - t.Run("wrap non 5xx errors into invalid params", func(t *testing.T) { + t.Run("wrap non 5xx errors as invalid params error", func(t *testing.T) { reset() promLongRetention.SetError("test err", http.StatusForbidden) err := sendWrite(storagemetadata.Attributes{ @@ -261,7 +261,7 @@ func TestWriteBasedOnRetention(t *testing.T) { assert.True(t, xerrors.IsInvalidParams(err)) }) - t.Run("429 should not be mapped to invalid params", func(t *testing.T) { + t.Run("429 should not be wrapped as invalid params", func(t *testing.T) { reset() promLongRetention.SetError("test err", http.StatusTooManyRequests) err := sendWrite(storagemetadata.Attributes{ From 076ab76622cb54a2382afee559bc02733dc572b3 Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Mon, 27 Sep 2021 16:27:39 +0300 Subject: [PATCH 3/4] fix test --- src/query/server/query_test.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index 5c5a86675e..5a4eb6de97 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -238,9 +238,7 @@ tagOptions: promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) requestURL := fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL) - - t.Run("write request", func(t *testing.T) { - defer externalFakePromServer.Reset() + newRequest := func() *http.Request { req, err := http.NewRequestWithContext( context.TODO(), http.MethodPost, @@ -248,29 +246,27 @@ tagOptions: promReqBody, ) require.NoError(t, err) + return req + } - resp, err := http.DefaultClient.Do(req) + t.Run("write request", func(t *testing.T) { + defer externalFakePromServer.Reset() + resp, err := http.DefaultClient.Do(newRequest()) require.NoError(t, err) - require.NoError(t, resp.Body.Close()) + assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + require.NoError(t, resp.Body.Close()) }) t.Run("bad request propagates", func(t *testing.T) { defer externalFakePromServer.Reset() externalFakePromServer.SetError("badRequest", http.StatusBadRequest) - req, err := http.NewRequestWithContext( - context.TODO(), - http.MethodPost, - requestURL, - promReqBody, - ) - require.NoError(t, err) - resp, err := http.DefaultClient.Do(req) + resp, err := http.DefaultClient.Do(newRequest()) require.NoError(t, err) + assert.Equal(t, 400, resp.StatusCode) require.NoError(t, resp.Body.Close()) - assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) }) } From 1583d16688520f82f7a7a041e81751c128c9263e Mon Sep 17 00:00:00 2001 From: Antanas Bastys Date: Mon, 4 Oct 2021 09:33:50 +0300 Subject: [PATCH 4/4] split test --- src/query/storage/promremote/storage_test.go | 159 +++++++++---------- 1 file changed, 79 insertions(+), 80 deletions(-) diff --git a/src/query/storage/promremote/storage_test.go b/src/query/storage/promremote/storage_test.go index 3ed1de77c7..2e6a71e4b6 100644 --- a/src/query/storage/promremote/storage_test.go +++ b/src/query/storage/promremote/storage_test.go @@ -44,13 +44,15 @@ import ( xtime "github.com/m3db/m3/src/x/time" ) -var logger, _ = zap.NewDevelopment() +var ( + logger, _ = zap.NewDevelopment() + scope = tally.NewTestScope("test_scope", map[string]string{}) +) func TestWrite(t *testing.T) { fakeProm := promremotetest.NewServer(t) defer fakeProm.Close() - scope := tally.NewTestScope("test_scope", map[string]string{}) promStorage, err := NewStorage(Options{ endpoints: []EndpointOptions{{name: "testEndpoint", address: fakeProm.WriteAddr()}}, scope: scope, @@ -120,40 +122,37 @@ func TestWriteBasedOnRetention(t *testing.T) { promLongRetention2.Reset() } - scope := tally.NewTestScope("test_scope", map[string]string{}) + mediumRetentionAttr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 720 * time.Hour, + Resolution: 5 * time.Minute, + } + shortRetentionAttr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 120 * time.Hour, + Resolution: 15 * time.Second, + } + longRetentionAttr := storagemetadata.Attributes{ + Resolution: 10 * time.Minute, + Retention: 8760 * time.Hour, + } promStorage, err := NewStorage(Options{ endpoints: []EndpointOptions{ { - address: promShortRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 120 * time.Hour, - Resolution: 15 * time.Second, - }, + address: promShortRetention.WriteAddr(), + attributes: shortRetentionAttr, }, { - address: promMediumRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 720 * time.Hour, - Resolution: 5 * time.Minute, - }, + address: promMediumRetention.WriteAddr(), + attributes: mediumRetentionAttr, }, { - address: promLongRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 8760 * time.Hour, - Resolution: 10 * time.Minute, - }, + address: promLongRetention.WriteAddr(), + attributes: longRetentionAttr, }, { - address: promLongRetention2.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 8760 * time.Hour, - Resolution: 10 * time.Minute, - }, + address: promLongRetention2.WriteAddr(), + attributes: longRetentionAttr, }, }, scope: scope, @@ -162,31 +161,9 @@ func TestWriteBasedOnRetention(t *testing.T) { require.NoError(t, err) defer closeWithCheck(t, promStorage) - sendWrite := func(attr storagemetadata.Attributes) error { - //nolint: gosec - datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()} - wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{ - Tags: models.Tags{ - Opts: models.NewTagOptions(), - Tags: []models.Tag{{ - Name: []byte("test_tag_name"), - Value: []byte("test_tag_value"), - }}, - }, - Datapoints: ts.Datapoints{datapoint}, - Unit: xtime.Millisecond, - Attributes: attr, - }) - require.NoError(t, err) - return promStorage.Write(context.TODO(), wq) - } - t.Run("send short retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Retention: 120 * time.Hour, - Resolution: 15 * time.Second, - }) + err := writeTestMetric(t, promStorage, shortRetentionAttr) require.NoError(t, err) assert.NotNil(t, promShortRetention.GetLastWriteRequest()) assert.Nil(t, promMediumRetention.GetLastWriteRequest()) @@ -195,10 +172,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send medium retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 5 * time.Minute, - Retention: 720 * time.Hour, - }) + err := writeTestMetric(t, promStorage, mediumRetentionAttr) require.NoError(t, err) assert.Nil(t, promShortRetention.GetLastWriteRequest()) assert.NotNil(t, promMediumRetention.GetLastWriteRequest()) @@ -207,10 +181,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send write to multiple instances configured with same retention", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + err := writeTestMetric(t, promStorage, longRetentionAttr) require.NoError(t, err) assert.Nil(t, promShortRetention.GetLastWriteRequest()) assert.Nil(t, promMediumRetention.GetLastWriteRequest()) @@ -220,14 +191,14 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send unconfigured retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 5*time.Minute + 1, - Retention: 720 * time.Hour, + err := writeTestMetric(t, promStorage, storagemetadata.Attributes{ + Resolution: mediumRetentionAttr.Resolution + 1, + Retention: mediumRetentionAttr.Retention, }) require.Error(t, err) - err = sendWrite(storagemetadata.Attributes{ - Resolution: 5 * time.Minute, - Retention: 720*time.Hour + 1, + err = writeTestMetric(t, promStorage, storagemetadata.Attributes{ + Resolution: mediumRetentionAttr.Resolution, + Retention: mediumRetentionAttr.Retention + 1, }) require.Error(t, err) assert.Contains(t, err.Error(), "write did not match any of known endpoints") @@ -241,33 +212,42 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("error should not prevent sending to other instances", func(t *testing.T) { reset() promLongRetention.SetError("test err", http.StatusInternalServerError) - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + err := writeTestMetric(t, promStorage, longRetentionAttr) require.Error(t, err) assert.Contains(t, err.Error(), "test err") assert.NotNil(t, promLongRetention2.GetLastWriteRequest()) }) +} + +func TestErrorHandling(t *testing.T) { + svr := promremotetest.NewServer(t) + defer svr.Close() + + attr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 720 * time.Hour, + Resolution: 5 * time.Minute, + } + promStorage, err := NewStorage(Options{ + endpoints: []EndpointOptions{{address: svr.WriteAddr(), attributes: attr}}, + scope: scope, + logger: logger, + }) + require.NoError(t, err) + defer closeWithCheck(t, promStorage) t.Run("wrap non 5xx errors as invalid params error", func(t *testing.T) { - reset() - promLongRetention.SetError("test err", http.StatusForbidden) - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + svr.Reset() + svr.SetError("test err", http.StatusForbidden) + err := writeTestMetric(t, promStorage, attr) require.Error(t, err) assert.True(t, xerrors.IsInvalidParams(err)) }) t.Run("429 should not be wrapped as invalid params", func(t *testing.T) { - reset() - promLongRetention.SetError("test err", http.StatusTooManyRequests) - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + svr.Reset() + svr.SetError("test err", http.StatusTooManyRequests) + err := writeTestMetric(t, promStorage, attr) require.Error(t, err) assert.False(t, xerrors.IsInvalidParams(err)) }) @@ -276,3 +256,22 @@ func TestWriteBasedOnRetention(t *testing.T) { func closeWithCheck(t *testing.T, c io.Closer) { require.NoError(t, c.Close()) } + +func writeTestMetric(t *testing.T, s storage.Storage, attr storagemetadata.Attributes) error { + //nolint: gosec + datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()} + wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{ + Tags: models.Tags{ + Opts: models.NewTagOptions(), + Tags: []models.Tag{{ + Name: []byte("test_tag_name"), + Value: []byte("test_tag_value"), + }}, + }, + Datapoints: ts.Datapoints{datapoint}, + Unit: xtime.Millisecond, + Attributes: attr, + }) + require.NoError(t, err) + return s.Write(context.TODO(), wq) +}