Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Make <500 endpoint errors as invalid params error #3783

Merged
merged 6 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions src/query/server/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,37 @@ tagOptions:

promReq := test.GeneratePromWriteRequest()
promReqBody := test.GeneratePromWriteRequestBody(t, promReq)
req, err := http.NewRequestWithContext(
context.TODO(),
http.MethodPost,
fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL),
promReqBody,
)
require.NoError(t, err)
requestURL := fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL)
newRequest := func() *http.Request {
req, err := http.NewRequestWithContext(
context.TODO(),
http.MethodPost,
requestURL,
promReqBody,
)
require.NoError(t, err)
return req
}

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
assert.NotNil(t, externalFakePromServer.GetLastWriteRequest())
t.Run("write request", func(t *testing.T) {
defer externalFakePromServer.Reset()
resp, err := http.DefaultClient.Do(newRequest())
require.NoError(t, err)

assert.NotNil(t, externalFakePromServer.GetLastWriteRequest())
require.NoError(t, resp.Body.Close())
})

t.Run("bad request propagates", func(t *testing.T) {
defer externalFakePromServer.Reset()
externalFakePromServer.SetError("badRequest", http.StatusBadRequest)

resp, err := http.DefaultClient.Do(newRequest())
require.NoError(t, err)

assert.Equal(t, 400, resp.StatusCode)
require.NoError(t, resp.Body.Close())
})
}

func TestGRPCBackend(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions src/query/storage/promremote/promremotetest/test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ import (
type TestPromServer struct {
mu sync.Mutex
lastWriteRequest *prompb.WriteRequest
respErr error
respErr *respErr
t *testing.T
svr *httptest.Server
}

type respErr struct {
error string
status int
}

// NewServer creates new instance of a fake server.
func NewServer(t *testing.T) *TestPromServer {
testPromServer := &TestPromServer{t: t}
Expand All @@ -67,7 +72,7 @@ func (s *TestPromServer) handleWrite(w http.ResponseWriter, r *http.Request) {
}
s.lastWriteRequest = req
if s.respErr != nil {
http.Error(w, s.respErr.Error(), http.StatusInternalServerError)
http.Error(w, s.respErr.error, s.respErr.status)
return
}
}
Expand All @@ -85,10 +90,10 @@ func (s *TestPromServer) WriteAddr() string {
}

// SetError sets error that will be returned for all incoming requests.
func (s *TestPromServer) SetError(err error) {
func (s *TestPromServer) SetError(body string, status int) {
s.mu.Lock()
defer s.mu.Unlock()
s.respErr = err
s.respErr = &respErr{error: body, status: status}
}

// Reset resets state to default.
Expand Down
10 changes: 8 additions & 2 deletions src/query/storage/promremote/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,14 @@ func (p *promStorage) writeSingle(
p.logger.Error("error reading body", zap.Error(err))
response = errorReadingBody
}
return fmt.Errorf("expected status code 2XX: actual=%v, address=%v, resp=%s",
resp.StatusCode, address, response)
genericError := fmt.Errorf(
"expected status code 2XX: actual=%v, address=%v, resp=%s",
resp.StatusCode, address, response,
)
if resp.StatusCode < 500 && resp.StatusCode != http.StatusTooManyRequests {
return xerrors.NewInvalidParamsError(genericError)
}
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
return genericError
}
metrics.ReportSuccess(methodDuration)
return nil
Expand Down
162 changes: 92 additions & 70 deletions src/query/storage/promremote/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package promremote

import (
"context"
"errors"
"io"
"math/rand"
"net/http"
"testing"
"time"

Expand All @@ -39,17 +39,20 @@ import (
"github.com/m3db/m3/src/query/storage/m3/storagemetadata"
"github.com/m3db/m3/src/query/storage/promremote/promremotetest"
"github.com/m3db/m3/src/query/ts"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/tallytest"
xtime "github.com/m3db/m3/src/x/time"
)

var logger, _ = zap.NewDevelopment()
var (
logger, _ = zap.NewDevelopment()
scope = tally.NewTestScope("test_scope", map[string]string{})
)

func TestWrite(t *testing.T) {
fakeProm := promremotetest.NewServer(t)
defer fakeProm.Close()

scope := tally.NewTestScope("test_scope", map[string]string{})
promStorage, err := NewStorage(Options{
endpoints: []EndpointOptions{{name: "testEndpoint", address: fakeProm.WriteAddr()}},
scope: scope,
Expand Down Expand Up @@ -119,40 +122,37 @@ func TestWriteBasedOnRetention(t *testing.T) {
promLongRetention2.Reset()
}

scope := tally.NewTestScope("test_scope", map[string]string{})
mediumRetentionAttr := storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 720 * time.Hour,
Resolution: 5 * time.Minute,
}
shortRetentionAttr := storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 120 * time.Hour,
Resolution: 15 * time.Second,
}
longRetentionAttr := storagemetadata.Attributes{
Resolution: 10 * time.Minute,
Retention: 8760 * time.Hour,
}
promStorage, err := NewStorage(Options{
endpoints: []EndpointOptions{
{
address: promShortRetention.WriteAddr(),
attributes: storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 120 * time.Hour,
Resolution: 15 * time.Second,
},
address: promShortRetention.WriteAddr(),
attributes: shortRetentionAttr,
},
{
address: promMediumRetention.WriteAddr(),
attributes: storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 720 * time.Hour,
Resolution: 5 * time.Minute,
},
address: promMediumRetention.WriteAddr(),
attributes: mediumRetentionAttr,
},
{
address: promLongRetention.WriteAddr(),
attributes: storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 8760 * time.Hour,
Resolution: 10 * time.Minute,
},
address: promLongRetention.WriteAddr(),
attributes: longRetentionAttr,
},
{
address: promLongRetention2.WriteAddr(),
attributes: storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 8760 * time.Hour,
Resolution: 10 * time.Minute,
},
address: promLongRetention2.WriteAddr(),
attributes: longRetentionAttr,
},
},
scope: scope,
Expand All @@ -161,31 +161,9 @@ func TestWriteBasedOnRetention(t *testing.T) {
require.NoError(t, err)
defer closeWithCheck(t, promStorage)

sendWrite := func(attr storagemetadata.Attributes) error {
//nolint: gosec
datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()}
wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{
Tags: models.Tags{
Opts: models.NewTagOptions(),
Tags: []models.Tag{{
Name: []byte("test_tag_name"),
Value: []byte("test_tag_value"),
}},
},
Datapoints: ts.Datapoints{datapoint},
Unit: xtime.Millisecond,
Attributes: attr,
})
require.NoError(t, err)
return promStorage.Write(context.TODO(), wq)
}

t.Run("send short retention write", func(t *testing.T) {
reset()
err := sendWrite(storagemetadata.Attributes{
Retention: 120 * time.Hour,
Resolution: 15 * time.Second,
})
err := writeTestMetric(t, promStorage, shortRetentionAttr)
require.NoError(t, err)
assert.NotNil(t, promShortRetention.GetLastWriteRequest())
assert.Nil(t, promMediumRetention.GetLastWriteRequest())
Expand All @@ -194,10 +172,7 @@ func TestWriteBasedOnRetention(t *testing.T) {

t.Run("send medium retention write", func(t *testing.T) {
reset()
err := sendWrite(storagemetadata.Attributes{
Resolution: 5 * time.Minute,
Retention: 720 * time.Hour,
})
err := writeTestMetric(t, promStorage, mediumRetentionAttr)
require.NoError(t, err)
assert.Nil(t, promShortRetention.GetLastWriteRequest())
assert.NotNil(t, promMediumRetention.GetLastWriteRequest())
Expand All @@ -206,10 +181,7 @@ func TestWriteBasedOnRetention(t *testing.T) {

t.Run("send write to multiple instances configured with same retention", func(t *testing.T) {
reset()
err := sendWrite(storagemetadata.Attributes{
Resolution: 10 * time.Minute,
Retention: 8760 * time.Hour,
})
err := writeTestMetric(t, promStorage, longRetentionAttr)
require.NoError(t, err)
assert.Nil(t, promShortRetention.GetLastWriteRequest())
assert.Nil(t, promMediumRetention.GetLastWriteRequest())
Expand All @@ -219,14 +191,14 @@ func TestWriteBasedOnRetention(t *testing.T) {

t.Run("send unconfigured retention write", func(t *testing.T) {
reset()
err := sendWrite(storagemetadata.Attributes{
Resolution: 5*time.Minute + 1,
Retention: 720 * time.Hour,
err := writeTestMetric(t, promStorage, storagemetadata.Attributes{
Resolution: mediumRetentionAttr.Resolution + 1,
Retention: mediumRetentionAttr.Retention,
})
require.Error(t, err)
err = sendWrite(storagemetadata.Attributes{
Resolution: 5 * time.Minute,
Retention: 720*time.Hour + 1,
err = writeTestMetric(t, promStorage, storagemetadata.Attributes{
Resolution: mediumRetentionAttr.Resolution,
Retention: mediumRetentionAttr.Retention + 1,
})
require.Error(t, err)
assert.Contains(t, err.Error(), "write did not match any of known endpoints")
Expand All @@ -239,17 +211,67 @@ func TestWriteBasedOnRetention(t *testing.T) {

t.Run("error should not prevent sending to other instances", func(t *testing.T) {
reset()
promLongRetention.SetError(errors.New("test err"))
err := sendWrite(storagemetadata.Attributes{
Resolution: 10 * time.Minute,
Retention: 8760 * time.Hour,
})
promLongRetention.SetError("test err", http.StatusInternalServerError)
err := writeTestMetric(t, promStorage, longRetentionAttr)
require.Error(t, err)
assert.Contains(t, err.Error(), "test err")
assert.NotNil(t, promLongRetention2.GetLastWriteRequest())
})
}

func TestErrorHandling(t *testing.T) {
svr := promremotetest.NewServer(t)
defer svr.Close()

attr := storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 720 * time.Hour,
Resolution: 5 * time.Minute,
}
promStorage, err := NewStorage(Options{
endpoints: []EndpointOptions{{address: svr.WriteAddr(), attributes: attr}},
scope: scope,
logger: logger,
})
require.NoError(t, err)
defer closeWithCheck(t, promStorage)

t.Run("wrap non 5xx errors as invalid params error", func(t *testing.T) {
svr.Reset()
svr.SetError("test err", http.StatusForbidden)
err := writeTestMetric(t, promStorage, attr)
require.Error(t, err)
assert.True(t, xerrors.IsInvalidParams(err))
})

t.Run("429 should not be wrapped as invalid params", func(t *testing.T) {
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
svr.Reset()
svr.SetError("test err", http.StatusTooManyRequests)
err := writeTestMetric(t, promStorage, attr)
require.Error(t, err)
assert.False(t, xerrors.IsInvalidParams(err))
})
}

func closeWithCheck(t *testing.T, c io.Closer) {
require.NoError(t, c.Close())
}

func writeTestMetric(t *testing.T, s storage.Storage, attr storagemetadata.Attributes) error {
//nolint: gosec
datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()}
wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{
Tags: models.Tags{
Opts: models.NewTagOptions(),
Tags: []models.Tag{{
Name: []byte("test_tag_name"),
Value: []byte("test_tag_value"),
}},
},
Datapoints: ts.Datapoints{datapoint},
Unit: xtime.Millisecond,
Attributes: attr,
})
require.NoError(t, err)
return s.Write(context.TODO(), wq)
}