-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[m3query] Add graphite function - aggregate #2584
Conversation
case Max, MaxSeries: | ||
return maxSeries(ctx, multiplePathSpecs(series)) | ||
case Avg, Average, AverageSeries: | ||
return averageSeries(ctx,multiplePathSpecs(series)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Need a space between args here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought go format tool should have fixed that. but yes, let me fix.
// aggregate takes a list of series and returns a new series containing the | ||
// value aggregated across the series at each datapoint using the specified function. | ||
func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) { | ||
switch fname { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Believe you're missing support for a few here? Namely:
- avg_zero
- median
- stddev
- range (or rangeOf)
- last (or current)
As per:
https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.aggregate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I was just checking the implemented methods in the aggregation_function.go, forgot to check the whole list in buildin_function.go. (stddev and range seems are implemented already) Will update the support for methods that are already implemented.
For other aggregate functions that are not implemented, I will update the PR to document them for now. and do some query internally to see if any of them are actually being used, if used, I will implement them.
71fe66d
to
a219d40
Compare
return countSeries(ctx, multiplePathSpecs(series)) | ||
case Range, RangeOf, RangeOfSeries: | ||
return rangeOfSeries(ctx, series) | ||
case Last, KeepLastValue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we add "current" as alias for this too? Since it seems due to graphite documentation that "current" is an alias of "last"?
This function can be used with aggregation functions average (or avg), avg_zero, median, sum (or total), min, max, diff, stddev, count, range (or rangeOf) , multiply & last (or current).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "keepLastValue" actually shouldn't be part of the case statement here, "last" and "current" have different implementations completely to "keepLastValue" which is per timeseries (not across timeseries). See comment below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg, I will take another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been updated. KeepLastValue
is no longer part of the case statement, and Current
is now an alias for last.
case Range, RangeOf, RangeOfSeries: | ||
return rangeOfSeries(ctx, series) | ||
case Last, KeepLastValue: | ||
return keepLastValue(ctx, series, -1) // using default value, same as graphite-web python implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I don't think this is what "last" does for "aggregate" func. It looks like it takes the highest value per step over all series and gives back a single series with std deviation at each step across all time series (per datapoint):
https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/functions/safe.py#L99-L101
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I just played around with Grafana, and I am fairly certain that the last function does this:
if your seriesList is
[
[A, B, C],
[D, E, F],
[G, H, I]
]
once aggregated with last
, our output will be:
[G, H, I]
because last
just returns the last value at each point in time
case Stdev, Stddev: | ||
// default value for points in python implementation not clear, using an arbitrary 5 data points value should works fine. | ||
// default value for null value ratio is same as graphite-web python implementation. | ||
return stdev(ctx, series, 5, 0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So our stddev function looks like it does something different than normal graphite. Normal graphite looks like it takes the standard deviation across all time series, rather than the time series per series.
Perhaps we should do that for the aggregate function?
https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/functions/safe.py#L86-L96
In fact all series that are returned from "aggregate" in graphite-web seem to only return a single value, so should make sure that all these return just a single series.
Can see this that all results is just a single timeseries from the return of "aggregate" in graphite-web:
https://github.com/graphite-project/graphite-web/blob/master/webapp/graphite/render/functions.py#L195-L197
series = TimeSeries(name, start, end, step, values, xFilesFactor=xFilesFactor, tags=tags)
return [series]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. will take another look at the stddev func and another pass of all these returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this to use the same StdDev logic as the graphite-web
source code.
I'm taking over this PR for Evan. It's ready for another review. |
|
||
return math.Sqrt(variance) | ||
} | ||
func stDevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, this is also not a base graphite function, there's already a stdev
implementation in builtin_functions.go
; again, not opposed to it if it's useful for your needs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Graphite does support a base stddevSeries
function, so I need to rename this function that I've implemented, and register it as a real function.
Graphite docs --> https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.stddevSeries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One note though: this documentation says stddevSeries
is just an alias for calling aggregate with stddev
. Our implementation of stddev()
does not support that.
stddevSeries
takes multiple series and calculates one standard deviation value at each step. At step n
, stddevSeries
will make a list of every series' n
th value, and calculate the standard deviation of that list. This is different from how stdev
works:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried implementing it in the "conventional" aggregation function way (like this):
func stddevSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr(StddevSeries, ts.SeriesList(series)), ts.StdDev)
}
and it didn't work. There is something about the combineSeries()
function that doesn't work for StdDev, so I just implemented from scratch.
count += 1 | ||
} | ||
} | ||
avg := sum / count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add a guard here e.g. if count == 0 { return math.NaN() }
for _, value := range values { | ||
if !math.IsNaN(value) { | ||
sum += value | ||
count += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; ++
count := float64(0) | ||
sum := float64(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: var count, sum float64
is more go-ish
numSteps := firstSeries.Len() | ||
values := ts.NewValues(ctx, firstSeries.MillisPerStep(), numSteps) | ||
for i := 0; i < numSteps; i++ { | ||
valuesAtTime := []float64{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: define valuesAtTime := make([]float64, 0, numSteps)
outside the loop to avoid allocations, then valuesAtTime = valuesAtTime[:0]
within the loop to reset 👍
values := ts.NewValues(ctx, firstSeries.MillisPerStep(), numSteps) | ||
for i := 0; i < numSteps; i++ { | ||
valuesAtTime := []float64{} | ||
for _, series := range seriesList.Values { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a little dangerous since series within the same query, series may (potentially) have different resolutions here, e.g.
If you have foo.real.time
aggregated to the 10d:1m
namespace, but foo.capacity.planning
is aggregated only to 1y:1d
, a query for foo.*.*
will return both, and they'd have very different amounts of data.
Luckily, this is unlikely to come up unless you have any non-overlapping metrics in multiple Graphite namespaces, and I doubt this issue is local to this function only; wouldn't be surprised to see the same issue in some of the existing functions. Perhaps this should just do a fail-fast sanity check like this for now?
if l := seriesList.Len(); l != numSteps {
return nil, fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l)
}
If you're interested, it will probably be some downsampling we perform around here: https://github.com/m3db/m3/blob/master/src/query/graphite/storage/m3_wrapper.go#L212 to downsample all series to the largest possible resolution using something like ts.LTTB(series, start, end, newStep)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, added this check.
Average = "average" | ||
AverageSeries = "averageSeries" | ||
Avg = "avg" | ||
Count = "count" | ||
CountSeries = "countSeries" | ||
Current = "current" | ||
Diff = "diff" | ||
DiffSeries = "diffSeries" | ||
Empty = "" | ||
Last = "last" | ||
LastSeries = "lastSeries" | ||
Max = "max" | ||
MaxSeries = "maxSeries" | ||
Min = "min" | ||
MinSeries = "minSeries" | ||
Multiply = "multiply" | ||
MultiplySeries = "multiplySeries" | ||
Range = "range" | ||
RangeOf = "rangeOf" | ||
RangeOfSeries = "rangeOfSeries" | ||
Stdev = "stdev" | ||
Stddev = "stddev" | ||
StddevSeries = "stddevSeries" | ||
Sum = "sum" | ||
SumSeries = "sumSeries" | ||
Total = "total" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't think these are exported anywhere, can probably keep em package local here 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed them all to lowercase.
But then I had some duplicate declarations, so also add FnName
to the end of each one.
Hope this is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with minor nit on lastSeries
implementation series renaming, thanks again for contributing!
What this PR does / why we need it:
Add graphite function - aggregate
Special notes for your reviewer:
Forced local build succeeded, but health check keeps failing for coordinator. Not seeing extra error logs to help telling me what might be the places went wrong. Gonna try to see if CI jobs can help find anything. might be something stupid.
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: