Skip to content

Commit

Permalink
[query] Add config for timeseries limit returned by single DB node (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored May 18, 2019
1 parent 1ec3b0b commit 9ce4743
Show file tree
Hide file tree
Showing 53 changed files with 1,162 additions and 650 deletions.
6 changes: 6 additions & 0 deletions scripts/docker-integration-tests/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
function retry_with_backoff {
local max_attempts=${ATTEMPTS-5}
local timeout=${TIMEOUT-1}
local max_timeout=${MAX_TIMEOUT}
local attempt=1
local exitCode=0

Expand All @@ -27,6 +28,11 @@ function retry_with_backoff {
sleep $timeout
attempt=$(( attempt + 1 ))
timeout=$(( timeout * 2 ))
if [[ $max_timeout != "" ]]; then
if [[ $timeout -gt $max_timeout ]]; then
timeout=$max_timeout
fi
fi
done

if [[ $exitCode != 0 ]]
Expand Down
4 changes: 4 additions & 0 deletions scripts/docker-integration-tests/prometheus/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ metrics:
samplingRate: 1.0
extended: none

limits:
perQuery:
maxFetchedSeries: 100

clusters:
- namespaces:
- namespace: agg
Expand Down
15 changes: 13 additions & 2 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@ docker-compose -f ${COMPOSE_FILE} up -d prometheus01

# Ensure Prometheus can proxy a Prometheus query
echo "Wait until the remote write endpoint generates and allows for data to be queried"
ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=prometheus_remote_storage_succeeded_samples_total | jq -r .data.result[].value[1]) -gt 100 ]]'

# Make sure we're proxying writes to the unaggregated namespace
echo "Wait until data begins being written to remote storage for the unaggregated namespace"
ATTEMPTS=6 TIMEOUT=2 retry_with_backoff \
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"unagg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]'

# Make sure we're proxying writes to the aggregated namespace
echo "Wait until data begins being written to remote storage for the aggregated namespace"
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"agg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]'

# Test the default series limit applied when directly querying
# coordinator (limit set to 100 in m3coordinator.yml)
echo "Test query limit with coordinator defaults"
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 100 ]]'

# Test the default series limit applied when directly querying
# coordinator (limit set by header)
echo "Test query limit with coordinator limit header"
ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 10" 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 10 ]]'
4 changes: 3 additions & 1 deletion scripts/docker-integration-tests/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ cd $GOPATH/src/github.com/m3db/m3

SERVICES=(m3dbnode m3coordinator m3aggregator)
REVISION=$(git rev-parse HEAD)
make clean
if [[ $SKIP_CLEAN != "true" ]]; then
make clean
fi
mkdir -p ./bin

# by keeping all the required files in ./bin, it makes the build context
Expand Down
4 changes: 2 additions & 2 deletions scripts/docker-integration-tests/simple/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{
"isolation_group": "rack-a",
"zone": "embedded",
"weight": 1024,
"endpoint": "127.0.0.1::9000",
"hostname": "127.0.0.1:",
"endpoint": "127.0.0.1:9000",
"hostname": "127.0.0.1",
"port": 9000
}
]
Expand Down
48 changes: 39 additions & 9 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
ingestm3msg "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest/m3msg"
"github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
Expand Down Expand Up @@ -62,6 +63,8 @@ var (
defaultLookbackDuration = 5 * time.Minute

defaultCarbonIngesterAggregationType = aggregation.Mean

defaultStorageQueryLimit = 10000
)

// Configuration is the configuration for the query service.
Expand Down Expand Up @@ -125,7 +128,11 @@ type Configuration struct {

// Cache configurations.
//
// Deprecated: cache configurations are no longer supported. Remove from file.
// Deprecated: cache configurations are no longer supported. Remove from file
// when we can make breaking changes.
// (If/when removed it will make existing configurations with the cache
// stanza not able to startup the binary since we parse YAML in strict mode
// by default).
DeprecatedCache CacheConfiguration `yaml:"cache"`
}

Expand Down Expand Up @@ -168,8 +175,8 @@ type ResultOptions struct {
KeepNans bool `yaml:"keepNans"`
}

// LimitsConfiguration represents limitations on resource usage in the query instance. Limits are split between per-query
// and global limits.
// LimitsConfiguration represents limitations on resource usage in the query
// instance. Limits are split between per-query and global limits.
type LimitsConfiguration struct {
// deprecated: use PerQuery.MaxComputedDatapoints instead.
DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
Expand All @@ -195,18 +202,22 @@ func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
return lc.DeprecatedMaxComputedDatapoints
}

// GlobalLimitsConfiguration represents limits on resource usage across a query instance. Zero or negative values imply no limit.
// GlobalLimitsConfiguration represents limits on resource usage across a query
// instance. Zero or negative values imply no limit.
type GlobalLimitsConfiguration struct {
// MaxFetchedDatapoints limits the total number of datapoints actually fetched by all queries at any given time.
// MaxFetchedDatapoints limits the total number of datapoints actually
// fetched by all queries at any given time.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
}

// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints.
// AsLimitManagerOptions converts this configuration to
// cost.LimitManagerOptions for MaxFetchedDatapoints.
func (l *GlobalLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions {
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

// PerQueryLimitsConfiguration represents limits on resource usage within a single query. Zero or negative values imply no limit.
// PerQueryLimitsConfiguration represents limits on resource usage within a
// single query. Zero or negative values imply no limit.
type PerQueryLimitsConfiguration struct {
// PrivateMaxComputedDatapoints limits the number of datapoints that can be
// returned by a query. It's determined purely
Expand All @@ -217,15 +228,34 @@ type PerQueryLimitsConfiguration struct {
// this field directly.
PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`

// MaxFetchedDatapoints limits the number of datapoints actually used by a given query.
// MaxFetchedDatapoints limits the number of datapoints actually used by a
// given query.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`

// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int64 `yaml:"maxFetchedSeries"`
}

// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints.
// AsLimitManagerOptions converts this configuration to
// cost.LimitManagerOptions for MaxFetchedDatapoints.
func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions {
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

// AsFetchOptionsBuilderOptions converts this configuration to
// handler.FetchOptionsBuilderOptions.
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handler.FetchOptionsBuilderOptions {
if l.MaxFetchedSeries <= 0 {
return handler.FetchOptionsBuilderOptions{
Limit: defaultStorageQueryLimit,
}
}

return handler.FetchOptionsBuilderOptions{
Limit: int(l.MaxFetchedSeries),
}
}

func toLimitManagerOptions(limit int64) cost.LimitManagerOptions {
return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{
Threshold: cost.Cost(limit),
Expand Down
68 changes: 68 additions & 0 deletions src/query/api/v1/handler/fetch_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package handler

import (
"net/http"
"strconv"

"github.com/m3db/m3/src/query/storage"
xhttp "github.com/m3db/m3/src/x/net/http"
)

// FetchOptionsBuilder builds fetch options based on a request and default
// config.
type FetchOptionsBuilder interface {
NewFetchOptions(req *http.Request) (*storage.FetchOptions, *xhttp.ParseError)
}

// FetchOptionsBuilderOptions provides options to use when creating a
// fetch options builder.
type FetchOptionsBuilderOptions struct {
Limit int
}

type fetchOptionsBuilder struct {
opts FetchOptionsBuilderOptions
}

// NewFetchOptionsBuilder returns a new fetch options builder.
func NewFetchOptionsBuilder(
opts FetchOptionsBuilderOptions,
) FetchOptionsBuilder {
return fetchOptionsBuilder{opts: opts}
}

func (b fetchOptionsBuilder) NewFetchOptions(
req *http.Request,
) (*storage.FetchOptions, *xhttp.ParseError) {
fetchOpts := storage.NewFetchOptions()
fetchOpts.Limit = b.opts.Limit
if str := req.Header.Get(LimitMaxSeriesHeader); str != "" {
n, err := strconv.Atoi(str)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
fetchOpts.Limit = n
}

return fetchOpts, nil
}
83 changes: 83 additions & 0 deletions src/query/api/v1/handler/fetch_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package handler

import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
)

func TestFetchOptionsBuilder(t *testing.T) {
tests := []struct {
name string
defaultLimit int
headers map[string]string
expectedLimit int
expectedErr bool
}{
{
name: "default limit with no headers",
defaultLimit: 42,
headers: map[string]string{},
expectedLimit: 42,
},
{
name: "limit with header",
defaultLimit: 42,
headers: map[string]string{
LimitMaxSeriesHeader: "4242",
},
expectedLimit: 4242,
},
{
name: "bad header",
defaultLimit: 42,
headers: map[string]string{
LimitMaxSeriesHeader: "not_a_number",
},
expectedErr: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{
Limit: test.defaultLimit,
})

req := httptest.NewRequest("GET", "/foo", nil)
for k, v := range test.headers {
req.Header.Add(k, v)
}

opts, err := builder.NewFetchOptions(req)

if !test.expectedErr {
require.NoError(t, err)
require.Equal(t, test.expectedLimit, opts.Limit)
} else {
require.Error(t, err)
}
})
}
}
18 changes: 12 additions & 6 deletions src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ var (
)

type grahiteFindHandler struct {
storage storage.Storage
storage storage.Storage
fetchOptionsBuilder handler.FetchOptionsBuilder
}

// NewFindHandler returns a new instance of handler.
func NewFindHandler(
storage storage.Storage,
fetchOptionsBuilder handler.FetchOptionsBuilder,
) http.Handler {
return &grahiteFindHandler{
storage: storage,
storage: storage,
fetchOptionsBuilder: fetchOptionsBuilder,
}
}

Expand Down Expand Up @@ -110,16 +113,19 @@ func (h *grahiteFindHandler) ServeHTTP(
return
}

opts, rErr := h.fetchOptionsBuilder.NewFetchOptions(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

var (
terminatedResult *storage.CompleteTagsResult
tErr error
childResult *storage.CompleteTagsResult
cErr error
opts = storage.NewFetchOptions()

wg sync.WaitGroup
wg sync.WaitGroup
)

wg.Add(2)
go func() {
terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts)
Expand Down
Loading

0 comments on commit 9ce4743

Please sign in to comment.