Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add config for timeseries limit returned by single DB node #1644

Merged
merged 12 commits into from
May 18, 2019
Merged
6 changes: 6 additions & 0 deletions scripts/docker-integration-tests/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
function retry_with_backoff {
local max_attempts=${ATTEMPTS-5}
local timeout=${TIMEOUT-1}
local max_timeout=${MAX_TIMEOUT}
local attempt=1
local exitCode=0

Expand All @@ -27,6 +28,11 @@ function retry_with_backoff {
sleep $timeout
attempt=$(( attempt + 1 ))
timeout=$(( timeout * 2 ))
if [[ $max_timeout != "" ]]; then
if [[ $timeout -gt $max_timeout ]]; then
timeout=$max_timeout
fi
fi
done

if [[ $exitCode != 0 ]]
Expand Down
4 changes: 4 additions & 0 deletions scripts/docker-integration-tests/prometheus/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ metrics:
samplingRate: 1.0
extended: none

limits:
perQuery:
maxFetchedSeries: 100

clusters:
- namespaces:
- namespace: agg
Expand Down
15 changes: 13 additions & 2 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@ docker-compose -f ${COMPOSE_FILE} up -d prometheus01

# Ensure Prometheus can proxy a Prometheus query
echo "Wait until the remote write endpoint generates and allows for data to be queried"
ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=prometheus_remote_storage_succeeded_samples_total | jq -r .data.result[].value[1]) -gt 100 ]]'

# Make sure we're proxying writes to the unaggregated namespace
echo "Wait until data begins being written to remote storage for the unaggregated namespace"
ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"unagg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]'

# Make sure we're proxying writes to the aggregated namespace
echo "Wait until data begins being written to remote storage for the aggregated namespace"
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"agg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]'

# Test the default series limit applied when directly querying
# coordinator (limit set to 100 in m3coordinator.yml)
echo "Test query limit with coordinator defaults"
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 100 ]]'

# Test the default series limit applied when directly querying
# coordinator (limit set by header)
echo "Test query limit with coordinator limit header"
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 10" 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 10 ]]'
4 changes: 3 additions & 1 deletion scripts/docker-integration-tests/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ cd $GOPATH/src/github.com/m3db/m3

SERVICES=(m3dbnode m3coordinator m3aggregator)
REVISION=$(git rev-parse HEAD)
make clean
if [[ $SKIP_CLEAN != "true" ]]; then
make clean
fi
mkdir -p ./bin

# by keeping all the required files in ./bin, it makes the build context
Expand Down
4 changes: 2 additions & 2 deletions scripts/docker-integration-tests/simple/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{
"isolation_group": "rack-a",
"zone": "embedded",
"weight": 1024,
"endpoint": "127.0.0.1::9000",
"hostname": "127.0.0.1:",
"endpoint": "127.0.0.1:9000",
"hostname": "127.0.0.1",
"port": 9000
}
]
Expand Down
48 changes: 39 additions & 9 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
ingestm3msg "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest/m3msg"
"github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
Expand Down Expand Up @@ -62,6 +63,8 @@ var (
defaultLookbackDuration = 5 * time.Minute

defaultCarbonIngesterAggregationType = aggregation.Mean

defaultStorageQueryLimit = 10000
)

// Configuration is the configuration for the query service.
Expand Down Expand Up @@ -125,7 +128,11 @@ type Configuration struct {

// Cache configurations.
//
// Deprecated: cache configurations are no longer supported. Remove from file.
// Deprecated: cache configurations are no longer supported. Remove from file
// when we can make breaking changes.
// (If/when removed it will make existing configurations with the cache
// stanza not able to startup the binary since we parse YAML in strict mode
// by default).
DeprecatedCache CacheConfiguration `yaml:"cache"`
}

Expand Down Expand Up @@ -168,8 +175,8 @@ type ResultOptions struct {
KeepNans bool `yaml:"keepNans"`
}

// LimitsConfiguration represents limitations on resource usage in the query instance. Limits are split between per-query
// and global limits.
// LimitsConfiguration represents limitations on resource usage in the query
// instance. Limits are split between per-query and global limits.
type LimitsConfiguration struct {
// deprecated: use PerQuery.MaxComputedDatapoints instead.
DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
Expand All @@ -195,18 +202,22 @@ func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
return lc.DeprecatedMaxComputedDatapoints
}

// GlobalLimitsConfiguration represents limits on resource usage across a query instance. Zero or negative values imply no limit.
// GlobalLimitsConfiguration represents limits on resource usage across a query
// instance. Zero or negative values imply no limit.
type GlobalLimitsConfiguration struct {
// MaxFetchedDatapoints limits the total number of datapoints actually fetched by all queries at any given time.
// MaxFetchedDatapoints limits the total number of datapoints actually
// fetched by all queries at any given time.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
}

// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints.
// AsLimitManagerOptions converts this configuration to
// cost.LimitManagerOptions for MaxFetchedDatapoints.
func (l *GlobalLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions {
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

// PerQueryLimitsConfiguration represents limits on resource usage within a single query. Zero or negative values imply no limit.
// PerQueryLimitsConfiguration represents limits on resource usage within a
// single query. Zero or negative values imply no limit.
type PerQueryLimitsConfiguration struct {
// PrivateMaxComputedDatapoints limits the number of datapoints that can be
// returned by a query. It's determined purely
Expand All @@ -217,15 +228,34 @@ type PerQueryLimitsConfiguration struct {
// this field directly.
PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`

// MaxFetchedDatapoints limits the number of datapoints actually used by a given query.
// MaxFetchedDatapoints limits the number of datapoints actually used by a
// given query.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`

// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int64 `yaml:"maxFetchedSeries"`
}

// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints.
// AsLimitManagerOptions converts this configuration to
// cost.LimitManagerOptions for MaxFetchedDatapoints.
func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions {
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

// AsFetchOptionsBuilderOptions converts this configuration to
// handler.FetchOptionsBuilderOptions.
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handler.FetchOptionsBuilderOptions {
if l.MaxFetchedSeries <= 0 {
return handler.FetchOptionsBuilderOptions{
Limit: defaultStorageQueryLimit,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline


return handler.FetchOptionsBuilderOptions{
Limit: int(l.MaxFetchedSeries),
}
}

func toLimitManagerOptions(limit int64) cost.LimitManagerOptions {
return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{
Threshold: cost.Cost(limit),
Expand Down
68 changes: 68 additions & 0 deletions src/query/api/v1/handler/fetch_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package handler

import (
"net/http"
"strconv"

"github.com/m3db/m3/src/query/storage"
xhttp "github.com/m3db/m3/src/x/net/http"
)

// FetchOptionsBuilder builds fetch options based on a request and default
// config.
type FetchOptionsBuilder interface {
NewFetchOptions(req *http.Request) (*storage.FetchOptions, *xhttp.ParseError)
}

// FetchOptionsBuilderOptions provides options to use when creating a
// fetch options builder.
type FetchOptionsBuilderOptions struct {
Limit int
}

type fetchOptionsBuilder struct {
opts FetchOptionsBuilderOptions
}

// NewFetchOptionsBuilder returns a new fetch options builder.
func NewFetchOptionsBuilder(
opts FetchOptionsBuilderOptions,
) FetchOptionsBuilder {
return fetchOptionsBuilder{opts: opts}
}

func (b fetchOptionsBuilder) NewFetchOptions(
req *http.Request,
) (*storage.FetchOptions, *xhttp.ParseError) {
fetchOpts := storage.NewFetchOptions()
fetchOpts.Limit = b.opts.Limit
if str := req.Header.Get(LimitMaxSeriesHeader); str != "" {
n, err := strconv.Atoi(str)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
fetchOpts.Limit = n
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline


return fetchOpts, nil
}
83 changes: 83 additions & 0 deletions src/query/api/v1/handler/fetch_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package handler

import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
)

func TestFetchOptionsBuilder(t *testing.T) {
tests := []struct {
name string
defaultLimit int
headers map[string]string
expectedLimit int
expectedErr bool
}{
{
name: "default limit with no headers",
defaultLimit: 42,
headers: map[string]string{},
expectedLimit: 42,
},
{
name: "limit with header",
defaultLimit: 42,
headers: map[string]string{
LimitMaxSeriesHeader: "4242",
},
expectedLimit: 4242,
},
{
name: "bad header",
defaultLimit: 42,
headers: map[string]string{
LimitMaxSeriesHeader: "not_a_number",
},
expectedErr: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{
Limit: test.defaultLimit,
})

req := httptest.NewRequest("GET", "/foo", nil)
for k, v := range test.headers {
req.Header.Add(k, v)
}

opts, err := builder.NewFetchOptions(req)

if !test.expectedErr {
require.NoError(t, err)
require.Equal(t, test.expectedLimit, opts.Limit)
} else {
require.Error(t, err)
}
})
}
}
18 changes: 12 additions & 6 deletions src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ var (
)

type grahiteFindHandler struct {
storage storage.Storage
storage storage.Storage
fetchOptionsBuilder handler.FetchOptionsBuilder
}

// NewFindHandler returns a new instance of handler.
func NewFindHandler(
storage storage.Storage,
fetchOptionsBuilder handler.FetchOptionsBuilder,
) http.Handler {
return &grahiteFindHandler{
storage: storage,
storage: storage,
fetchOptionsBuilder: fetchOptionsBuilder,
}
}

Expand Down Expand Up @@ -110,16 +113,19 @@ func (h *grahiteFindHandler) ServeHTTP(
return
}

opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

var (
terminatedResult *storage.CompleteTagsResult
tErr error
childResult *storage.CompleteTagsResult
cErr error
opts = storage.NewFetchOptions()

wg sync.WaitGroup
wg sync.WaitGroup
)

wg.Add(2)
go func() {
terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts)
Expand Down
Loading