Skip to content

Commit

Permalink
[query] Restrict query by header tag (#2053)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Dec 3, 2019
1 parent 65b7f92 commit 4c44f1d
Show file tree
Hide file tree
Showing 34 changed files with 2,065 additions and 618 deletions.
225 changes: 225 additions & 0 deletions scripts/docker-integration-tests/query_fanout/restrict.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"runtime"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/models"

"github.com/stretchr/testify/require"
)

func main() {
var ts int
flag.IntVar(&ts, "t", -1, "metric name to search")
flag.Parse()

require.True(t, ts > 0, "no timestamp supplied")
name = fmt.Sprintf("foo_%d", ts)
instant := fmt.Sprintf("http://0.0.0.0:7201/api/v1/query?query=%s", name)
rnge := fmt.Sprintf("http://0.0.0.0:7201/api/v1/query_range?query=%s"+
"&start=%d&end=%d&step=100", name, ts/100*100, (ts/100+1)*100)

for _, url := range []string{instant, rnge} {
singleClusterDefaultStrip(url)
bothClusterCustomStrip(url)
bothClusterDefaultStrip(url)
bothClusterNoStrip(url)
bothClusterMultiStrip(url)
}
}

func queryWithHeader(url string, h string) (prometheus.Response, error) {
var result prometheus.Response
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return result, err
}

req.Header.Add(handler.RestrictByTagsJSONHeader, h)
client := http.DefaultClient
resp, err := client.Do(req)
if err != nil {
return result, err
}

if resp.StatusCode != http.StatusOK {
return result, fmt.Errorf("response failed with code %s", resp.Status)
}

defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return result, err
}

json.Unmarshal(data, &result)
return result, err
}

func mustMatcher(t models.MatchType, n string, v string) models.Matcher {
m, err := models.NewMatcher(models.MatchEqual, []byte("val"), []byte("1"))
if err != nil {
panic(err)
}

return m
}

type tester struct{}

// Ensure tester is a TestingT and set a global `t`.
var t require.TestingT = &tester{}

// name is global and set on startup.
var name string

func (t *tester) Errorf(format string, args ...interface{}) {
_, fn, line, _ := runtime.Caller(4)
args[2] = fmt.Sprintf(" at %s:%d:\n%v", fn, line, args[2])
fmt.Printf(format, args...)
}

func (t *tester) FailNow() {
os.Exit(1)
}

func mustParseOpts(o handler.StringTagOptions) string {
m, err := json.Marshal(o)
require.NoError(t, err, "cannot marshal to json")
return string(m)
}

func bothClusterDefaultStrip(url string) {
m := mustParseOpts(handler.StringTagOptions{
Restrict: []handler.StringMatch{
handler.StringMatch{Name: "val", Type: "EQUAL", Value: "1"},
},
})

resp, err := queryWithHeader(url, string(m))
require.NoError(t, err, "failed to query")

data := resp.Data.Result
data.Sort()
require.Equal(t, len(data), 2)
clusters := []string{"coordinator-cluster-a", "coordinator-cluster-b"}
for i, d := range data {
require.Equal(t, 2, len(d.Metric))
require.Equal(t, name, d.Metric["__name__"])
require.Equal(t, clusters[i], d.Metric["cluster"])
}
}

func bothClusterCustomStrip(url string) {
m := mustParseOpts(handler.StringTagOptions{
Restrict: []handler.StringMatch{
handler.StringMatch{Name: "val", Type: "EQUAL", Value: "1"},
},
Strip: []string{"__name__"},
})

resp, err := queryWithHeader(url, string(m))
require.NoError(t, err, "failed to query")

data := resp.Data.Result
data.Sort()
require.Equal(t, len(data), 2)
clusters := []string{"coordinator-cluster-a", "coordinator-cluster-b"}
for i, d := range data {
require.Equal(t, 2, len(d.Metric))
require.Equal(t, clusters[i], d.Metric["cluster"])
require.Equal(t, "1", d.Metric["val"])
}
}

func bothClusterNoStrip(url string) {
m := mustParseOpts(handler.StringTagOptions{
Restrict: []handler.StringMatch{
handler.StringMatch{Name: "val", Type: "EQUAL", Value: "1"},
},
Strip: []string{},
})

resp, err := queryWithHeader(url, string(m))
require.NoError(t, err, "failed to query")

data := resp.Data.Result
data.Sort()
require.Equal(t, len(data), 2)
clusters := []string{"coordinator-cluster-a", "coordinator-cluster-b"}
for i, d := range data {
require.Equal(t, 3, len(d.Metric))
require.Equal(t, name, d.Metric["__name__"])
require.Equal(t, clusters[i], d.Metric["cluster"])
require.Equal(t, "1", d.Metric["val"])
}
}

func bothClusterMultiStrip(url string) {
m := mustParseOpts(handler.StringTagOptions{
Restrict: []handler.StringMatch{
handler.StringMatch{Name: "val", Type: "EQUAL", Value: "1"},
},
Strip: []string{"val", "__name__"},
})

resp, err := queryWithHeader(url, string(m))
require.NoError(t, err, "failed to query")

data := resp.Data.Result
data.Sort()
require.Equal(t, len(data), 2)
clusters := []string{"coordinator-cluster-a", "coordinator-cluster-b"}
for i, d := range data {
require.Equal(t, 1, len(d.Metric))
require.Equal(t, clusters[i], d.Metric["cluster"])
}
}

