Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix Graphite aliasByNode & aliasByMetric to use robust path expr extraction and add neg indexing to substr #3576

Merged
merged 4 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions src/query/graphite/common/aliasing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/m3db/m3/src/query/graphite/ts"
"github.com/m3db/m3/src/x/errors"
Expand All @@ -45,19 +44,6 @@ func Alias(_ *Context, series ts.SeriesList, a string) (ts.SeriesList, error) {
return series, nil
}

// AliasByMetric takes a seriesList and applies an alias derived from the base
// metric name.
func AliasByMetric(ctx *Context, series ts.SeriesList) (ts.SeriesList, error) {
renamed := make([]*ts.Series, series.Len())
for i, s := range series.Values {
firstPart := strings.Split(s.Name(), ",")[0]
terms := strings.Split(firstPart, ".")
renamed[i] = s.RenamedTo(terms[len(terms)-1])
}
series.Values = renamed
return series, nil
}

// AliasSub runs series names through a regex search/replace.
func AliasSub(_ *Context, input ts.SeriesList, search, replace string) (ts.SeriesList, error) {
regex, err := regexp.Compile(search)
Expand Down
44 changes: 24 additions & 20 deletions src/query/graphite/native/alias_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@ func alias(ctx *common.Context, series singlePathSpec, a string) (ts.SeriesList,

// aliasByMetric takes a seriesList and applies an alias derived from the base
// metric name.
func aliasByMetric(ctx *common.Context, series singlePathSpec) (ts.SeriesList, error) {
return common.AliasByMetric(ctx, ts.SeriesList(series))
func aliasByMetric(ctx *common.Context, seriesList singlePathSpec) (ts.SeriesList, error) {
return aliasByNode(ctx, seriesList, -1)
}

// aliasByNode renames a time series result according to a subset of the nodes
// in its hierarchy.
func aliasByNode(ctx *common.Context, seriesList singlePathSpec, nodes ...int) (ts.SeriesList, error) {
renamed := make([]*ts.Series, 0, ts.SeriesList(seriesList).Len())
for _, series := range seriesList.Values {
name := getFirstPathExpression(series.Name())
name, err := getFirstPathExpression(series.Name())
if err != nil {
return ts.SeriesList{}, err
}

nameParts := strings.Split(name, ".")
newNameParts := make([]string, 0, len(nodes))
Expand All @@ -66,34 +69,35 @@ func aliasByNode(ctx *common.Context, seriesList singlePathSpec, nodes ...int) (
return ts.SeriesList(seriesList), nil
}

func getFirstPathExpression(name string) string {
expr, err := Compile(name, CompileOptions{})
func getFirstPathExpression(name string) (string, error) {
node, err := ParseGrammar(name, CompileOptions{})
if err != nil {
return name
return "", err
}
if path, ok := getFirstPathExpressionDepthFirst(expr.Arguments()); ok {
return path
if path, ok := getFirstPathExpressionDepthFirst(node); ok {
return path, nil
}
return name
return name, nil
}

func getFirstPathExpressionDepthFirst(args []ArgumentASTNode) (string, bool) {
for i := 0; i < len(args); i++ {
path, ok := args[i].PathExpression()
if ok {
return path, true
}
func getFirstPathExpressionDepthFirst(node ASTNode) (string, bool) {
path, ok := node.PathExpression()
if ok {
return path, true
}

inner, ok := args[i].(CallASTNode)
if !ok {
continue
}
call, ok := node.CallExpression()
if !ok {
return "", false
}

path, ok = getFirstPathExpressionDepthFirst(inner.Arguments())
for _, arg := range call.Arguments() {
path, ok = getFirstPathExpressionDepthFirst(arg)
if ok {
return path, true
}
}

return "", false
}

Expand Down
39 changes: 26 additions & 13 deletions src/query/graphite/native/alias_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func TestAlias(t *testing.T) {
results, err := alias(nil, singlePathSpec{
Values: series,
}, a)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
for _, s := range results.Values {
assert.Equal(t, a, s.Name())
Expand Down Expand Up @@ -145,14 +144,13 @@ func TestAliasByMetric(t *testing.T) {
ts.NewSeries(ctx, "foo.bar.baz.foo01-foo.writes.success", now, values),
ts.NewSeries(ctx, "foo.bar.baz.foo02-foo.writes.success.P99", now, values),
ts.NewSeries(ctx, "foo.bar.baz.foo03-foo.writes.success.P75", now, values),
ts.NewSeries(ctx, "scale(stats.foobar.gauges.quazqux.latency_minutes.foo, 60.123))", now, values),
ts.NewSeries(ctx, "scale(stats.foobar.gauges.quazqux.latency_minutes.foo, 60.123)", now, values),
}

results, err := aliasByMetric(ctx, singlePathSpec{
Values: series,
})
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), len(results.Values))
assert.Equal(t, "success", results.Values[0].Name())
assert.Equal(t, "P99", results.Values[1].Name())
Expand All @@ -176,8 +174,7 @@ func TestAliasByNode(t *testing.T) {
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 3, 5, 6)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "foo01-foo.success", results.Values[0].Name())
assert.Equal(t, "foo02-foo.success.P99", results.Values[1].Name())
Expand All @@ -186,8 +183,7 @@ func TestAliasByNode(t *testing.T) {
results, err = aliasByNode(nil, singlePathSpec{
Values: series,
}, -1)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "success", results.Values[0].Name())
assert.Equal(t, "P99", results.Values[1].Name())
Expand All @@ -209,8 +205,7 @@ func TestAliasByNodeWithComposition(t *testing.T) {
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 0, 1)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "servers.bob02-foo", results.Values[0].Name())
assert.Equal(t, "servers.bob02-foo", results.Values[1].Name())
Expand All @@ -231,9 +226,27 @@ func TestAliasByNodeWithManyPathExpressions(t *testing.T) {
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 0, 1)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "servers.bob02-foo", results.Values[0].Name())
assert.Equal(t, "servers.bob04-foo", results.Values[1].Name())
}

func TestAliasByNodeWitCallSubExpressions(t *testing.T) {
ctx := common.NewTestContext()
defer func() { _ = ctx.Close() }()

now := time.Now()
values := ts.NewConstantValues(ctx, 10.0, 1000, 10)
series := []*ts.Series{
ts.NewSeries(ctx, "asPercent(foo01,sumSeries(bar,baz))", now, values),
ts.NewSeries(ctx, "asPercent(foo02,sumSeries(bar,baz))", now, values),
}
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 0)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "foo01", results.Values[0].Name())
assert.Equal(t, "foo02", results.Values[1].Name())
}
21 changes: 15 additions & 6 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ func safeIndex(len, index int) int {
// of the array (if only one integer n is passed) or n - m elements of the array (if two integers n and m
// are passed).
func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.SeriesList, error) {
origStart, origStop := start, stop
results := make([]*ts.Series, len(seriesList.Values))
re := regexp.MustCompile(",.*$")
for i, series := range seriesList.Values {
Expand All @@ -1676,18 +1677,26 @@ func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.S
right = safeIndex(length, right)
nameParts := strings.Split(name[left:right], ".")
numParts := len(nameParts)
currStart, currStop := start, stop
// Graphite supports negative indexing, so we need to also.
if currStart < 0 {
currStart += numParts
}
if currStop < 0 {
currStop += numParts
}
// If stop == 0, it's as if stop was unspecified
if start < 0 || start >= numParts || (stop != 0 && stop < start) {
if currStart < 0 || currStart >= numParts || (currStop != 0 && currStop < currStart) {
err := xerrors.NewInvalidParamsError(fmt.Errorf(
"invalid substr params, start=%d, stop=%d", start, stop))
"invalid substr params: start=%d, stop=%d", origStart, origStop))
return ts.NewSeriesList(), err
}
var newName string
if stop == 0 {
newName = strings.Join(nameParts[start:], ".")
if currStop == 0 {
newName = strings.Join(nameParts[currStart:], ".")
} else {
stop = safeIndex(numParts, stop)
newName = strings.Join(nameParts[start:stop], ".")
stop = safeIndex(numParts, currStop)
newName = strings.Join(nameParts[currStart:currStop], ".")
}
newName = re.ReplaceAllString(newName, "")
results[i] = series.RenamedTo(newName)
Expand Down
20 changes: 12 additions & 8 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,40 +3244,44 @@ func TestSubstr(t *testing.T) {
Values: []*ts.Series{series},
}, 1, 0)
expected := common.TestSeries{Name: "bar", Data: input.values}
require.Nil(t, err)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 0, 2)
expected = common.TestSeries{Name: "foo.bar", Data: input.values}
require.Nil(t, err)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 0, 0)
expected = common.TestSeries{Name: "foo.bar", Data: input.values}
require.Nil(t, err)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

// Negative support -1, 0.
results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 2, 1)
require.NotNil(t, err)
}, -1, 0)
expected = common.TestSeries{Name: "bar", Data: input.values}
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, -1, 1)
require.NotNil(t, err)
}, 2, 1)
require.Error(t, err)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 3, 4)
require.NotNil(t, err)
require.Error(t, err)
}

type mockStorage struct{}
Expand Down
Loading