-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Plumb potential tag completion/aggregate through to m3query endpoints #1481
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
[ | ||
{ | ||
"id": "a.bag", | ||
"text": "bag", | ||
"leaf": 1, | ||
"expandable": 0, | ||
"allowChildren": 0 | ||
}, | ||
{ | ||
"id": "a.bar", | ||
"text": "bar", | ||
"leaf": 0, | ||
"expandable": 1, | ||
"allowChildren": 1 | ||
} | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,15 +21,17 @@ | |
package graphite | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"net/http" | ||
"sync" | ||
|
||
"github.com/m3db/m3/src/query/api/v1/handler" | ||
"github.com/m3db/m3/src/query/graphite/graphite" | ||
"github.com/m3db/m3/src/query/storage" | ||
"github.com/m3db/m3/src/query/util/logging" | ||
"github.com/m3db/m3/src/x/net/http" | ||
xerrors "github.com/m3db/m3x/errors" | ||
|
||
"go.uber.org/zap" | ||
) | ||
|
@@ -57,52 +59,94 @@ func NewFindHandler( | |
} | ||
} | ||
|
||
func mergeTags( | ||
terminatedResult *storage.CompleteTagsResult, | ||
childResult *storage.CompleteTagsResult, | ||
) (map[string]bool, error) { | ||
// sanity check the case. | ||
if terminatedResult.CompleteNameOnly { | ||
return nil, errors.New("terminated result is completing name only") | ||
} | ||
|
||
if childResult.CompleteNameOnly { | ||
return nil, errors.New("child result is completing name only") | ||
} | ||
|
||
mapLength := len(terminatedResult.CompletedTags) + len(childResult.CompletedTags) | ||
tagMap := make(map[string]bool, mapLength) | ||
|
||
for _, tag := range terminatedResult.CompletedTags { | ||
for _, value := range tag.Values { | ||
tagMap[string(value)] = false | ||
} | ||
} | ||
|
||
// NB: fine to overwrite any tags which were present in the `terminatedResult` map | ||
// since if they appear in `childResult`, then they exist AND have children. | ||
for _, tag := range childResult.CompletedTags { | ||
for _, value := range tag.Values { | ||
tagMap[string(value)] = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you just create a set here instead of the true/false thing? I think you can just do something like:
then just set it to an empty struct when you're looping through.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I need to see if the given tag has children here so need the bool |
||
} | ||
} | ||
|
||
return tagMap, nil | ||
} | ||
|
||
func (h *grahiteFindHandler) ServeHTTP( | ||
w http.ResponseWriter, | ||
r *http.Request, | ||
) { | ||
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header) | ||
logger := logging.WithContext(ctx) | ||
w.Header().Set("Content-Type", "application/json") | ||
query, rErr := parseFindParamsToQuery(r) | ||
|
||
// NB: need to run two separate queries, one of which will match only the | ||
// provided matchers, and one which will match the provided matchers with at | ||
// least one more child node. For further information, refer to the comment | ||
// for parseFindParamsToQueries | ||
terminatedQuery, childQuery, raw, rErr := parseFindParamsToQueries(r) | ||
if rErr != nil { | ||
xhttp.Error(w, rErr.Inner(), rErr.Code()) | ||
return | ||
} | ||
|
||
opts := storage.NewFetchOptions() | ||
result, err := h.storage.FetchTags(ctx, query, opts) | ||
if err != nil { | ||
var ( | ||
terminatedResult *storage.CompleteTagsResult | ||
tErr error | ||
childResult *storage.CompleteTagsResult | ||
cErr error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: You could make this (to slightly compact): var (
terminateResult, childResult *storage.CompleteTagsResult
termErr, childErr error
// ...
) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually not a fan of doing it like that, looks messier to me :( If it's a standard we're using will update though 👍 |
||
opts = storage.NewFetchOptions() | ||
|
||
wg sync.WaitGroup | ||
) | ||
|
||
wg.Add(2) | ||
go func() { | ||
terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts) | ||
wg.Done() | ||
}() | ||
|
||
go func() { | ||
childResult, cErr = h.storage.CompleteTags(ctx, childQuery, opts) | ||
wg.Done() | ||
}() | ||
|
||
wg.Wait() | ||
if err := xerrors.FirstError(tErr, cErr); err != nil { | ||
logger.Error("unable to complete tags", zap.Error(err)) | ||
xhttp.Error(w, err, http.StatusBadRequest) | ||
return | ||
} | ||
|
||
partCount := graphite.CountMetricParts(query.Raw) | ||
partName := graphite.TagName(partCount - 1) | ||
seenMap := make(map[string]bool, len(result.Metrics)) | ||
for _, m := range result.Metrics { | ||
tags := m.Tags.Tags | ||
index := 0 | ||
// TODO: make this more performant by computing the index for the tag name. | ||
for i, tag := range tags { | ||
if bytes.Equal(partName, tag.Name) { | ||
index = i | ||
break | ||
} | ||
} | ||
|
||
value := tags[index].Value | ||
// If this value has already been encountered, check if | ||
if hadExtra, seen := seenMap[string(value)]; seen && hadExtra { | ||
continue | ||
} | ||
|
||
hasExtraParts := len(tags) > partCount | ||
seenMap[string(value)] = hasExtraParts | ||
// NB: merge results from both queries to specify which series have children | ||
seenMap, err := mergeTags(terminatedResult, childResult) | ||
if err != nil { | ||
logger.Error("unable to complete tags", zap.Error(err)) | ||
xhttp.Error(w, err, http.StatusBadRequest) | ||
return | ||
} | ||
|
||
prefix := graphite.DropLastMetricPart(query.Raw) | ||
prefix := graphite.DropLastMetricPart(raw) | ||
if len(prefix) > 0 { | ||
prefix += "." | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,16 +29,41 @@ import ( | |
"github.com/m3db/m3/src/query/errors" | ||
"github.com/m3db/m3/src/query/graphite/graphite" | ||
graphiteStorage "github.com/m3db/m3/src/query/graphite/storage" | ||
"github.com/m3db/m3/src/query/models" | ||
"github.com/m3db/m3/src/query/storage" | ||
"github.com/m3db/m3/src/query/util/json" | ||
"github.com/m3db/m3/src/x/net/http" | ||
) | ||
|
||
func parseFindParamsToQuery(r *http.Request) ( | ||
*storage.FetchQuery, | ||
*xhttp.ParseError, | ||
// parseFindParamsToQueries parses an incoming request to two find queries, | ||
// which are then combined to give the final result. | ||
// It returns, in order: | ||
// the given query; this will return all values for exactly that tag which have | ||
// _terminatedQuery, which adds an explicit terminator after the last term in | ||
// no child nodes | ||
// _childQuery, which adds an explicit match all after the last term in the | ||
// given query; this will return all values for exactly that tag which have at | ||
// least one child node. | ||
// _rawQueryString, which is the initial query request (bar final | ||
// matcher), which is used to reconstruct the return values. | ||
// _err, any error encountered during parsing. | ||
// | ||
// As an example, given the query `a.b*`, and metrics `a.bar.c` and `a.biz`, | ||
// terminatedQuery will return only [biz], and childQuery will return only | ||
// [bar]. | ||
func parseFindParamsToQueries(r *http.Request) ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could you add a comment w/ an example to explain expected input/output There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice, i like it. |
||
_terminatedQuery *storage.CompleteTagsQuery, | ||
_childQuery *storage.CompleteTagsQuery, | ||
_rawQueryString string, | ||
_err *xhttp.ParseError, | ||
) { | ||
values := r.URL.Query() | ||
query := values.Get("query") | ||
if query == "" { | ||
return nil, nil, "", | ||
xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) | ||
} | ||
|
||
now := time.Now() | ||
fromString, untilString := r.FormValue("from"), r.FormValue("until") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we getting rid of the from and until? I thought the new index supports that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good one, for some reason decided these are always from |
||
if len(fromString) == 0 { | ||
|
@@ -56,8 +81,9 @@ func parseFindParamsToQuery(r *http.Request) ( | |
) | ||
|
||
if err != nil { | ||
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString), | ||
http.StatusBadRequest) | ||
return nil, nil, "", | ||
xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString), | ||
http.StatusBadRequest) | ||
} | ||
|
||
until, err := graphite.ParseTime( | ||
|
@@ -67,28 +93,50 @@ func parseFindParamsToQuery(r *http.Request) ( | |
) | ||
|
||
if err != nil { | ||
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString), | ||
http.StatusBadRequest) | ||
return nil, nil, "", | ||
xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString), | ||
http.StatusBadRequest) | ||
} | ||
|
||
query := values.Get("query") | ||
if query == "" { | ||
return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest) | ||
matchers, err := graphiteStorage.TranslateQueryToMatchersWithTerminator(query) | ||
if err != nil { | ||
return nil, nil, "", | ||
xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), | ||
http.StatusBadRequest) | ||
} | ||
|
||
matchers, err := graphiteStorage.TranslateQueryToMatchers(query) | ||
if err != nil { | ||
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query), | ||
// NB: Filter will always be the second last term in the matchers, and the | ||
// matchers should always have a length of at least 2 (term + terminator) | ||
// so this is a sanity check and unexpected in actual execution. | ||
if len(matchers) < 2 { | ||
return nil, nil, "", xhttp.NewParseError(fmt.Errorf("unable to parse "+ | ||
"'query': %s", query), | ||
http.StatusBadRequest) | ||
} | ||
|
||
return &storage.FetchQuery{ | ||
Raw: query, | ||
TagMatchers: matchers, | ||
Start: from, | ||
End: until, | ||
Interval: 0, | ||
}, nil | ||
filter := [][]byte{matchers[len(matchers)-2].Name} | ||
terminatedQuery := &storage.CompleteTagsQuery{ | ||
CompleteNameOnly: false, | ||
FilterNameTags: filter, | ||
TagMatchers: matchers, | ||
Start: from, | ||
End: until, | ||
} | ||
|
||
clonedMatchers := make([]models.Matcher, len(matchers)) | ||
copy(clonedMatchers, matchers) | ||
// NB: change terminator from `MatchNotRegexp` to `MatchRegexp` to ensure | ||
// segments with children are matched. | ||
clonedMatchers[len(clonedMatchers)-1].Type = models.MatchRegexp | ||
childQuery := &storage.CompleteTagsQuery{ | ||
CompleteNameOnly: false, | ||
FilterNameTags: filter, | ||
TagMatchers: clonedMatchers, | ||
Start: from, | ||
End: until, | ||
} | ||
|
||
return terminatedQuery, childQuery, query, nil | ||
} | ||
|
||
func findResultsJSON( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: In future we should use an autogen'd map that can use
[]byte
as the key and we could then useSetUnsafe(key, ..., Options{NoCopyKey: true, NoFinalizeKey: true})
.