// NB: cluster 1 is expected to have metrics with vals in range: [1,5]
// and cluster 2 is expected to have metrics with vals in range: [1,10]
// so setting the value to be in (5..10] should hit only a single metric.
func singleClusterDefaultStrip(url string) {
m := mustParseOpts(handler.StringTagOptions{
Restrict: []handler.StringMatch{
handler.StringMatch{Name: "val", Type: "EQUAL", Value: "9"},
},
})

resp, err := queryWithHeader(url, string(m))
require.NoError(t, err, "failed to query")

data := resp.Data.Result
require.Equal(t, len(data), 1, url)
require.Equal(t, 2, len(data[0].Metric))
require.Equal(t, name, data[0].Metric["__name__"], "single")
require.Equal(t, "coordinator-cluster-b", data[0].Metric["cluster"])
}
19 changes: 19 additions & 0 deletions scripts/docker-integration-tests/query_fanout/restrict.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash

set -ex
TEST_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests
FANOUT_PATH=$TEST_PATH/query_fanout
source $TEST_PATH/common.sh
source $FANOUT_PATH/warning.sh

function test_restrictions {
t=$(date +%s)
METRIC_NAME="foo_$t"
# # write 5 metrics to cluster a
write_metrics coordinator-cluster-a 5
# write 10 metrics to cluster b
write_metrics coordinator-cluster-b 10

# unlimited query against cluster a has no header
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff go run $FANOUT_PATH/restrict.go -t $t
}
10 changes: 8 additions & 2 deletions scripts/docker-integration-tests/query_fanout/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

set -xe

source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh
source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/query_fanout/warning.sh
TEST_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests
FANOUT_PATH=$TEST_PATH/query_fanout
source $TEST_PATH/common.sh
source $FANOUT_PATH/warning.sh
source $FANOUT_PATH/restrict.sh

REVISION=$(git rev-parse HEAD)
COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/query_fanout/docker-compose.yml
Expand Down Expand Up @@ -216,3 +219,6 @@ ATTEMPTS=5 TIMEOUT=1 retry_with_backoff complete_tags

echo "running fanout warning tests"
test_fanout_warnings

echo "running restrict tests"
test_restrictions
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/query_fanout/warning.sh
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ function test_fanout_warning_missing_zone {

ATTEMPTS=3 TIMEOUT=1 retry_with_backoff find_carbon 16 remote_store_cluster-c_complete_tags_warning
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff find_carbon 9 max_fetch_series_limit_applied,remote_store_cluster-c_complete_tags_warning

docker-compose -f ${COMPOSE_FILE} start coordinator-cluster-c
}

function test_fanout_warnings {
Expand Down
52 changes: 43 additions & 9 deletions src/query/api/v1/handler/fetch_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package handler

import (
"encoding/json"
"fmt"
"math"
"net/http"
Expand Down Expand Up @@ -103,6 +104,7 @@ func (b fetchOptionsBuilder) NewFetchOptions(
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.Limit = limit
if str := req.Header.Get(MetricsTypeHeader); str != "" {
mt, err := storage.ParseMetricsType(str)
Expand All @@ -111,20 +113,43 @@ func (b fetchOptionsBuilder) NewFetchOptions(
"could not parse metrics type: input=%s, err=%v", str, err)
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
fetchOpts.RestrictFetchOptions.MetricsType = mt

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType =
newOrExistingRestrictQueryOptionsRestrictByType(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType.MetricsType = mt
}

if str := req.Header.Get(MetricsStoragePolicyHeader); str != "" {
sp, err := policy.ParseStoragePolicy(str)
if err != nil {
err = fmt.Errorf(
"could not parse storage policy: input=%s, err=%v", str, err)
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
fetchOpts.RestrictFetchOptions.StoragePolicy = sp

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType =
newOrExistingRestrictQueryOptionsRestrictByType(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByType.StoragePolicy = sp
}
if restrict := fetchOpts.RestrictFetchOptions; restrict != nil {

if str := req.Header.Get(RestrictByTagsJSONHeader); str != "" {
var opts StringTagOptions
if err := json.Unmarshal([]byte(str), &opts); err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

tagOpts, err := opts.toOptions()
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictQueryOptions = newOrExistingRestrictQueryOptions(fetchOpts)
fetchOpts.RestrictQueryOptions.RestrictByTag = tagOpts
}

if restrict := fetchOpts.RestrictQueryOptions; restrict != nil {
if err := restrict.Validate(); err != nil {
err = fmt.Errorf(
"could not validate restrict options: err=%v", err)
Expand Down Expand Up @@ -152,13 +177,22 @@ func (b fetchOptionsBuilder) NewFetchOptions(
return fetchOpts, nil
}

func newOrExistingRestrictFetchOptions(
func newOrExistingRestrictQueryOptions(
fetchOpts *storage.FetchOptions,
) *storage.RestrictQueryOptions {
if v := fetchOpts.RestrictQueryOptions; v != nil {
return v
}
return &storage.RestrictQueryOptions{}
}

func newOrExistingRestrictQueryOptionsRestrictByType(
fetchOpts *storage.FetchOptions,
) *storage.RestrictFetchOptions {
if v := fetchOpts.RestrictFetchOptions; v != nil {
) *storage.RestrictByType {
if v := fetchOpts.RestrictQueryOptions.RestrictByType; v != nil {
return v
}
return &storage.RestrictFetchOptions{}
return &storage.RestrictByType{}
}

// ParseStep parses the step duration for an HTTP request.
Expand Down
Loading

0 comments on commit 4c44f1d

Please sign in to comment.