Skip to content

Commit

Permalink
[query] Fix Graphite aliasByNode & aliasByMetric to use robust path e…
Browse files Browse the repository at this point in the history
…xpr extraction and add neg indexing to substr (#3576)
  • Loading branch information
robskillington authored Jun 30, 2021
1 parent f7dd205 commit 0865ebc
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 107 deletions.
14 changes: 0 additions & 14 deletions src/query/graphite/common/aliasing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/m3db/m3/src/query/graphite/ts"
"github.com/m3db/m3/src/x/errors"
Expand All @@ -45,19 +44,6 @@ func Alias(_ *Context, series ts.SeriesList, a string) (ts.SeriesList, error) {
return series, nil
}

// AliasByMetric takes a seriesList and applies an alias derived from the base
// metric name.
func AliasByMetric(ctx *Context, series ts.SeriesList) (ts.SeriesList, error) {
renamed := make([]*ts.Series, series.Len())
for i, s := range series.Values {
firstPart := strings.Split(s.Name(), ",")[0]
terms := strings.Split(firstPart, ".")
renamed[i] = s.RenamedTo(terms[len(terms)-1])
}
series.Values = renamed
return series, nil
}

// AliasSub runs series names through a regex search/replace.
func AliasSub(_ *Context, input ts.SeriesList, search, replace string) (ts.SeriesList, error) {
regex, err := regexp.Compile(search)
Expand Down
44 changes: 24 additions & 20 deletions src/query/graphite/native/alias_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@ func alias(ctx *common.Context, series singlePathSpec, a string) (ts.SeriesList,

// aliasByMetric takes a seriesList and applies an alias derived from the base
// metric name.
func aliasByMetric(ctx *common.Context, series singlePathSpec) (ts.SeriesList, error) {
return common.AliasByMetric(ctx, ts.SeriesList(series))
func aliasByMetric(ctx *common.Context, seriesList singlePathSpec) (ts.SeriesList, error) {
return aliasByNode(ctx, seriesList, -1)
}

// aliasByNode renames a time series result according to a subset of the nodes
// in its hierarchy.
func aliasByNode(ctx *common.Context, seriesList singlePathSpec, nodes ...int) (ts.SeriesList, error) {
renamed := make([]*ts.Series, 0, ts.SeriesList(seriesList).Len())
for _, series := range seriesList.Values {
name := getFirstPathExpression(series.Name())
name, err := getFirstPathExpression(series.Name())
if err != nil {
return ts.SeriesList{}, err
}

nameParts := strings.Split(name, ".")
newNameParts := make([]string, 0, len(nodes))
Expand All @@ -66,34 +69,35 @@ func aliasByNode(ctx *common.Context, seriesList singlePathSpec, nodes ...int) (
return ts.SeriesList(seriesList), nil
}

func getFirstPathExpression(name string) string {
expr, err := Compile(name, CompileOptions{})
func getFirstPathExpression(name string) (string, error) {
node, err := ParseGrammar(name, CompileOptions{})
if err != nil {
return name
return "", err
}
if path, ok := getFirstPathExpressionDepthFirst(expr.Arguments()); ok {
return path
if path, ok := getFirstPathExpressionDepthFirst(node); ok {
return path, nil
}
return name
return name, nil
}

func getFirstPathExpressionDepthFirst(args []ArgumentASTNode) (string, bool) {
for i := 0; i < len(args); i++ {
path, ok := args[i].PathExpression()
if ok {
return path, true
}
func getFirstPathExpressionDepthFirst(node ASTNode) (string, bool) {
path, ok := node.PathExpression()
if ok {
return path, true
}

inner, ok := args[i].(CallASTNode)
if !ok {
continue
}
call, ok := node.CallExpression()
if !ok {
return "", false
}

path, ok = getFirstPathExpressionDepthFirst(inner.Arguments())
for _, arg := range call.Arguments() {
path, ok = getFirstPathExpressionDepthFirst(arg)
if ok {
return path, true
}
}

return "", false
}

Expand Down
43 changes: 28 additions & 15 deletions src/query/graphite/native/alias_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func TestAlias(t *testing.T) {
results, err := alias(nil, singlePathSpec{
Values: series,
}, a)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
for _, s := range results.Values {
assert.Equal(t, a, s.Name())
Expand Down Expand Up @@ -145,14 +144,13 @@ func TestAliasByMetric(t *testing.T) {
ts.NewSeries(ctx, "foo.bar.baz.foo01-foo.writes.success", now, values),
ts.NewSeries(ctx, "foo.bar.baz.foo02-foo.writes.success.P99", now, values),
ts.NewSeries(ctx, "foo.bar.baz.foo03-foo.writes.success.P75", now, values),
ts.NewSeries(ctx, "scale(stats.foobar.gauges.quazqux.latency_minutes.foo, 60.123))", now, values),
ts.NewSeries(ctx, "scale(stats.foobar.gauges.quazqux.latency_minutes.foo, 60.123)", now, values),
}

results, err := aliasByMetric(ctx, singlePathSpec{
Values: series,
})
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), len(results.Values))
assert.Equal(t, "success", results.Values[0].Name())
assert.Equal(t, "P99", results.Values[1].Name())
Expand All @@ -176,8 +174,7 @@ func TestAliasByNode(t *testing.T) {
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 3, 5, 6)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "foo01-foo.success", results.Values[0].Name())
assert.Equal(t, "foo02-foo.success.P99", results.Values[1].Name())
Expand All @@ -186,8 +183,7 @@ func TestAliasByNode(t *testing.T) {
results, err = aliasByNode(nil, singlePathSpec{
Values: series,
}, -1)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "success", results.Values[0].Name())
assert.Equal(t, "P99", results.Values[1].Name())
Expand All @@ -203,18 +199,17 @@ func TestAliasByNodeWithComposition(t *testing.T) {
series := []*ts.Series{
ts.NewSeries(ctx, "derivative(servers.bob02-foo.cpu.load_5)", now, values),
ts.NewSeries(ctx, "derivative(derivative(servers.bob02-foo.cpu.load_5))", now, values),
ts.NewSeries(ctx, "~~~", now, values),
ts.NewSeries(ctx, "fooble", now, values),
ts.NewSeries(ctx, "", now, values),
}
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 0, 1)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "servers.bob02-foo", results.Values[0].Name())
assert.Equal(t, "servers.bob02-foo", results.Values[1].Name())
assert.Equal(t, "~~~", results.Values[2].Name())
assert.Equal(t, "fooble", results.Values[2].Name())
assert.Equal(t, "", results.Values[3].Name())
}

Expand All @@ -231,9 +226,27 @@ func TestAliasByNodeWithManyPathExpressions(t *testing.T) {
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 0, 1)
require.Nil(t, err)
require.NotNil(t, results)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "servers.bob02-foo", results.Values[0].Name())
assert.Equal(t, "servers.bob04-foo", results.Values[1].Name())
}

func TestAliasByNodeWitCallSubExpressions(t *testing.T) {
ctx := common.NewTestContext()
defer func() { _ = ctx.Close() }()

now := time.Now()
values := ts.NewConstantValues(ctx, 10.0, 1000, 10)
series := []*ts.Series{
ts.NewSeries(ctx, "asPercent(foo01,sumSeries(bar,baz))", now, values),
ts.NewSeries(ctx, "asPercent(foo02,sumSeries(bar,baz))", now, values),
}
results, err := aliasByNode(ctx, singlePathSpec{
Values: series,
}, 0)
require.NoError(t, err)
require.Equal(t, len(series), results.Len())
assert.Equal(t, "foo01", results.Values[0].Name())
assert.Equal(t, "foo02", results.Values[1].Name())
}
21 changes: 15 additions & 6 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ func safeIndex(len, index int) int {
// of the array (if only one integer n is passed) or n - m elements of the array (if two integers n and m
// are passed).
func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.SeriesList, error) {
origStart, origStop := start, stop
results := make([]*ts.Series, len(seriesList.Values))
re := regexp.MustCompile(",.*$")
for i, series := range seriesList.Values {
Expand All @@ -1676,18 +1677,26 @@ func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.S
right = safeIndex(length, right)
nameParts := strings.Split(name[left:right], ".")
numParts := len(nameParts)
currStart, currStop := start, stop
// Graphite supports negative indexing, so we need to also.
if currStart < 0 {
currStart += numParts
}
if currStop < 0 {
currStop += numParts
}
// If stop == 0, it's as if stop was unspecified
if start < 0 || start >= numParts || (stop != 0 && stop < start) {
if currStart < 0 || currStart >= numParts || (currStop != 0 && currStop < currStart) {
err := xerrors.NewInvalidParamsError(fmt.Errorf(
"invalid substr params, start=%d, stop=%d", start, stop))
"invalid substr params: start=%d, stop=%d", origStart, origStop))
return ts.NewSeriesList(), err
}
var newName string
if stop == 0 {
newName = strings.Join(nameParts[start:], ".")
if currStop == 0 {
newName = strings.Join(nameParts[currStart:], ".")
} else {
stop = safeIndex(numParts, stop)
newName = strings.Join(nameParts[start:stop], ".")
stop = safeIndex(numParts, currStop)
newName = strings.Join(nameParts[currStart:currStop], ".")
}
newName = re.ReplaceAllString(newName, "")
results[i] = series.RenamedTo(newName)
Expand Down
20 changes: 12 additions & 8 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,40 +3244,44 @@ func TestSubstr(t *testing.T) {
Values: []*ts.Series{series},
}, 1, 0)
expected := common.TestSeries{Name: "bar", Data: input.values}
require.Nil(t, err)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 0, 2)
expected = common.TestSeries{Name: "foo.bar", Data: input.values}
require.Nil(t, err)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 0, 0)
expected = common.TestSeries{Name: "foo.bar", Data: input.values}
require.Nil(t, err)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

// Negative support -1, 0.
results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 2, 1)
require.NotNil(t, err)
}, -1, 0)
expected = common.TestSeries{Name: "bar", Data: input.values}
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, -1, 1)
require.NotNil(t, err)
}, 2, 1)
require.Error(t, err)

results, err = substr(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, 3, 4)
require.NotNil(t, err)
require.Error(t, err)
}

type mockStorage struct{}
Expand Down
Loading

0 comments on commit 0865ebc

Please sign in to comment.