Skip to content

Commit

Permalink
- Fix the old Match endpoint which incorrectly generated a combinator…
Browse files Browse the repository at this point in the history
…ial result

- Update Graphite find endpoint
  • Loading branch information
arnikola committed Mar 20, 2019
1 parent 5a2ac6a commit 4a54dc5
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 243 deletions.
36 changes: 9 additions & 27 deletions src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package graphite

import (
"bytes"
"context"
"net/http"

Expand Down Expand Up @@ -64,47 +63,30 @@ func (h *grahiteFindHandler) ServeHTTP(
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)
w.Header().Set("Content-Type", "application/json")
query, rErr := parseFindParamsToQuery(r)
query, raw, rErr := parseFindParamsToQuery(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

opts := storage.NewFetchOptions()
// FIXME: arnikola, use the tag completion point instead of this one here
// if someone finds this in the PR I owe you a beer
result, err := h.storage.SearchSeries(ctx, query, opts)
result, err := h.storage.CompleteTags(ctx, query, opts)
if err != nil {
logger.Error("unable to complete tags", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

partCount := graphite.CountMetricParts(query.Raw)
partName := graphite.TagName(partCount - 1)
seenMap := make(map[string]bool, len(result.Metrics))
for _, m := range result.Metrics {
tags := m.Tags.Tags
index := 0
// TODO: make this more performant by computing the index for the tag name.
for i, tag := range tags {
if bytes.Equal(partName, tag.Name) {
index = i
break
}
seenMap := make(map[string]bool, len(result.CompletedTags))
for _, tags := range result.CompletedTags {
for _, value := range tags.Values {
// FIXME: (arnikola) Figure out how to add children; may need to run find
// query twice, once with an additional wildcard matcher on the end.
seenMap[string(value)] = true
}

value := tags[index].Value
// If this value has already been encountered, check if
if hadExtra, seen := seenMap[string(value)]; seen && hadExtra {
continue
}

hasExtraParts := len(tags) > partCount
seenMap[string(value)] = hasExtraParts
}

prefix := graphite.DropLastMetricPart(query.Raw)
prefix := graphite.DropLastMetricPart(raw)
if len(prefix) > 0 {
prefix += "."
}
Expand Down
52 changes: 8 additions & 44 deletions src/query/api/v1/handler/graphite/find_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,71 +24,35 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/graphite/graphite"
graphiteStorage "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/json"
"github.com/m3db/m3/src/x/net/http"
)

func parseFindParamsToQuery(r *http.Request) (
*storage.FetchQuery,
*storage.CompleteTagsQuery,
string,
*xhttp.ParseError,
) {
values := r.URL.Query()
now := time.Now()
fromString, untilString := r.FormValue("from"), r.FormValue("until")
if len(fromString) == 0 {
fromString = "0"
}

if len(untilString) == 0 {
untilString = "now"
}

from, err := graphite.ParseTime(
fromString,
now,
tzOffsetForAbsoluteTime,
)

if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString),
http.StatusBadRequest)
}

until, err := graphite.ParseTime(
untilString,
now,
tzOffsetForAbsoluteTime,
)

if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString),
http.StatusBadRequest)
}

query := values.Get("query")
if query == "" {
return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest)
return nil, "", xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest)
}

matchers, err := graphiteStorage.TranslateQueryToMatchers(query)
if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
return nil, "", xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
http.StatusBadRequest)
}

return &storage.FetchQuery{
Raw: query,
TagMatchers: matchers,
Start: from,
End: until,
Interval: 0,
}, nil
return &storage.CompleteTagsQuery{
CompleteNameOnly: false,
TagMatchers: matchers,
}, query, nil
}

