Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Plumb potential tag completion/aggregate through to m3query endpoints #1481

Merged
merged 2 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ function read_carbon {

# Send metric values 40 and 44 every second
echo "Sending unaggregated carbon metrics to m3coordinator"
bash -c 'for i in $(seq 1 100); do t=$(date +%s); echo "foo.bar.baz 40 $t" | nc 0.0.0.0 7204; echo "foo.bar.baz 44 $t" | nc 0.0.0.0 7204; sleep 1; done' &
bash -c 'while true; do t=$(date +%s); echo "foo.bar.baz 40 $t" | nc 0.0.0.0 7204; echo "foo.bar.baz 44 $t" | nc 0.0.0.0 7204; sleep 1; done' &

# Track PID to kill on exit
METRIC_EMIT_PID="$!"
Expand Down
16 changes: 16 additions & 0 deletions scripts/docker-integration-tests/carbon/expected/a.ba.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"id": "a.bag",
"text": "bag",
"leaf": 1,
"expandable": 0,
"allowChildren": 0
},
{
"id": "a.bar",
"text": "bar",
"leaf": 0,
"expandable": 1,
"allowChildren": 1
}
]
3 changes: 2 additions & 1 deletion scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function read_carbon {
function find_carbon {
query=$1
expected_file=$2
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query")
RESPONSE=$(curl -sSg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query")
ACTUAL=$(echo $RESPONSE | jq '. | sort')
EXPECTED=$(cat $EXPECTED_PATH/$expected_file | jq '. | sort')
if [ "$ACTUAL" == "$EXPECTED" ]
Expand Down Expand Up @@ -83,6 +83,7 @@ echo "a.bar.caw.daz 0 $t" | nc 0.0.0.0 7204
echo "a.bag 0 $t" | nc 0.0.0.0 7204
ATTEMPTS=5 TIMEOUT=1 retry_with_backoff find_carbon a* a.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b* a.b.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.ba[rg] a.ba.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b*.c* a.b.c.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon a.b*.caw.* a.b.c.d.json
ATTEMPTS=2 TIMEOUT=1 retry_with_backoff find_carbon x none.json
Expand Down
100 changes: 72 additions & 28 deletions src/query/api/v1/handler/graphite/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
package graphite

import (
"bytes"
"context"
"errors"
"net/http"
"sync"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
xerrors "github.com/m3db/m3x/errors"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -57,52 +59,94 @@ func NewFindHandler(
}
}

func mergeTags(
terminatedResult *storage.CompleteTagsResult,
childResult *storage.CompleteTagsResult,
) (map[string]bool, error) {
// sanity check the case.
if terminatedResult.CompleteNameOnly {
return nil, errors.New("terminated result is completing name only")
}

if childResult.CompleteNameOnly {
return nil, errors.New("child result is completing name only")
}

mapLength := len(terminatedResult.CompletedTags) + len(childResult.CompletedTags)
tagMap := make(map[string]bool, mapLength)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: In future we should use an autogen'd map that can use []byte as the key and we could then use SetUnsafe(key, ..., Options{NoCopyKey: true, NoFinalizeKey: true}).


for _, tag := range terminatedResult.CompletedTags {
for _, value := range tag.Values {
tagMap[string(value)] = false
}
}

// NB: fine to overwrite any tags which were present in the `terminatedResult` map
// since if they appear in `childResult`, then they exist AND have children.
for _, tag := range childResult.CompletedTags {
for _, value := range tag.Values {
tagMap[string(value)] = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just create a set here instead of the true/false thing? I think you can just do something like:

tagMap := make(map[string]struct{}, mapLength)

then just set it to an empty struct when you're looping through.

		for _, value := range tag.Values {
			tagMap[string(value)] = struct{}{}
		}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I need to see if the given tag has children here so need the bool

}
}

return tagMap, nil
}

func (h *grahiteFindHandler) ServeHTTP(
w http.ResponseWriter,
r *http.Request,
) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)
w.Header().Set("Content-Type", "application/json")
query, rErr := parseFindParamsToQuery(r)

// NB: need to run two separate queries, one of which will match only the
// provided matchers, and one which will match the provided matchers with at
// least one more child node. For further information, refer to the comment
// for parseFindParamsToQueries
terminatedQuery, childQuery, raw, rErr := parseFindParamsToQueries(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

opts := storage.NewFetchOptions()
result, err := h.storage.FetchTags(ctx, query, opts)
if err != nil {
var (
terminatedResult *storage.CompleteTagsResult
tErr error
childResult *storage.CompleteTagsResult
cErr error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could make this (to slightly compact):

var (
  terminateResult, childResult *storage.CompleteTagsResult
  termErr, childErr error
  // ...
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not a fan of doing it like that, looks messier to me :(

If it's a standard we're using will update though 👍

opts = storage.NewFetchOptions()

wg sync.WaitGroup
)

wg.Add(2)
go func() {
terminatedResult, tErr = h.storage.CompleteTags(ctx, terminatedQuery, opts)
wg.Done()
}()

go func() {
childResult, cErr = h.storage.CompleteTags(ctx, childQuery, opts)
wg.Done()
}()

wg.Wait()
if err := xerrors.FirstError(tErr, cErr); err != nil {
logger.Error("unable to complete tags", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

partCount := graphite.CountMetricParts(query.Raw)
partName := graphite.TagName(partCount - 1)
seenMap := make(map[string]bool, len(result.Metrics))
for _, m := range result.Metrics {
tags := m.Tags.Tags
index := 0
// TODO: make this more performant by computing the index for the tag name.
for i, tag := range tags {
if bytes.Equal(partName, tag.Name) {
index = i
break
}
}

value := tags[index].Value
// If this value has already been encountered, check if
if hadExtra, seen := seenMap[string(value)]; seen && hadExtra {
continue
}

hasExtraParts := len(tags) > partCount
seenMap[string(value)] = hasExtraParts
// NB: merge results from both queries to specify which series have children
seenMap, err := mergeTags(terminatedResult, childResult)
if err != nil {
logger.Error("unable to complete tags", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

prefix := graphite.DropLastMetricPart(query.Raw)
prefix := graphite.DropLastMetricPart(raw)
if len(prefix) > 0 {
prefix += "."
}
Expand Down
88 changes: 68 additions & 20 deletions src/query/api/v1/handler/graphite/find_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,41 @@ import (
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/graphite/graphite"
graphiteStorage "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/json"
"github.com/m3db/m3/src/x/net/http"
)

func parseFindParamsToQuery(r *http.Request) (
*storage.FetchQuery,
*xhttp.ParseError,
// parseFindParamsToQueries parses an incoming request to two find queries,
// which are then combined to give the final result.
// It returns, in order:
// the given query; this will return all values for exactly that tag which have
// _terminatedQuery, which adds an explicit terminator after the last term in
// no child nodes
// _childQuery, which adds an explicit match all after the last term in the
// given query; this will return all values for exactly that tag which have at
// least one child node.
// _rawQueryString, which is the initial query request (bar final
// matcher), which is used to reconstruct the return values.
// _err, any error encountered during parsing.
//
// As an example, given the query `a.b*`, and metrics `a.bar.c` and `a.biz`,
// terminatedQuery will return only [biz], and childQuery will return only
// [bar].
func parseFindParamsToQueries(r *http.Request) (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add a comment w/ an example to explain expected input/output

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, i like it.

_terminatedQuery *storage.CompleteTagsQuery,
_childQuery *storage.CompleteTagsQuery,
_rawQueryString string,
_err *xhttp.ParseError,
) {
values := r.URL.Query()
query := values.Get("query")
if query == "" {
return nil, nil, "",
xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest)
}

now := time.Now()
fromString, untilString := r.FormValue("from"), r.FormValue("until")
if len(fromString) == 0 {
Expand All @@ -56,8 +81,9 @@ func parseFindParamsToQuery(r *http.Request) (
)

if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString),
http.StatusBadRequest)
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'from': %s", fromString),
http.StatusBadRequest)
}

until, err := graphite.ParseTime(
Expand All @@ -67,28 +93,50 @@ func parseFindParamsToQuery(r *http.Request) (
)

if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString),
http.StatusBadRequest)
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'until': %s", untilString),
http.StatusBadRequest)
}

query := values.Get("query")
if query == "" {
return nil, xhttp.NewParseError(errors.ErrNoQueryFound, http.StatusBadRequest)
matchers, err := graphiteStorage.TranslateQueryToMatchersWithTerminator(query)
if err != nil {
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
http.StatusBadRequest)
}

matchers, err := graphiteStorage.TranslateQueryToMatchers(query)
if err != nil {
return nil, xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
// NB: Filter will always be the second last term in the matchers, and the
// matchers should always have a length of at least 2 (term + terminator)
// so this is a sanity check and unexpected in actual execution.
if len(matchers) < 2 {
return nil, nil, "", xhttp.NewParseError(fmt.Errorf("unable to parse "+
"'query': %s", query),
http.StatusBadRequest)
}

return &storage.FetchQuery{
Raw: query,
TagMatchers: matchers,
Start: from,
End: until,
Interval: 0,
}, nil
filter := [][]byte{matchers[len(matchers)-2].Name}
terminatedQuery := &storage.CompleteTagsQuery{
CompleteNameOnly: false,
FilterNameTags: filter,
TagMatchers: matchers,
Start: from,
End: until,
}

clonedMatchers := make([]models.Matcher, len(matchers))
copy(clonedMatchers, matchers)
// NB: change terminator from `MatchNotRegexp` to `MatchRegexp` to ensure
// segments with children are matched.
clonedMatchers[len(clonedMatchers)-1].Type = models.MatchRegexp
childQuery := &storage.CompleteTagsQuery{
CompleteNameOnly: false,
FilterNameTags: filter,
TagMatchers: clonedMatchers,
Start: from,
End: until,
}

return terminatedQuery, childQuery, query, nil
}

func findResultsJSON(
Expand Down
Loading