From eea6199b69e1de0901ddbf3a5fa2a72a3cb5dea9 Mon Sep 17 00:00:00 2001 From: arnikola Date: Fri, 17 Apr 2020 10:54:12 -0400 Subject: [PATCH] [query] Refactor query code, add warnings to prom output (#2265) --- src/cmd/services/m3query/config/config.go | 14 +- .../services/m3query/config/config_test.go | 74 ++--- src/query/api/v1/handler/close.go | 106 +++++-- src/query/api/v1/handler/close_test.go | 14 +- .../database/config_bootstrappers_get_test.go | 14 +- .../database/config_bootstrappers_set_test.go | 48 ++-- .../api/v1/handler/database/create_test.go | 240 ++++++++-------- .../api/v1/handler/namespace/add_test.go | 14 +- .../api/v1/handler/namespace/schema_test.go | 38 +-- .../api/v1/handler/placement/set_test.go | 7 +- .../v1/handler/prometheus/native/common.go | 37 ++- .../handler/prometheus/native/common_test.go | 269 ++++++++++-------- .../prometheus/native/parse_query_test.go | 4 +- .../prometheus/native/parse_threshold_test.go | 4 +- .../api/v1/handler/prometheus/native/read.go | 220 ++++---------- .../handler/prometheus/native/read_common.go | 158 ++++++++-- .../prometheus/native/read_instantaneous.go | 127 --------- .../native/read_instantaneous_test.go | 63 ++-- .../v1/handler/prometheus/native/read_test.go | 123 ++++---- .../api/v1/handler/prometheus/remote/read.go | 146 ++++++---- .../v1/handler/prometheus/remote/read_test.go | 172 +++++------ src/query/api/v1/httpd/handler.go | 22 +- src/query/api/v1/httpd/handler_test.go | 10 +- src/query/api/v1/options/handler.go | 17 +- src/query/block/meta.go | 19 ++ src/query/server/cost_reporters_test.go | 2 +- src/query/server/query.go | 11 +- src/query/storage/types.go | 2 + src/x/json/json.go | 45 +++ src/x/test/diff.go | 13 +- 30 files changed, 1105 insertions(+), 928 deletions(-) delete mode 100644 src/query/api/v1/handler/prometheus/native/read_instantaneous.go create mode 100644 src/x/json/json.go diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 303687a8b4..5e2f5b1948 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -217,7 +217,7 @@ func (c QueryConfiguration) TimeoutOrDefault() time.Duration { // instance. Limits are split between per-query and global limits. type LimitsConfiguration struct { // deprecated: use PerQuery.MaxComputedDatapoints instead. - DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` + DeprecatedMaxComputedDatapoints int `yaml:"maxComputedDatapoints"` // Global configures limits which apply across all queries running on this // instance. @@ -232,7 +232,7 @@ type LimitsConfiguration struct { // LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints. See // LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints for a comment on // the semantics. -func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 { +func (lc LimitsConfiguration) MaxComputedDatapoints() int { if lc.PerQuery.PrivateMaxComputedDatapoints != 0 { return lc.PerQuery.PrivateMaxComputedDatapoints } @@ -245,7 +245,7 @@ func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 { type GlobalLimitsConfiguration struct { // MaxFetchedDatapoints limits the total number of datapoints actually // fetched by all queries at any given time. - MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"` + MaxFetchedDatapoints int `yaml:"maxFetchedDatapoints"` } // AsLimitManagerOptions converts this configuration to @@ -264,14 +264,14 @@ type PerQueryLimitsConfiguration struct { // N.B.: the hacky "Private" prefix is to indicate that callers should use // LimitsConfiguration.MaxComputedDatapoints() instead of accessing // this field directly. - PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` + PrivateMaxComputedDatapoints int `yaml:"maxComputedDatapoints"` // MaxFetchedDatapoints limits the number of datapoints actually used by a // given query. - MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"` + MaxFetchedDatapoints int `yaml:"maxFetchedDatapoints"` // MaxFetchedSeries limits the number of time series returned by a storage node. - MaxFetchedSeries int64 `yaml:"maxFetchedSeries"` + MaxFetchedSeries int `yaml:"maxFetchedSeries"` } // AsLimitManagerOptions converts this configuration to @@ -294,7 +294,7 @@ func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleropti } } -func toLimitManagerOptions(limit int64) cost.LimitManagerOptions { +func toLimitManagerOptions(limit int) cost.LimitManagerOptions { return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{ Threshold: cost.Cost(limit), Enabled: limit > 0, diff --git a/src/cmd/services/m3query/config/config_test.go b/src/cmd/services/m3query/config/config_test.go index d77dbe49df..82a3fc6756 100644 --- a/src/cmd/services/m3query/config/config_test.go +++ b/src/cmd/services/m3query/config/config_test.go @@ -72,36 +72,36 @@ func TestTagOptionsFromConfig(t *testing.T) { assert.Equal(t, []byte(name), opts.MetricName()) } -func TestLimitsConfiguration_AsLimitManagerOptions(t *testing.T) { +func TestLimitsConfigurationAsLimitManagerOptions(t *testing.T) { cases := []struct { - Input interface { + input interface { AsLimitManagerOptions() cost.LimitManagerOptions } - ExpectedDefault int64 + expectedDefault int }{{ - Input: &PerQueryLimitsConfiguration{ + input: &PerQueryLimitsConfiguration{ MaxFetchedDatapoints: 5, }, - ExpectedDefault: 5, + expectedDefault: 5, }, { - Input: &GlobalLimitsConfiguration{ + input: &GlobalLimitsConfiguration{ MaxFetchedDatapoints: 6, }, - ExpectedDefault: 6, + expectedDefault: 6, }} for _, tc := range cases { - t.Run(fmt.Sprintf("type_%T", tc.Input), func(t *testing.T) { - res := tc.Input.AsLimitManagerOptions() + t.Run(fmt.Sprintf("type_%T", tc.input), func(t *testing.T) { + res := tc.input.AsLimitManagerOptions() assert.Equal(t, cost.Limit{ - Threshold: cost.Cost(tc.ExpectedDefault), + Threshold: cost.Cost(tc.expectedDefault), Enabled: true, }, res.DefaultLimit()) }) } } -func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) { +func TestLimitsConfigurationMaxComputedDatapoints(t *testing.T) { t.Run("uses PerQuery value if provided", func(t *testing.T) { lc := &LimitsConfiguration{ DeprecatedMaxComputedDatapoints: 6, @@ -110,7 +110,7 @@ func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) { }, } - assert.Equal(t, int64(5), lc.MaxComputedDatapoints()) + assert.Equal(t, 5, lc.MaxComputedDatapoints()) }) t.Run("uses deprecated value if PerQuery not provided", func(t *testing.T) { @@ -118,41 +118,41 @@ func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) { DeprecatedMaxComputedDatapoints: 6, } - assert.Equal(t, int64(6), lc.MaxComputedDatapoints()) + assert.Equal(t, 6, lc.MaxComputedDatapoints()) }) } func TestToLimitManagerOptions(t *testing.T) { cases := []struct { - Name string - Input int64 - ExpectedLimit cost.Limit + name string + input int + expectedLimit cost.Limit }{{ - Name: "negative is disabled", - Input: -5, - ExpectedLimit: cost.Limit{ + name: "negative is disabled", + input: -5, + expectedLimit: cost.Limit{ Threshold: cost.Cost(-5), Enabled: false, }, }, { - Name: "zero is disabled", - Input: 0, - ExpectedLimit: cost.Limit{ + name: "zero is disabled", + input: 0, + expectedLimit: cost.Limit{ Threshold: cost.Cost(0), Enabled: false, }, }, { - Name: "positive is enabled", - Input: 5, - ExpectedLimit: cost.Limit{ + name: "positive is enabled", + input: 5, + expectedLimit: cost.Limit{ Threshold: cost.Cost(5), Enabled: true, }, }} for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.ExpectedLimit, toLimitManagerOptions(tc.Input).DefaultLimit()) + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedLimit, toLimitManagerOptions(tc.input).DefaultLimit()) }) } } @@ -185,25 +185,25 @@ func TestConfigValidation(t *testing.T) { // limits configuration limitsCfgCases := []struct { - Name string - Limit int64 + name string + limit int }{{ - Name: "empty LimitsConfiguration is valid (implies disabled)", - Limit: 0, + name: "empty LimitsConfiguration is valid (implies disabled)", + limit: 0, }, { - Name: "LimitsConfiguration with positive limit is valid", - Limit: 5, + name: "LimitsConfiguration with positive limit is valid", + limit: 5, }, {}, { - Name: "LimitsConfiguration with negative limit is valid (implies disabled)", - Limit: -5, + name: "LimitsConfiguration with negative limit is valid (implies disabled)", + limit: -5, }} for _, tc := range limitsCfgCases { - t.Run(tc.Name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { cfg := baseCfg(t) cfg.Limits = LimitsConfiguration{ PerQuery: PerQueryLimitsConfiguration{ - PrivateMaxComputedDatapoints: tc.Limit, + PrivateMaxComputedDatapoints: tc.limit, }} assert.NoError(t, validator.Validate(cfg)) diff --git a/src/query/api/v1/handler/close.go b/src/query/api/v1/handler/close.go index 21d0231541..1487e47851 100644 --- a/src/query/api/v1/handler/close.go +++ b/src/query/api/v1/handler/close.go @@ -28,29 +28,101 @@ import ( "github.com/m3db/m3/src/x/instrument" ) -// CloseWatcher watches for CloseNotify and context timeout. It is best effort and may sometimes not close the channel relying on gc +// CancelWatcher is an interface that wraps a WatchForCancel method. +// TODO: make this generic middleware, rather than applied per function. +type CancelWatcher interface { + // WatchForCancel watches on the given context, and applies + // the given cancellation function. + WatchForCancel(context.Context, context.CancelFunc) +} + +type canceller struct { + notifier http.CloseNotifier + iOpts instrument.Options +} + +func (c *canceller) WatchForCancel( + ctx context.Context, + cancel context.CancelFunc, +) { + logger := logging.WithContext(ctx, c.iOpts) + notify := c.notifier.CloseNotify() + go func() { + // Wait for either the request to finish + // or for the client to disconnect + select { + case <-notify: + logger.Warn("connection closed by client") + cancel() + case <-ctx.Done(): + // We only care about the time out case and not other cancellations + if ctx.Err() == context.DeadlineExceeded { + logger.Warn("request timed out") + } + } + }() +} + +type ctxCanceller struct { + iOpts instrument.Options +} + +func (c *ctxCanceller) WatchForCancel( + ctx context.Context, _ context.CancelFunc, +) { + logger := logging.WithContext(ctx, c.iOpts) + go func() { + select { + case <-ctx.Done(): + // We only care about the time out case and not other cancellations + if ctx.Err() == context.DeadlineExceeded { + logger.Warn("request timed out") + } + } + }() +} + +// NewResponseWriterCanceller creates a canceller on the given context with +// the given response writer. +func NewResponseWriterCanceller( + w http.ResponseWriter, + iOpts instrument.Options, +) CancelWatcher { + notifier, ok := w.(http.CloseNotifier) + if !ok { + return &ctxCanceller{iOpts: iOpts} + } + + return &canceller{notifier: notifier, iOpts: iOpts} +} + +// CloseWatcher watches for CloseNotify and context timeout. +// It is best effort and may sometimes not close the channel relying on GC. func CloseWatcher( ctx context.Context, cancel context.CancelFunc, w http.ResponseWriter, instrumentOpts instrument.Options, ) { + notifier, ok := w.(http.CloseNotifier) + if !ok { + return + } + logger := logging.WithContext(ctx, instrumentOpts) - if notifier, ok := w.(http.CloseNotifier); ok { - notify := notifier.CloseNotify() - go func() { - // Wait for either the request to finish - // or for the client to disconnect - select { - case <-notify: - logger.Warn("connection closed by client") - cancel() - case <-ctx.Done(): - // We only care about the time out case and not other cancellations - if ctx.Err() == context.DeadlineExceeded { - logger.Warn("request timed out") - } + notify := notifier.CloseNotify() + go func() { + // Wait for either the request to finish + // or for the client to disconnect + select { + case <-notify: + logger.Warn("connection closed by client") + cancel() + case <-ctx.Done(): + // We only care about the time out case and not other cancellations + if ctx.Err() == context.DeadlineExceeded { + logger.Warn("request timed out") } - }() - } + } + }() } diff --git a/src/query/api/v1/handler/close_test.go b/src/query/api/v1/handler/close_test.go index a9222962ff..1a748ce39a 100644 --- a/src/query/api/v1/handler/close_test.go +++ b/src/query/api/v1/handler/close_test.go @@ -35,7 +35,17 @@ func TestCloseWatcher(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) w := httptest.NewRecorder() CloseWatcher(ctx, cancel, w, instrument.NewOptions()) - assert.Nil(t, ctx.Err()) + assert.NoError(t, ctx.Err()) time.Sleep(100 * time.Millisecond) - assert.NotNil(t, ctx.Err()) + assert.Error(t, ctx.Err()) +} + +func TestResponseWriteCanceller(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + w := httptest.NewRecorder() + canceller := NewResponseWriterCanceller(w, instrument.NewOptions()) + canceller.WatchForCancel(ctx, cancel) + assert.NoError(t, ctx.Err()) + time.Sleep(100 * time.Millisecond) + assert.Error(t, ctx.Err()) } diff --git a/src/query/api/v1/handler/database/config_bootstrappers_get_test.go b/src/query/api/v1/handler/database/config_bootstrappers_get_test.go index b9cd53d92c..df46f40f98 100644 --- a/src/query/api/v1/handler/database/config_bootstrappers_get_test.go +++ b/src/query/api/v1/handler/database/config_bootstrappers_get_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/dbnode/kvconfig" "github.com/m3db/m3/src/x/instrument" + xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" "github.com/gogo/protobuf/proto" @@ -71,13 +72,14 @@ func TestConfigGetBootstrappersHandler(t *testing.T) { assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) - expectedResponse := ` - { - "values": ["filesystem", "commitlog", "peers", "uninitialized_topology"] + expectedResp := xjson.Map{ + "values": xjson.Array{"filesystem", "commitlog", "peers", "uninitialized_topology"}, } - ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + + expected := xtest.MustPrettyJSONMap(t, expectedResp) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestConfigGetBootstrappersHandlerNotFound(t *testing.T) { diff --git a/src/query/api/v1/handler/database/config_bootstrappers_set_test.go b/src/query/api/v1/handler/database/config_bootstrappers_set_test.go index 4faeea5174..8ae3a3fe8e 100644 --- a/src/query/api/v1/handler/database/config_bootstrappers_set_test.go +++ b/src/query/api/v1/handler/database/config_bootstrappers_set_test.go @@ -24,12 +24,12 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "strings" "testing" "github.com/m3db/m3/src/cluster/generated/proto/commonpb" "github.com/m3db/m3/src/dbnode/kvconfig" "github.com/m3db/m3/src/x/instrument" + xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" @@ -46,11 +46,9 @@ func TestConfigSetBootstrappersHandler(t *testing.T) { instrument.NewOptions()) w := httptest.NewRecorder() - jsonInput := ` - { - "values": ["filesystem", "commitlog", "peers", "uninitialized_topology"] - } - ` + jsonInput := xjson.Map{ + "values": xjson.Array{"filesystem", "commitlog", "peers", "uninitialized_topology"}, + } mockStore.EXPECT(). Set(kvconfig.BootstrapperKey, gomock.Any()). @@ -61,7 +59,8 @@ func TestConfigSetBootstrappersHandler(t *testing.T) { }, value.Values) }) - req := httptest.NewRequest("POST", "/database/config/bootstrappers", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/config/bootstrappers", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) handler.ServeHTTP(w, req) @@ -71,13 +70,14 @@ func TestConfigSetBootstrappersHandler(t *testing.T) { assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) - expectedResponse := ` - { - "values": ["filesystem", "commitlog", "peers", "uninitialized_topology"] + expectedResp := xjson.Map{ + "values": xjson.Array{"filesystem", "commitlog", "peers", "uninitialized_topology"}, } - ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + + expected := xtest.MustPrettyJSONMap(t, expectedResp) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestConfigSetBootstrappersHandlerNoValues(t *testing.T) { @@ -89,13 +89,12 @@ func TestConfigSetBootstrappersHandlerNoValues(t *testing.T) { instrument.NewOptions()) w := httptest.NewRecorder() - jsonInput := ` - { - "values": [] - } - ` + jsonInput := xjson.Map{ + "values": xjson.Array{}, + } - req := httptest.NewRequest("POST", "/database/config/bootstrappers", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/config/bootstrappers", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) handler.ServeHTTP(w, req) @@ -113,13 +112,12 @@ func TestConfigSetBootstrappersHandlerInvalidValue(t *testing.T) { instrument.NewOptions()) w := httptest.NewRecorder() - jsonInput := ` - { - "values": ["filesystem", "foo"] - } - ` + jsonInput := xjson.Map{ + "values": xjson.Array{"filesystem", "foo"}, + } - req := httptest.NewRequest("POST", "/database/config/bootstrappers", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/config/bootstrappers", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) handler.ServeHTTP(w, req) diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index 3c676ffef1..452255bc44 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -21,14 +21,14 @@ package database import ( + "bytes" + "encoding/json" "fmt" "io/ioutil" "net/http" "net/http/httptest" - "strings" "testing" "time" - "unicode" "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/generated/proto/placementpb" @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/x/instrument" + xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" @@ -102,14 +103,13 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := fmt.Sprintf(` - { - "namespaceName": "testNamespace", - "type": "%s" - } - `, providedType) + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": providedType, + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) @@ -202,8 +202,11 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { } } ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + + expected := xtest.MustPrettyJSONString(t, expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) { @@ -216,14 +219,13 @@ func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "local" - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "local", + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) placementProto := &placementpb.Placement{ @@ -264,15 +266,14 @@ func TestLocalTypeWithNumShards(t *testing.T) { w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "local", - "numShards": 51 - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "local", + "numShards": 51, + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) @@ -360,8 +361,10 @@ func TestLocalTypeWithNumShards(t *testing.T) { } } ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + expected := xtest.MustPrettyJSONString(t, expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestLocalWithBlockSizeNanos(t *testing.T) { ctrl := gomock.NewController(t) @@ -374,15 +377,14 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "local", - "blockSize": {"time": "3h"} - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "local", + "blockSize": xjson.Map{"time": "3h"}, + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) @@ -470,8 +472,10 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { } } ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + expected := xtest.MustPrettyJSONString(t, expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { @@ -488,15 +492,16 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { min := minRecommendCalculateBlockSize desiredBlockSize := min + 5*time.Minute - jsonInput := fmt.Sprintf(` - { - "namespaceName": "testNamespace", - "type": "local", - "blockSize": {"expectedSeriesDatapointsPerHour": %d} - } - `, int64(float64(blockSizeFromExpectedSeriesScalar)/float64(desiredBlockSize))) + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "local", + "blockSize": xjson.Map{ + "expectedSeriesDatapointsPerHour": int64(float64(blockSizeFromExpectedSeriesScalar) / float64(desiredBlockSize)), + }, + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) @@ -585,8 +590,10 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { } `, desiredBlockSize, desiredBlockSize) - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + expected := xtest.MustPrettyJSONString(t, expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestClusterTypeHosts(t *testing.T) { @@ -608,15 +615,14 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "cluster", - "hosts": [{"id": "host1"}, {"id": "host2"}] - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "cluster", + "hosts": xjson.Array{xjson.Map{"id": "host1"}, xjson.Map{"id": "host2"}}, + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) placementProto := &placementpb.Placement{ @@ -664,14 +670,13 @@ func TestClusterTypeHostsPlacementAlreadyExistsExistingIsLocal(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "cluster" - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "cluster", + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) placementProto := &placementpb.Placement{ @@ -711,26 +716,25 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { require.NoError(t, err) w := httptest.NewRecorder() - var jsonInput string + var jsonInput xjson.Map if placementExists { - jsonInput = ` - { + jsonInput = xjson.Map{ "namespaceName": "testNamespace", - "type": "cluster" + "type": "cluster", } - ` } else { - jsonInput = ` - { + jsonInput = xjson.Map{ "namespaceName": "testNamespace", - "type": "cluster", - "hosts": [{"id": "host1"}, {"id": "host2"}] + "type": "cluster", + "hosts": xjson.Array{xjson.Map{"id": "host1"}, xjson.Map{"id": "host2"}}, } - ` } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + reqBody := bytes.NewBuffer(nil) + require.NoError(t, json.NewEncoder(reqBody).Encode(jsonInput)) + + req := httptest.NewRequest("POST", "/database/create", reqBody) require.NotNil(t, req) mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) @@ -843,8 +847,11 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { } } ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + + expected := xtest.MustPrettyJSONString(t, expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { @@ -859,15 +866,17 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "cluster", - "hosts": [{"id":"host1", "isolationGroup":"group1"}, {"id":"host2", "isolationGroup":"group2"}] - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "cluster", + "hosts": xjson.Array{ + xjson.Map{"id": "host1", "isolationGroup": "group1"}, + xjson.Map{"id": "host2", "isolationGroup": "group2"}, + }, + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) @@ -975,8 +984,11 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { } } ` - assert.Equal(t, stripAllWhitespace(expectedResponse), string(body), - xtest.Diff(xtest.MustPrettyJSON(t, expectedResponse), xtest.MustPrettyJSON(t, string(body)))) + + expected := xtest.MustPrettyJSONString(t, expectedResponse) + actual := xtest.MustPrettyJSONString(t, string(body)) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestClusterTypeMissingHostnames(t *testing.T) { ctrl := gomock.NewController(t) @@ -990,14 +1002,13 @@ func TestClusterTypeMissingHostnames(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "cluster" - } - ` + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "cluster", + } - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) createHandler.ServeHTTP(w, req) @@ -1006,7 +1017,13 @@ func TestClusterTypeMissingHostnames(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) assert.NoError(t, err) assert.Equal(t, http.StatusBadRequest, resp.StatusCode) - assert.Equal(t, withEndline(`{"error":"missing required field"}`), string(body)) + assert.Equal(t, + xtest.MustPrettyJSONMap(t, + xjson.Map{ + "error": "missing required field", + }, + ), + xtest.MustPrettyJSONString(t, string(body))) } func TestBadType(t *testing.T) { @@ -1021,13 +1038,13 @@ func TestBadType(t *testing.T) { require.NoError(t, err) w := httptest.NewRecorder() - jsonInput := ` - { - "namespaceName": "testNamespace", - "type": "badtype" - } - ` - req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + jsonInput := xjson.Map{ + "namespaceName": "testNamespace", + "type": "badtype", + } + + req := httptest.NewRequest("POST", "/database/create", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) createHandler.ServeHTTP(w, req) @@ -1035,18 +1052,11 @@ func TestBadType(t *testing.T) { body, err := ioutil.ReadAll(resp.Body) assert.NoError(t, err) assert.Equal(t, http.StatusBadRequest, resp.StatusCode) - assert.Equal(t, withEndline(`{"error":"invalid database type"}`), string(body)) -} - -func stripAllWhitespace(str string) string { - return strings.Map(func(r rune) rune { - if unicode.IsSpace(r) { - return -1 - } - return r - }, str) -} - -func withEndline(str string) string { - return str + "\n" + assert.Equal(t, + xtest.MustPrettyJSONMap(t, + xjson.Map{ + "error": "invalid database type", + }, + ), + xtest.MustPrettyJSONString(t, string(body))) } diff --git a/src/query/api/v1/handler/namespace/add_test.go b/src/query/api/v1/handler/namespace/add_test.go index c361ec49a8..a797f5ffb2 100644 --- a/src/query/api/v1/handler/namespace/add_test.go +++ b/src/query/api/v1/handler/namespace/add_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/x/instrument" + xjson "github.com/m3db/m3/src/x/json" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -73,14 +74,13 @@ func TestNamespaceAddHandler(t *testing.T) { // Error case where required fields are not set w := httptest.NewRecorder() - jsonInput := ` - { - "name": "testNamespace", - "options": {} - } - ` + jsonInput := xjson.Map{ + "name": "testNamespace", + "options": xjson.Map{}, + } - req := httptest.NewRequest("POST", "/namespace", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/namespace", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) addHandler.ServeHTTP(svcDefaults, w, req) diff --git a/src/query/api/v1/handler/namespace/schema_test.go b/src/query/api/v1/handler/namespace/schema_test.go index d40a2e465e..c95e7e2572 100644 --- a/src/query/api/v1/handler/namespace/schema_test.go +++ b/src/query/api/v1/handler/namespace/schema_test.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace/kvadmin" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/x/instrument" + xjson "github.com/m3db/m3/src/x/json" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -141,13 +142,12 @@ func TestSchemaDeploy_KVKeyNotFound(t *testing.T) { // Error case where required fields are not set w := httptest.NewRecorder() - jsonInput := ` - { - "name": "testNamespace" - } - ` + jsonInput := xjson.Map{ + "name": "testNamespace", + } - req := httptest.NewRequest("POST", "/schema", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/schema", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) mockKV.EXPECT().Get(M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) @@ -209,14 +209,13 @@ func TestSchemaDeploy_NamespaceNotFound(t *testing.T) { schemaHandler := NewSchemaHandler(mockClient, instrument.NewOptions()) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) - jsonInput := ` - { - "name": "no-such-namespace" - } - ` + jsonInput := xjson.Map{ + "name": "no-such-namespace", + } // Ensure adding to an non-existing namespace returns 404 - req := httptest.NewRequest("POST", "/namespace", strings.NewReader(jsonInput)) + req := httptest.NewRequest("POST", "/namespace", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) registry := nsproto.Registry{ @@ -269,12 +268,12 @@ func TestSchemaReset(t *testing.T) { w := httptest.NewRecorder() - jsonInput := ` - { - "name": "testNamespace" - } - ` - req := httptest.NewRequest("DELETE", "/schema", strings.NewReader(jsonInput)) + jsonInput := xjson.Map{ + "name": "testNamespace", + } + + req := httptest.NewRequest("DELETE", "/schema", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) schemaHandler.ServeHTTP(svcDefaults, w, req) @@ -284,7 +283,8 @@ func TestSchemaReset(t *testing.T) { assert.Equal(t, http.StatusBadRequest, resp.StatusCode) w = httptest.NewRecorder() - req = httptest.NewRequest("DELETE", "/schema", strings.NewReader(jsonInput)) + req = httptest.NewRequest("DELETE", "/schema", + xjson.MustNewTestReader(t, jsonInput)) require.NotNil(t, req) req.Header.Add("Force", "true") diff --git a/src/query/api/v1/handler/placement/set_test.go b/src/query/api/v1/handler/placement/set_test.go index a08187bdb0..047a862175 100644 --- a/src/query/api/v1/handler/placement/set_test.go +++ b/src/query/api/v1/handler/placement/set_test.go @@ -144,7 +144,10 @@ func TestPlacementSetHandler(t *testing.T) { DryRun: !setTestPlacementReqProto.Confirm, }) require.NoError(t, err) - assert.Equal(t, expectedBody, body, - xtest.Diff(xtest.MustPrettyJSON(t, expectedBody), xtest.MustPrettyJSON(t, body))) + + expected := xtest.MustPrettyJSONString(t, expectedBody) + actual := xtest.MustPrettyJSONString(t, body) + + assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) }) } diff --git a/src/query/api/v1/handler/prometheus/native/common.go b/src/query/api/v1/handler/prometheus/native/common.go index 5968972f9f..4ece65135a 100644 --- a/src/query/api/v1/handler/prometheus/native/common.go +++ b/src/query/api/v1/handler/prometheus/native/common.go @@ -291,10 +291,15 @@ func filterNaNSeries( func renderResultsJSON( w io.Writer, - series []*ts.Series, + result ReadResult, params models.RequestParams, keepNans bool, ) { + var ( + series = result.Series + warnings = result.Meta.WarningStrings() + ) + // NB: if dropping NaNs, drop series with only NaNs from output entirely. if !keepNans { series = filterNaNSeries(series, params.Start, params.End) @@ -306,6 +311,16 @@ func renderResultsJSON( jw.BeginObjectField("status") jw.WriteString("success") + if len(warnings) > 0 { + jw.BeginObjectField("warnings") + jw.BeginArray() + for _, warn := range warnings { + jw.WriteString(warn) + } + + jw.EndArray() + } + jw.BeginObjectField("data") jw.BeginObject() @@ -349,8 +364,8 @@ func renderResultsJSON( jw.WriteString(utils.FormatFloat(dp.Value)) jw.EndArray() } - jw.EndArray() + jw.EndArray() fixedStep, ok := s.Values().(ts.FixedResolutionMutableValues) if ok { jw.BeginObjectField("step_size_ms") @@ -359,7 +374,6 @@ func renderResultsJSON( jw.EndObject() } jw.EndArray() - jw.EndObject() jw.EndObject() @@ -368,14 +382,29 @@ func renderResultsJSON( func renderResultsInstantaneousJSON( w io.Writer, - series []*ts.Series, + result ReadResult, ) { + var ( + series = result.Series + warnings = result.Meta.WarningStrings() + ) + jw := json.NewWriter(w) jw.BeginObject() jw.BeginObjectField("status") jw.WriteString("success") + if len(warnings) > 0 { + jw.BeginObjectField("warnings") + jw.BeginArray() + for _, warn := range warnings { + jw.WriteString(warn) + } + + jw.EndArray() + } + jw.BeginObjectField("data") jw.BeginObject() diff --git a/src/query/api/v1/handler/prometheus/native/common_test.go b/src/query/api/v1/handler/prometheus/native/common_test.go index 52c23e9393..6cbe6492bd 100644 --- a/src/query/api/v1/handler/prometheus/native/common_test.go +++ b/src/query/api/v1/handler/prometheus/native/common_test.go @@ -32,12 +32,14 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/test" "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/x/instrument" + xjson "github.com/m3db/m3/src/x/json" xhttp "github.com/m3db/m3/src/x/net/http" xtest "github.com/m3db/m3/src/x/test" @@ -169,86 +171,90 @@ func TestRenderResultsJSON(t *testing.T) { series := []*ts.Series{ ts.NewSeries([]byte("foo"), valsWithNaN, test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("bar"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + {Name: []byte("bar"), Value: []byte("baz")}, + {Name: []byte("qux"), Value: []byte("qaz")}, })), ts.NewSeries([]byte("bar"), - ts.NewFixedStepValues(10*time.Second, 2, 2, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("baz"), Value: []byte("bar")}, - models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, + ts.NewFixedStepValues(10*time.Second, 2, 2, start), + test.TagSliceToTags([]models.Tag{ + {Name: []byte("baz"), Value: []byte("bar")}, + {Name: []byte("qaz"), Value: []byte("qux")}, })), ts.NewSeries([]byte("foobar"), - ts.NewFixedStepValues(10*time.Second, 2, math.NaN(), start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("biz"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + ts.NewFixedStepValues(10*time.Second, 2, math.NaN(), start), + test.TagSliceToTags([]models.Tag{ + {Name: []byte("biz"), Value: []byte("baz")}, + {Name: []byte("qux"), Value: []byte("qaz")}, })), } - renderResultsJSON(buffer, series, params, true) + readResult := ReadResult{Series: series} + renderResultsJSON(buffer, readResult, params, true) - expected := xtest.MustPrettyJSON(t, ` - { + expected := xtest.MustPrettyJSONMap(t, xjson.Map{ "status": "success", - "data": { + "warnings": xjson.Array{ + "m3db exceeded query limit: results not exhaustive", + }, + "data": xjson.Map{ "resultType": "matrix", - "result": [ - { - "metric": { + "result": xjson.Array{ + xjson.Map{ + "metric": xjson.Map{ "bar": "baz", - "qux": "qaz" + "qux": "qaz", }, - "values": [ - [ + "values": xjson.Array{ + xjson.Array{ 1535948880, - "1" - ], - [ + "1", + }, + xjson.Array{ 1535948890, - "NaN" - ] - ], - "step_size_ms": 10000 + "NaN", + }, + }, + "step_size_ms": 10000, }, - { - "metric": { + xjson.Map{ + "metric": xjson.Map{ "baz": "bar", - "qaz": "qux" + "qaz": "qux", }, - "values": [ - [ + "values": xjson.Array{ + xjson.Array{ 1535948880, - "2" - ], - [ + "2", + }, + xjson.Array{ 1535948890, - "2" - ] - ], - "step_size_ms": 10000 + "2", + }, + }, + "step_size_ms": 10000, }, - { - "metric": { + xjson.Map{ + "metric": xjson.Map{ "biz": "baz", - "qux": "qaz" + "qux": "qaz", }, - "values": [ - [ + "values": xjson.Array{ + xjson.Array{ 1535948880, - "NaN" - ], - [ + "NaN", + }, + xjson.Array{ 1535948890, - "NaN" - ] - ], - "step_size_ms": 10000 - } - ] - } - } - `) + "NaN", + }, + }, + "step_size_ms": 10000, + }, + }, + }, + }) - actual := xtest.MustPrettyJSON(t, buffer.String()) + actual := xtest.MustPrettyJSONString(t, buffer.String()) assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } @@ -268,65 +274,76 @@ func TestRenderResultsJSONWithDroppedNaNs(t *testing.T) { series := []*ts.Series{ ts.NewSeries([]byte("foo"), valsWithNaN, test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("bar"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + {Name: []byte("bar"), Value: []byte("baz")}, + {Name: []byte("qux"), Value: []byte("qaz")}, })), ts.NewSeries([]byte("bar"), - ts.NewFixedStepValues(step, 2, 2, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("baz"), Value: []byte("bar")}, - models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, + ts.NewFixedStepValues(step, 2, 2, start), + test.TagSliceToTags([]models.Tag{ + {Name: []byte("baz"), Value: []byte("bar")}, + {Name: []byte("qaz"), Value: []byte("qux")}, })), ts.NewSeries([]byte("foobar"), - ts.NewFixedStepValues(step, 2, math.NaN(), start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("biz"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + ts.NewFixedStepValues(step, 2, math.NaN(), start), + test.TagSliceToTags([]models.Tag{ + {Name: []byte("biz"), Value: []byte("baz")}, + {Name: []byte("qux"), Value: []byte("qaz")}, })), } - renderResultsJSON(buffer, series, params, false) + meta := block.NewResultMetadata() + meta.AddWarning("foo", "bar") + meta.AddWarning("baz", "qux") + readResult := ReadResult{ + Series: series, + Meta: meta, + } - expected := xtest.MustPrettyJSON(t, ` - { + renderResultsJSON(buffer, readResult, params, false) + expected := xtest.MustPrettyJSONMap(t, xjson.Map{ "status": "success", - "data": { + "warnings": xjson.Array{ + "foo_bar", + "baz_qux", + }, + "data": xjson.Map{ "resultType": "matrix", - "result": [ - { - "metric": { + "result": xjson.Array{ + xjson.Map{ + "metric": xjson.Map{ "bar": "baz", - "qux": "qaz" + "qux": "qaz", }, - "values": [ - [ + "values": xjson.Array{ + xjson.Array{ 1535948880, - "1" - ] - ], - "step_size_ms": 10000 + "1", + }, + }, + "step_size_ms": 10000, }, - { - "metric": { + xjson.Map{ + "metric": xjson.Map{ "baz": "bar", - "qaz": "qux" + "qaz": "qux", }, - "values": [ - [ + "values": xjson.Array{ + xjson.Array{ 1535948880, - "2" - ], - [ + "2", + }, + xjson.Array{ 1535948890, - "2" - ] - ], - "step_size_ms": 10000 - } - ] - } - } - `) + "2", + }, + }, + "step_size_ms": 10000, + }, + }, + }, + }) - actual := xtest.MustPrettyJSON(t, buffer.String()) + actual := xtest.MustPrettyJSONString(t, buffer.String()) assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } @@ -335,50 +352,54 @@ func TestRenderInstantaneousResultsJSON(t *testing.T) { buffer := bytes.NewBuffer(nil) series := []*ts.Series{ ts.NewSeries([]byte("foo"), - ts.NewFixedStepValues(10*time.Second, 1, 1, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("bar"), Value: []byte("baz")}, - models.Tag{Name: []byte("qux"), Value: []byte("qaz")}, + ts.NewFixedStepValues(10*time.Second, 1, 1, start), + test.TagSliceToTags([]models.Tag{ + {Name: []byte("bar"), Value: []byte("baz")}, + {Name: []byte("qux"), Value: []byte("qaz")}, })), ts.NewSeries([]byte("bar"), - ts.NewFixedStepValues(10*time.Second, 1, 2, start), test.TagSliceToTags([]models.Tag{ - models.Tag{Name: []byte("baz"), Value: []byte("bar")}, - models.Tag{Name: []byte("qaz"), Value: []byte("qux")}, + ts.NewFixedStepValues(10*time.Second, 1, 2, start), + test.TagSliceToTags([]models.Tag{ + {Name: []byte("baz"), Value: []byte("bar")}, + {Name: []byte("qaz"), Value: []byte("qux")}, })), } - renderResultsInstantaneousJSON(buffer, series) + readResult := ReadResult{ + Series: series, + Meta: block.NewResultMetadata(), + } - expected := xtest.MustPrettyJSON(t, ` - { + renderResultsInstantaneousJSON(buffer, readResult) + expected := xtest.MustPrettyJSONMap(t, xjson.Map{ "status": "success", - "data": { + "data": xjson.Map{ "resultType": "vector", - "result": [ - { - "metric": { + "result": xjson.Array{ + xjson.Map{ + "metric": xjson.Map{ "bar": "baz", - "qux": "qaz" + "qux": "qaz", }, - "value": [ + "value": xjson.Array{ 1535948880, - "1" - ] + "1", + }, }, - { - "metric": { + xjson.Map{ + "metric": xjson.Map{ "baz": "bar", - "qaz": "qux" + "qaz": "qux", }, - "value": [ + "value": xjson.Array{ 1535948880, - "2" - ] - } - ] - } - } - `) - actual := xtest.MustPrettyJSON(t, buffer.String()) + "2", + }, + }, + }, + }, + }) + actual := xtest.MustPrettyJSONString(t, buffer.String()) assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } diff --git a/src/query/api/v1/handler/prometheus/native/parse_query_test.go b/src/query/api/v1/handler/prometheus/native/parse_query_test.go index 646ac9d48d..fe0d485138 100644 --- a/src/query/api/v1/handler/prometheus/native/parse_query_test.go +++ b/src/query/api/v1/handler/prometheus/native/parse_query_test.go @@ -128,8 +128,8 @@ func TestParse(t *testing.T) { r, err := ioutil.ReadAll(body) require.NoError(t, err) - ex := xtest.MustPrettyJSON(t, tt.ex) - actual := xtest.MustPrettyJSON(t, string(r)) + ex := xtest.MustPrettyJSONString(t, tt.ex) + actual := xtest.MustPrettyJSONString(t, string(r)) require.Equal(t, ex, actual, fmt.Sprintf("Run %d:\n%s", i, xtest.Diff(ex, actual))) } diff --git a/src/query/api/v1/handler/prometheus/native/parse_threshold_test.go b/src/query/api/v1/handler/prometheus/native/parse_threshold_test.go index 519dd24c60..d6e4334fad 100644 --- a/src/query/api/v1/handler/prometheus/native/parse_threshold_test.go +++ b/src/query/api/v1/handler/prometheus/native/parse_threshold_test.go @@ -200,8 +200,8 @@ func TestParseThreshold(t *testing.T) { r, err := ioutil.ReadAll(body) require.NoError(t, err) - ex := xtest.MustPrettyJSON(t, tt.ex) - actual := xtest.MustPrettyJSON(t, string(r)) + ex := xtest.MustPrettyJSONString(t, tt.ex) + actual := xtest.MustPrettyJSONString(t, string(r)) require.Equal(t, ex, actual, fmt.Sprintf("Run %d:\n%s", i, xtest.Diff(ex, actual))) } diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index b8eac61e1d..3a867d045c 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -22,227 +22,129 @@ package native import ( "context" - "fmt" "net/http" - "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" - "github.com/m3db/m3/src/query/block" - "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" xopentracing "github.com/m3db/m3/src/x/opentracing" opentracingext "github.com/opentracing/opentracing-go/ext" opentracinglog "github.com/opentracing/opentracing-go/log" - "github.com/uber-go/tally" "go.uber.org/zap" ) const ( // PromReadURL is the url for native prom read handler, this matches the - // default URL for the query range endpoint found on a Prometheus server + // default URL for the query range endpoint found on a Prometheus server. PromReadURL = handler.RoutePrefixV1 + "/query_range" - // TODO: Move to config - initialBlockAlloc = 10 + // PromReadInstantURL is the url for native instantaneous prom read + // handler, this matches the default URL for the query endpoint + // found on a Prometheus server. + PromReadInstantURL = handler.RoutePrefixV1 + "/query" ) var ( - // PromReadHTTPMethods are the HTTP methods for this handler. + // PromReadHTTPMethods are the HTTP methods for the read handler. PromReadHTTPMethods = []string{ http.MethodGet, http.MethodPost, } - emptySeriesList = []*ts.Series{} - emptyReqParams = models.RequestParams{} + // PromReadInstantHTTPMethods are the HTTP methods for the instant handler. + PromReadInstantHTTPMethods = []string{ + http.MethodGet, + http.MethodPost, + } ) -// PromReadHandler represents a handler for prometheus read endpoint. -type PromReadHandler struct { - keepEmpty bool - limitsCfg *config.LimitsConfiguration - timeoutOps *prometheus.TimeoutOpts - engine executor.Engine - fetchOptionsBuilder handleroptions.FetchOptionsBuilder - tagOpts models.TagOptions - promReadMetrics promReadMetrics - instrumentOpts instrument.Options -} - -type promReadMetrics struct { - fetchSuccess tally.Counter - fetchErrorsServer tally.Counter - fetchErrorsClient tally.Counter - fetchTimerSuccess tally.Timer - maxDatapoints tally.Gauge +// promReadHandler represents a handler for prometheus read endpoint. +type promReadHandler struct { + instant bool + promReadMetrics promReadMetrics + opts options.HandlerOptions } -func newPromReadMetrics(scope tally.Scope) promReadMetrics { - return promReadMetrics{ - fetchSuccess: scope.Counter("fetch.success"), - fetchErrorsServer: scope.Tagged(map[string]string{"code": "5XX"}). - Counter("fetch.errors"), - fetchErrorsClient: scope.Tagged(map[string]string{"code": "4XX"}). - Counter("fetch.errors"), - fetchTimerSuccess: scope.Timer("fetch.success.latency"), - maxDatapoints: scope.Gauge("max_datapoints"), - } -} - -// ReadResponse is the response that gets returned to the user -type ReadResponse struct { - Results []ts.Series `json:"results,omitempty"` +// NewPromReadHandler returns a new prometheus-compatible read handler. +func NewPromReadHandler(opts options.HandlerOptions) http.Handler { + return newHandler(opts, false) } -type blockWithMeta struct { - block block.Block - meta block.Metadata +// NewPromReadInstantHandler returns a new pro instance of handler. +func NewPromReadInstantHandler(opts options.HandlerOptions) http.Handler { + return newHandler(opts, true) } -// RespError wraps error and status code -type RespError struct { - Err error - Code int -} +// newHandler returns a new pro instance of handler. +func newHandler(opts options.HandlerOptions, instant bool) http.Handler { + name := "native-read" + if instant { + name = "native-instant-read" + } -// NewPromReadHandler returns a new instance of handler. -func NewPromReadHandler(opts options.HandlerOptions) *PromReadHandler { taggedScope := opts.InstrumentOpts().MetricsScope(). - Tagged(map[string]string{"handler": "native-read"}) - limits := opts.Config().Limits - - h := &PromReadHandler{ - engine: opts.Engine(), - fetchOptionsBuilder: opts.FetchOptionsBuilder(), - tagOpts: opts.TagOptions(), - limitsCfg: &limits, - promReadMetrics: newPromReadMetrics(taggedScope), - timeoutOps: opts.TimeoutOpts(), - keepEmpty: opts.Config().ResultOptions.KeepNans, - instrumentOpts: opts.InstrumentOpts(), + Tagged(map[string]string{"handler": name}) + h := &promReadHandler{ + promReadMetrics: newPromReadMetrics(taggedScope), + opts: opts, + instant: instant, } - pointCount := float64(limits.MaxComputedDatapoints()) - h.promReadMetrics.maxDatapoints.Update(pointCount) + maxDatapoints := opts.Config().Limits.MaxComputedDatapoints() + h.promReadMetrics.maxDatapoints.Update(float64(maxDatapoints)) return h } -func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { timer := h.promReadMetrics.fetchTimerSuccess.Start() - fetchOpts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) - if rErr != nil { - xhttp.Error(w, rErr.Inner(), rErr.Code()) - return - } - - queryOpts := &executor.QueryOptions{ - QueryContextOptions: models.QueryContextOptions{ - LimitMaxTimeseries: fetchOpts.Limit, - }} - - restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType() - if restrictOpts != nil { - restrict := &models.RestrictFetchTypeQueryContextOptions{ - MetricsType: uint(restrictOpts.MetricsType), - StoragePolicy: restrictOpts.StoragePolicy, - } - queryOpts.QueryContextOptions.RestrictFetchType = restrict - } - - result, params, respErr := h.ServeHTTPWithEngine(w, r, h.engine, queryOpts, fetchOpts) - if respErr != nil { - xhttp.Error(w, respErr.Err, respErr.Code) - return - } - - w.Header().Set("Content-Type", "application/json") - if params.FormatType == models.FormatM3QL { - renderM3QLResultsJSON(w, result, params) - h.promReadMetrics.fetchSuccess.Inc(1) - timer.Stop() - return - } - - h.promReadMetrics.fetchSuccess.Inc(1) - timer.Stop() - // TODO: Support multiple result types - renderResultsJSON(w, result, params, h.keepEmpty) -} + defer timer.Stop() -// ServeHTTPWithEngine returns query results from the storage -func (h *PromReadHandler) ServeHTTPWithEngine( - w http.ResponseWriter, - r *http.Request, - engine executor.Engine, - queryOpts *executor.QueryOptions, - fetchOpts *storage.FetchOptions, -) ([]*ts.Series, models.RequestParams, *RespError) { ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) - logger := logging.WithContext(ctx, h.instrumentOpts) + logger := logging.WithContext(ctx, h.opts.InstrumentOpts()) - params, rErr := parseParams(r, engine.Options(), - h.timeoutOps, fetchOpts, h.instrumentOpts) + parsedOptions, rErr := ParseRequest(ctx, r, h.instant, h.opts) if rErr != nil { h.promReadMetrics.fetchErrorsClient.Inc(1) - return nil, emptyReqParams, &RespError{Err: rErr.Inner(), Code: rErr.Code()} - } - - if params.Debug { - logger.Info("request params", zap.Any("params", params)) + logger.Error("could not parse request", zap.Error(rErr.Inner())) + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return } - if err := h.validateRequest(¶ms); err != nil { - h.promReadMetrics.fetchErrorsClient.Inc(1) - return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusBadRequest} - } + watcher := handler.NewResponseWriterCanceller(w, h.opts.InstrumentOpts()) + parsedOptions.CancelWatcher = watcher - result, err := read(ctx, engine, queryOpts, fetchOpts, h.tagOpts, - w, params, h.instrumentOpts) + result, err := read(ctx, parsedOptions, h.opts) if err != nil { sp := xopentracing.SpanFromContextOrNoop(ctx) sp.LogFields(opentracinglog.Error(err)) opentracingext.Error.Set(sp, true) logger.Error("range query error", zap.Error(err), - zap.Any("params", params), - zap.Any("queryOpts", queryOpts), - zap.Any("fetchOpts", fetchOpts)) + zap.Any("parsedOptions", parsedOptions)) h.promReadMetrics.fetchErrorsServer.Inc(1) - return nil, emptyReqParams, &RespError{ - Err: err, - Code: http.StatusInternalServerError, - } + + xhttp.Error(w, err, http.StatusInternalServerError) + return } - // TODO: Support multiple result types w.Header().Set("Content-Type", "application/json") - handleroptions.AddWarningHeaders(w, result.meta) - return result.series, params, nil -} + handleroptions.AddWarningHeaders(w, result.Meta) + h.promReadMetrics.fetchSuccess.Inc(1) -func (h *PromReadHandler) validateRequest(params *models.RequestParams) error { - // Impose a rough limit on the number of returned time series. This is intended to prevent things like - // querying from the beginning of time with a 1s step size. - // Approach taken directly from prom. - numSteps := int64(params.End.Sub(params.Start) / params.Step) - maxComputedDatapoints := h.limitsCfg.MaxComputedDatapoints() - if maxComputedDatapoints > 0 && numSteps > maxComputedDatapoints { - return fmt.Errorf( - "querying from %v to %v with step size %v would result in too many datapoints "+ - "(end - start / step > %d). Either decrease the query resolution (?step=XX), decrease the time window, "+ - "or increase the limit (`limits.maxComputedDatapoints`)", - params.Start, params.End, params.Step, maxComputedDatapoints, - ) + if h.instant { + renderResultsInstantaneousJSON(w, result) + return + } + + if parsedOptions.Params.FormatType == models.FormatM3QL { + renderM3QLResultsJSON(w, result.Series, parsedOptions.Params) + return } - return nil + keepNans := h.opts.Config().ResultOptions.KeepNans + renderResultsJSON(w, result, parsedOptions.Params, keepNans) } diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index 8a20f318fd..a983fc0d26 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -22,41 +22,150 @@ package native import ( "context" + "fmt" "math" "net/http" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" + "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" - "github.com/m3db/m3/src/x/instrument" + xhttp "github.com/m3db/m3/src/x/net/http" xopentracing "github.com/m3db/m3/src/x/opentracing" + "github.com/uber-go/tally" opentracinglog "github.com/opentracing/opentracing-go/log" ) -type readResult struct { - series []*ts.Series - meta block.ResultMetadata +type promReadMetrics struct { + fetchSuccess tally.Counter + fetchErrorsServer tally.Counter + fetchErrorsClient tally.Counter + fetchTimerSuccess tally.Timer + maxDatapoints tally.Gauge } -func read( - reqCtx context.Context, - engine executor.Engine, - opts *executor.QueryOptions, - fetchOpts *storage.FetchOptions, - tagOpts models.TagOptions, - w http.ResponseWriter, - params models.RequestParams, - instrumentOpts instrument.Options, -) (readResult, error) { - ctx, cancel := context.WithTimeout(reqCtx, params.Timeout) - defer cancel() +func newPromReadMetrics(scope tally.Scope) promReadMetrics { + return promReadMetrics{ + fetchSuccess: scope.Counter("fetch.success"), + fetchErrorsServer: scope.Tagged(map[string]string{"code": "5XX"}). + Counter("fetch.errors"), + fetchErrorsClient: scope.Tagged(map[string]string{"code": "4XX"}). + Counter("fetch.errors"), + fetchTimerSuccess: scope.Timer("fetch.success.latency"), + maxDatapoints: scope.Gauge("max_datapoints"), + } +} + +// ReadResponse is the response that gets returned to the user +type ReadResponse struct { + Results []ts.Series `json:"results,omitempty"` +} + +// ReadResult is a result from a remote read. +type ReadResult struct { + Series []*ts.Series + Meta block.ResultMetadata +} + +// ParseRequest parses the given request. +func ParseRequest( + ctx context.Context, + r *http.Request, + instantaneous bool, + opts options.HandlerOptions, +) (ParsedOptions, *xhttp.ParseError) { + fetchOpts, rErr := opts.FetchOptionsBuilder().NewFetchOptions(r) + if rErr != nil { + return ParsedOptions{}, rErr + } + + queryOpts := &executor.QueryOptions{ + QueryContextOptions: models.QueryContextOptions{ + LimitMaxTimeseries: fetchOpts.Limit, + }} + + restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType() + if restrictOpts != nil { + restrict := &models.RestrictFetchTypeQueryContextOptions{ + MetricsType: uint(restrictOpts.MetricsType), + StoragePolicy: restrictOpts.StoragePolicy, + } + + queryOpts.QueryContextOptions.RestrictFetchType = restrict + } + + engine := opts.Engine() + var params models.RequestParams + if instantaneous { + params, rErr = parseInstantaneousParams(r, engine.Options(), + opts.TimeoutOpts(), fetchOpts, opts.InstrumentOpts()) + } else { + params, rErr = parseParams(r, engine.Options(), + opts.TimeoutOpts(), fetchOpts, opts.InstrumentOpts()) + } + + if rErr != nil { + return ParsedOptions{}, rErr + } + + maxPoints := opts.Config().Limits.MaxComputedDatapoints() + if err := validateRequest(params, maxPoints); err != nil { + return ParsedOptions{}, xhttp.NewParseError(err, http.StatusBadRequest) + } + + return ParsedOptions{ + QueryOpts: queryOpts, + FetchOpts: fetchOpts, + Params: params, + }, nil +} + +func validateRequest(params models.RequestParams, maxPoints int) error { + // Impose a rough limit on the number of returned time series. + // This is intended to prevent things like querying from the beginning of + // time with a 1s step size. + numSteps := int(params.End.Sub(params.Start) / params.Step) + if maxPoints > 0 && numSteps > maxPoints { + return fmt.Errorf( + "querying from %v to %v with step size %v would result in too many "+ + "datapoints (end - start / step > %d). Either decrease the query "+ + "resolution (?step=XX), decrease the time window, or increase "+ + "the limit (`limits.maxComputedDatapoints`)", + params.Start, params.End, params.Step, maxPoints, + ) + } + return nil +} + +// ParsedOptions are parsed options for the query. +type ParsedOptions struct { + QueryOpts *executor.QueryOptions + FetchOpts *storage.FetchOptions + Params models.RequestParams + CancelWatcher handler.CancelWatcher +} + +func read( + ctx context.Context, + parsed ParsedOptions, + handlerOpts options.HandlerOptions, +) (ReadResult, error) { + var ( + opts = parsed.QueryOpts + fetchOpts = parsed.FetchOpts + params = parsed.Params + cancelWatcher = parsed.CancelWatcher + + tagOpts = handlerOpts.TagOptions() + engine = handlerOpts.Engine() + ) sp := xopentracing.SpanFromContextOrNoop(ctx) sp.LogFields( opentracinglog.String("params.query", params.Query), @@ -66,9 +175,7 @@ func read( xopentracing.Duration("params.step", params.Step), ) - // Detect clients closing connections. - handler.CloseWatcher(ctx, cancel, w, instrumentOpts) - emptyResult := readResult{meta: block.NewResultMetadata()} + emptyResult := ReadResult{Meta: block.NewResultMetadata()} // TODO: Capture timing parseOpts := engine.Options().ParseOptions() @@ -77,6 +184,14 @@ func read( return emptyResult, err } + // Detect clients closing connections. + if cancelWatcher != nil { + ctx, cancel := context.WithTimeout(ctx, fetchOpts.Timeout) + defer cancel() + + cancelWatcher.WatchForCancel(ctx, cancel) + } + bl, err := engine.ExecuteExpr(ctx, parser, opts, fetchOpts, params) if err != nil { return emptyResult, err @@ -130,8 +245,5 @@ func read( } seriesList = prometheus.FilterSeriesByOptions(seriesList, fetchOpts) - return readResult{ - series: seriesList, - meta: resultMeta, - }, nil + return ReadResult{Series: seriesList, Meta: resultMeta}, nil } diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go deleted file mode 100644 index a278c160b0..0000000000 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package native - -import ( - "context" - "net/http" - - "github.com/m3db/m3/src/query/api/v1/handler" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus" - "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/api/v1/options" - "github.com/m3db/m3/src/query/executor" - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/instrument" - xhttp "github.com/m3db/m3/src/x/net/http" - - "go.uber.org/zap" -) - -const ( - // PromReadInstantURL is the url for native instantaneous prom read - // handler, this matches the default URL for the query endpoint - // found on a Prometheus server - PromReadInstantURL = handler.RoutePrefixV1 + "/query" -) - -var ( - // PromReadInstantHTTPMethods are the HTTP methods for this handler. - PromReadInstantHTTPMethods = []string{ - http.MethodGet, - http.MethodPost, - } -) - -// PromReadInstantHandler represents a handler for prometheus instantaneous read endpoint. -type PromReadInstantHandler struct { - engine executor.Engine - fetchOptionsBuilder handleroptions.FetchOptionsBuilder - tagOpts models.TagOptions - timeoutOpts *prometheus.TimeoutOpts - instrumentOpts instrument.Options -} - -// NewPromReadInstantHandler returns a new instance of handler. -func NewPromReadInstantHandler( - opts options.HandlerOptions) *PromReadInstantHandler { - return &PromReadInstantHandler{ - engine: opts.Engine(), - fetchOptionsBuilder: opts.FetchOptionsBuilder(), - tagOpts: opts.TagOptions(), - timeoutOpts: opts.TimeoutOpts(), - instrumentOpts: opts.InstrumentOpts(), - } -} - -func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) - logger := logging.WithContext(ctx, h.instrumentOpts) - - fetchOpts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) - if rErr != nil { - xhttp.Error(w, rErr.Inner(), rErr.Code()) - return - } - - params, rErr := parseInstantaneousParams(r, h.engine.Options(), - h.timeoutOpts, fetchOpts, h.instrumentOpts) - if rErr != nil { - xhttp.Error(w, rErr, rErr.Code()) - return - } - - if params.Debug { - logger.Info("request params", zap.Any("params", params)) - } - - queryOpts := &executor.QueryOptions{ - QueryContextOptions: models.QueryContextOptions{ - LimitMaxTimeseries: fetchOpts.Limit, - }} - - restrictOpts := fetchOpts.RestrictQueryOptions.GetRestrictByType() - if restrictOpts != nil { - restrict := &models.RestrictFetchTypeQueryContextOptions{ - MetricsType: uint(restrictOpts.MetricsType), - StoragePolicy: restrictOpts.StoragePolicy, - } - queryOpts.QueryContextOptions.RestrictFetchType = restrict - } - - result, err := read(ctx, h.engine, queryOpts, fetchOpts, - h.tagOpts, w, params, h.instrumentOpts) - if err != nil { - logger.Error("instant query error", - zap.Error(err), - zap.Any("params", params), - zap.Any("queryOpts", queryOpts), - zap.Any("fetchOpts", queryOpts)) - xhttp.Error(w, err, http.StatusInternalServerError) - return - } - - // TODO: Support multiple result types - w.Header().Set("Content-Type", "application/json") - handleroptions.AddWarningHeaders(w, result.meta) - renderResultsInstantaneousJSON(w, result.series) -} diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go index 11dda2858d..259f9ca1f9 100644 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_instantaneous_test.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test" + xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" @@ -79,21 +80,23 @@ func (v vectorResultValues) parse() (time.Time, int, error) { } func TestPromReadInstantHandler(t *testing.T) { - testPromReadInstantHandler(t, block.NewResultMetadata(), "") - testPromReadInstantHandler(t, buildWarningMeta("foo", "bar"), "foo_bar") + testPromReadInstantHandler(t, block.NewResultMetadata(), "", "") + testPromReadInstantHandler(t, buildWarningMeta("foo", "bar"), "foo_bar", "foo_bar") testPromReadInstantHandler(t, block.ResultMetadata{Exhaustive: false}, - handleroptions.LimitHeaderSeriesLimitApplied) + handleroptions.LimitHeaderSeriesLimitApplied, + "m3db exceeded query limit: results not exhaustive") } func testPromReadInstantHandler( t *testing.T, resultMeta block.ResultMetadata, ex string, + jsonWarning string, ) { values, bounds := test.GenerateValuesAndBounds(nil, nil) setup := newTestSetup() - promReadInstant := setup.Handlers.InstantRead + promReadInstant := setup.Handlers.instantRead seriesMeta := test.NewSeriesMeta("dummy", len(values)) meta := block.Metadata{ @@ -130,43 +133,47 @@ func testPromReadInstantHandler( at1, value1, err := result.Data.Result[1].Value.parse() require.NoError(t, err) - expected := xtest.MustPrettyJSON(t, fmt.Sprintf(` - { + expectedResp := xjson.Map{ "status": "success", - "data": { + "data": xjson.Map{ "resultType": "vector", - "result": [ - { - "metric": { + "result": xjson.Array{ + xjson.Map{ + "metric": xjson.Map{ "__name__": "dummy0", - "dummy0": "dummy0" + "dummy0": "dummy0", + }, + "value": xjson.Array{ + at0.Unix(), + strconv.Itoa(value0), }, - "value": [ - %d, - "%d" - ] }, - { - "metric": { + xjson.Map{ + "metric": xjson.Map{ "__name__": "dummy1", - "dummy1": "dummy1" + "dummy1": "dummy1", + }, + "value": xjson.Array{ + at1.Unix(), + strconv.Itoa(value1), }, - "value": [ - %d, - "%d" - ] - } - ] - } + }, + }, + }, } - `, at0.Unix(), value0, at1.Unix(), value1)) - actual := xtest.MustPrettyJSON(t, recorder.Body.String()) + + if len(jsonWarning) != 0 { + expectedResp["warnings"] = xjson.Array{jsonWarning} + } + + expected := xtest.MustPrettyJSONMap(t, expectedResp) + actual := xtest.MustPrettyJSONString(t, recorder.Body.String()) assert.Equal(t, expected, actual, xtest.Diff(expected, actual)) } func TestPromReadInstantHandlerStorageError(t *testing.T) { setup := newTestSetup() - promReadInstant := setup.Handlers.InstantRead + promReadInstant := setup.Handlers.instantRead storageErr := fmt.Errorf("storage err") setup.Storage.SetFetchBlocksResult(block.Result{}, storageErr) diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index d31fcfd0f5..d17976a744 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -61,7 +61,7 @@ func testPromReadHandlerRead( values, bounds := test.GenerateValuesAndBounds(nil, nil) setup := newTestSetup() - promRead := setup.Handlers.Read + promRead := setup.Handlers.read seriesMeta := test.NewSeriesMeta("dummy", len(values)) m := block.Metadata{ @@ -79,12 +79,16 @@ func testPromReadHandlerRead( r, parseErr := testParseParams(req) require.Nil(t, parseErr) assert.Equal(t, models.FormatPromQL, r.FormatType) - result, err := read(context.TODO(), promRead.engine, - setup.QueryOpts, setup.FetchOpts, promRead.tagOpts, httptest.NewRecorder(), - r, instrument.NewOptions()) + parsed := ParsedOptions{ + QueryOpts: setup.QueryOpts, + FetchOpts: setup.FetchOpts, + Params: r, + } - seriesList := result.series + result, err := read(context.TODO(), parsed, promRead.opts) require.NoError(t, err) + seriesList := result.Series + require.Len(t, seriesList, 2) s := seriesList[0] @@ -116,7 +120,7 @@ func testM3PromReadHandlerRead( values, bounds := test.GenerateValuesAndBounds(nil, nil) setup := newTestSetup() - promRead := setup.Handlers.Read + promRead := setup.Handlers.read seriesMeta := test.NewSeriesMeta("dummy", len(values)) meta := block.Metadata{ @@ -166,11 +170,12 @@ type testSetup struct { QueryOpts *executor.QueryOptions FetchOpts *storage.FetchOptions TimeoutOpts *prometheus.TimeoutOpts + options options.HandlerOptions } type testSetupHandlers struct { - Read *PromReadHandler - InstantRead *PromReadInstantHandler + read *promReadHandler + instantRead *promReadHandler } func newTestSetup() *testSetup { @@ -202,28 +207,32 @@ func newTestSetup() *testSetup { }, }) - read := NewPromReadHandler(opts) - instantRead := NewPromReadInstantHandler(opts) + read := NewPromReadHandler(opts).(*promReadHandler) + instantRead := NewPromReadInstantHandler(opts).(*promReadHandler) return &testSetup{ Storage: mockStorage, Handlers: testSetupHandlers{ - Read: read, - InstantRead: instantRead, + read: read, + instantRead: instantRead, }, QueryOpts: &executor.QueryOptions{}, FetchOpts: storage.NewFetchOptions(), TimeoutOpts: timeoutOpts, + options: opts, } } -func TestPromReadHandler_ServeHTTP_maxComputedDatapoints(t *testing.T) { +func TestPromReadHandlerServeHTTPMaxComputedDatapoints(t *testing.T) { setup := newTestSetup() - setup.Handlers.Read.limitsCfg = &config.LimitsConfiguration{ - PerQuery: config.PerQueryLimitsConfiguration{ - PrivateMaxComputedDatapoints: 3599, + opts := setup.Handlers.read.opts + setup.Handlers.read.opts = opts.SetConfig(config.Configuration{ + Limits: config.LimitsConfiguration{ + PerQuery: config.PerQueryLimitsConfiguration{ + PrivateMaxComputedDatapoints: 3599, + }, }, - } + }) params := defaultParams() params.Set(startParam, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC). @@ -234,7 +243,7 @@ func TestPromReadHandler_ServeHTTP_maxComputedDatapoints(t *testing.T) { req := newReadRequest(t, params) recorder := httptest.NewRecorder() - setup.Handlers.Read.ServeHTTP(recorder, req) + setup.Handlers.read.ServeHTTP(recorder, req) resp := recorder.Result() assert.Equal(t, http.StatusBadRequest, resp.StatusCode) @@ -260,87 +269,79 @@ func TestPromReadHandler_validateRequest(t *testing.T) { } cases := []struct { - Name string - Params *models.RequestParams - Max int64 - ErrorExpected bool + name string + params models.RequestParams + max int + errorExpected bool }{{ - Name: "under limit", - Params: &models.RequestParams{ + name: "under limit", + params: models.RequestParams{ Step: time.Second, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 1), }, - Max: 3601, - ErrorExpected: false, + max: 3601, + errorExpected: false, }, { - Name: "at limit", - Params: &models.RequestParams{ + name: "at limit", + params: models.RequestParams{ Step: time.Second, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 1), }, - Max: 3600, - ErrorExpected: false, + max: 3600, + errorExpected: false, }, { - Name: "over limit", - Params: &models.RequestParams{ + name: "over limit", + params: models.RequestParams{ Step: time.Second, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 1), }, - Max: 3599, - ErrorExpected: true, + max: 3599, + errorExpected: true, }, { - Name: "large query, limit disabled (0)", - Params: &models.RequestParams{ + name: "large query, limit disabled (0)", + params: models.RequestParams{ Step: time.Second, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 1), }, - Max: 0, - ErrorExpected: false, + max: 0, + errorExpected: false, }, { - Name: "large query, limit disabled (negative)", - Params: &models.RequestParams{ + name: "large query, limit disabled (negative)", + params: models.RequestParams{ Step: time.Second, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 1), }, - Max: -50, - ErrorExpected: false, + max: -50, + errorExpected: false, }, { - Name: "uneven step over limit", - Params: &models.RequestParams{ + name: "uneven step over limit", + params: models.RequestParams{ Step: 34 * time.Minute, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 11), }, - Max: 1, - ErrorExpected: true, + max: 1, + errorExpected: true, }, { - Name: "uneven step under limit", - Params: &models.RequestParams{ + name: "uneven step under limit", + params: models.RequestParams{ Step: 34 * time.Minute, Start: dt(2018, 1, 1, 0), End: dt(2018, 1, 1, 1), }, - Max: 2, - ErrorExpected: false}, + max: 2, + errorExpected: false}, } for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - setup := newTestSetup() - setup.Handlers.Read.limitsCfg = &config.LimitsConfiguration{ - PerQuery: config.PerQueryLimitsConfiguration{ - PrivateMaxComputedDatapoints: tc.Max, - }, - } - - err := setup.Handlers.Read.validateRequest(tc.Params) - - if tc.ErrorExpected { + t.Run(tc.name, func(t *testing.T) { + err := validateRequest(tc.params, tc.max) + if tc.errorExpected { require.Error(t, err) } else { require.NoError(t, err) diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 06118f5cdc..5517196f80 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -25,7 +25,6 @@ import ( "context" "net/http" "sync" - "time" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" @@ -38,7 +37,6 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" "github.com/golang/protobuf/proto" @@ -55,27 +53,19 @@ const ( PromReadHTTPMethod = http.MethodPost ) -// PromReadHandler represents a handler for prometheus read endpoint. -type PromReadHandler struct { - engine executor.Engine - promReadMetrics promReadMetrics - timeoutOpts *prometheus.TimeoutOpts - fetchOptionsBuilder handleroptions.FetchOptionsBuilder - keepEmpty bool - instrumentOpts instrument.Options +// promReadHandler is a handler for the prometheus remote read endpoint. +type promReadHandler struct { + promReadMetrics promReadMetrics + opts options.HandlerOptions } // NewPromReadHandler returns a new instance of handler. func NewPromReadHandler(opts options.HandlerOptions) http.Handler { taggedScope := opts.InstrumentOpts().MetricsScope(). Tagged(map[string]string{"handler": "remote-read"}) - return &PromReadHandler{ - engine: opts.Engine(), - promReadMetrics: newPromReadMetrics(taggedScope), - timeoutOpts: opts.TimeoutOpts(), - fetchOptionsBuilder: opts.FetchOptionsBuilder(), - keepEmpty: opts.Config().ResultOptions.KeepNans, - instrumentOpts: opts.InstrumentOpts(), + return &promReadHandler{ + promReadMetrics: newPromReadMetrics(taggedScope), + opts: opts, } } @@ -98,71 +88,79 @@ func newPromReadMetrics(scope tally.Scope) promReadMetrics { } } -func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { timer := h.promReadMetrics.fetchTimerSuccess.Start() + defer timer.Stop() + ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) - logger := logging.WithContext(ctx, h.instrumentOpts) - req, rErr := h.parseRequest(r) + logger := logging.WithContext(ctx, h.opts.InstrumentOpts()) + req, fetchOpts, rErr := parseRequest(ctx, r, h.opts) if rErr != nil { - xhttp.Error(w, rErr.Inner(), rErr.Code()) - return - } - - timeout, err := prometheus.ParseRequestTimeout(r, h.timeoutOpts.FetchTimeout) - if err != nil { + err := rErr.Inner() h.promReadMetrics.fetchErrorsClient.Inc(1) - xhttp.Error(w, err, http.StatusBadRequest) - return - } - - fetchOpts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) - if rErr != nil { - xhttp.Error(w, rErr.Inner(), rErr.Code()) + logger.Error("remote read query parse error", + zap.Error(err), + zap.Any("req", req), + zap.Any("fetchOpts", fetchOpts)) + xhttp.Error(w, err, rErr.Code()) return } - readResult, err := h.read(ctx, w, req, timeout, fetchOpts) + cancelWatcher := handler.NewResponseWriterCanceller(w, h.opts.InstrumentOpts()) + readResult, err := Read(ctx, cancelWatcher, req, fetchOpts, h.opts) if err != nil { h.promReadMetrics.fetchErrorsServer.Inc(1) logger.Error("remote read query error", zap.Error(err), zap.Any("req", req), - zap.Duration("timeout", timeout), zap.Any("fetchOpts", fetchOpts)) xhttp.Error(w, err, http.StatusInternalServerError) return } + // NB: if this errors, all relevant headers and information should already + // be sent to the writer; so it is not necessary to do anything here other + // than increment success/failure metrics. + err = WriteSnappyCompressed(w, readResult, logger) + if err != nil { + h.promReadMetrics.fetchErrorsServer.Inc(1) + } else { + h.promReadMetrics.fetchSuccess.Inc(1) + } +} + +// WriteSnappyCompressed writes snappy compressed results to the given writer. +func WriteSnappyCompressed( + w http.ResponseWriter, + readResult ReadResult, + logger *zap.Logger, +) error { resp := &prompb.ReadResponse{ - Results: readResult.result, + Results: readResult.Result, } data, err := proto.Marshal(resp) if err != nil { - h.promReadMetrics.fetchErrorsServer.Inc(1) logger.Error("unable to marshal read results to protobuf", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) - return + return err } w.Header().Set("Content-Type", "application/x-protobuf") w.Header().Set("Content-Encoding", "snappy") - handleroptions.AddWarningHeaders(w, readResult.meta) + handleroptions.AddWarningHeaders(w, readResult.Meta) compressed := snappy.Encode(nil, data) if _, err := w.Write(compressed); err != nil { - h.promReadMetrics.fetchErrorsServer.Inc(1) logger.Error("unable to encode read results to snappy", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) - return } - timer.Stop() - h.promReadMetrics.fetchSuccess.Inc(1) + return err } -func (h *PromReadHandler) parseRequest( +func parseCompressedRequest( r *http.Request, ) (*prompb.ReadRequest, *xhttp.ParseError) { result, err := prometheus.ParsePromCompressedRequest(r) @@ -178,18 +176,45 @@ func (h *PromReadHandler) parseRequest( return &req, nil } -type readResult struct { - meta block.ResultMetadata - result []*prompb.QueryResult +// ReadResult is a read result. +type ReadResult struct { + Meta block.ResultMetadata + Result []*prompb.QueryResult } -func (h *PromReadHandler) read( - reqCtx context.Context, - w http.ResponseWriter, +func parseRequest( + ctx context.Context, + r *http.Request, + opts options.HandlerOptions, +) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) { + req, rErr := parseCompressedRequest(r) + if rErr != nil { + return nil, nil, rErr + } + + timeout := opts.TimeoutOpts().FetchTimeout + timeout, err := prometheus.ParseRequestTimeout(r, timeout) + if err != nil { + return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + fetchOpts, rErr := opts.FetchOptionsBuilder().NewFetchOptions(r) + if rErr != nil { + return nil, nil, rErr + } + + fetchOpts.Timeout = timeout + return req, fetchOpts, nil +} + +// Read performs a remote read on the given engine. +func Read( + ctx context.Context, + cancelWatcher handler.CancelWatcher, r *prompb.ReadRequest, - timeout time.Duration, fetchOpts *storage.FetchOptions, -) (readResult, error) { + opts options.HandlerOptions, +) (ReadResult, error) { var ( queryCount = len(r.Queries) cancelFuncs = make([]context.CancelFunc, queryCount) @@ -200,6 +225,8 @@ func (h *PromReadHandler) read( LimitMaxTimeseries: fetchOpts.Limit, }} + engine = opts.Engine() + wg sync.WaitGroup mu sync.Mutex multiErr xerrors.MultiError @@ -210,7 +237,7 @@ func (h *PromReadHandler) read( i, promQuery := i, promQuery // Capture vars for lambda. go func() { defer wg.Done() - ctx, cancel := context.WithTimeout(reqCtx, timeout) + ctx, cancel := context.WithTimeout(ctx, fetchOpts.Timeout) cancelFuncs[i] = cancel query, err := storage.PromReadQueryToM3(promQuery) if err != nil { @@ -220,9 +247,12 @@ func (h *PromReadHandler) read( return } - // Detect clients closing connections - handler.CloseWatcher(ctx, cancel, w, h.instrumentOpts) - result, err := h.engine.ExecuteProm(ctx, query, queryOpts, fetchOpts) + // Detect clients closing connections. + if cancelWatcher != nil { + cancelWatcher.WatchForCancel(ctx, cancel) + } + + result, err := engine.ExecuteProm(ctx, query, queryOpts, fetchOpts) if err != nil { mu.Lock() multiErr = multiErr.Add(err) @@ -245,10 +275,10 @@ func (h *PromReadHandler) read( } if err := multiErr.FinalError(); err != nil { - return readResult{result: nil, meta: meta}, err + return ReadResult{Result: nil, Meta: meta}, err } - return readResult{result: queryResults, meta: meta}, nil + return ReadResult{Result: queryResults, Meta: meta}, nil } // filterResults removes series tags based on options. diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index cc1d7399e8..28da6ed868 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -26,12 +26,14 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "time" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" @@ -45,6 +47,7 @@ import ( "github.com/m3db/m3/src/query/test/m3" xclock "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -77,7 +80,7 @@ func newEngine( } func setupServer(t *testing.T) *httptest.Server { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) // No calls expected on session object lstore, session := m3.NewStorageAndSession(t, ctrl) session.EXPECT(). @@ -89,87 +92,80 @@ func setupServer(t *testing.T) *httptest.Server { return server } -func readHandler(store storage.Storage, timeoutOpts *prometheus.TimeoutOpts) *PromReadHandler { - opts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} - engine := newEngine(store, defaultLookbackDuration, nil, - instrument.NewOptions()) - return &PromReadHandler{ - engine: engine, - promReadMetrics: promReadTestMetrics, - timeoutOpts: timeoutOpts, - fetchOptionsBuilder: handleroptions.NewFetchOptionsBuilder(opts), - instrumentOpts: instrument.NewOptions(), - } +func readHandler(store storage.Storage, + timeoutOpts *prometheus.TimeoutOpts) http.Handler { + fetchOpts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} + iOpts := instrument.NewOptions() + engine := newEngine(store, defaultLookbackDuration, nil, iOpts) + opts := options.EmptyHandlerOptions(). + SetEngine(engine). + SetInstrumentOpts(iOpts). + SetFetchOptionsBuilder(handleroptions.NewFetchOptionsBuilder(fetchOpts)). + SetTimeoutOpts(timeoutOpts) + + return NewPromReadHandler(opts) } func TestPromReadParsing(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) storage, _ := m3.NewStorageAndSession(t, ctrl) - opts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} + builderOpts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} engine := newEngine(storage, defaultLookbackDuration, nil, instrument.NewOptions()) - promRead := &PromReadHandler{ - engine: engine, - promReadMetrics: promReadTestMetrics, - fetchOptionsBuilder: handleroptions.NewFetchOptionsBuilder(opts), - } + + opts := options.EmptyHandlerOptions(). + SetEngine(engine). + SetFetchOptionsBuilder(handleroptions.NewFetchOptionsBuilder(builderOpts)). + SetTimeoutOpts(timeoutOpts) req := httptest.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) - r, err := promRead.parseRequest(req) + r, fetchOpts, err := parseRequest(context.TODO(), req, opts) require.Nil(t, err, "unable to parse request") require.Equal(t, len(r.Queries), 1) + fmt.Println(fetchOpts) } func TestPromFetchTimeoutParsing(t *testing.T) { - ctrl := gomock.NewController(t) - storage, _ := m3.NewStorageAndSession(t, ctrl) - opts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} - engine := newEngine(storage, defaultLookbackDuration, nil, - instrument.NewOptions()) - promRead := &PromReadHandler{ - engine: engine, - promReadMetrics: promReadTestMetrics, - timeoutOpts: &prometheus.TimeoutOpts{ - FetchTimeout: 2 * time.Minute, - }, - fetchOptionsBuilder: handleroptions.NewFetchOptionsBuilder(opts), - } - - req := httptest.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) - dur, err := prometheus.ParseRequestTimeout(req, promRead.timeoutOpts.FetchTimeout) + url := fmt.Sprintf("%s?timeout=2m", PromReadURL) + req := httptest.NewRequest("POST", url, test.GeneratePromReadBody(t)) + dur, err := prometheus.ParseRequestTimeout(req, time.Second) require.NoError(t, err) assert.Equal(t, 2*time.Minute, dur) } func TestPromReadParsingBad(t *testing.T) { - ctrl := gomock.NewController(t) - storage, _ := m3.NewStorageAndSession(t, ctrl) - promRead := readHandler(storage, timeoutOpts) req := httptest.NewRequest("POST", PromReadURL, strings.NewReader("bad body")) - _, err := promRead.parseRequest(req) + _, _, err := parseRequest(context.TODO(), req, options.EmptyHandlerOptions()) require.NotNil(t, err, "unable to parse request") } func TestPromReadStorageWithFetchError(t *testing.T) { - ctrl := gomock.NewController(t) - store, session := m3.NewStorageAndSession(t, ctrl) - session.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, client.FetchResponseMetadata{Exhaustive: true}, fmt.Errorf("unable to get data")) - session.EXPECT().IteratorPools(). - Return(nil, nil) - promRead := readHandler(store, timeoutOpts) - req := test.GeneratePromReadRequest() - recorder := httptest.NewRecorder() - res, err := promRead.read(context.TODO(), recorder, - req, time.Hour, storage.NewFetchOptions()) + ctrl := xtest.NewController(t) + watcher := &cancelWatcher{} + readRequest := &prompb.ReadRequest{ + Queries: []*prompb.Query{ + {}, + }, + } + + fetchOpts := &storage.FetchOptions{} + result := storage.PromResult{Metadata: block.ResultMetadata{ + Exhaustive: true, LocalOnly: true}} + engine := executor.NewMockEngine(ctrl) + engine.EXPECT(). + ExecuteProm(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(result, fmt.Errorf("expr err")) + + opts := options.EmptyHandlerOptions().SetEngine(engine) + res, err := Read(context.TODO(), watcher, readRequest, fetchOpts, opts) require.Error(t, err, "unable to read from storage") - header := recorder.Header().Get(handleroptions.LimitHeader) - assert.Equal(t, 0, len(header)) - meta := res.meta + meta := res.Meta assert.True(t, meta.Exhaustive) assert.True(t, meta.LocalOnly) assert.Equal(t, 0, len(meta.Warnings)) + + assert.Equal(t, 1, watcher.count) } func TestQueryMatchMustBeEqual(t *testing.T) { @@ -208,7 +204,7 @@ func TestQueryKillOnTimeout(t *testing.T) { } func TestReadErrorMetricsCount(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) storage, session := m3.NewStorageAndSession(t, ctrl) session.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil, client.FetchResponseMetadata{Exhaustive: true}, fmt.Errorf("unable to get data")) @@ -219,15 +215,16 @@ func TestReadErrorMetricsCount(t *testing.T) { scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Millisecond) defer closer.Close() readMetrics := newPromReadMetrics(scope) - opts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} + buildOpts := handleroptions.FetchOptionsBuilderOptions{Limit: 100} engine := newEngine(storage, defaultLookbackDuration, nil, instrument.NewOptions()) - promRead := &PromReadHandler{ - engine: engine, - promReadMetrics: readMetrics, - timeoutOpts: timeoutOpts, - fetchOptionsBuilder: handleroptions.NewFetchOptionsBuilder(opts), - instrumentOpts: instrument.NewOptions(), + opts := options.EmptyHandlerOptions(). + SetEngine(engine). + SetTimeoutOpts(&prometheus.TimeoutOpts{FetchTimeout: time.Minute}). + SetFetchOptionsBuilder(handleroptions.NewFetchOptionsBuilder(buildOpts)) + promRead := &promReadHandler{ + promReadMetrics: readMetrics, + opts: opts, } req := httptest.NewRequest("POST", PromReadURL, test.GeneratePromReadBody(t)) @@ -240,7 +237,7 @@ func TestReadErrorMetricsCount(t *testing.T) { } func TestMultipleRead(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() now := time.Now() @@ -249,7 +246,7 @@ func TestMultipleRead(t *testing.T) { r := storage.PromResult{ PromResult: &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Samples: []prompb.Sample{{Value: 1, Timestamp: promNow}}, Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}}, }, @@ -258,14 +255,14 @@ func TestMultipleRead(t *testing.T) { Metadata: block.ResultMetadata{ Exhaustive: true, LocalOnly: true, - Warnings: []block.Warning{block.Warning{Name: "foo", Message: "bar"}}, + Warnings: []block.Warning{{Name: "foo", Message: "bar"}}, }, } rTwo := storage.PromResult{ PromResult: &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Samples: []prompb.Sample{{Value: 2, Timestamp: promNow}}, Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}}, }, @@ -305,35 +302,38 @@ func TestMultipleRead(t *testing.T) { }, }) - h := NewPromReadHandler(handlerOpts).(*PromReadHandler) - res, err := h.read(context.TODO(), nil, req, 0, storage.NewFetchOptions()) + fetchOpts := &storage.FetchOptions{} + watcher := &cancelWatcher{} + res, err := Read(context.TODO(), watcher, req, fetchOpts, handlerOpts) require.NoError(t, err) expected := &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}}, Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}}, }, - &prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: []byte("c"), Value: []byte("d")}}, Samples: []prompb.Sample{{Timestamp: promNow, Value: 2}}, }, }, } - result := res.result + result := res.Result assert.Equal(t, expected.Timeseries[0], result[0].Timeseries[0]) assert.Equal(t, expected.Timeseries[1], result[1].Timeseries[0]) - meta := res.meta + meta := res.Meta assert.False(t, meta.Exhaustive) assert.True(t, meta.LocalOnly) require.Equal(t, 1, len(meta.Warnings)) assert.Equal(t, "foo_bar", meta.Warnings[0].Header()) + + assert.Equal(t, 2, watcher.count) } func TestReadWithOptions(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() now := time.Now() @@ -342,7 +342,7 @@ func TestReadWithOptions(t *testing.T) { r := storage.PromResult{ PromResult: &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Samples: []prompb.Sample{{Value: 1, Timestamp: promNow}}, Labels: []prompb.Label{ {Name: []byte("a"), Value: []byte("b")}, @@ -366,8 +366,8 @@ func TestReadWithOptions(t *testing.T) { ExecuteProm(gomock.Any(), q, gomock.Any(), gomock.Any()). Return(r, nil) - opts := storage.NewFetchOptions() - opts.RestrictQueryOptions = &storage.RestrictQueryOptions{ + fetchOpts := storage.NewFetchOptions() + fetchOpts.RestrictQueryOptions = &storage.RestrictQueryOptions{ RestrictByTag: &storage.RestrictByTag{ Strip: [][]byte{[]byte("remove")}, }, @@ -380,18 +380,30 @@ func TestReadWithOptions(t *testing.T) { }, }) - h := NewPromReadHandler(handlerOpts).(*PromReadHandler) - res, err := h.read(context.TODO(), nil, req, 0, opts) + res, err := Read(context.TODO(), nil, req, fetchOpts, handlerOpts) require.NoError(t, err) expected := &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ - &prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: []byte("a"), Value: []byte("b")}}, Samples: []prompb.Sample{{Timestamp: promNow, Value: 1}}, }, }, } - result := res.result + result := res.Result assert.Equal(t, expected.Timeseries[0], result[0].Timeseries[0]) } + +type cancelWatcher struct { + sync.Mutex + count int +} + +var _ handler.CancelWatcher = (*cancelWatcher)(nil) + +func (c *cancelWatcher) WatchForCancel(context.Context, context.CancelFunc) { + c.Lock() + c.count++ + c.Unlock() +} diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 6a37244a32..81f06d9364 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -156,6 +156,17 @@ func (h *Handler) RegisterRoutes() error { Tagged(v1APIGroup), )) + // Register custom endpoints. + for _, custom := range h.customHandlers { + handler, err := custom.Handler(nativeSourceOpts) + if err != nil { + return err + } + + h.router.HandleFunc(custom.Route(), handler.ServeHTTP). + Methods(custom.Methods()...) + } + nativePromReadHandler := native.NewPromReadHandler(nativeSourceOpts) h.router.HandleFunc(remote.PromReadURL, wrapped(promRemoteReadHandler).ServeHTTP, @@ -280,17 +291,6 @@ func (h *Handler) RegisterRoutes() error { } } - // Register custom endpoints. - for _, custom := range h.customHandlers { - handler, err := custom.Handler(h.options) - if err != nil { - return err - } - - h.router.HandleFunc(custom.Route(), handler.ServeHTTP). - Methods(custom.Methods()...) - } - h.registerHealthEndpoints() h.registerProfileEndpoints() h.registerRoutesEndpoint() diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index afa733b54f..719ce1ad08 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -157,7 +157,7 @@ func TestPromRemoteReadGet(t *testing.T) { err = h.RegisterRoutes() require.NoError(t, err, "unable to register routes") h.Router().ServeHTTP(res, req) - require.Equal(t, res.Code, http.StatusMethodNotAllowed, "GET method not defined") + require.Equal(t, http.StatusMethodNotAllowed, res.Code) } func TestPromRemoteReadPost(t *testing.T) { @@ -171,7 +171,7 @@ func TestPromRemoteReadPost(t *testing.T) { err = h.RegisterRoutes() require.NoError(t, err, "unable to register routes") h.Router().ServeHTTP(res, req) - require.Equal(t, res.Code, http.StatusBadRequest, "Empty request") + require.Equal(t, http.StatusBadRequest, res.Code, "Empty request") } func TestPromNativeReadGet(t *testing.T) { @@ -184,7 +184,7 @@ func TestPromNativeReadGet(t *testing.T) { require.NoError(t, err, "unable to setup handler") h.RegisterRoutes() h.Router().ServeHTTP(res, req) - require.Equal(t, res.Code, http.StatusBadRequest, "Empty request") + require.Equal(t, http.StatusBadRequest, res.Code, "Empty request") } func TestPromNativeReadPost(t *testing.T) { @@ -197,7 +197,7 @@ func TestPromNativeReadPost(t *testing.T) { require.NoError(t, err, "unable to setup handler") h.RegisterRoutes() h.Router().ServeHTTP(res, req) - require.Equal(t, res.Code, http.StatusBadRequest, "Empty request") + require.Equal(t, http.StatusBadRequest, res.Code, "Empty request") } func TestJSONWritePost(t *testing.T) { @@ -210,7 +210,7 @@ func TestJSONWritePost(t *testing.T) { require.NoError(t, err, "unable to setup handler") h.RegisterRoutes() h.Router().ServeHTTP(res, req) - require.Equal(t, res.Code, http.StatusBadRequest, "Empty request") + require.Equal(t, http.StatusBadRequest, res.Code, "Empty request") } func TestRoutesGet(t *testing.T) { diff --git a/src/query/api/v1/options/handler.go b/src/query/api/v1/options/handler.go index 40f6450481..95ea7912de 100644 --- a/src/query/api/v1/options/handler.go +++ b/src/query/api/v1/options/handler.go @@ -21,6 +21,7 @@ package options import ( + "io" "net/http" "time" @@ -35,12 +36,22 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" ) const defaultTimeout = 30 * time.Second +// OptionTransformFn transforms given handler options. +type OptionTransformFn func(opts HandlerOptions) HandlerOptions + +// CustomHandlerOptions is a list of custom handler options. +type CustomHandlerOptions struct { + CustomHandlers []CustomHandler + OptionTransformFn OptionTransformFn +} + // CustomHandler allows for custom third party http handlers. type CustomHandler interface { // Route is the custom handler route. @@ -51,6 +62,10 @@ type CustomHandler interface { Handler(handlerOptions HandlerOptions) (http.Handler, error) } +// RemoteReadRenderer renders remote read output. +type RemoteReadRenderer func(io.Writer, []*ts.Series, + models.RequestParams, bool) + // HandlerOptions represents handler options. type HandlerOptions interface { // CreatedAt returns the time the options were created. @@ -136,7 +151,7 @@ type HandlerOptions interface { // SetNowFn sets the now function. SetNowFn(f clock.NowFn) HandlerOptions - // InstrumentOpts returns the instrumentation optoins. + // InstrumentOpts returns the instrumentation options. InstrumentOpts() instrument.Options // SetInstrumentOpts sets instrumentation options. SetInstrumentOpts(opts instrument.Options) HandlerOptions diff --git a/src/query/block/meta.go b/src/query/block/meta.go index 1fd70e5e29..97a9b6444e 100644 --- a/src/query/block/meta.go +++ b/src/query/block/meta.go @@ -159,6 +159,25 @@ func (w Warnings) addWarnings(warnings ...Warning) Warnings { return w } +// WarningStrings converts warnings to a slice of strings for presentation. +func (m ResultMetadata) WarningStrings() []string { + size := len(m.Warnings) + if !m.Exhaustive { + size++ + } + + strs := make([]string, 0, size) + for _, warn := range m.Warnings { + strs = append(strs, warn.Header()) + } + + if !m.Exhaustive { + strs = append(strs, "m3db exceeded query limit: results not exhaustive") + } + + return strs +} + // Warning is a message that indicates potential partial or incomplete results. type Warning struct { // Name is the name of the store originating the warning. diff --git a/src/query/server/cost_reporters_test.go b/src/query/server/cost_reporters_test.go index 9ac86cebc7..a301123bb8 100644 --- a/src/query/server/cost_reporters_test.go +++ b/src/query/server/cost_reporters_test.go @@ -47,7 +47,7 @@ func (c enforcerTestCtx) Close() { } func TestNewConfiguredChainedEnforcer(t *testing.T) { - setup := func(t *testing.T, perQueryLimit, globalLimit int64) enforcerTestCtx { + setup := func(t *testing.T, perQueryLimit, globalLimit int) enforcerTestCtx { s := tally.NewTestScope("", nil) iopts := instrument.NewOptions().SetMetricsScope(s) diff --git a/src/query/server/query.go b/src/query/server/query.go index d6a0d4bc74..abd8fece09 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -129,8 +129,8 @@ type RunOptions struct { // ready signal once it is open. DownsamplerReadyCh chan<- struct{} - // CustomHandlers is a list of custom 3rd party handlers. - CustomHandlers []options.CustomHandler + // CustomHandlerOptions contains custom handler options. + CustomHandlerOptions options.CustomHandlerOptions // CustomPromQLParseFunction is a custom PromQL parsing function. CustomPromQLParseFunction promql.ParseFn @@ -372,7 +372,12 @@ func Run(runOpts RunOptions) { logger.Fatal("unable to set up handler options", zap.Error(err)) } - handler := httpd.NewHandler(handlerOptions, runOpts.CustomHandlers...) + if fn := runOpts.CustomHandlerOptions.OptionTransformFn; fn != nil { + handlerOptions = fn(handlerOptions) + } + + customHandlers := runOpts.CustomHandlerOptions.CustomHandlers + handler := httpd.NewHandler(handlerOptions, customHandlers...) if err := handler.RegisterRoutes(); err != nil { logger.Fatal("unable to register routes", zap.Error(err)) } diff --git a/src/query/storage/types.go b/src/query/storage/types.go index 93935be85a..82d528af16 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -127,6 +127,8 @@ type FetchOptions struct { // IncludeResolution if set, appends resolution information to fetch results. // Currently only used for graphite queries. IncludeResolution bool + // Timeout is the timeout for the request. + Timeout time.Duration } // FanoutOptions describes which namespaces should be fanned out to for diff --git a/src/x/json/json.go b/src/x/json/json.go new file mode 100644 index 0000000000..621d819d3c --- /dev/null +++ b/src/x/json/json.go @@ -0,0 +1,45 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package json allows for easy duck typing of JSON, i.e. for test stubs +// to compare JSON output against (such as src/x/test.MustPrettyJSON). +package json + +import ( + "bytes" + "encoding/json" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +// Map is an untyped JSON map representation. +type Map map[string]interface{} + +// Array is an untyped JSON array representation. +type Array []interface{} + +// MustNewTestReader returns a io.Reader reader to the JSON value. +func MustNewTestReader(t *testing.T, value interface{}) io.Reader { + reader := bytes.NewBuffer(nil) + require.NoError(t, json.NewEncoder(reader).Encode(value)) + return reader +} diff --git a/src/x/test/diff.go b/src/x/test/diff.go index d8db0f9c48..669a828090 100644 --- a/src/x/test/diff.go +++ b/src/x/test/diff.go @@ -24,6 +24,8 @@ import ( "encoding/json" "testing" + xjson "github.com/m3db/m3/src/x/json" + "github.com/sergi/go-diff/diffmatchpatch" "github.com/stretchr/testify/require" ) @@ -36,8 +38,15 @@ func Diff(expected, actual string) string { return dmp.DiffPrettyText(diffs) } -// MustPrettyJSON returns an indented version of the JSON. -func MustPrettyJSON(t *testing.T, str string) string { +// MustPrettyJSONMap returns an indented JSON string of the object. +func MustPrettyJSONMap(t *testing.T, value xjson.Map) string { + pretty, err := json.MarshalIndent(value, "", " ") + require.NoError(t, err) + return string(pretty) +} + +// MustPrettyJSONString returns an indented version of the JSON. +func MustPrettyJSONString(t *testing.T, str string) string { var unmarshalled map[string]interface{} err := json.Unmarshal([]byte(str), &unmarshalled) require.NoError(t, err)