func findResultsJSON(
Expand Down
114 changes: 46 additions & 68 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package prometheus

import (
"bytes"
"fmt"
"io"
"io/ioutil"
Expand All @@ -41,7 +42,7 @@ import (
)

const (
// NameReplace is the parameter that gets replaced
// NameReplace is the parameter that gets replaced.
NameReplace = "name"
queryParam = "query"
filterNameTagsParam = "tag"
Expand All @@ -52,14 +53,15 @@ const (

var (
matchValues = []byte(".*")
roleName = []byte("role")
)

// TimeoutOpts stores options related to various timeout configurations
// TimeoutOpts stores options related to various timeout configurations.
type TimeoutOpts struct {
FetchTimeout time.Duration
}

// ParsePromCompressedRequest parses a snappy compressed request from Prometheus
// ParsePromCompressedRequest parses a snappy compressed request from Prometheus.
func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) {
body := r.Body
if r.Body == nil {
Expand All @@ -85,7 +87,7 @@ func ParsePromCompressedRequest(r *http.Request) ([]byte, *xhttp.ParseError) {
return reqBuf, nil
}

// ParseRequestTimeout parses the input request timeout with a default
// ParseRequestTimeout parses the input request timeout with a default.
func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (time.Duration, error) {
timeout := r.Header.Get("timeout")
if timeout == "" {
Expand All @@ -104,7 +106,7 @@ func ParseRequestTimeout(r *http.Request, configFetchTimeout time.Duration) (tim
return duration, nil
}

// ParseTagCompletionParamsToQuery parses all params from the GET request
// ParseTagCompletionParamsToQuery parses all params from the GET request.
func ParseTagCompletionParamsToQuery(
r *http.Request,
) (*storage.CompleteTagsQuery, *xhttp.ParseError) {
Expand Down Expand Up @@ -170,11 +172,11 @@ func parseTimeWithDefault(
return defaultTime, nil
}

// ParseSeriesMatchQuery parses all params from the GET request
// ParseSeriesMatchQuery parses all params from the GET request.
func ParseSeriesMatchQuery(
r *http.Request,
tagOptions models.TagOptions,
) (*storage.SeriesMatchQuery, *xhttp.ParseError) {
) ([]*storage.FetchQuery, *xhttp.ParseError) {
r.ParseForm()
matcherValues := r.Form["match[]"]
if len(matcherValues) == 0 {
Expand All @@ -191,7 +193,7 @@ func ParseSeriesMatchQuery(
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

tagMatchers := make([]models.Matchers, len(matcherValues))
queries := make([]*storage.FetchQuery, len(matcherValues))
for i, s := range matcherValues {
promMatchers, err := promql.ParseMetricSelector(s)
if err != nil {
Expand All @@ -203,17 +205,18 @@ func ParseSeriesMatchQuery(
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

tagMatchers[i] = matchers
queries[i] = &storage.FetchQuery{
Raw: fmt.Sprintf("match[]=%s", s),
TagMatchers: matchers,
Start: start,
End: end,
}
}

return &storage.SeriesMatchQuery{
TagMatchers: tagMatchers,
Start: start,
End: end,
}, nil
return queries, nil
}

// ParseTagValuesToQuery parses a tag values request to a complete tags query
// ParseTagValuesToQuery parses a tag values request to a complete tags query.
func ParseTagValuesToQuery(
r *http.Request,
) (*storage.CompleteTagsQuery, error) {
Expand Down Expand Up @@ -288,7 +291,7 @@ func renderDefaultTagCompletionResultsJSON(
return jw.Close()
}

// RenderTagCompletionResultsJSON renders tag completion results to json format
// RenderTagCompletionResultsJSON renders tag completion results to json format.
func RenderTagCompletionResultsJSON(
w io.Writer,
result *storage.CompleteTagsResult,
Expand All @@ -301,7 +304,7 @@ func RenderTagCompletionResultsJSON(
return renderDefaultTagCompletionResultsJSON(w, results)
}

// RenderTagValuesResultsJSON renders tag values results to json format
// RenderTagValuesResultsJSON renders tag values results to json format.
func RenderTagValuesResultsJSON(
w io.Writer,
result *storage.CompleteTagsResult,
Expand Down Expand Up @@ -346,56 +349,11 @@ func RenderTagValuesResultsJSON(
return jw.Close()
}

type tag struct {
name string
value string
}

func writeTagsHelper(
jw *json.Writer,
completedTags []storage.CompletedTag,
tags []tag,
) {
if len(completedTags) == 0 {
jw.BeginObject()
for _, tag := range tags {
jw.BeginObjectField(tag.name)
jw.WriteString(tag.value)
}

jw.EndObject()
return
}

firstResult := completedTags[0]
name := string(firstResult.Name)

copiedTags := make([]tag, len(tags)+1)
copy(copiedTags, tags)
for _, value := range firstResult.Values {
copiedTags[len(tags)] = tag{name: name, value: string(value)}
writeTagsHelper(jw, completedTags[1:], copiedTags)
}
}

func writeTags(
jw *json.Writer,
results []*storage.CompleteTagsResult,
) {
for _, result := range results {
jw.BeginArray()
tags := result.CompletedTags
if len(tags) > 0 {
writeTagsHelper(jw, result.CompletedTags, nil)
}
jw.EndArray()
}
}

// RenderSeriesMatchResultsJSON renders series match results to json format
// RenderSeriesMatchResultsJSON renders series match results to json format.
func RenderSeriesMatchResultsJSON(
w io.Writer,
results []*storage.CompleteTagsResult,
results []models.Metrics,
dropRole bool,
) error {
jw := json.NewWriter(w)
jw.BeginObject()
Expand All @@ -404,13 +362,33 @@ func RenderSeriesMatchResultsJSON(
jw.WriteString("success")

jw.BeginObjectField("data")
writeTags(jw, results)
jw.BeginArray()

for _, result := range results {
for _, tags := range result {
jw.BeginObject()
for _, tag := range tags.Tags.Tags {
if bytes.Equal(tag.Name, roleName) && dropRole {
// NB: When data is written from Prometheus remote write, additional
// `"role":"remote"` tag is added, which should not be included in the
// results.
continue
}
jw.BeginObjectField(string(tag.Name))
jw.WriteString(string(tag.Value))
}
jw.EndObject()
}

}

jw.EndArray()
jw.EndObject()

return jw.Close()
}

// PromResp represents Prometheus's query response
// PromResp represents Prometheus's query response.
type PromResp struct {
Status string `json:"status"`
Data struct {
Expand All @@ -424,7 +402,7 @@ type PromResp struct {
} `json:"data"`
}

// PromDebug represents the input and output that are used in the debug endpoint
// PromDebug represents the input and output that are used in the debug endpoint.
type PromDebug struct {
Input PromResp `json:"input"`
Results PromResp `json:"results"`
Expand Down
Loading

0 comments on commit 4a54dc5

Please sign in to comment.