Skip to content

Commit

Permalink
[query] Add ability to set restrict by tags defaults in config (#2430)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Jul 1, 2020
1 parent b4be7b1 commit 630946b
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 35 deletions.
9 changes: 9 additions & 0 deletions scripts/docker-integration-tests/prometheus/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,12 @@ clusters:

tagOptions:
idScheme: quoted

query:
restrictTags:
match:
- name: restricted_metrics_type
type: NOTEQUAL
value: hidden
strip:
- restricted_metrics_type
3 changes: 3 additions & 0 deletions scripts/docker-integration-tests/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ remote_read:

remote_write:
- url: http://coordinator01:7201/api/v1/prom/remote/write
write_relabel_configs:
- target_label: metrics_storage
replacement: m3db_remote
34 changes: 31 additions & 3 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ function test_query_limits_applied {
# NB: ensure that the limit is not exceeded (it may be below limit).
echo "Test query limit with coordinator defaults"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{__name__!=\"\"\\} | jq -r ".data.result | length") -lt 101 ]]'
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{metrics_storage=\"m3db_remote\"\\} | jq -r ".data.result | length") -lt 101 ]]'

# Test the series limit applied when directly querying
# coordinator (series limit set by header)
echo "Test query series limit with coordinator limit header"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 10" 0.0.0.0:7201/api/v1/query?query=\\{__name__!=\"\"\\} | jq -r ".data.result | length") -eq 10 ]]'
'[[ $(curl -s -H "M3-Limit-Max-Series: 10" 0.0.0.0:7201/api/v1/query?query=\\{metrics_storage=\"m3db_remote\"\\} | jq -r ".data.result | length") -eq 10 ]]'

echo "Test query series limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
Expand All @@ -206,7 +206,7 @@ function test_query_limits_applied {
# coordinator (docs limit set by header)
echo "Test query docs limit with coordinator limit header"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/query?query=\\{__name__!=\"\"\\} | jq -r ".data.result | length") -lt 101 ]]'
'[[ $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/query?query=\\{metrics_storage=\"m3db_remote\"\\} | jq -r ".data.result | length") -lt 101 ]]'

echo "Test query docs limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
Expand Down Expand Up @@ -277,6 +277,33 @@ function test_query_restrict_metrics_type {
retry_with_backoff prometheus_query_native
}

function test_query_restrict_tags {
# Test the default restrict tags is applied when directly querying
# coordinator (restrict tags set to hide any restricted_metrics_type="hidden"
# in m3coordinator.yml)

# First write some hidden metrics.
echo "Test write with unaggregated metrics type works as expected"
TAG_NAME_0="restricted_metrics_type" TAG_VALUE_0="hidden" \
TAG_NAME_1="foo_tag" TAG_VALUE_1="foo_tag_value" \
prometheus_remote_write \
some_hidden_metric now 42.42 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

# Check that we can see them with zero restrictions applied as an
# override (we do this check first so that when we test that they
# don't appear by default we know that the metrics are already visible).
echo "Test restrict by tags with header override to remove restrict works"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Restrict-By-Tags-JSON: {}" 0.0.0.0:7201/api/v1/query?query=\\{restricted_metrics_type=\"hidden\"\\} | jq -r ".data.result | length") -eq 1 ]]'

# Now test that the defaults will hide the metrics altogether.
echo "Test restrict by tags with coordinator defaults"
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{restricted_metrics_type=\"hidden\"\\} | jq -r ".data.result | length") -eq 0 ]]'
}

echo "Running prometheus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
Expand All @@ -287,6 +314,7 @@ test_prometheus_remote_write_too_old_returns_400_status_code
test_prometheus_remote_write_restrict_metrics_type
test_query_limits_applied
test_query_restrict_metrics_type
test_query_restrict_tags
test_prometheus_remote_write_map_tags

echo "Running function correctness tests"
Expand Down
51 changes: 47 additions & 4 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ type QueryConfiguration struct {
ConsolidationConfiguration ConsolidationConfiguration `yaml:"consolidation"`
// Prometheus is prometheus client configuration.
Prometheus PrometheusQueryConfiguration `yaml:"prometheus"`
// RestrictTags is an optional configuration that can be set to restrict
// all queries with certain tags by.
RestrictTags *RestrictTagsConfiguration `yaml:"restrictTags"`
}

// TimeoutOrDefault returns the configured timeout or default value.
Expand All @@ -228,6 +231,46 @@ func (c QueryConfiguration) TimeoutOrDefault() time.Duration {
return defaultQueryTimeout
}

// RestrictTagsAsStorageRestrictByTag returns restrict tags as
// storage options to restrict all queries by default.
func (c QueryConfiguration) RestrictTagsAsStorageRestrictByTag() (*storage.RestrictByTag, bool, error) {
if c.RestrictTags == nil {
return nil, false, nil
}

var (
cfg = *c.RestrictTags
result = handleroptions.StringTagOptions{
Restrict: make([]handleroptions.StringMatch, 0, len(cfg.Restrict)),
Strip: cfg.Strip,
}
)
for _, elem := range cfg.Restrict {
value := handleroptions.StringMatch(elem)
result.Restrict = append(result.Restrict, value)
}

opts, err := result.StorageOptions()
if err != nil {
return nil, false, err
}

return opts, true, nil
}

// RestrictTagsConfiguration applies tag restriction to all queries.
type RestrictTagsConfiguration struct {
Restrict []StringMatch `yaml:"match"`
Strip []string `yaml:"strip"`
}

// StringMatch is an easy to use representation of models.Matcher.
type StringMatch struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Value string `yaml:"value"`
}

// ConsolidationConfiguration are configs for consolidating fetched queries.
type ConsolidationConfiguration struct {
// MatchType determines the options by which series should match.
Expand Down Expand Up @@ -328,9 +371,9 @@ func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerO
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

// AsFetchOptionsBuilderOptions converts this configuration to
// handler.FetchOptionsBuilderOptions.
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleroptions.FetchOptionsBuilderOptions {
// AsFetchOptionsBuilderLimitsOptions converts this configuration to
// handleroptions.FetchOptionsBuilderLimitsOptions.
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderLimitsOptions() handleroptions.FetchOptionsBuilderLimitsOptions {
seriesLimit := defaultStorageQuerySeriesLimit
if v := l.MaxFetchedSeries; v > 0 {
seriesLimit = v
Expand All @@ -341,7 +384,7 @@ func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleropti
docsLimit = v
}

return handleroptions.FetchOptionsBuilderOptions{
return handleroptions.FetchOptionsBuilderLimitsOptions{
SeriesLimit: int(seriesLimit),
DocsLimit: int(docsLimit),
RequireExhaustive: l.RequireExhaustive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ type FetchOptionsBuilder interface {
// FetchOptionsBuilderOptions provides options to use when creating a
// fetch options builder.
type FetchOptionsBuilderOptions struct {
Limits FetchOptionsBuilderLimitsOptions
RestrictByTag *storage.RestrictByTag
}

// FetchOptionsBuilderLimitsOptions provides limits options to use when
// creating a fetch options builder.
type FetchOptionsBuilderLimitsOptions struct {
SeriesLimit int
DocsLimit int
RequireExhaustive bool
Expand Down Expand Up @@ -128,21 +135,21 @@ func (b fetchOptionsBuilder) NewFetchOptions(
) (*storage.FetchOptions, *xhttp.ParseError) {
fetchOpts := storage.NewFetchOptions()

seriesLimit, err := ParseLimit(req, LimitMaxSeriesHeader, "limit", b.opts.SeriesLimit)
seriesLimit, err := ParseLimit(req, LimitMaxSeriesHeader, "limit", b.opts.Limits.SeriesLimit)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.SeriesLimit = seriesLimit

docsLimit, err := ParseLimit(req, LimitMaxDocsHeader, "docsLimit", b.opts.DocsLimit)
docsLimit, err := ParseLimit(req, LimitMaxDocsHeader, "docsLimit", b.opts.Limits.DocsLimit)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.DocsLimit = docsLimit

requireExhaustive, err := ParseRequireExhaustive(req, b.opts.RequireExhaustive)
requireExhaustive, err := ParseRequireExhaustive(req, b.opts.Limits.RequireExhaustive)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down Expand Up @@ -178,18 +185,23 @@ func (b fetchOptionsBuilder) NewFetchOptions(
}

if str := req.Header.Get(RestrictByTagsJSONHeader); str != "" {
// Allow header to override any default restrict by tags config.
var opts StringTagOptions
if err := json.Unmarshal([]byte(str), &opts); err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

tagOpts, err := opts.toOptions()
tagOpts, err := opts.StorageOptions()
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByTag = tagOpts
} else if defaultTagOpts := b.opts.RestrictByTag; defaultTagOpts != nil {
// Apply defaults if not overridden by header.
fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByTag = defaultTagOpts
}

if restrict := fetchOpts.RestrictQueryOptions; restrict != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"testing"
"time"

Expand All @@ -44,14 +45,15 @@ func TestFetchOptionsBuilder(t *testing.T) {
}

tests := []struct {
name string
defaultLimit int
headers map[string]string
query string
expectedLimit int
expectedRestrict *storage.RestrictQueryOptions
expectedLookback *expectedLookback
expectedErr bool
name string
defaultLimit int
defaultRestrictByTag *storage.RestrictByTag
headers map[string]string
query string
expectedLimit int
expectedRestrict *storage.RestrictQueryOptions
expectedLookback *expectedLookback
expectedErr bool
}{
{
name: "default limit with no headers",
Expand Down Expand Up @@ -150,12 +152,72 @@ func TestFetchOptionsBuilder(t *testing.T) {
query: "lookback=step&step=-1",
expectedErr: true,
},
{
name: "restrict by tags json header",
headers: map[string]string{
RestrictByTagsJSONHeader: stripSpace(`{
"match":[{"name":"foo", "value":"bar", "type":"EQUAL"}],
"strip":["foo"]
}`),
},
expectedRestrict: &storage.RestrictQueryOptions{
RestrictByTag: &storage.RestrictByTag{
Restrict: models.Matchers{
mustMatcher("foo", "bar", models.MatchEqual),
},
Strip: toStrip("foo"),
},
},
},
{
name: "restrict by tags json defaults",
defaultRestrictByTag: &storage.RestrictByTag{
Restrict: models.Matchers{
mustMatcher("foo", "bar", models.MatchEqual),
},
Strip: toStrip("foo"),
},
expectedRestrict: &storage.RestrictQueryOptions{
RestrictByTag: &storage.RestrictByTag{
Restrict: models.Matchers{
mustMatcher("foo", "bar", models.MatchEqual),
},
Strip: toStrip("foo"),
},
},
},
{
name: "restrict by tags json default override by header",
defaultRestrictByTag: &storage.RestrictByTag{
Restrict: models.Matchers{
mustMatcher("foo", "bar", models.MatchEqual),
},
Strip: toStrip("foo"),
},
headers: map[string]string{
RestrictByTagsJSONHeader: stripSpace(`{
"match":[{"name":"qux", "value":"qaz", "type":"EQUAL"}],
"strip":["qux"]
}`),
},
expectedRestrict: &storage.RestrictQueryOptions{
RestrictByTag: &storage.RestrictByTag{
Restrict: models.Matchers{
mustMatcher("qux", "qaz", models.MatchEqual),
},
Strip: toStrip("qux"),
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{
SeriesLimit: test.defaultLimit,
Limits: FetchOptionsBuilderLimitsOptions{
SeriesLimit: test.defaultLimit,
},
RestrictByTag: test.defaultRestrictByTag,
})

url := "/foo"
Expand Down Expand Up @@ -297,7 +359,11 @@ func TestFetchOptionsWithHeader(t *testing.T) {
}`,
}

builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{SeriesLimit: 5})
builder := NewFetchOptionsBuilder(FetchOptionsBuilderOptions{
Limits: FetchOptionsBuilderLimitsOptions{
SeriesLimit: 5,
},
})
req := httptest.NewRequest("GET", "/", nil)
for k, v := range headers {
req.Header.Add(k, v)
Expand Down Expand Up @@ -326,3 +392,7 @@ func TestFetchOptionsWithHeader(t *testing.T) {

require.Equal(t, ex, opts.RestrictQueryOptions)
}

func stripSpace(str string) string {
return regexp.MustCompile(`\s+`).ReplaceAllString(str, "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ func (m StringMatch) toMatcher() (models.Matcher, error) {
return models.NewMatcher(t, []byte(m.Name), []byte(m.Value))
}

func (o *StringTagOptions) toOptions() (*storage.RestrictByTag, error) {
// Validate validates the string tag options.
func (o *StringTagOptions) Validate() error {
_, err := o.StorageOptions()
return err
}

// StorageOptions returns the corresponding storage.RestrictByTag options.
func (o *StringTagOptions) StorageOptions() (*storage.RestrictByTag, error) {
if len(o.Restrict) == 0 && len(o.Strip) == 0 {
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,14 @@ func TestParse(t *testing.T) {
err := json.Unmarshal([]byte(tt.json), &opts)
require.NoError(t, err)

a, err := opts.toOptions()
validateErr := opts.Validate()
if tt.expectedError {
require.Error(t, validateErr)
} else {
require.NoError(t, validateErr)
}

a, err := opts.StorageOptions()
if tt.expectedError {
require.Error(t, err)
require.Nil(t, a)
Expand Down
Loading

0 comments on commit 630946b

Please sign in to comment.