From 4a54dc59393845cc44420393b46ee614ff563209 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 20 Mar 2019 18:38:02 -0400 Subject: [PATCH] - Fix the old Match endpoint which incorrectly generated a combinatorial result - Update Graphite find endpoint --- src/query/api/v1/handler/graphite/find.go | 36 ++---- .../api/v1/handler/graphite/find_parser.go | 52 ++------ src/query/api/v1/handler/prometheus/common.go | 114 +++++++---------- .../api/v1/handler/prometheus/common_test.go | 121 ++++++++---------- .../api/v1/handler/prometheus/remote/match.go | 23 ++-- src/query/graphite/storage/m3_wrapper.go | 29 ++--- 6 files changed, 132 insertions(+), 243 deletions(-) diff --git a/src/query/api/v1/handler/graphite/find.go b/src/query/api/v1/handler/graphite/find.go index 3e3665deff..60028d4464 100644 --- a/src/query/api/v1/handler/graphite/find.go +++ b/src/query/api/v1/handler/graphite/find.go @@ -21,7 +21,6 @@ package graphite import ( - "bytes" "context" "net/http" @@ -64,47 +63,30 @@ func (h *grahiteFindHandler) ServeHTTP( ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) logger := logging.WithContext(ctx) w.Header().Set("Content-Type", "application/json") - query, rErr := parseFindParamsToQuery(r) + query, raw, rErr := parseFindParamsToQuery(r) if rErr != nil { xhttp.Error(w, rErr.Inner(), rErr.Code()) return } opts := storage.NewFetchOptions() - // FIXME: arnikola, use the tag completion point instead of this one here - // if someone finds this in the PR I owe you a beer - result, err := h.storage.SearchSeries(ctx, query, opts) + result, err := h.storage.CompleteTags(ctx, query, opts) if err != nil { logger.Error("unable to complete tags", zap.Error(err)) xhttp.Error(w, err, http.StatusBadRequest) return } - partCount := graphite.CountMetricParts(query.Raw) - partName := graphite.TagName(partCount - 1) - seenMap := make(map[string]bool, len(result.Metrics)) - for _, m := range result.Metrics { - tags := m.Tags.Tags - index := 0 - // TODO: make this more performant by computing the index for the tag name. - for i, tag := range tags { - if bytes.Equal(partName, tag.Name) { - index = i - break - } + seenMap := make(map[string]bool, len(result.CompletedTags)) + for _, tags := range result.CompletedTags { + for _, value := range tags.Values { + // FIXME: (arnikola) Figure out how to add children; may need to run find + // query twice, once with an additional wildcard matcher on the end. + seenMap[string(value)] = true } - - value := tags[index].Value - // If this value has already been encountered, check if - if hadExtra, seen := seenMap[string(value)]; seen && hadExtra { - continue - } - - hasExtraParts := len(tags) > partCount - seenMap[string(value)] = hasExtraParts } - prefix := graphite.DropLastMetricPart(query.Raw) + prefix := graphite.DropLastMetricPart(raw) if len(prefix) > 0 { prefix += "." } diff --git a/src/query/api/v1/handler/graphite/find_parser.go b/src/query/api/v1/handler/graphite/find_parser.go index bc2ae371ab..4eb0e344cf 100644 --- a/src/query/api/v1/handler/graphite/find_parser.go +++ b/src/query/api/v1/handler/graphite/find_parser.go @@ -24,10 +24,8 @@ import ( "fmt" "io" "net/http" - "time" "github.com/m3db/m3/src/query/errors" - "github.com/m3db/m3/src/query/graphite/graphite" graphiteStorage "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/json" @@ -35,60 +33,26 @@ import ( ) func parseFindParamsToQuery(r *http.Request) ( - *storage.FetchQuery, + *storage.CompleteTagsQuery, + string, *xhttp.ParseError, ) { values := r.URL.Query() - now := time.Now() - fromString, untilString := r.FormValue("from"), r.FormValue("until") - if len(fromString) == 0 { - fromString = "0" - } - - if len(untilString) == 0 { - untilString = "now" - } - - from, err := graphite.ParseTime( - fromString, - now, - tzOffsetForAbsoluteTime, - ) - - if err != nil { - return nil, xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString), - http.StatusBadRequest) - } - - until, err := graphite.ParseTime( - untilString, - now, - tzOffsetForAbsoluteTime, - ) - - if err != nil { - return nil, xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString), - http.StatusBadRequest) - } - query := values.Get("query") if query == "" { - return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) + return nil, "", xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) } matchers, err := graphiteStorage.TranslateQueryToMatchers(query) if err != nil { - return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), + return nil, "", xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), http.StatusBadRequest) } - return &storage.FetchQuery{ - Raw: query, - TagMatchers: matchers, - Start: from, - End: until, - Interval: 0, - }, nil + return &storage.CompleteTagsQuery{ + CompleteNameOnly: false, + TagMatchers: matchers, + }, query, nil } func findResultsJSON( diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 4c0bcce7a4..9f82353e44 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -21,6 +21,7 @@ package prometheus import ( + "bytes" "fmt" "io" "io/ioutil" @@ -41,7 +42,7 @@ import ( ) const ( - // NameReplace is the parameter that gets replaced + // NameReplace is the parameter that gets replaced. NameReplace = "name" queryParam = "query" filterNameTagsParam = "tag" @@ -52,14 +53,15 @@ const ( var ( matchValues = []byte(".*") + roleName = []byte("role") ) -// TimeoutOpts stores options related to various timeout configurations +// TimeoutOpts stores options related to various timeout configurations. type TimeoutOpts struct { FetchTimeout time.Duration } -// ParsePromCompressedRequest parses a snappy compressed request from Prometheus +// ParsePromCompressedRequest parses a snappy compressed request from Prometheus. func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { body := r.Body if r.Body == nil { @@ -85,7 +87,7 @@ func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) { return reqBuf, nil } -// ParseRequestTimeout parses the input request timeout with a default +// ParseRequestTimeout parses the input request timeout with a default. func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (time.Duration, error) { timeout := r.Header.Get("timeout") if timeout == "" { @@ -104,7 +106,7 @@ func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (tim return duration, nil } -// ParseTagCompletionParamsToQuery parses all params from the GET request +// ParseTagCompletionParamsToQuery parses all params from the GET request. func ParseTagCompletionParamsToQuery( r *http.Request, ) (*storage.CompleteTagsQuery, *xhttp.ParseError) { @@ -170,11 +172,11 @@ func parseTimeWithDefault( return defaultTime, nil } -// ParseSeriesMatchQuery parses all params from the GET request +// ParseSeriesMatchQuery parses all params from the GET request. func ParseSeriesMatchQuery( r *http.Request, tagOptions models.TagOptions, -) (*storage.SeriesMatchQuery, *xhttp.ParseError) { +) ([]*storage.FetchQuery, *xhttp.ParseError) { r.ParseForm() matcherValues := r.Form["match[]"] if len(matcherValues) == 0 { @@ -191,7 +193,7 @@ func ParseSeriesMatchQuery( return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - tagMatchers := make([]models.Matchers, len(matcherValues)) + queries := make([]*storage.FetchQuery, len(matcherValues)) for i, s := range matcherValues { promMatchers, err := promql.ParseMetricSelector(s) if err != nil { @@ -203,17 +205,18 @@ func ParseSeriesMatchQuery( return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - tagMatchers[i] = matchers + queries[i] = &storage.FetchQuery{ + Raw: fmt.Sprintf("match[]=%s", s), + TagMatchers: matchers, + Start: start, + End: end, + } } - return &storage.SeriesMatchQuery{ - TagMatchers: tagMatchers, - Start: start, - End: end, - }, nil + return queries, nil } -// ParseTagValuesToQuery parses a tag values request to a complete tags query +// ParseTagValuesToQuery parses a tag values request to a complete tags query. func ParseTagValuesToQuery( r *http.Request, ) (*storage.CompleteTagsQuery, error) { @@ -288,7 +291,7 @@ func renderDefaultTagCompletionResultsJSON( return jw.Close() } -// RenderTagCompletionResultsJSON renders tag completion results to json format +// RenderTagCompletionResultsJSON renders tag completion results to json format. func RenderTagCompletionResultsJSON( w io.Writer, result *storage.CompleteTagsResult, @@ -301,7 +304,7 @@ func RenderTagCompletionResultsJSON( return renderDefaultTagCompletionResultsJSON(w, results) } -// RenderTagValuesResultsJSON renders tag values results to json format +// RenderTagValuesResultsJSON renders tag values results to json format. func RenderTagValuesResultsJSON( w io.Writer, result *storage.CompleteTagsResult, @@ -346,56 +349,11 @@ func RenderTagValuesResultsJSON( return jw.Close() } -type tag struct { - name string - value string -} - -func writeTagsHelper( - jw *json.Writer, - completedTags []storage.CompletedTag, - tags []tag, -) { - if len(completedTags) == 0 { - jw.BeginObject() - for _, tag := range tags { - jw.BeginObjectField(tag.name) - jw.WriteString(tag.value) - } - - jw.EndObject() - return - } - - firstResult := completedTags[0] - name := string(firstResult.Name) - - copiedTags := make([]tag, len(tags)+1) - copy(copiedTags, tags) - for _, value := range firstResult.Values { - copiedTags[len(tags)] = tag{name: name, value: string(value)} - writeTagsHelper(jw, completedTags[1:], copiedTags) - } -} - -func writeTags( - jw *json.Writer, - results []*storage.CompleteTagsResult, -) { - for _, result := range results { - jw.BeginArray() - tags := result.CompletedTags - if len(tags) > 0 { - writeTagsHelper(jw, result.CompletedTags, nil) - } - jw.EndArray() - } -} - -// RenderSeriesMatchResultsJSON renders series match results to json format +// RenderSeriesMatchResultsJSON renders series match results to json format. func RenderSeriesMatchResultsJSON( w io.Writer, - results []*storage.CompleteTagsResult, + results []models.Metrics, + dropRole bool, ) error { jw := json.NewWriter(w) jw.BeginObject() @@ -404,13 +362,33 @@ func RenderSeriesMatchResultsJSON( jw.WriteString("success") jw.BeginObjectField("data") - writeTags(jw, results) + jw.BeginArray() + + for _, result := range results { + for _, tags := range result { + jw.BeginObject() + for _, tag := range tags.Tags.Tags { + if bytes.Equal(tag.Name, roleName) && dropRole { + // NB: When data is written from Prometheus remote write, additional + // `"role":"remote"` tag is added, which should not be included in the + // results. + continue + } + jw.BeginObjectField(string(tag.Name)) + jw.WriteString(string(tag.Value)) + } + jw.EndObject() + } + + } + + jw.EndArray() jw.EndObject() return jw.Close() } -// PromResp represents Prometheus's query response +// PromResp represents Prometheus's query response. type PromResp struct { Status string `json:"status"` Data struct { @@ -424,7 +402,7 @@ type PromResp struct { } `json:"data"` } -// PromDebug represents the input and output that are used in the debug endpoint +// PromDebug represents the input and output that are used in the debug endpoint. type PromDebug struct { Input PromResp `json:"input"` Results PromResp `json:"results"` diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index cd02b4d9ac..dc4ec68ff0 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -22,12 +22,13 @@ package prometheus import ( "bytes" + "fmt" "net/http" "strings" "testing" "time" - "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test" "github.com/stretchr/testify/assert" @@ -87,84 +88,68 @@ func (w *writer) Write(p []byte) (n int, err error) { return len(p), nil } -func makeResult() []*storage.CompleteTagsResult { - return []*storage.CompleteTagsResult{ - &storage.CompleteTagsResult{ - CompletedTags: []storage.CompletedTag{ - storage.CompletedTag{ - Name: []byte("a"), - Values: [][]byte{[]byte("1"), []byte("2"), []byte("3")}, - }, - storage.CompletedTag{ - Name: []byte("b"), - Values: [][]byte{[]byte("1"), []byte("2")}, - }, - storage.CompletedTag{ - Name: []byte("c"), - Values: [][]byte{[]byte("1"), []byte("2"), []byte("3")}, - }, - }, - }, - } +type tag struct { + name, value string } -func TestRenderSeriesMatchResults(t *testing.T) { - w := &writer{value: ""} - seriesMatchResult := makeResult() - - expectedWhitespace := `{ - "status":"success", - "data":[ - {"a":"1","b":"1","c":"1"}, - {"a":"1","b":"1","c":"2"}, - {"a":"1","b":"1","c":"3"}, - {"a":"1","b":"2","c":"1"}, - {"a":"1","b":"2","c":"2"}, - {"a":"1","b":"2","c":"3"}, - {"a":"2","b":"1","c":"1"}, - {"a":"2","b":"1","c":"2"}, - {"a":"2","b":"1","c":"3"}, - {"a":"2","b":"2","c":"1"}, - {"a":"2","b":"2","c":"2"}, - {"a":"2","b":"2","c":"3"}, - {"a":"3","b":"1","c":"1"}, - {"a":"3","b":"1","c":"2"}, - {"a":"3","b":"1","c":"3"}, - {"a":"3","b":"2","c":"1"}, - {"a":"3","b":"2","c":"2"}, - {"a":"3","b":"2","c":"3"} - ] - }` - - err := RenderSeriesMatchResultsJSON(w, seriesMatchResult) - assert.NoError(t, err) - fields := strings.Fields(expectedWhitespace) - expected := "" - for _, field := range fields { - expected = expected + field +func toTags(name string, tags ...tag) models.Metric { + tagOpts := models.NewTagOptions() + ts := models.NewTags(len(tags), tagOpts) + ts = ts.SetName([]byte(name)) + for _, tag := range tags { + ts = ts.AddTag(models.Tag{Name: []byte(tag.name), Value: []byte(tag.value)}) } - assert.Equal(t, expected, w.value) + return models.Metric{Tags: ts} } func TestRenderSeriesMatchResultsNoTags(t *testing.T) { w := &writer{value: ""} - seriesMatchResult := []*storage.CompleteTagsResult{ - &storage.CompleteTagsResult{}, + tests := []struct { + dropRole bool + additional string + }{ + { + dropRole: true, + additional: "", + }, + { + dropRole: false, + additional: `,"role":"appears"`, + }, } - expectedWhitespace := `{ + seriesMatchResult := []models.Metrics{ + models.Metrics{ + toTags("name", tag{name: "a", value: "b"}, tag{name: "role", value: "appears"}), + toTags("name2", tag{name: "c", value: "d"}, tag{name: "e", value: "f"}), + }, + } + + for _, tt := range tests { + expectedWhitespace := fmt.Sprintf(`{ "status":"success", - "data":[] - }` + "data":[ + { + "__name__":"name", + "a":"b"%s + }, + { + "__name__":"name2", + "c":"d", + "e":"f" + } + ] + }`, tt.additional) - err := RenderSeriesMatchResultsJSON(w, seriesMatchResult) - assert.NoError(t, err) - fields := strings.Fields(expectedWhitespace) - expected := "" - for _, field := range fields { - expected = expected + field - } + err := RenderSeriesMatchResultsJSON(w, seriesMatchResult, tt.dropRole) + assert.NoError(t, err) + fields := strings.Fields(expectedWhitespace) + expected := "" + for _, field := range fields { + expected = expected + field + } - assert.Equal(t, expected, w.value) + assert.Equal(t, expected, w.value) + } } diff --git a/src/query/api/v1/handler/prometheus/remote/match.go b/src/query/api/v1/handler/prometheus/remote/match.go index a7954e6470..33fd631bfd 100644 --- a/src/query/api/v1/handler/prometheus/remote/match.go +++ b/src/query/api/v1/handler/prometheus/remote/match.go @@ -65,7 +65,7 @@ func (h *PromSeriesMatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") - query, err := prometheus.ParseSeriesMatchQuery(r, h.tagOptions) + queries, err := prometheus.ParseSeriesMatchQuery(r, h.tagOptions) if err != nil { logger.Error("unable to parse series match values to query", zap.Error(err)) xhttp.Error(w, err, http.StatusBadRequest) @@ -73,29 +73,22 @@ func (h *PromSeriesMatchHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } opts := storage.NewFetchOptions() - matchers := query.TagMatchers - results := make([]*storage.CompleteTagsResult, len(matchers)) - // TODO: parallel execution - for i, matcher := range matchers { - completeTagsQuery := &storage.CompleteTagsQuery{ - CompleteNameOnly: false, - TagMatchers: matcher, - } - - result, err := h.storage.CompleteTags(ctx, completeTagsQuery, opts) + results := make([]models.Metrics, len(queries)) + for i, query := range queries { + result, err := h.storage.SearchSeries(ctx, query, opts) if err != nil { logger.Error("unable to get matched series", zap.Error(err)) xhttp.Error(w, err, http.StatusBadRequest) return } - results[i] = result + results[i] = result.Metrics } // TODO: Support multiple result types - if renderErr := prometheus.RenderSeriesMatchResultsJSON(w, results); renderErr != nil { - logger.Error("unable to write matched series", zap.Error(renderErr)) - xhttp.Error(w, renderErr, http.StatusBadRequest) + if err := prometheus.RenderSeriesMatchResultsJSON(w, results, true); err != nil { + logger.Error("unable to write matched series", zap.Error(err)) + xhttp.Error(w, err, http.StatusBadRequest) return } } diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index 3c437f9190..6ffbb0b2e0 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -60,18 +60,13 @@ func NewM3WrappedStorage( return &m3WrappedStore{m3: m3storage, enforcer: enforcer} } -// translates a graphite query to tag matcher pairs. -func translateQueryToMatchers( +// TranslateQueryToMatchers converts a graphite query to tag matcher pairs. +func TranslateQueryToMatchers( query string, - withTerminator bool, ) (models.Matchers, error) { metricLength := graphite.CountMetricParts(query) - matchersLength := metricLength - if withTerminator { - // Add space for a terminator character. - matchersLength++ - } - + // Add space for a terminator character. + matchersLength := metricLength + 1 matchers := make(models.Matchers, matchersLength) for i := 0; i < metricLength; i++ { metric := graphite.ExtractNthMetricPart(query, i) @@ -82,20 +77,12 @@ func translateQueryToMatchers( } } - if withTerminator { - // Add a terminator matcher at the end to ensure expansion is terminated at - // the last given metric part. - matchers[metricLength] = matcherTerminator(metricLength) - } - + // Add a terminator matcher at the end to ensure expansion is terminated at + // the last given metric part. + matchers[metricLength] = matcherTerminator(metricLength) return matchers, nil } -// TranslateQueryToMatchers converts a graphite query to tag matcher pairs. -func TranslateQueryToMatchers(query string) (models.Matchers, error) { - return translateQueryToMatchers(query, false) -} - // GetQueryTerminatorTagName will return the name for the terminator matcher in // the given pattern. This is useful for filtering out any additional results. func GetQueryTerminatorTagName(query string) []byte { @@ -104,7 +91,7 @@ func GetQueryTerminatorTagName(query string) []byte { } func translateQuery(query string, opts FetchOptions) (*storage.FetchQuery, error) { - matchers, err := translateQueryToMatchers(query, true) + matchers, err := TranslateQueryToMatchers(query) if err != nil { return nil, err }