Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Restrict query by header tag #2053

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/query/api/v1/handler/fetch_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ import (

"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/models"
parser "github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/prometheus/prometheus/promql"
)

const (
Expand Down Expand Up @@ -103,6 +107,7 @@ func (b fetchOptionsBuilder) NewFetchOptions(
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.Limit = limit
if str := req.Header.Get(MetricsTypeHeader); str != "" {
mt, err := storage.ParseMetricsType(str)
Expand All @@ -111,19 +116,39 @@ func (b fetchOptionsBuilder) NewFetchOptions(
"could not parse metrics type: input=%s, err=%v", str, err)
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
fetchOpts.RestrictFetchOptions.MetricsType = mt
}

if str := req.Header.Get(MetricsStoragePolicyHeader); str != "" {
sp, err := policy.ParseStoragePolicy(str)
if err != nil {
err = fmt.Errorf(
"could not parse storage policy: input=%s, err=%v", str, err)
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
fetchOpts.RestrictFetchOptions.StoragePolicy = sp
}

if str := req.Header.Get(FetchRestrictLabels); str != "" {
promMatchers, err := promql.ParseMetricSelector(str)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictFetchOptions = newOrExistingRestrictFetchOptions(fetchOpts)
// TODO: filter through TagOptions
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this still needs to happen yeah? Otherwise we'll get wrong tag options being used for these matchers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess; it's only really relevant (atm) if you're overriding the name tag, but will do it here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a follow up, this was refactored to no longer be neccessary

m3Matchers, err := parser.LabelMatchersToModelMatcher(promMatchers, models.NewTagOptions())
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RestrictFetchOptions.MustApplyMatchers = m3Matchers
}

if restrict := fetchOpts.RestrictFetchOptions; restrict != nil {
if err := restrict.Validate(); err != nil {
err = fmt.Errorf(
Expand Down
6 changes: 6 additions & 0 deletions src/query/api/v1/handler/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ const (
// metrics type.
MetricsStoragePolicyHeader = "M3-Storage-Policy"

// FetchRestrictLabels restricts all fetches to match certain labels.
FetchRestrictLabels = "M3-Fetch-Restrict-Labels"

// FetchStripLabels strips certain labels from the result.
FetchStripLabels = "M3-Fetch-Strip-Labels"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we were moving towards using just one header, called M3-Query-Options-JSON?

Perhaps:

QueryOptionsJSONHeader = "M3-Query-Options-JSON"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah oops forgot to prune this one

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to M3-Restrict-By-Tags-JSON


// UnaggregatedStoragePolicy specifies the unaggregated storage policy.
UnaggregatedStoragePolicy = "unaggregated"

Expand Down
27 changes: 27 additions & 0 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/query/models"
xpromql "github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util"
"github.com/m3db/m3/src/query/util/json"
xhttp "github.com/m3db/m3/src/x/net/http"
Expand Down Expand Up @@ -632,3 +633,29 @@ type PromDebug struct {
Input Response `json:"input"`
Results Response `json:"results"`
}

// FilterSeriesByOptions removes series tags based on options.
func FilterSeriesByOptions(
series []*ts.Series,
opts *storage.FetchOptions,
) []*ts.Series {
if opts == nil || opts.RestrictFetchOptions == nil {
return series
}

matchers := opts.RestrictFetchOptions.MustApplyMatchers
if len(matchers) == 0 {
return series
}

keys := make([][]byte, 0, len(matchers))
for _, m := range matchers {
keys = append(keys, m.Name)
}

for i, s := range series {
series[i].Tags = s.Tags.TagsWithoutKeys(keys)
}

return series
}
3 changes: 2 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func (h *PromReadHandler) ServeHTTPWithEngine(
// TODO: Support multiple result types
w.Header().Set("Content-Type", "application/json")
handler.AddWarningHeaders(w, result.meta)
return result.series, params, nil
series := prometheus.FilterSeriesByOptions(result.series, fetchOpts)
return series, params, nil
}

func (h *PromReadHandler) validateRequest(params *models.RequestParams) error {
Expand Down
4 changes: 4 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (h *PromReadHandler) read(
mu.Lock()
meta = meta.CombineMetadata(result.Metadata)
mu.Unlock()
result.SeriesList = prometheus.FilterSeriesByOptions(
result.SeriesList,
fetchOpts,
)
promRes := storage.FetchResultToPromResult(result, h.keepEmpty)
promResults[i] = promRes
}()
Expand Down
8 changes: 4 additions & 4 deletions src/query/api/v1/handler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@

package handler

// HeaderKeyType is the type for the header key
// HeaderKeyType is the type for the header key.
type HeaderKeyType int

const (
// HeaderKey is the key which headers will be added to in the request context
// HeaderKey is the key which headers will be added to in the request context.
HeaderKey HeaderKeyType = iota

// RoutePrefixV1 is the v1 prefix for all coordinator routes
// RoutePrefixV1 is the v1 prefix for all coordinator routes.
RoutePrefixV1 = "/api/v1"

// RoutePrefixExperimenta is the experimental prefix for all coordinator routes
// RoutePrefixExperimental is the experimental prefix for all coordinator routes.
RoutePrefixExperimental = "/api/experimental"
)
3 changes: 2 additions & 1 deletion src/query/executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (e *engine) Execute(
opts *QueryOptions,
fetchOpts *storage.FetchOptions,
) (*storage.FetchResult, error) {
return e.opts.Store().Fetch(ctx, query, fetchOpts)
result, err := e.opts.Store().Fetch(ctx, query, fetchOpts)
return result, err
}

func (e *engine) ExecuteExpr(
Expand Down
Loading