diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index dd08e45b47..56599a1780 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -10,6 +10,7 @@ function retry_with_backoff { local max_attempts=${ATTEMPTS-5} local timeout=${TIMEOUT-1} + local max_timeout=${MAX_TIMEOUT} local attempt=1 local exitCode=0 @@ -27,6 +28,11 @@ function retry_with_backoff { sleep $timeout attempt=$(( attempt + 1 )) timeout=$(( timeout * 2 )) + if [[ $max_timeout != "" ]]; then + if [[ $timeout -gt $max_timeout ]]; then + timeout=$max_timeout + fi + fi done if [[ $exitCode != 0 ]] diff --git a/scripts/docker-integration-tests/prometheus/m3coordinator.yml b/scripts/docker-integration-tests/prometheus/m3coordinator.yml index 6d513c5fb2..4b050a2033 100644 --- a/scripts/docker-integration-tests/prometheus/m3coordinator.yml +++ b/scripts/docker-integration-tests/prometheus/m3coordinator.yml @@ -12,6 +12,10 @@ metrics: samplingRate: 1.0 extended: none +limits: + perQuery: + maxFetchedSeries: 100 + clusters: - namespaces: - namespace: agg diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 04362a8911..280d6a0267 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -24,12 +24,12 @@ docker-compose -f ${COMPOSE_FILE} up -d prometheus01 # Ensure Prometheus can proxy a Prometheus query echo "Wait until the remote write endpoint generates and allows for data to be queried" -ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \ +ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ '[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=prometheus_remote_storage_succeeded_samples_total | jq -r .data.result[].value[1]) -gt 100 ]]' # Make sure we're proxying writes to the unaggregated namespace echo "Wait until data begins being written to remote storage for the unaggregated namespace" -ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \ +ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ '[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"unagg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]' # Make sure we're proxying writes to the aggregated namespace @@ -37,3 +37,14 @@ echo "Wait until data begins being written to remote storage for the aggregated ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ '[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"agg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]' +# Test the default series limit applied when directly querying +# coordinator (limit set to 100 in m3coordinator.yml) +echo "Test query limit with coordinator defaults" +ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ + '[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 100 ]]' + +# Test the default series limit applied when directly querying +# coordinator (limit set by header) +echo "Test query limit with coordinator limit header" +ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ + '[[ $(curl -s -H "M3-Limit-Max-Series: 10" 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 10 ]]' diff --git a/scripts/docker-integration-tests/setup.sh b/scripts/docker-integration-tests/setup.sh index 1b1859af91..d64409a64b 100755 --- a/scripts/docker-integration-tests/setup.sh +++ b/scripts/docker-integration-tests/setup.sh @@ -7,7 +7,9 @@ cd $GOPATH/src/github.com/m3db/m3 SERVICES=(m3dbnode m3coordinator m3aggregator) REVISION=$(git rev-parse HEAD) -make clean +if [[ $SKIP_CLEAN != "true" ]]; then + make clean +fi mkdir -p ./bin # by keeping all the required files in ./bin, it makes the build context diff --git a/scripts/docker-integration-tests/simple/test.sh b/scripts/docker-integration-tests/simple/test.sh index fa6d330e74..dce45d5ac2 100755 --- a/scripts/docker-integration-tests/simple/test.sh +++ b/scripts/docker-integration-tests/simple/test.sh @@ -98,8 +98,8 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ "isolation_group": "rack-a", "zone": "embedded", "weight": 1024, - "endpoint": "127.0.0.1::9000", - "hostname": "127.0.0.1:", + "endpoint": "127.0.0.1:9000", + "hostname": "127.0.0.1", "port": 9000 } ] diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 8a2c7836e1..cc0e0aa95d 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -30,6 +30,7 @@ import ( ingestm3msg "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest/m3msg" "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" @@ -62,6 +63,8 @@ var ( defaultLookbackDuration = 5 * time.Minute defaultCarbonIngesterAggregationType = aggregation.Mean + + defaultStorageQueryLimit = 10000 ) // Configuration is the configuration for the query service. @@ -125,7 +128,11 @@ type Configuration struct { // Cache configurations. // - // Deprecated: cache configurations are no longer supported. Remove from file. + // Deprecated: cache configurations are no longer supported. Remove from file + // when we can make breaking changes. + // (If/when removed it will make existing configurations with the cache + // stanza not able to startup the binary since we parse YAML in strict mode + // by default). DeprecatedCache CacheConfiguration `yaml:"cache"` } @@ -168,8 +175,8 @@ type ResultOptions struct { KeepNans bool `yaml:"keepNans"` } -// LimitsConfiguration represents limitations on resource usage in the query instance. Limits are split between per-query -// and global limits. +// LimitsConfiguration represents limitations on resource usage in the query +// instance. Limits are split between per-query and global limits. type LimitsConfiguration struct { // deprecated: use PerQuery.MaxComputedDatapoints instead. DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` @@ -195,18 +202,22 @@ func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 { return lc.DeprecatedMaxComputedDatapoints } -// GlobalLimitsConfiguration represents limits on resource usage across a query instance. Zero or negative values imply no limit. +// GlobalLimitsConfiguration represents limits on resource usage across a query +// instance. Zero or negative values imply no limit. type GlobalLimitsConfiguration struct { - // MaxFetchedDatapoints limits the total number of datapoints actually fetched by all queries at any given time. + // MaxFetchedDatapoints limits the total number of datapoints actually + // fetched by all queries at any given time. MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"` } -// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints. +// AsLimitManagerOptions converts this configuration to +// cost.LimitManagerOptions for MaxFetchedDatapoints. func (l *GlobalLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions { return toLimitManagerOptions(l.MaxFetchedDatapoints) } -// PerQueryLimitsConfiguration represents limits on resource usage within a single query. Zero or negative values imply no limit. +// PerQueryLimitsConfiguration represents limits on resource usage within a +// single query. Zero or negative values imply no limit. type PerQueryLimitsConfiguration struct { // PrivateMaxComputedDatapoints limits the number of datapoints that can be // returned by a query. It's determined purely @@ -217,15 +228,34 @@ type PerQueryLimitsConfiguration struct { // this field directly. PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"` - // MaxFetchedDatapoints limits the number of datapoints actually used by a given query. + // MaxFetchedDatapoints limits the number of datapoints actually used by a + // given query. MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"` + + // MaxFetchedSeries limits the number of time series returned by a storage node. + MaxFetchedSeries int64 `yaml:"maxFetchedSeries"` } -// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints. +// AsLimitManagerOptions converts this configuration to +// cost.LimitManagerOptions for MaxFetchedDatapoints. func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions { return toLimitManagerOptions(l.MaxFetchedDatapoints) } +// AsFetchOptionsBuilderOptions converts this configuration to +// handler.FetchOptionsBuilderOptions. +func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handler.FetchOptionsBuilderOptions { + if l.MaxFetchedSeries <= 0 { + return handler.FetchOptionsBuilderOptions{ + Limit: defaultStorageQueryLimit, + } + } + + return handler.FetchOptionsBuilderOptions{ + Limit: int(l.MaxFetchedSeries), + } +} + func toLimitManagerOptions(limit int64) cost.LimitManagerOptions { return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{ Threshold: cost.Cost(limit), diff --git a/src/query/api/v1/handler/fetch_options.go b/src/query/api/v1/handler/fetch_options.go new file mode 100644 index 0000000000..19eededffe --- /dev/null +++ b/src/query/api/v1/handler/fetch_options.go @@ -0,0 +1,68 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "net/http" + "strconv" + + "github.com/m3db/m3/src/query/storage" + xhttp "github.com/m3db/m3/src/x/net/http" +) + +// FetchOptionsBuilder builds fetch options based on a request and default +// config. +type FetchOptionsBuilder interface { + NewFetchOptions(req *http.Request) (*storage.FetchOptions, *xhttp.ParseError) +} + +// FetchOptionsBuilderOptions provides options to use when creating a +// fetch options builder. +type FetchOptionsBuilderOptions struct { + Limit int +} + +type fetchOptionsBuilder struct { + opts FetchOptionsBuilderOptions +} + +// NewFetchOptionsBuilder returns a new fetch options builder. +func NewFetchOptionsBuilder( + opts FetchOptionsBuilderOptions, +) FetchOptionsBuilder { + return fetchOptionsBuilder{opts: opts} +} + +func (b fetchOptionsBuilder) NewFetchOptions( + req *http.Request, +) (*storage.FetchOptions, *xhttp.ParseError) { + fetchOpts := storage.NewFetchOptions() + fetchOpts.Limit = b.opts.Limit + if str := req.Header.Get(LimitMaxSeriesHeader); str != "" { + n, err := strconv.Atoi(str) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + fetchOpts.Limit = n + } + + return fetchOpts, nil +} diff --git a/src/query/api/v1/handler/fetch_options_test.go b/src/query/api/v1/handler/fetch_options_test.go new file mode 100644 index 0000000000..0ab6418a17 --- /dev/null +++ b/src/query/api/v1/handler/fetch_options_test.go @@ -0,0 +1,83 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package handler + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFetchOptionsBuilder(t *testing.T) { + tests := []struct { + name string + defaultLimit int + headers map[string]string + expectedLimit int + expectedErr bool + }{ + { + name: "default limit with no headers", + defaultLimit: 42, + headers: map[string]string{}, + expectedLimit: 42, + }, + { + name: "limit with header", + defaultLimit: 42, + headers: map[string]string{ + LimitMaxSeriesHeader: "4242", + }, + expectedLimit: 4242, + }, + { + name: "bad header", + defaultLimit: 42, + headers: map[string]string{ + LimitMaxSeriesHeader: "not_a_number", + }, + expectedErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{ + Limit: test.defaultLimit, + }) + + req := httptest.NewRequest("GET", "/foo", nil) + for k, v := range test.headers { + req.Header.Add(k, v) + } + + opts, err := builder.NewFetchOptions(req) + + if !test.expectedErr { + require.NoError(t, err) + require.Equal(t, test.expectedLimit, opts.Limit) + } else { + require.Error(t, err) + } + }) + } +} diff --git a/src/query/api/v1/handler/graphite/find.go b/src/query/api/v1/handler/graphite/find.go index 1068ca8d5f..30f42f73e4 100644 --- a/src/query/api/v1/handler/graphite/find.go +++ b/src/query/api/v1/handler/graphite/find.go @@ -47,15 +47,18 @@ var ( ) type grahiteFindHandler struct { - storage storage.Storage + storage storage.Storage + fetchOptionsBuilder handler.FetchOptionsBuilder } // NewFindHandler returns a new instance of handler. func NewFindHandler( storage storage.Storage, + fetchOptionsBuilder handler.FetchOptionsBuilder, ) http.Handler { return &grahiteFindHandler{ - storage: storage, + storage: storage, + fetchOptionsBuilder: fetchOptionsBuilder, } } @@ -110,16 +113,19 @@ func (h *grahiteFindHandler) ServeHTTP( return } + opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + var ( terminatedResult *storage.CompleteTagsResult tErr error childResult *storage.CompleteTagsResult cErr error - opts = storage.NewFetchOptions() - - wg sync.WaitGroup + wg sync.WaitGroup ) - wg.Add(2) go func() { terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts) diff --git a/src/query/api/v1/handler/graphite/find_test.go b/src/query/api/v1/handler/graphite/find_test.go index a8490dc519..09c055424e 100644 --- a/src/query/api/v1/handler/graphite/find_test.go +++ b/src/query/api/v1/handler/graphite/find_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" @@ -203,7 +204,8 @@ func TestFind(t *testing.T) { // setup storage and handler store := setupStorage(ctrl) - handler := NewFindHandler(store) + handler := NewFindHandler(store, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{})) // execute the query w := &writer{} diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index f050dd0857..bfb370213c 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -28,6 +28,8 @@ import ( "sort" "sync" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/graphite/common" @@ -63,9 +65,11 @@ type respError struct { // NewRenderHandler returns a new render handler around the given storage. func NewRenderHandler( storage storage.Storage, + queryContextOpts models.QueryContextOptions, enforcer cost.ChainedEnforcer, ) http.Handler { - wrappedStore := graphite.NewM3WrappedStorage(storage, enforcer) + wrappedStore := graphite.NewM3WrappedStorage(storage, + enforcer, queryContextOpts) return &renderHandler{ engine: native.NewEngine(wrappedStore), } diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index 83cbc4fe9c..74f11a34bd 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -39,7 +39,8 @@ import ( func TestParseNoQuery(t *testing.T) { mockStorage := mock.NewMockStorage() - handler := NewRenderHandler(mockStorage, nil) + handler := NewRenderHandler(mockStorage, + models.QueryContextOptions{}, nil) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, newGraphiteReadHTTPRequest(t)) @@ -51,7 +52,8 @@ func TestParseNoQuery(t *testing.T) { func TestParseQueryNoResults(t *testing.T) { mockStorage := mock.NewMockStorage() mockStorage.SetFetchResult(&storage.FetchResult{}, nil) - handler := NewRenderHandler(mockStorage, nil) + handler := NewRenderHandler(mockStorage, + models.QueryContextOptions{}, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = "target=foo.bar&from=-2h&until=now" @@ -82,7 +84,8 @@ func TestParseQueryResults(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage, nil) + handler := NewRenderHandler(mockStorage, + models.QueryContextOptions{}, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = fmt.Sprintf("target=foo.bar&from=%d&until=%d", @@ -123,7 +126,8 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage, nil) + handler := NewRenderHandler(mockStorage, + models.QueryContextOptions{}, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = "target=foo.bar&from=" + startStr + "&until=" + endStr + "&maxDataPoints=1" @@ -158,7 +162,8 @@ func TestParseQueryResultsMultiTarget(t *testing.T) { } mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil) - handler := NewRenderHandler(mockStorage, nil) + handler := NewRenderHandler(mockStorage, + models.QueryContextOptions{}, nil) req := newGraphiteReadHTTPRequest(t) req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=baz.qux&from=%d&until=%d", diff --git a/src/query/api/v1/handler/headers.go b/src/query/api/v1/handler/headers.go index e44a6aa7eb..3acb033d3c 100644 --- a/src/query/api/v1/handler/headers.go +++ b/src/query/api/v1/handler/headers.go @@ -21,18 +21,22 @@ package handler const ( - // WarningsHeader is the M3 warnings header when to display a warning to a user + // WarningsHeader is the M3 warnings header when to display a warning to a user. WarningsHeader = "M3-Warnings" - // RetryHeader is the M3 retry header to display when it is safe to retry + // RetryHeader is the M3 retry header to display when it is safe to retry. RetryHeader = "M3-Retry" - // ServedByHeader is the M3 query storage execution breakdown + // ServedByHeader is the M3 query storage execution breakdown. ServedByHeader = "M3-Storage-By" - // DeprecatedHeader is the M3 deprecated header + // DeprecatedHeader is the M3 deprecated header. DeprecatedHeader = "M3-Deprecated" + // LimitMaxSeriesHeader is the M3 limit timeseries header that limits + // the number of time series returned by each storage node. + LimitMaxSeriesHeader = "M3-Limit-Max-Series" + // DefaultServiceEnvironment is the default service ID environment. DefaultServiceEnvironment = "default_env" // DefaultServiceZone is the default service ID zone. diff --git a/src/query/api/v1/handler/prometheus/native/complete_tags.go b/src/query/api/v1/handler/prometheus/native/complete_tags.go index bd33f59a2a..fcb5b602e1 100644 --- a/src/query/api/v1/handler/prometheus/native/complete_tags.go +++ b/src/query/api/v1/handler/prometheus/native/complete_tags.go @@ -43,15 +43,18 @@ const ( // CompleteTagsHandler represents a handler for search tags endpoint. type CompleteTagsHandler struct { - storage storage.Storage + storage storage.Storage + fetchOptionsBuilder handler.FetchOptionsBuilder } // NewCompleteTagsHandler returns a new instance of handler. func NewCompleteTagsHandler( storage storage.Storage, + fetchOptionsBuilder handler.FetchOptionsBuilder, ) http.Handler { return &CompleteTagsHandler{ - storage: storage, + storage: storage, + fetchOptionsBuilder: fetchOptionsBuilder, } } @@ -66,7 +69,12 @@ func (h *CompleteTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) return } - opts := storage.NewFetchOptions() + opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + result, err := h.storage.CompleteTags(ctx, query, opts) if err != nil { logger.Error("unable to complete tags", zap.Error(err)) diff --git a/src/query/api/v1/handler/prometheus/native/list_tags.go b/src/query/api/v1/handler/prometheus/native/list_tags.go index 4e176518ca..9bf761fc41 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags.go @@ -31,7 +31,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -48,18 +48,21 @@ var ( // ListTagsHandler represents a handler for list tags endpoint. type ListTagsHandler struct { - storage storage.Storage - nowFn clock.NowFn + storage storage.Storage + fetchOptionsBuilder handler.FetchOptionsBuilder + nowFn clock.NowFn } // NewListTagsHandler returns a new instance of handler. func NewListTagsHandler( storage storage.Storage, + fetchOptionsBuilder handler.FetchOptionsBuilder, nowFn clock.NowFn, ) http.Handler { return &ListTagsHandler{ - storage: storage, - nowFn: nowFn, + storage: storage, + fetchOptionsBuilder: fetchOptionsBuilder, + nowFn: nowFn, } } @@ -77,7 +80,12 @@ func (h *ListTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { End: h.nowFn(), } - opts := storage.NewFetchOptions() + opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + result, err := h.storage.CompleteTags(ctx, query, opts) if err != nil { logger.Error("unable to complete tags", zap.Error(err)) diff --git a/src/query/api/v1/handler/prometheus/native/list_tags_test.go b/src/query/api/v1/handler/prometheus/native/list_tags_test.go index eee0845a45..0e130d00d7 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags_test.go @@ -28,6 +28,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" @@ -97,7 +99,9 @@ func TestListTags(t *testing.T) { return now } - handler := NewListTagsHandler(store, nowFn) + handler := NewListTagsHandler(store, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + nowFn) for _, method := range []string{"GET", "POST"} { matcher := &listTagsMatcher{now: now} store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()). @@ -131,7 +135,9 @@ func TestListErrorTags(t *testing.T) { return now } - handler := NewListTagsHandler(store, nowFn) + handler := NewListTagsHandler(store, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + nowFn) for _, method := range []string{"GET", "POST"} { matcher := &listTagsMatcher{now: now} store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()). diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index 7cd6764325..94bfa160ed 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/query/util/httperrors" "github.com/m3db/m3/src/query/util/logging" opentracingutil "github.com/m3db/m3/src/query/util/opentracing" + xhttp "github.com/m3db/m3/src/x/net/http" opentracingext "github.com/opentracing/opentracing-go/ext" opentracinglog "github.com/opentracing/opentracing-go/log" @@ -61,12 +62,13 @@ var ( // PromReadHandler represents a handler for prometheus read endpoint. type PromReadHandler struct { - engine *executor.Engine - tagOpts models.TagOptions - limitsCfg *config.LimitsConfiguration - promReadMetrics promReadMetrics - timeoutOps *prometheus.TimeoutOpts - keepNans bool + engine *executor.Engine + fetchOptionsBuilder handler.FetchOptionsBuilder + tagOpts models.TagOptions + limitsCfg *config.LimitsConfiguration + promReadMetrics promReadMetrics + timeoutOps *prometheus.TimeoutOpts + keepNans bool } type promReadMetrics struct { @@ -106,6 +108,7 @@ type RespError struct { // NewPromReadHandler returns a new instance of handler. func NewPromReadHandler( engine *executor.Engine, + fetchOptionsBuilder handler.FetchOptionsBuilder, tagOpts models.TagOptions, limitsCfg *config.LimitsConfiguration, scope tally.Scope, @@ -113,12 +116,13 @@ func NewPromReadHandler( keepNans bool, ) *PromReadHandler { h := &PromReadHandler{ - engine: engine, - tagOpts: tagOpts, - limitsCfg: limitsCfg, - promReadMetrics: newPromReadMetrics(scope), - timeoutOps: timeoutOpts, - keepNans: keepNans, + engine: engine, + fetchOptionsBuilder: fetchOptionsBuilder, + tagOpts: tagOpts, + limitsCfg: limitsCfg, + promReadMetrics: newPromReadMetrics(scope), + timeoutOps: timeoutOpts, + keepNans: keepNans, } h.promReadMetrics.maxDatapoints.Update(float64(limitsCfg.MaxComputedDatapoints())) @@ -128,7 +132,17 @@ func NewPromReadHandler( func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { timer := h.promReadMetrics.fetchTimerSuccess.Start() - result, params, respErr := h.ServeHTTPWithEngine(w, r, h.engine) + fetchOpts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + + result, params, respErr := h.ServeHTTPWithEngine(w, r, h.engine, &executor.EngineOptions{ + QueryContextOptions: models.QueryContextOptions{ + LimitMaxTimeseries: fetchOpts.Limit, + }, + }) if respErr != nil { httperrors.ErrorWithReqInfo(w, r, respErr.Code, respErr.Err) return @@ -151,7 +165,9 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ServeHTTPWithEngine returns query results from the storage func (h *PromReadHandler) ServeHTTPWithEngine( w http.ResponseWriter, - r *http.Request, engine *executor.Engine, + r *http.Request, + engine *executor.Engine, + opts *executor.EngineOptions, ) ([]*ts.Series, models.RequestParams, *RespError) { ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) logger := logging.WithContext(ctx) @@ -171,7 +187,7 @@ func (h *PromReadHandler) ServeHTTPWithEngine( return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusBadRequest} } - result, err := read(ctx, engine, h.tagOpts, w, params) + result, err := read(ctx, engine, opts, h.tagOpts, w, params) if err != nil { sp := opentracingutil.SpanFromContextOrNoop(ctx) sp.LogFields(opentracinglog.Error(err)) diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index b3ab29192e..f2d12f3c1c 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -41,6 +41,7 @@ import ( func read( reqCtx context.Context, engine *executor.Engine, + opts *executor.EngineOptions, tagOpts models.TagOptions, w http.ResponseWriter, params models.RequestParams, @@ -57,7 +58,6 @@ func read( opentracingutil.Duration("params.step", params.Step), ) - opts := &executor.EngineOptions{} // Detect clients closing connections handler.CloseWatcher(ctx, cancel, w) diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go index e5bcdeb6bf..42401a0c8f 100644 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go +++ b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/util/httperrors" "github.com/m3db/m3/src/query/util/logging" + xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -46,21 +47,24 @@ const ( // PromReadInstantHandler represents a handler for prometheus instantaneous read endpoint. type PromReadInstantHandler struct { - engine *executor.Engine - tagOpts models.TagOptions - timeoutOpts *prometheus.TimeoutOpts + engine *executor.Engine + fetchOptionsBuilder handler.FetchOptionsBuilder + tagOpts models.TagOptions + timeoutOpts *prometheus.TimeoutOpts } // NewPromReadInstantHandler returns a new instance of handler. func NewPromReadInstantHandler( engine *executor.Engine, + fetchOptionsBuilder handler.FetchOptionsBuilder, tagOpts models.TagOptions, timeoutOpts *prometheus.TimeoutOpts, ) *PromReadInstantHandler { return &PromReadInstantHandler{ - engine: engine, - tagOpts: tagOpts, - timeoutOpts: timeoutOpts, + engine: engine, + fetchOptionsBuilder: fetchOptionsBuilder, + tagOpts: tagOpts, + timeoutOpts: timeoutOpts, } } @@ -77,7 +81,19 @@ func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques logger.Info("request params", zap.Any("params", params)) } - result, err := read(ctx, h.engine, h.tagOpts, w, params) + fetchOpts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + + engineOpts := &executor.EngineOptions{ + QueryContextOptions: models.QueryContextOptions{ + LimitMaxTimeseries: fetchOpts.Limit, + }, + } + + result, err := read(ctx, h.engine, engineOpts, h.tagOpts, w, params) if err != nil { logger.Error("unable to fetch data", zap.Error(err)) httperrors.ErrorWithReqInfo(w, r, http.StatusInternalServerError, err) diff --git a/src/query/api/v1/handler/prometheus/native/read_test.go b/src/query/api/v1/handler/prometheus/native/read_test.go index 110bf8d163..c62f97893d 100644 --- a/src/query/api/v1/handler/prometheus/native/read_test.go +++ b/src/query/api/v1/handler/prometheus/native/read_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/block" @@ -61,7 +63,8 @@ func TestPromReadHandler_Read(t *testing.T) { r, parseErr := parseParams(req, timeoutOpts) require.Nil(t, parseErr) assert.Equal(t, models.FormatPromQL, r.FormatType) - seriesList, err := read(context.TODO(), promRead.engine, promRead.tagOpts, httptest.NewRecorder(), r) + seriesList, err := read(context.TODO(), promRead.engine, setup.EngineOpts, + promRead.tagOpts, httptest.NewRecorder(), r) require.NoError(t, err) require.Len(t, seriesList, 2) s := seriesList[0] @@ -119,6 +122,7 @@ func newReadRequest(t *testing.T, params url.Values) *http.Request { type testSetup struct { Storage mock.Storage Handlers testSetupHandlers + EngineOpts *executor.EngineOptions TimeoutOpts *prometheus.TimeoutOpts } @@ -133,15 +137,17 @@ func newTestSetup() *testSetup { scope := tally.NoopScope engine := executor.NewEngine(mockStorage, scope, time.Minute, nil) + fetchOptsBuilderCfg := handler.FetchOptionsBuilderOptions{} + fetchOptsBuilder := handler.NewFetchOptionsBuilder(fetchOptsBuilderCfg) tagOpts := models.NewTagOptions() limitsConfig := &config.LimitsConfiguration{} keepNans := false - read := NewPromReadHandler(engine, tagOpts, limitsConfig, - scope, timeoutOpts, keepNans) + read := NewPromReadHandler(engine, fetchOptsBuilder, tagOpts, + limitsConfig, scope, timeoutOpts, keepNans) - instantRead := NewPromReadInstantHandler(engine, tagOpts, - timeoutOpts) + instantRead := NewPromReadInstantHandler(engine, fetchOptsBuilder, + tagOpts, timeoutOpts) return &testSetup{ Storage: mockStorage, @@ -149,6 +155,7 @@ func newTestSetup() *testSetup { Read: read, InstantRead: instantRead, }, + EngineOpts: &executor.EngineOptions{}, TimeoutOpts: timeoutOpts, } } diff --git a/src/query/api/v1/handler/prometheus/remote/match.go b/src/query/api/v1/handler/prometheus/remote/match.go index e83a444bf5..d8be323c67 100644 --- a/src/query/api/v1/handler/prometheus/remote/match.go +++ b/src/query/api/v1/handler/prometheus/remote/match.go @@ -46,18 +46,21 @@ var ( // PromSeriesMatchHandler represents a handler for prometheus series matcher endpoint. type PromSeriesMatchHandler struct { - tagOptions models.TagOptions - storage storage.Storage + storage storage.Storage + tagOptions models.TagOptions + fetchOptionsBuilder handler.FetchOptionsBuilder } // NewPromSeriesMatchHandler returns a new instance of handler. func NewPromSeriesMatchHandler( storage storage.Storage, tagOptions models.TagOptions, + fetchOptionsBuilder handler.FetchOptionsBuilder, ) http.Handler { return &PromSeriesMatchHandler{ - tagOptions: tagOptions, - storage: storage, + tagOptions: tagOptions, + storage: storage, + fetchOptionsBuilder: fetchOptionsBuilder, } } @@ -74,7 +77,12 @@ func (h *PromSeriesMatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques return } - opts := storage.NewFetchOptions() + opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + results := make([]models.Metrics, len(queries)) for i, query := range queries { result, err := h.storage.SearchSeries(ctx, query, opts) diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values.go b/src/query/api/v1/handler/prometheus/remote/tag_values.go index c5a7606267..06b159dec9 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values.go @@ -56,8 +56,9 @@ var ( // TagValuesHandler represents a handler for search tags endpoint. type TagValuesHandler struct { - storage storage.Storage - nowFn clock.NowFn + storage storage.Storage + fetchOptionsBuilder handler.FetchOptionsBuilder + nowFn clock.NowFn } // TagValuesResponse is the response that gets returned to the user @@ -68,11 +69,13 @@ type TagValuesResponse struct { // NewTagValuesHandler returns a new instance of handler. func NewTagValuesHandler( storage storage.Storage, + fetchOptionsBuilder handler.FetchOptionsBuilder, nowFn clock.NowFn, ) http.Handler { return &TagValuesHandler{ - storage: storage, - nowFn: nowFn, + storage: storage, + fetchOptionsBuilder: fetchOptionsBuilder, + nowFn: nowFn, } } @@ -88,7 +91,12 @@ func (h *TagValuesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - opts := storage.NewFetchOptions() + opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if rErr != nil { + xhttp.Error(w, rErr.Inner(), rErr.Code()) + return + } + result, err := h.storage.CompleteTags(ctx, query, opts) if err != nil { logger.Error("unable to get tag values", zap.Error(err)) diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go index 9fda5108e6..3e734a9ecb 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" @@ -102,7 +104,8 @@ func TestTagValues(t *testing.T) { return now } - handler := NewTagValuesHandler(store, nowFn) + handler := NewTagValuesHandler(store, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), nowFn) names := []struct { name string }{ @@ -162,7 +165,8 @@ func TestTagValueErrors(t *testing.T) { return now } - handler := NewTagValuesHandler(store, nowFn) + handler := NewTagValuesHandler(store, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), nowFn) url := "/label" req, err := http.NewRequest("GET", url, nil) if err != nil { diff --git a/src/query/api/v1/handler/prometheus/validator/handler.go b/src/query/api/v1/handler/prometheus/validator/handler.go index ac251f28fc..a7dad2aff3 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler.go +++ b/src/query/api/v1/handler/prometheus/validator/handler.go @@ -110,7 +110,7 @@ func (h *PromDebugHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } engine := executor.NewEngine(s, h.scope.SubScope("debug_engine"), h.lookbackDuration, nil) - results, _, respErr := h.readHandler.ServeHTTPWithEngine(w, r, engine) + results, _, respErr := h.readHandler.ServeHTTPWithEngine(w, r, engine, &executor.EngineOptions{}) if respErr != nil { logger.Error("unable to read data", zap.Error(respErr.Err)) xhttp.Error(w, respErr.Err, respErr.Code) diff --git a/src/query/api/v1/handler/prometheus/validator/handler_test.go b/src/query/api/v1/handler/prometheus/validator/handler_test.go index 06b587c7f2..bf5d9f14de 100644 --- a/src/query/api/v1/handler/prometheus/validator/handler_test.go +++ b/src/query/api/v1/handler/prometheus/validator/handler_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" @@ -303,6 +305,7 @@ func newServer() (*httptest.Server, *PromDebugHandler) { debugHandler := NewPromDebugHandler( native.NewPromReadHandler( executor.NewEngine(mockStorage, tally.NewTestScope("test_engine", nil), defaultLookbackDuration, cost.NoopChainedEnforcer()), + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), models.NewTagOptions(), &config.LimitsConfiguration{}, tally.NewTestScope("test", nil), diff --git a/src/query/api/v1/handler/search.go b/src/query/api/v1/handler/search.go index 1daf1a4df5..90af9feca8 100644 --- a/src/query/api/v1/handler/search.go +++ b/src/query/api/v1/handler/search.go @@ -29,7 +29,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" - "github.com/m3db/m3/src/x/net/http" + xhttp "github.com/m3db/m3/src/x/net/http" "go.uber.org/zap" ) @@ -46,24 +46,31 @@ const ( // SearchHandler represents a handler for the search endpoint type SearchHandler struct { - store storage.Storage + store storage.Storage + fetchOptionsBuilder FetchOptionsBuilder } // NewSearchHandler returns a new instance of handler -func NewSearchHandler(storage storage.Storage) http.Handler { - return &SearchHandler{store: storage} +func NewSearchHandler( + storage storage.Storage, + fetchOptionsBuilder FetchOptionsBuilder, +) http.Handler { + return &SearchHandler{ + store: storage, + fetchOptionsBuilder: fetchOptionsBuilder, + } } func (h *SearchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger := logging.WithContext(r.Context()) - query, rErr := h.parseBody(r) - if rErr != nil { - logger.Error("unable to parse request", zap.Error(rErr)) - xhttp.Error(w, rErr.Inner(), rErr.Code()) + query, parseBodyErr := h.parseBody(r) + opts, parseURLParamsErr := h.parseURLParams(r) + if err := firstParseError(parseBodyErr, parseURLParamsErr); err != nil { + logger.Error("unable to parse request", zap.Error(err.Inner())) + xhttp.Error(w, err.Inner(), err.Code()) return } - opts := h.parseURLParams(r) results, err := h.search(r.Context(), query, opts) if err != nil { @@ -90,24 +97,21 @@ func (h *SearchHandler) parseBody(r *http.Request) (*storage.FetchQuery, *xhttp. return &fetchQuery, nil } -func (h *SearchHandler) parseURLParams(r *http.Request) *storage.FetchOptions { - var ( - limit int - err error - ) +func (h *SearchHandler) parseURLParams(r *http.Request) (*storage.FetchOptions, *xhttp.ParseError) { + fetchOpts, parseErr := h.fetchOptionsBuilder.NewFetchOptions(r) + if parseErr != nil { + return nil, parseErr + } - limitRaw := r.URL.Query().Get("limit") - if limitRaw != "" { - limit, err = strconv.Atoi(limitRaw) + if str := r.URL.Query().Get("limit"); str != "" { + var err error + fetchOpts.Limit, err = strconv.Atoi(str) if err != nil { - limit = defaultLimit + return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - } else { - limit = defaultLimit } - fetchOptions := newFetchOptions(limit) - return &fetchOptions + return fetchOpts, nil } func (h *SearchHandler) search( @@ -118,8 +122,11 @@ func (h *SearchHandler) search( return h.store.SearchSeries(ctx, query, opts) } -func newFetchOptions(limit int) storage.FetchOptions { - return storage.FetchOptions{ - Limit: limit, +func firstParseError(errs ...*xhttp.ParseError) *xhttp.ParseError { + for _, err := range errs { + if err != nil { + return err + } } + return nil } diff --git a/src/query/api/v1/handler/search_test.go b/src/query/api/v1/handler/search_test.go index c1f5285c07..487986c9ed 100644 --- a/src/query/api/v1/handler/search_test.go +++ b/src/query/api/v1/handler/search_test.go @@ -99,15 +99,19 @@ func searchServer(t *testing.T) *SearchHandler { session.EXPECT().FetchTaggedIDs(gomock.Any(), gomock.Any(), gomock.Any()). Return(mockTaggedIDsIter, false, nil).AnyTimes() - search := &SearchHandler{store: storage} - return search + search := NewSearchHandler(storage, + NewFetchOptionsBuilder(FetchOptionsBuilderOptions{})) + h, ok := search.(*SearchHandler) + require.True(t, ok) + return h } func TestSearchResponse(t *testing.T) { searchHandler := searchServer(t) - opts := newFetchOptions(100) - results, err := searchHandler.search(context.TODO(), generateSearchReq(), &opts) + opts := storage.NewFetchOptions() + opts.Limit = 100 + results, err := searchHandler.search(context.TODO(), generateSearchReq(), opts) require.NoError(t, err) assert.Equal(t, []byte(testID), results.Metrics[0].ID) diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 5a6bfcb920..fce837f80c 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -87,6 +87,8 @@ type Handler struct { tagOptions models.TagOptions timeoutOpts *prometheus.TimeoutOpts enforcer cost.ChainedEnforcer + fetchOptionsBuilder handler.FetchOptionsBuilder + queryContextOptions models.QueryContextOptions } // Router returns the http handler registered with all relevant routes for query. @@ -104,6 +106,8 @@ func NewHandler( cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, enforcer cost.ChainedEnforcer, + fetchOptionsBuilder handler.FetchOptionsBuilder, + queryContextOptions models.QueryContextOptions, scope tally.Scope, ) (*Handler, error) { r := mux.NewRouter() @@ -121,7 +125,7 @@ func NewHandler( timeoutOpts.FetchTimeout = *embeddedDbCfg.Client.FetchTimeout } - h := &Handler{ + return &Handler{ router: r, handler: handlerWithMiddleware, storage: downsamplerAndWriter.Storage(), @@ -136,8 +140,9 @@ func NewHandler( tagOptions: tagOptions, timeoutOpts: timeoutOpts, enforcer: enforcer, - } - return h, nil + fetchOptionsBuilder: fetchOptionsBuilder, + queryContextOptions: queryContextOptions, + }, nil } func applyMiddleware(base *mux.Router, tracer opentracing.Tracer) http.Handler { @@ -170,24 +175,20 @@ func (h *Handler) RegisterRoutes() error { h.router.PathPrefix(openapi.StaticURLPrefix).Handler(wrapped(openapi.StaticHandler())) // Prometheus remote read/write endpoints - promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource), h.timeoutOpts) + promRemoteReadHandler := remote.NewPromReadHandler(h.engine, + h.scope.Tagged(remoteSource), h.timeoutOpts) promRemoteWriteHandler, err := remote.NewPromWriteHandler( h.downsamplerAndWriter, h.tagOptions, - h.scope.Tagged(remoteSource), - ) + h.scope.Tagged(remoteSource)) if err != nil { return err } - nativePromReadHandler := native.NewPromReadHandler( - h.engine, - h.tagOptions, - &h.config.Limits, - h.scope.Tagged(nativeSource), - h.timeoutOpts, - h.config.ResultOptions.KeepNans, - ) + nativePromReadHandler := native.NewPromReadHandler(h.engine, + h.fetchOptionsBuilder, h.tagOptions, &h.config.Limits, + h.scope.Tagged(nativeSource), h.timeoutOpts, + h.config.ResultOptions.KeepNans) h.router.HandleFunc(remote.PromReadURL, wrapped(promRemoteReadHandler).ServeHTTP, @@ -199,12 +200,14 @@ func (h *Handler) RegisterRoutes() error { wrapped(nativePromReadHandler).ServeHTTP, ).Methods(native.PromReadHTTPMethod) h.router.HandleFunc(native.PromReadInstantURL, - wrapped(native.NewPromReadInstantHandler(h.engine, h.tagOptions, h.timeoutOpts)).ServeHTTP, + wrapped(native.NewPromReadInstantHandler(h.engine, h.fetchOptionsBuilder, + h.tagOptions, h.timeoutOpts)).ServeHTTP, ).Methods(native.PromReadInstantHTTPMethod) // Native M3 search and write endpoints h.router.HandleFunc(handler.SearchURL, - wrapped(handler.NewSearchHandler(h.storage)).ServeHTTP, + wrapped(handler.NewSearchHandler(h.storage, + h.fetchOptionsBuilder)).ServeHTTP, ).Methods(handler.SearchHTTPMethod) h.router.HandleFunc(m3json.WriteJSONURL, wrapped(m3json.NewWriteJSONHandler(h.storage)).ServeHTTP, @@ -212,38 +215,45 @@ func (h *Handler) RegisterRoutes() error { // Tag completion endpoints h.router.HandleFunc(native.CompleteTagsURL, - wrapped(native.NewCompleteTagsHandler(h.storage)).ServeHTTP, + wrapped(native.NewCompleteTagsHandler(h.storage, + h.fetchOptionsBuilder)).ServeHTTP, ).Methods(native.CompleteTagsHTTPMethod) h.router.HandleFunc(remote.TagValuesURL, - wrapped(remote.NewTagValuesHandler(h.storage, nowFn)).ServeHTTP, + wrapped(remote.NewTagValuesHandler(h.storage, h.fetchOptionsBuilder, + nowFn)).ServeHTTP, ).Methods(remote.TagValuesHTTPMethod) // List tag endpoints for _, method := range native.ListTagsHTTPMethods { h.router.HandleFunc(native.ListTagsURL, - wrapped(native.NewListTagsHandler(h.storage, nowFn)).ServeHTTP, + wrapped(native.NewListTagsHandler(h.storage, h.fetchOptionsBuilder, + nowFn)).ServeHTTP, ).Methods(method) } // Series match endpoints for _, method := range remote.PromSeriesMatchHTTPMethods { h.router.HandleFunc(remote.PromSeriesMatchURL, - wrapped(remote.NewPromSeriesMatchHandler(h.storage, h.tagOptions)).ServeHTTP, + wrapped(remote.NewPromSeriesMatchHandler(h.storage, + h.tagOptions, h.fetchOptionsBuilder)).ServeHTTP, ).Methods(method) } // Debug endpoints h.router.HandleFunc(validator.PromDebugURL, - wrapped(validator.NewPromDebugHandler(nativePromReadHandler, h.scope, *h.config.LookbackDuration)).ServeHTTP, + wrapped(validator.NewPromDebugHandler(nativePromReadHandler, + h.scope, *h.config.LookbackDuration)).ServeHTTP, ).Methods(validator.PromDebugHTTPMethod) // Graphite endpoints h.router.HandleFunc(graphite.ReadURL, - wrapped(graphite.NewRenderHandler(h.storage, h.enforcer)).ServeHTTP, + wrapped(graphite.NewRenderHandler(h.storage, + h.queryContextOptions, h.enforcer)).ServeHTTP, ).Methods(graphite.ReadHTTPMethods...) h.router.HandleFunc(graphite.FindURL, - wrapped(graphite.NewFindHandler(h.storage)).ServeHTTP, + wrapped(graphite.NewFindHandler(h.storage, + h.fetchOptionsBuilder)).ServeHTTP, ).Methods(graphite.FindHTTPMethods...) if h.clusterClient != nil { diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 17622b5ca1..dab6a4d971 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -28,6 +28,8 @@ import ( "testing" "time" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" @@ -72,6 +74,8 @@ func setupHandler(store storage.Storage) (*Handler, error) { config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, nil, + handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + models.QueryContextOptions{}, tally.NewTestScope("", nil)) } @@ -87,7 +91,8 @@ func TestHandlerFetchTimeoutError(t *testing.T) { engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil) cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} _, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, nil, nil, - cfg, dbconfig, nil, tally.NewTestScope("", nil)) + cfg, dbconfig, nil, handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + models.QueryContextOptions{}, tally.NewTestScope("", nil)) require.Error(t, err) } @@ -104,7 +109,8 @@ func TestHandlerFetchTimeout(t *testing.T) { engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil) cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration} h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, - nil, nil, cfg, dbconfig, nil, tally.NewTestScope("", nil)) + nil, nil, cfg, dbconfig, nil, handler.NewFetchOptionsBuilder(handler.FetchOptionsBuilderOptions{}), + models.QueryContextOptions{}, tally.NewTestScope("", nil)) require.NoError(t, err) assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout) } diff --git a/src/query/block/column.go b/src/query/block/column.go index 6e171e1631..bd8817f1f2 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -285,6 +285,7 @@ func (m *columnBlockSeriesIter) Err() error { } func (m *columnBlockSeriesIter) Next() bool { + fmt.Printf("THIS iter4 next\n") m.idx++ next := m.idx < m.SeriesCount() if !next { diff --git a/src/query/errors/execution.go b/src/query/errors/execution.go index cbaba38692..7069674062 100644 --- a/src/query/errors/execution.go +++ b/src/query/errors/execution.go @@ -26,26 +26,6 @@ import ( ) var ( - // ErrInvalidQuery is returned when executing an unknown query type. - ErrInvalidQuery = errors.New("invalid query") - - // ErrNotExecuted is returned when a statement is not executed in a query. - // This can occur when a previous statement in the same query has errored. - ErrNotExecuted = errors.New("not executed") - - // ErrQueryInterrupted is an error returned when the query is interrupted. - ErrQueryInterrupted = errors.New("query interrupted") - - // ErrQueryAborted is an error returned when the query is aborted. - ErrQueryAborted = errors.New("query aborted") - - // ErrQueryEngineShutdown is an error sent when the query cannot be - // created because the query engine was shutdown. - ErrQueryEngineShutdown = errors.New("query engine shutdown") - - // ErrQueryTimeoutLimitExceeded is an error when the query hits the max time allowed to run. - ErrQueryTimeoutLimitExceeded = errors.New("query timeout limit exceeded") - // ErrNoClientAddresses is an error when there are no addresses passed to the remote client ErrNoClientAddresses = errors.New("no client addresses given") ) diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index b63858f7af..e61c0e49dc 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -42,18 +42,24 @@ type Engine struct { lookbackDuration time.Duration } -// EngineOptions can be used to pass custom flags to engine +// EngineOptions can be used to pass custom flags to engine. type EngineOptions struct { + QueryContextOptions models.QueryContextOptions } -// Query is the result after execution +// Query is the result after execution. type Query struct { Err error Result Result } // NewEngine returns a new instance of QueryExecutor. -func NewEngine(store storage.Storage, scope tally.Scope, lookbackDuration time.Duration, factory qcost.ChainedEnforcer) *Engine { +func NewEngine( + store storage.Storage, + scope tally.Scope, + lookbackDuration time.Duration, + factory qcost.ChainedEnforcer, +) *Engine { if factory == nil { factory = qcost.NoopChainedEnforcer() } @@ -120,8 +126,10 @@ func (e *Engine) Execute( results chan *storage.QueryResult, ) { defer close(results) + fetchOpts := storage.NewFetchOptions() - fetchOpts.Limit = 0 + fetchOpts.Limit = opts.QueryContextOptions.LimitMaxTimeseries + result, err := e.store.Fetch(ctx, query, fetchOpts) if err != nil { results <- &storage.QueryResult{Err: err} @@ -171,7 +179,9 @@ func (e *Engine) ExecuteExpr( result := state.resultNode results <- Query{Result: result} - if err := state.Execute(models.NewQueryContext(ctx, e.costScope, perQueryEnforcer)); err != nil { + queryCtx := models.NewQueryContext(ctx, e.costScope, perQueryEnforcer, + opts.QueryContextOptions) + if err := state.Execute(queryCtx); err != nil { result.abort(err) } else { result.done() diff --git a/src/query/executor/state.go b/src/query/executor/state.go index f3e18f2dd9..0bf13b61b6 100644 --- a/src/query/executor/state.go +++ b/src/query/executor/state.go @@ -163,7 +163,7 @@ func (s *ExecutionState) createNode( transformParams, ok := step.Transform.Op.(transform.Params) if !ok { - return nil, fmt.Errorf("invalid transform step, %s", step) + return nil, fmt.Errorf("invalid transform step: %s", step) } transformNode, controller := CreateTransform(step.ID(), transformParams, options) diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index 57ab450c6d..807c682be3 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -99,6 +99,7 @@ func (n *FetchNode) fetch(queryCtx *models.QueryContext) (block.Result, error) { endTime := timeSpec.End opts := storage.NewFetchOptions() + opts.Limit = queryCtx.Options.LimitMaxTimeseries opts.BlockType = n.blockType opts.Scope = queryCtx.Scope opts.Enforcer = queryCtx.Enforcer diff --git a/src/query/generated/mocks/generate.go b/src/query/generated/mocks/generate.go index 10fc1be0f0..16caadda00 100644 --- a/src/query/generated/mocks/generate.go +++ b/src/query/generated/mocks/generate.go @@ -21,6 +21,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode). //go:generate sh -c "mockgen -package=downsample $PACKAGE/src/cmd/services/m3coordinator/downsample Downsampler,MetricsAppender,SamplesAppender | genclean -pkg $PACKAGE/src/cmd/services/m3coordinator/downsample -out $GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/downsample/downsample_mock.go" //go:generate sh -c "mockgen -package=storage -destination=$GOPATH/src/$PACKAGE/src/query/storage/storage_mock.go $PACKAGE/src/query/storage Storage" +//go:generate sh -c "mockgen -package=m3 -destination=$GOPATH/src/$PACKAGE/src/query/storage/m3/m3_mock.go $PACKAGE/src/query/storage/m3 Storage" //go:generate sh -c "mockgen -package=block -destination=$GOPATH/src/$PACKAGE/src/query/block/block_mock.go $PACKAGE/src/query/block Block,StepIter,SeriesIter,Builder,Step,UnconsolidatedBlock,UnconsolidatedStepIter,UnconsolidatedSeriesIter,UnconsolidatedStep" //go:generate sh -c "mockgen -package=ingest -destination=$GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/ingest/write_mock.go $PACKAGE/src/cmd/services/m3coordinator/ingest DownsamplerAndWriter" //go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/$PACKAGE/src/query/executor/transform/types_mock.go $PACKAGE/src/query/executor/transform OpNode" diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index 82a250eb9e..688697871e 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -43,8 +43,9 @@ var ( ) type m3WrappedStore struct { - m3 storage.Storage - enforcer cost.ChainedEnforcer + m3 storage.Storage + enforcer cost.ChainedEnforcer + queryContextOpts models.QueryContextOptions } // NewM3WrappedStorage creates a graphite storage wrapper around an m3query @@ -52,12 +53,17 @@ type m3WrappedStore struct { func NewM3WrappedStorage( m3storage storage.Storage, enforcer cost.ChainedEnforcer, + queryContextOpts models.QueryContextOptions, ) Storage { if enforcer == nil { enforcer = cost.NoopChainedEnforcer() } - return &m3WrappedStore{m3: m3storage, enforcer: enforcer} + return &m3WrappedStore{ + m3: m3storage, + enforcer: enforcer, + queryContextOpts: queryContextOpts, + } } // TranslateQueryToMatchersWithTerminator converts a graphite query to tag @@ -162,6 +168,7 @@ func (s *m3WrappedStore) FetchByQuery( m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout) defer cancel() fetchOptions := storage.NewFetchOptions() + fetchOptions.Limit = s.queryContextOpts.LimitMaxTimeseries perQueryEnforcer := s.enforcer.Child(cost.QueryLevel) defer perQueryEnforcer.Close() diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index 58bbd6368f..0a22c81d0f 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -172,7 +172,8 @@ func TestFetchByQuery(t *testing.T) { enforcer := cost.NewMockChainedEnforcer(ctrl) enforcer.EXPECT().Child(cost.QueryLevel).Return(childEnforcer).MinTimes(1) - wrapper := NewM3WrappedStorage(store, enforcer) + wrapper := NewM3WrappedStorage(store, enforcer, + models.QueryContextOptions{}) ctx := xctx.New() ctx.SetRequestContext(context.TODO()) end := time.Now() @@ -211,7 +212,8 @@ func TestFetchByInvalidQuery(t *testing.T) { query := "a." ctx := xctx.New() - wrapper := NewM3WrappedStorage(store, nil) + wrapper := NewM3WrappedStorage(store, nil, + models.QueryContextOptions{}) result, err := wrapper.FetchByQuery(ctx, query, opts) assert.NoError(t, err) require.Equal(t, 0, len(result.SeriesList)) diff --git a/src/query/models/query_context.go b/src/query/models/query_context.go index 5fa1de52bb..84e9a50c2b 100644 --- a/src/query/models/query_context.go +++ b/src/query/models/query_context.go @@ -35,6 +35,14 @@ type QueryContext struct { Ctx context.Context Scope tally.Scope Enforcer cost.ChainedEnforcer + Options QueryContextOptions +} + +// QueryContextOptions contains optional configuration for the query context. +type QueryContextOptions struct { + // LimitMaxTimeseries limits the number of time series returned by each + // storage node. + LimitMaxTimeseries int } // NewQueryContext constructs a QueryContext using the given Enforcer to @@ -42,17 +50,21 @@ type QueryContext struct { func NewQueryContext( ctx context.Context, scope tally.Scope, - enforcer cost.ChainedEnforcer) *QueryContext { + enforcer cost.ChainedEnforcer, + options QueryContextOptions, +) *QueryContext { return &QueryContext{ Ctx: ctx, Scope: scope, Enforcer: enforcer, + Options: options, } } // NoopQueryContext returns a query context with no active components. func NoopQueryContext() *QueryContext { - return NewQueryContext(context.Background(), tally.NoopScope, cost.NoopChainedEnforcer()) + return NewQueryContext(context.Background(), tally.NoopScope, + cost.NoopChainedEnforcer(), QueryContextOptions{}) } // WithContext creates a shallow copy of this QueryContext using the new context. diff --git a/src/query/server/server.go b/src/query/server/server.go index 338e839a05..4ad7ef5dec 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -24,6 +24,7 @@ import ( "context" "fmt" "math/rand" + "net" "net/http" "os" "os/signal" @@ -42,6 +43,7 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/metrics/aggregation" "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/httpd" m3dbcluster "github.com/m3db/m3/src/query/cluster/m3db" "github.com/m3db/m3/src/query/executor" @@ -174,13 +176,18 @@ func Run(runOpts RunOptions) { } defer buildReporter.Stop() + var ( - backendStorage storage.Storage - clusterClient clusterclient.Client - downsampler downsample.Downsampler - enabled bool + backendStorage storage.Storage + clusterClient clusterclient.Client + downsampler downsample.Downsampler + enabled bool + fetchOptsBuilderCfg = cfg.Limits.PerQuery.AsFetchOptionsBuilderOptions() + fetchOptsBuilder = handler.NewFetchOptionsBuilder(fetchOptsBuilderCfg) + queryCtxOpts = models.QueryContextOptions{ + LimitMaxTimeseries: fetchOptsBuilderCfg.Limit, + } ) - readWorkerPool, writeWorkerPool, err := pools.BuildWorkerPools( instrumentOptions, cfg.ReadWorkerPool, @@ -210,12 +217,8 @@ func Run(runOpts RunOptions) { // For m3db backend, we need to make connections to the m3db cluster which generates a session and use the storage with the session. if cfg.Backend == config.GRPCStorageType { poolWrapper := pools.NewPoolsWrapper(pools.BuildIteratorPools()) - backendStorage, enabled, err = remoteClient( - cfg, - tagOptions, - poolWrapper, - readWorkerPool, - ) + backendStorage, enabled, err = remoteClient(cfg, tagOptions, poolWrapper, + readWorkerPool) if err != nil { logger.Fatal("unable to setup grpc backend", zap.Error(err)) } @@ -234,7 +237,7 @@ func Run(runOpts RunOptions) { var cleanup cleanupFn backendStorage, clusterClient, downsampler, cleanup, err = newM3DBStorage( runOpts, cfg, tagOptions, m3dbClusters, m3dbPoolWrapper, - readWorkerPool, writeWorkerPool, instrumentOptions) + readWorkerPool, writeWorkerPool, queryCtxOpts, instrumentOptions) if err != nil { logger.Fatal("unable to setup m3db backend", zap.Error(err)) } @@ -246,15 +249,16 @@ func Run(runOpts RunOptions) { logger.Fatal("unable to setup perQueryEnforcer", zap.Error(err)) } - engine := executor.NewEngine(backendStorage, scope.SubScope("engine"), *cfg.LookbackDuration, perQueryEnforcer) - + engine := executor.NewEngine(backendStorage, scope.SubScope("engine"), + *cfg.LookbackDuration, perQueryEnforcer) downsamplerAndWriter, err := newDownsamplerAndWriter(backendStorage, downsampler) if err != nil { logger.Fatal("unable to create new downsampler and writer", zap.Error(err)) } handler, err := httpd.NewHandler(downsamplerAndWriter, tagOptions, engine, - m3dbClusters, clusterClient, cfg, runOpts.DBConfig, perQueryEnforcer, scope) + m3dbClusters, clusterClient, cfg, runOpts.DBConfig, perQueryEnforcer, + fetchOptsBuilder, queryCtxOpts, scope) if err != nil { logger.Fatal("unable to set up handlers", zap.Error(err)) } @@ -348,6 +352,7 @@ func newM3DBStorage( poolWrapper *pools.PoolWrapper, readWorkerPool xsync.PooledWorkerPool, writeWorkerPool xsync.PooledWorkerPool, + queryContextOptions models.QueryContextOptions, instrumentOptions instrument.Options, ) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) { var ( @@ -389,7 +394,8 @@ func newM3DBStorage( } fanoutStorage, storageCleanup, err := newStorages(clusters, cfg, tagOptions, - poolWrapper, readWorkerPool, writeWorkerPool, instrumentOptions) + poolWrapper, readWorkerPool, writeWorkerPool, queryContextOptions, + instrumentOptions) if err != nil { return nil, nil, nil, nil, errors.Wrap(err, "unable to set up storages") } @@ -602,6 +608,7 @@ func newStorages( poolWrapper *pools.PoolWrapper, readWorkerPool xsync.PooledWorkerPool, writeWorkerPool xsync.PooledWorkerPool, + queryContextOptions models.QueryContextOptions, instrumentOpts instrument.Options, ) (storage.Storage, cleanupFn, error) { var ( @@ -624,7 +631,8 @@ func newStorages( remoteEnabled := false if cfg.RPC != nil && cfg.RPC.Enabled { logger.Info("rpc enabled") - server, err := startGrpcServer(logger, localStorage, poolWrapper, cfg.RPC) + server, err := startGRPCServer(localStorage, queryContextOptions, + poolWrapper, cfg.RPC, logger) if err != nil { return nil, nil, err } @@ -726,26 +734,27 @@ func remoteClient( return nil, false, nil } -func startGrpcServer( - logger *zap.Logger, +func startGRPCServer( storage m3.Storage, + queryContextOptions models.QueryContextOptions, poolWrapper *pools.PoolWrapper, cfg *config.RPCConfiguration, + logger *zap.Logger, ) (*grpc.Server, error) { logger.Info("creating gRPC server") - server := tsdbRemote.CreateNewGrpcServer(storage, poolWrapper) - waitForStart := make(chan struct{}) - var startErr error + server := tsdbRemote.NewGRPCServer(storage, + queryContextOptions, poolWrapper) + listener, err := net.Listen("tcp", cfg.ListenAddress) + if err != nil { + return nil, err + } go func() { - logger.Info("starting gRPC server on port", zap.String("rpc", cfg.ListenAddress)) - err := tsdbRemote.StartNewGrpcServer(server, cfg.ListenAddress, waitForStart) - // TODO: consider removing logger.Fatal here and pass back error through a channel - if err != nil { - startErr = errors.Wrap(err, "unable to start gRPC server") + if err := server.Serve(listener); err != nil { + logger.Error("error from serving gRPC server", zap.Error(err)) } }() - <-waitForStart - return server, startErr + + return server, nil } func startCarbonIngestion( diff --git a/src/query/storage/block.go b/src/query/storage/block.go index ed6f830824..8d202c7b48 100644 --- a/src/query/storage/block.go +++ b/src/query/storage/block.go @@ -22,6 +22,7 @@ package storage import ( "errors" + "fmt" "sync" "time" @@ -330,6 +331,7 @@ func (m *multiSeriesBlockSeriesIter) SeriesCount() int { } func (m *multiSeriesBlockSeriesIter) Next() bool { + fmt.Printf("THIS iter3 next\n") m.index++ return m.index < m.SeriesCount() } diff --git a/src/query/storage/consolidated.go b/src/query/storage/consolidated.go index 196a4e0379..e50b2cb3c3 100644 --- a/src/query/storage/consolidated.go +++ b/src/query/storage/consolidated.go @@ -22,6 +22,7 @@ package storage import ( "errors" + "fmt" "github.com/m3db/m3/src/query/block" ) @@ -109,6 +110,7 @@ type consolidatedSeriesIter struct { } func (c *consolidatedSeriesIter) Next() bool { + fmt.Printf("THIS iter2 next\n") return c.unconsolidated.Next() } diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index aa6bf7c20c..53c11baccd 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -296,7 +296,6 @@ func iteratorToTsSeries( // Fall back to sequential decompression if unable to decompress concurrently func decompressSequentially( - iterLength int, iters []encoding.SeriesIterator, enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, @@ -316,14 +315,12 @@ func decompressSequentially( } func decompressConcurrently( - iterLength int, iters []encoding.SeriesIterator, readWorkerPool xsync.PooledWorkerPool, enforcer cost.ChainedEnforcer, tagOptions models.TagOptions, ) (*FetchResult, error) { - seriesList := make([]*ts.Series, iterLength) - var wg sync.WaitGroup + seriesList := make([]*ts.Series, len(iters)) errorCh := make(chan error, 1) done := make(chan struct{}) stopped := func() bool { @@ -335,11 +332,10 @@ func decompressConcurrently( } } - wg.Add(iterLength) - + var wg sync.WaitGroup for i, iter := range iters { - i := i - iter := iter + i, iter := i, iter + wg.Add(1) readWorkerPool.Go(func() { defer wg.Done() if stopped() { @@ -384,11 +380,10 @@ func SeriesIteratorsToFetchResult( } iters := seriesIterators.Iters() - iterLength := seriesIterators.Len() if readWorkerPool == nil { - return decompressSequentially(iterLength, iters, enforcer, tagOptions) + return decompressSequentially(iters, enforcer, tagOptions) } - return decompressConcurrently(iterLength, iters, readWorkerPool, + return decompressConcurrently(iters, readWorkerPool, enforcer, tagOptions) } diff --git a/src/query/storage/m3/m3_mock.go b/src/query/storage/m3/m3_mock.go new file mode 100644 index 0000000000..41b8b19032 --- /dev/null +++ b/src/query/storage/m3/m3_mock.go @@ -0,0 +1,193 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/query/storage/m3 (interfaces: Storage) + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package m3 is a generated GoMock package. +package m3 + +import ( + "context" + "reflect" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/storage" + + "github.com/golang/mock/gomock" +) + +// MockStorage is a mock of Storage interface +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder +} + +// MockStorageMockRecorder is the mock recorder for MockStorage +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockStorage) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockStorageMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStorage)(nil).Close)) +} + +// CompleteTags mocks base method +func (m *MockStorage) CompleteTags(arg0 context.Context, arg1 *storage.CompleteTagsQuery, arg2 *storage.FetchOptions) (*storage.CompleteTagsResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CompleteTags", arg0, arg1, arg2) + ret0, _ := ret[0].(*storage.CompleteTagsResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CompleteTags indicates an expected call of CompleteTags +func (mr *MockStorageMockRecorder) CompleteTags(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CompleteTags", reflect.TypeOf((*MockStorage)(nil).CompleteTags), arg0, arg1, arg2) +} + +// Fetch mocks base method +func (m *MockStorage) Fetch(arg0 context.Context, arg1 *storage.FetchQuery, arg2 *storage.FetchOptions) (*storage.FetchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Fetch", arg0, arg1, arg2) + ret0, _ := ret[0].(*storage.FetchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Fetch indicates an expected call of Fetch +func (mr *MockStorageMockRecorder) Fetch(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fetch", reflect.TypeOf((*MockStorage)(nil).Fetch), arg0, arg1, arg2) +} + +// FetchBlocks mocks base method +func (m *MockStorage) FetchBlocks(arg0 context.Context, arg1 *storage.FetchQuery, arg2 *storage.FetchOptions) (block.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchBlocks", arg0, arg1, arg2) + ret0, _ := ret[0].(block.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchBlocks indicates an expected call of FetchBlocks +func (mr *MockStorageMockRecorder) FetchBlocks(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchBlocks", reflect.TypeOf((*MockStorage)(nil).FetchBlocks), arg0, arg1, arg2) +} + +// FetchCompressed mocks base method +func (m *MockStorage) FetchCompressed(arg0 context.Context, arg1 *storage.FetchQuery, arg2 *storage.FetchOptions) (encoding.SeriesIterators, Cleanup, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchCompressed", arg0, arg1, arg2) + ret0, _ := ret[0].(encoding.SeriesIterators) + ret1, _ := ret[1].(Cleanup) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// FetchCompressed indicates an expected call of FetchCompressed +func (mr *MockStorageMockRecorder) FetchCompressed(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchCompressed", reflect.TypeOf((*MockStorage)(nil).FetchCompressed), arg0, arg1, arg2) +} + +// SearchCompressed mocks base method +func (m *MockStorage) SearchCompressed(arg0 context.Context, arg1 *storage.FetchQuery, arg2 *storage.FetchOptions) ([]MultiTagResult, Cleanup, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchCompressed", arg0, arg1, arg2) + ret0, _ := ret[0].([]MultiTagResult) + ret1, _ := ret[1].(Cleanup) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// SearchCompressed indicates an expected call of SearchCompressed +func (mr *MockStorageMockRecorder) SearchCompressed(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchCompressed", reflect.TypeOf((*MockStorage)(nil).SearchCompressed), arg0, arg1, arg2) +} + +// SearchSeries mocks base method +func (m *MockStorage) SearchSeries(arg0 context.Context, arg1 *storage.FetchQuery, arg2 *storage.FetchOptions) (*storage.SearchResults, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchSeries", arg0, arg1, arg2) + ret0, _ := ret[0].(*storage.SearchResults) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SearchSeries indicates an expected call of SearchSeries +func (mr *MockStorageMockRecorder) SearchSeries(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchSeries", reflect.TypeOf((*MockStorage)(nil).SearchSeries), arg0, arg1, arg2) +} + +// Type mocks base method +func (m *MockStorage) Type() storage.Type { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Type") + ret0, _ := ret[0].(storage.Type) + return ret0 +} + +// Type indicates an expected call of Type +func (mr *MockStorageMockRecorder) Type() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Type", reflect.TypeOf((*MockStorage)(nil).Type)) +} + +// Write mocks base method +func (m *MockStorage) Write(arg0 context.Context, arg1 *storage.WriteQuery) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Write", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Write indicates an expected call of Write +func (mr *MockStorageMockRecorder) Write(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockStorage)(nil).Write), arg0, arg1) +} diff --git a/src/query/storage/m3/types.go b/src/query/storage/m3/types.go index 4e45bf8d46..56a6377c69 100644 --- a/src/query/storage/m3/types.go +++ b/src/query/storage/m3/types.go @@ -29,14 +29,14 @@ import ( "github.com/m3db/m3/src/x/ident" ) -// Cleanup is a cleanup function to be called after resources are freed +// Cleanup is a cleanup function to be called after resources are freed. type Cleanup func() error func noop() error { return nil } -// Storage provides an interface for reading and writing to the tsdb +// Storage provides an interface for reading and writing to the TSDB. type Storage interface { genericstorage.Storage Querier @@ -44,13 +44,14 @@ type Storage interface { // Querier handles queries against an M3 instance. type Querier interface { - // FetchCompressed fetches timeseries data based on a query + // FetchCompressed fetches timeseries data based on a query. FetchCompressed( ctx context.Context, query *genericstorage.FetchQuery, options *genericstorage.FetchOptions, ) (encoding.SeriesIterators, Cleanup, error) - // SearchCompressed fetches matching tags based on a query + + // SearchCompressed fetches matching tags based on a query. SearchCompressed( ctx context.Context, query *genericstorage.FetchQuery, @@ -67,36 +68,42 @@ type MultiFetchResult interface { iterators encoding.SeriesIterators, err error, ) + // FinalResult returns a series iterators object containing // deduplicated series values. FinalResult() (encoding.SeriesIterators, error) + // FinalResultWithAttrs returns a series iterators object containing // deduplicated series values, attributes corresponding to these iterators, // and any errors encountered. FinalResultWithAttrs() ( - encoding.SeriesIterators, []genericstorage.Attributes, error) + encoding.SeriesIterators, + []genericstorage.Attributes, + error, + ) + // Close releases all resources held by this accumulator. Close() error } -// MultiFetchTagsResult is a deduping accumalator for tag iterators +// MultiFetchTagsResult is a deduping accumalator for tag iterators. type MultiFetchTagsResult interface { - // Add adds tagged ID iterators to the accumulator + // Add adds tagged ID iterators to the accumulator.f Add( newIterator client.TaggedIDsIterator, err error, ) // FinalResult returns a deduped list of tag iterators with - // corresponding series ids + // corresponding series IDs. FinalResult() ([]MultiTagResult, error) - // Close releases all resources held by this accumulator + // Close releases all resources held by this accumulator. Close() error } -// MultiTagResult represents a tag iterator with its string ID +// MultiTagResult represents a tag iterator with its string ID. type MultiTagResult struct { - // ID is the series ID + // ID is the series ID. ID ident.ID - // Iter is the tag iterator for the series + // Iter is the tag iterator for the series. Iter ident.TagIterator } diff --git a/src/query/storage/types.go b/src/query/storage/types.go index ed6ec3cf19..d9e1eee476 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -137,6 +137,12 @@ func NewFetchOptions() *FetchOptions { } } +// Clone will clone and return the fetch options. +func (o *FetchOptions) Clone() *FetchOptions { + result := *o + return &result +} + // Querier handles queries against a storage. type Querier interface { // Fetch fetches timeseries data based on a query diff --git a/src/query/ts/m3db/encoded_series_iterator.go b/src/query/ts/m3db/encoded_series_iterator.go index b42119f9e0..5020a2b1e4 100644 --- a/src/query/ts/m3db/encoded_series_iterator.go +++ b/src/query/ts/m3db/encoded_series_iterator.go @@ -21,6 +21,7 @@ package m3db import ( + "fmt" "math" "github.com/m3db/m3/src/dbnode/encoding" @@ -73,6 +74,7 @@ func (it *encodedSeriesIter) Current() block.Series { } func (it *encodedSeriesIter) Next() bool { + fmt.Printf("THIS iter1 next\n") if it.err != nil { return false } diff --git a/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go b/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go index e6bd8ef8f6..f9250b1775 100644 --- a/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go +++ b/src/query/ts/m3db/encoded_unconsolidated_series_iterator.go @@ -21,6 +21,7 @@ package m3db import ( + "fmt" "time" "github.com/m3db/m3/src/dbnode/encoding" @@ -60,6 +61,7 @@ func (it *encodedSeriesIterUnconsolidated) Err() error { } func (it *encodedSeriesIterUnconsolidated) Next() bool { + fmt.Printf("THIS iter0 next\n") if it.err != nil { return false } diff --git a/src/query/tsdb/remote/client.go b/src/query/tsdb/remote/client.go index 58f68c77f7..89a76ae1b6 100644 --- a/src/query/tsdb/remote/client.go +++ b/src/query/tsdb/remote/client.go @@ -60,7 +60,7 @@ type grpcClient struct { const initResultSize = 10 -// NewGRPCClient creates grpc client +// NewGRPCClient creates grpc client. func NewGRPCClient( addresses []string, poolWrapper *pools.PoolWrapper, @@ -93,7 +93,7 @@ func NewGRPCClient( }, nil } -// Fetch reads from remote client storage +// Fetch reads from remote client storage. func (c *grpcClient) Fetch( ctx context.Context, query *storage.FetchQuery, diff --git a/src/query/tsdb/remote/compressed_codecs.go b/src/query/tsdb/remote/compressed_codecs.go index a85d205c1e..13b549305f 100644 --- a/src/query/tsdb/remote/compressed_codecs.go +++ b/src/query/tsdb/remote/compressed_codecs.go @@ -26,8 +26,10 @@ import ( "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/dbnode/x/xpool" @@ -37,7 +39,6 @@ import ( "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" - "github.com/m3db/m3/src/dbnode/namespace" ) func initializeVars() { @@ -208,6 +209,8 @@ func encodeToCompressedFetchResult( return nil, err } + spew.Dump(series) + seriesList = append(seriesList, series) } @@ -270,7 +273,7 @@ func tagIteratorFromSeries( iteratorPools encoding.IteratorPools, ) (ident.TagIterator, error) { if series != nil && len(series.GetCompressedTags()) > 0 { - return tagIteratorFromCompressedTagsWithDecoder( + return tagIteratorFromCompressedTagsWithDecoder( series.GetCompressedTags(), iteratorPools, ) diff --git a/src/query/tsdb/remote/compressed_codecs_test.go b/src/query/tsdb/remote/compressed_codecs_test.go index 3f786c5975..c2ebc8d5ca 100644 --- a/src/query/tsdb/remote/compressed_codecs_test.go +++ b/src/query/tsdb/remote/compressed_codecs_test.go @@ -81,21 +81,26 @@ func validateSeriesInternals(t *testing.T, it encoding.SeriesIterator) { } } -func validateSeries(t *testing.T, it encoding.SeriesIterator) { - defer it.Close() - - expectedValues := [60]float64{} +func expectedValues() []float64 { + expectedValues := make([]float64, 60) for i := 2; i < 30; i++ { expectedValues[i] = float64(i) + 1 } for i := 0; i < 30; i++ { expectedValues[i+30] = float64(i) + 101 } - for i := 2; i < 60; i++ { + return expectedValues[2:] + +} + +func validateSeries(t *testing.T, it encoding.SeriesIterator) { + defer it.Close() + + for i, expectedValue := range expectedValues() { require.True(t, it.Next()) dp, unit, annotation := it.Current() - require.Equal(t, expectedValues[i], dp.Value) - require.Equal(t, seriesStart.Add(time.Duration(i-2)*time.Minute), dp.Timestamp) + require.Equal(t, expectedValue, dp.Value) + require.Equal(t, seriesStart.Add(time.Duration(i)*time.Minute), dp.Timestamp) uv, err := unit.Value() assert.NoError(t, err) assert.Equal(t, time.Second, uv) diff --git a/src/query/tsdb/remote/server.go b/src/query/tsdb/remote/server.go index 9167d5e0f3..5c4ce34b2b 100644 --- a/src/query/tsdb/remote/server.go +++ b/src/query/tsdb/remote/server.go @@ -21,13 +21,13 @@ package remote import ( - "net" "sync" "time" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/query/errors" rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/pools" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3" @@ -41,43 +41,31 @@ const poolTimeout = time.Second * 10 // TODO: add metrics type grpcServer struct { - storage m3.Storage - poolWrapper *pools.PoolWrapper - once sync.Once - pools encoding.IteratorPools - poolErr error + storage m3.Storage + queryContextOpts models.QueryContextOptions + poolWrapper *pools.PoolWrapper + once sync.Once + pools encoding.IteratorPools + poolErr error } -// CreateNewGrpcServer builds a grpc server which must be started later -func CreateNewGrpcServer( +// NewGRPCServer builds a grpc server which must be started later. +func NewGRPCServer( store m3.Storage, + queryContextOpts models.QueryContextOptions, poolWrapper *pools.PoolWrapper, ) *grpc.Server { server := grpc.NewServer() grpcServer := &grpcServer{ - storage: store, - poolWrapper: poolWrapper, + storage: store, + queryContextOpts: queryContextOpts, + poolWrapper: poolWrapper, } rpc.RegisterQueryServer(server, grpcServer) return server } -// StartNewGrpcServer starts server on given address, then notifies channel -func StartNewGrpcServer( - server *grpc.Server, - address string, - waitForStart chan<- struct{}, -) error { - lis, err := net.Listen("tcp", address) - if err != nil { - return err - } - - waitForStart <- struct{}{} - return server.Serve(lis) -} - func (s *grpcServer) waitForPools() (encoding.IteratorPools, error) { s.once.Do(func() { s.pools, s.poolErr = s.poolWrapper.WaitForIteratorPools(poolTimeout) @@ -86,7 +74,7 @@ func (s *grpcServer) waitForPools() (encoding.IteratorPools, error) { return s.pools, s.poolErr } -// Fetch reads decompressed series from m3 storage +// Fetch reads decompressed series from M3 storage. func (s *grpcServer) Fetch( message *rpc.FetchRequest, stream rpc.Query_FetchServer, @@ -99,11 +87,11 @@ func (s *grpcServer) Fetch( return err } - result, cleanup, err := s.storage.FetchCompressed( - ctx, - storeQuery, - storage.NewFetchOptions(), - ) + // TODO(r): Allow propagation of limit from RPC request + fetchOpts := storage.NewFetchOptions() + fetchOpts.Limit = s.queryContextOpts.LimitMaxTimeseries + + result, cleanup, err := s.storage.FetchCompressed(ctx, storeQuery, fetchOpts) defer cleanup() if err != nil { logger.Error("unable to fetch local query", zap.Error(err)) @@ -144,11 +132,12 @@ func (s *grpcServer) Search( return err } - results, cleanup, err := s.storage.SearchCompressed( - ctx, - searchQuery, - storage.NewFetchOptions(), - ) + // TODO(r): Allow propagation of limit from RPC request + fetchOpts := storage.NewFetchOptions() + fetchOpts.Limit = s.queryContextOpts.LimitMaxTimeseries + + results, cleanup, err := s.storage.SearchCompressed(ctx, searchQuery, + fetchOpts) defer cleanup() if err != nil { logger.Error("unable to search tags", zap.Error(err)) diff --git a/src/query/tsdb/remote/server_test.go b/src/query/tsdb/remote/server_test.go new file mode 100644 index 0000000000..a6d181687c --- /dev/null +++ b/src/query/tsdb/remote/server_test.go @@ -0,0 +1,328 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package remote + +import ( + "context" + "errors" + "net" + "runtime" + "sync" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/query/block" + m3err "github.com/m3db/m3/src/query/errors" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/pools" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/storage/m3" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/util/logging" + xsync "github.com/m3db/m3/src/x/sync" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +var ( + errRead = errors.New("read error") + poolsWrapper = pools.NewPoolsWrapper(pools.BuildIteratorPools()) +) + +type mockStorageOptions struct { + err error + iters encoding.SeriesIterators + fetchCompressedSleep time.Duration +} + +func newMockStorage( + t *testing.T, + ctrl *gomock.Controller, + opts mockStorageOptions, +) *m3.MockStorage { + store := m3.NewMockStorage(ctrl) + store.EXPECT(). + FetchCompressed(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + query *storage.FetchQuery, + options *storage.FetchOptions, + ) (encoding.SeriesIterators, m3.Cleanup, error) { + noopCleanup := func() error { return nil } + + if opts.err != nil { + return nil, noopCleanup, opts.err + } + + if opts.fetchCompressedSleep > 0 { + time.Sleep(opts.fetchCompressedSleep) + } + + iters := opts.iters + if iters == nil { + it, err := test.BuildTestSeriesIterator() + require.NoError(t, err) + iters = encoding.NewSeriesIterators( + []encoding.SeriesIterator{it}, + nil, + ) + } + + return iters, noopCleanup, nil + }). + AnyTimes() + return store +} + +func checkRemoteFetch(t *testing.T, r *storage.FetchResult) { + require.Equal(t, 1, len(r.SeriesList)) + + for _, series := range r.SeriesList { + datapoints := series.Values().Datapoints() + values := make([]float64, 0, len(datapoints)) + for _, d := range datapoints { + values = append(values, d.Value) + } + + require.Equal(t, expectedValues(), values) + } +} + +func startServer(t *testing.T, ctrl *gomock.Controller, store m3.Storage) net.Listener { + server := NewGRPCServer(store, models.QueryContextOptions{}, poolsWrapper) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + go func() { + server.Serve(listener) + }() + + return listener +} + +func createCtxReadOpts(t *testing.T) (context.Context, *storage.FetchQuery, *storage.FetchOptions) { + logging.InitWithCores(nil) + + ctx := context.Background() + read, _, _ := createStorageFetchQuery(t) + readOpts := storage.NewFetchOptions() + return ctx, read, readOpts +} + +func checkFetch(ctx context.Context, t *testing.T, client Client, read *storage.FetchQuery, readOpts *storage.FetchOptions) { + fetch, err := client.Fetch(ctx, read, readOpts) + require.NoError(t, err) + checkRemoteFetch(t, fetch) +} + +func checkErrorFetch(ctx context.Context, t *testing.T, client Client, read *storage.FetchQuery, readOpts *storage.FetchOptions) { + fetch, err := client.Fetch(ctx, read, readOpts) + assert.Nil(t, fetch) + assert.Equal(t, errRead.Error(), grpc.ErrorDesc(err)) +} + +func buildClient(t *testing.T, hosts []string) Client { + readWorkerPool, err := xsync.NewPooledWorkerPool(runtime.NumCPU(), + xsync.NewPooledWorkerPoolOptions()) + readWorkerPool.Init() + require.NoError(t, err) + client, err := NewGRPCClient(hosts, poolsWrapper, readWorkerPool, + models.NewTagOptions(), 0, grpc.WithBlock()) + require.NoError(t, err) + return client +} + +func TestRpc(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, read, readOpts := createCtxReadOpts(t) + store := newMockStorage(t, ctrl, mockStorageOptions{}) + listener := startServer(t, ctrl, store) + client := buildClient(t, []string{listener.Addr().String()}) + defer func() { + assert.NoError(t, client.Close()) + }() + + checkFetch(ctx, t, client, read, readOpts) +} + +func TestRpcMultipleRead(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, read, readOpts := createCtxReadOpts(t) + store := newMockStorage(t, ctrl, mockStorageOptions{}) + + listener := startServer(t, ctrl, store) + client := buildClient(t, []string{listener.Addr().String()}) + defer func() { + assert.NoError(t, client.Close()) + }() + + fetch, err := client.Fetch(ctx, read, readOpts) + require.NoError(t, err) + checkRemoteFetch(t, fetch) +} + +func TestRpcStopsStreamingWhenFetchKilledOnClient(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, read, readOpts := createCtxReadOpts(t) + store := newMockStorage(t, ctrl, mockStorageOptions{ + fetchCompressedSleep: time.Second, + }) + + listener := startServer(t, ctrl, store) + client := buildClient(t, []string{listener.Addr().String()}) + defer func() { + assert.NoError(t, client.Close()) + }() + + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + + fetch, err := client.Fetch(ctx, read, readOpts) + require.Nil(t, fetch) + require.Error(t, err) +} + +func TestMultipleClientRpc(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, read, readOpts := createCtxReadOpts(t) + store := newMockStorage(t, ctrl, mockStorageOptions{ + fetchCompressedSleep: 10 * time.Millisecond, + }) + + listener := startServer(t, ctrl, store) + + var wg sync.WaitGroup + clients := make([]Client, 10) + for i := range clients { + clients[i] = buildClient(t, []string{listener.Addr().String()}) + } + + defer func() { + for _, client := range clients { + assert.NoError(t, client.Close()) + } + }() + + for _, client := range clients { + wg.Add(1) + client := client + go func() { + checkFetch(ctx, t, client, read, readOpts) + wg.Done() + }() + } + + wg.Wait() +} + +func TestEmptyAddressListErrors(t *testing.T) { + readWorkerPool, err := xsync.NewPooledWorkerPool(runtime.NumCPU(), + xsync.NewPooledWorkerPoolOptions()) + require.NoError(t, err) + readWorkerPool.Init() + + addresses := []string{} + client, err := NewGRPCClient(addresses, poolsWrapper, readWorkerPool, + models.NewTagOptions(), 0, grpc.WithBlock()) + assert.Nil(t, client) + assert.Equal(t, m3err.ErrNoClientAddresses, err) +} + +func TestErrRpc(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, read, readOpts := createCtxReadOpts(t) + store := newMockStorage(t, ctrl, mockStorageOptions{ + err: errors.New("read error"), + }) + + listener := startServer(t, ctrl, store) + client := buildClient(t, []string{listener.Addr().String()}) + defer func() { + assert.NoError(t, client.Close()) + }() + + checkErrorFetch(ctx, t, client, read, readOpts) +} + +func TestRoundRobinClientRpc(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, read, readOpts := createCtxReadOpts(t) + store := newMockStorage(t, ctrl, mockStorageOptions{}) + errStore := newMockStorage(t, ctrl, mockStorageOptions{ + err: errors.New("read error"), + }) + + listener1 := startServer(t, ctrl, store) + listener2 := startServer(t, ctrl, errStore) + + hosts := []string{listener1.Addr().String(), listener2.Addr().String()} + client := buildClient(t, hosts) + defer func() { + assert.NoError(t, client.Close()) + }() + + // Host ordering is not always deterministic; retry several times to ensure at least one + // call is made to both hosts. Giving 10 attempts per host should remove flakiness while guaranteeing + // round robin behaviour + attempts := 20 + + hitHost, hitErrHost := false, false + for i := 0; i < attempts; i++ { + fetch, err := client.Fetch(ctx, read, readOpts) + if fetch == nil { + assert.Equal(t, errRead.Error(), grpc.ErrorDesc(err)) + hitErrHost = true + } else { + checkRemoteFetch(t, fetch) + hitHost = true + } + if hitHost && hitErrHost { + break + } + } + + assert.True(t, hitHost, "round robin did not fetch from host") + assert.True(t, hitErrHost, "round robin did not fetch from error host") +} + +func validateBlockResult(t *testing.T, r block.Result) { + require.Equal(t, 1, len(r.Blocks)) + + _, err := r.Blocks[0].SeriesIter() + require.NoError(t, err) +} diff --git a/src/query/tsdb/remote/server_testgo b/src/query/tsdb/remote/server_testgo deleted file mode 100644 index e9344207cf..0000000000 --- a/src/query/tsdb/remote/server_testgo +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package remote - -import ( - "context" - "errors" - "fmt" - "sync" - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/encoding" - "github.com/m3db/m3/src/query/block" - m3err "github.com/m3db/m3/src/query/errors" - "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/query/storage/m3" - "github.com/m3db/m3/src/query/test" - "github.com/m3db/m3/src/query/util/logging" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" -) - -var ( - errRead = errors.New("read error") - initialPort = 17762 - testMu sync.Mutex -) - -func generateAddress() string { - testMu.Lock() - defer testMu.Unlock() - address := fmt.Sprintf("localhost:%d", initialPort) - initialPort++ - return address -} - -type mockStorage struct { - t *testing.T - read *storage.FetchQuery - sleepMillis int -} - -func (s *mockStorage) Fetch( - ctx context.Context, - query *storage.FetchQuery, - _ *storage.FetchOptions, -) (*storage.FetchResult, error) { - return nil, m3err.ErrNotImplemented -} - -func (s *mockStorage) FetchTags( - ctx context.Context, - query *storage.FetchQuery, - _ *storage.FetchOptions, -) (*storage.SearchResults, error) { - return nil, nil -} - -func (s *mockStorage) FetchBlocks( - ctx context.Context, - query *storage.FetchQuery, - options *storage.FetchOptions, -) (block.Result, error) { - return block.Result{}, m3err.ErrNotImplemented -} - -func (s *mockStorage) FetchRaw( - ctx context.Context, - query *storage.FetchQuery, - options *storage.FetchOptions, -) (encoding.SeriesIterators, error) { - it, _ := test.BuildTestSeriesIterator() - its := encoding.NewSeriesIterators( - []encoding.SeriesIterator{it}, - nil, - ) - - if s.sleepMillis > 0 { - time.Sleep(time.Millisecond * time.Duration(s.sleepMillis)) - } - - return its, nil -} - -func (s *mockStorage) Write(ctx context.Context, query *storage.WriteQuery) error { - return nil -} - -func (s *mockStorage) Type() storage.Type { - return storage.Type(0) -} - -func (s *mockStorage) Close() error { - return nil -} - -func checkRemoteFetch(t *testing.T, its encoding.SeriesIterators) { - iters := its.Iters() - require.Len(t, iters, 1) - validateSeries(t, iters[0]) -} - -func startServer(t *testing.T, host string, store m3.Storage) { - ip := &mockIteratorPool{} - server := CreateNewGrpcServer(store, ip) - waitForStart := make(chan struct{}) - go func() { - err := StartNewGrpcServer(server, host, waitForStart) - assert.NoError(t, err) - }() - - <-waitForStart -} - -func createStorageFetchOptions() *storage.FetchOptions { - return &storage.FetchOptions{ - KillChan: make(chan struct{}), - } -} - -func createCtxReadOpts(t *testing.T) (context.Context, *storage.FetchQuery, *storage.FetchOptions, string) { - logging.InitWithCores(nil) - - ctx := context.Background() - read, _, _ := createStorageFetchQuery(t) - readOpts := createStorageFetchOptions() - host := generateAddress() - return ctx, read, readOpts, host -} - -func checkFetch(ctx context.Context, t *testing.T, client Client, read *storage.FetchQuery, readOpts *storage.FetchOptions) { - fetch, err := client.Fetch(ctx, read, readOpts) - require.NoError(t, err) - checkRemoteFetch(t, fetch) -} - -func checkErrorFetch(ctx context.Context, t *testing.T, client Client, read *storage.FetchQuery, readOpts *storage.FetchOptions) { - fetch, err := client.Fetch(ctx, read, readOpts) - assert.Nil(t, fetch) - assert.Equal(t, errRead.Error(), grpc.ErrorDesc(err)) -} - -func buildClient(t *testing.T, hosts []string) Client { - ip := &mockIteratorPool{} - client, err := NewGrpcClient(hosts, ip, nil, grpc.WithBlock()) - require.NoError(t, err) - return client -} - -func TestRpc(t *testing.T) { - ctx, read, readOpts, host := createCtxReadOpts(t) - store := &mockStorage{ - t: t, - read: read, - } - startServer(t, host, store) - hosts := []string{host} - client := buildClient(t, hosts) - defer func() { - assert.NoError(t, client.Close()) - }() - - checkFetch(ctx, t, client, read, readOpts) -} - -func TestRpcMultipleRead(t *testing.T) { - ctx, read, readOpts, host := createCtxReadOpts(t) - store := &mockStorage{ - t: t, - read: read, - } - - startServer(t, host, store) - hosts := []string{host} - client := buildClient(t, hosts) - defer func() { - assert.NoError(t, client.Close()) - }() - - fetch, err := client.Fetch(ctx, read, readOpts) - require.NoError(t, err) - checkRemoteFetch(t, fetch) -} - -func TestRpcStopsStreamingWhenFetchKilledOnClient(t *testing.T) { - ctx, read, readOpts, host := createCtxReadOpts(t) - sleepMillis := 100 - store := &mockStorage{ - t: t, - read: read, - sleepMillis: sleepMillis, - } - - startServer(t, host, store) - hosts := []string{host} - client := buildClient(t, hosts) - defer func() { - assert.NoError(t, client.Close()) - }() - - go func() { - time.Sleep(time.Millisecond * 10) - readOpts.KillChan <- struct{}{} - }() - - fetch, err := client.Fetch(ctx, read, readOpts) - require.Nil(t, fetch) - assert.Equal(t, err, m3err.ErrQueryInterrupted) -} - -func TestMultipleClientRpc(t *testing.T) { - ctx, read, readOpts, host := createCtxReadOpts(t) - store := &mockStorage{ - t: t, - read: read, - sleepMillis: 10, - } - startServer(t, host, store) - - var wg sync.WaitGroup - clients := make([]Client, 10) - for i := range clients { - hosts := []string{host} - clients[i] = buildClient(t, hosts) - } - - defer func() { - for _, client := range clients { - assert.NoError(t, client.Close()) - } - }() - - for _, client := range clients { - wg.Add(1) - client := client - go func() { - checkFetch(ctx, t, client, read, readOpts) - wg.Done() - }() - } - - wg.Wait() -} - -type errStorage struct { - t *testing.T - read *storage.FetchQuery -} - -func (s *errStorage) Fetch( - ctx context.Context, - query *storage.FetchQuery, - _ *storage.FetchOptions, -) (*storage.FetchResult, error) { - readQueriesAreEqual(s.t, s.read, query) - return nil, m3err.ErrNotImplemented -} - -func (s *errStorage) FetchRaw( - ctx context.Context, - query *storage.FetchQuery, - _ *storage.FetchOptions, -) (encoding.SeriesIterators, error) { - return nil, errRead -} - -func (s *errStorage) FetchBlocks( - ctx context.Context, - query *storage.FetchQuery, - _ *storage.FetchOptions, -) (block.Result, error) { - return block.Result{}, m3err.ErrNotImplemented -} - -func (s *errStorage) FetchTags( - ctx context.Context, - query *storage.FetchQuery, - _ *storage.FetchOptions, -) (*storage.SearchResults, error) { - return nil, m3err.ErrNotImplemented -} - -func (s *errStorage) Write(ctx context.Context, query *storage.WriteQuery) error { - return m3err.ErrNotImplemented -} - -func (s *errStorage) Type() storage.Type { - return storage.Type(-1) -} - -func (s *errStorage) Close() error { - return nil -} - -func TestEmptyAddressListErrors(t *testing.T) { - addresses := []string{} - client, err := NewGrpcClient(addresses, nil, nil) - assert.Nil(t, client) - assert.Equal(t, m3err.ErrNoClientAddresses, err) -} - -func TestErrRpc(t *testing.T) { - ctx, read, readOpts, host := createCtxReadOpts(t) - store := &errStorage{ - t: t, - read: read, - } - startServer(t, host, store) - hosts := []string{host} - client := buildClient(t, hosts) - defer func() { - assert.NoError(t, client.Close()) - }() - - checkErrorFetch(ctx, t, client, read, readOpts) -} - -func TestRoundRobinClientRpc(t *testing.T) { - ctx, read, readOpts, host := createCtxReadOpts(t) - store := &mockStorage{ - t: t, - read: read, - } - startServer(t, host, store) - errHost := generateAddress() - errStore := &errStorage{ - t: t, - read: read, - } - startServer(t, errHost, errStore) - - hosts := []string{host, errHost} - client := buildClient(t, hosts) - defer func() { - assert.NoError(t, client.Close()) - }() - - // Host ordering is not always deterministic; retry several times to ensure at least one - // call is made to both hosts. Giving 10 attempts per host should remove flakiness while guaranteeing - // round robin behaviour - attempts := 20 - - hitHost, hitErrHost := false, false - for i := 0; i < attempts; i++ { - fetch, err := client.FetchRaw(ctx, read, readOpts) - if fetch == nil { - assert.Equal(t, errRead.Error(), grpc.ErrorDesc(err)) - hitErrHost = true - } else { - checkRemoteFetch(t, fetch) - hitHost = true - } - if hitHost && hitErrHost { - break - } - } - - assert.True(t, hitHost, "round robin did not fetch from host") - assert.True(t, hitErrHost, "round robin did not fetch from error host") -}