Skip to content

Commit

Permalink
[query] Add determinism to Graphite sorting and reduce functions (#3164)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Feb 4, 2021
1 parent 6653da6 commit 985f866
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 62 deletions.
1 change: 1 addition & 0 deletions src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (h *renderHandler) serveHTTP(
for _, r := range results {
numSeries += r.Len()
if !r.SortApplied {
// Use sort.Stable for deterministic output.
sort.Stable(ts.SeriesByName(r.Values))
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,10 @@ func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, te
wg.Wait()
}

r := ts.NewSeriesList()
// Retain metadata but we definitely did not retain sort order.
r := ts.SeriesList(seriesList)
r.Values = output
r.SortApplied = false
return r, nil
}

Expand Down
61 changes: 36 additions & 25 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func sortByName(_ *common.Context, series singlePathSpec) (ts.SeriesList, error)
sorted[i] = series.Values[i]
}

sort.Sort(ts.SeriesByName(sorted))
// Use sort.Stable for deterministic output.
sort.Stable(ts.SeriesByName(sorted))

r := ts.SeriesList(series)
r.Values = sorted
Expand All @@ -105,13 +106,16 @@ func sortByMaxima(ctx *common.Context, series singlePathSpec) (ts.SeriesList, er
// the response time metric will be plotted only when the maximum value of the
// corresponding request/s metric is > 10
// Example: useSeriesAbove(ganglia.metric1.reqs,10,"reqs","time")
func useSeriesAbove(ctx *common.Context, seriesList singlePathSpec, maxAllowedValue float64, search, replace string) (ts.SeriesList, error) {
func useSeriesAbove(
ctx *common.Context,
seriesList singlePathSpec,
maxAllowedValue float64,
search, replace string,
) (ts.SeriesList, error) {
var (
mu sync.Mutex
wg sync.WaitGroup
multiErr xerrors.MultiError
newNames []string

mu sync.Mutex
multiErr xerrors.MultiError
newNames []string
output = make([]*ts.Series, 0, len(seriesList.Values))
maxConcurrency = runtime.NumCPU() / 2
)
Expand All @@ -124,36 +128,39 @@ func useSeriesAbove(ctx *common.Context, seriesList singlePathSpec, maxAllowedVa
}

for _, newNameChunk := range chunkArrayHelper(newNames, maxConcurrency) {
if multiErr.LastError() != nil {
return ts.NewSeriesList(), multiErr.LastError()
}

var wg sync.WaitGroup
for _, newTarget := range newNameChunk {
newTarget := newTarget
wg.Add(1)
go func() {
defer wg.Done()
resultSeriesList, err := evaluateTarget(ctx, newTarget)

mu.Lock()
defer mu.Unlock()

if err != nil {
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
return
}

mu.Lock()
for _, resultSeries := range resultSeriesList.Values {
resultSeries.Specification = newTarget
output = append(output, resultSeries)
}
mu.Unlock()
}()
}
wg.Wait()

if err := multiErr.LastError(); err != nil {
return ts.NewSeriesList(), err
}
}

r := ts.NewSeriesList()
// Retain metadata but mark as unsorted since this was done in parallel.
r := ts.SeriesList(seriesList)
r.Values = output
r.SortApplied = false
return r, nil
}

Expand Down Expand Up @@ -263,8 +270,16 @@ func limit(_ *common.Context, series singlePathSpec, n int) (ts.SeriesList, erro
}
upperBound := int(math.Min(float64(len(series.Values)), float64(n)))

if !series.SortApplied {
// If sort not applied then sort to get deterministic results.
// Use sort.Stable for deterministic output.
sort.Stable(ts.SeriesByName(series.Values))
}

r := ts.SeriesList(series)
r.Values = series.Values[0:upperBound]
// Note: If wasn't sorted we applied a sort by name for determinism.
r.SortApplied = true
return r, nil
}

Expand All @@ -283,7 +298,8 @@ func grep(_ *common.Context, seriesList singlePathSpec, regex string) (ts.Series
}
}

r := ts.NewSeriesList()
// Retain sort applied function.
r := ts.SeriesList(seriesList)
r.Values = filtered
return r, nil
}
Expand Down Expand Up @@ -639,16 +655,11 @@ func removeEmptySeries(ctx *common.Context, input singlePathSpec) (ts.SeriesList
}

func takeByFunction(input singlePathSpec, n int, sr ts.SeriesReducer, sort ts.Direction) (ts.SeriesList, error) {
series, err := ts.SortSeries(input.Values, sr, sort)
result, err := ts.SortSeries(ts.SeriesList(input), sr, sort)
if err != nil {
return ts.NewSeriesList(), err
}
r := ts.SeriesList{
Values: series,
SortApplied: true,
Metadata: input.Metadata,
}
return common.Head(r, n)
return common.Head(result, n)
}

func getReducer(f string) (ts.SeriesReducer, error) {
Expand Down Expand Up @@ -2116,7 +2127,7 @@ func newMovingBinaryTransform(

results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i, _ := range bootstrapList.Values {
for i := range bootstrapList.Values {
series := original.Values[i]
windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
Expand Down
90 changes: 67 additions & 23 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package native
import (
"fmt"
"math"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -691,11 +692,11 @@ func TestTransformNull(t *testing.T) {
},
42.5,
[]common.TestSeries{
common.TestSeries{
{
Name: "transformNull(foo1,42.500)",
Data: []float64{0, 42.5, 2.0, 42.5, 3.0},
},
common.TestSeries{
{
Name: "transformNull(foo2,42.500)",
Data: []float64{42.5, 7, 2.0, 6.5, 42.5},
},
Expand All @@ -710,11 +711,11 @@ func TestTransformNull(t *testing.T) {
},
-0.5,
[]common.TestSeries{
common.TestSeries{
{
Name: "transformNull(foo1,-0.500)",
Data: []float64{0, 1.0, 2.0, -0.5, 3.0},
},
common.TestSeries{
{
Name: "transformNull(foo2,-0.500)",
Data: []float64{-0.5, 7, -0.5, 6.5, -0.5},
},
Expand Down Expand Up @@ -1470,18 +1471,18 @@ func TestFallbackSeries(t *testing.T) {
}{
{
nil,
[]common.TestSeries{common.TestSeries{"output", []float64{0, 1.0}}},
[]common.TestSeries{common.TestSeries{"output", []float64{0, 1.0}}},
[]common.TestSeries{{"output", []float64{0, 1.0}}},
[]common.TestSeries{{"output", []float64{0, 1.0}}},
},
{
[]common.TestSeries{},
[]common.TestSeries{common.TestSeries{"output", []float64{0, 1.0}}},
[]common.TestSeries{common.TestSeries{"output", []float64{0, 1.0}}},
[]common.TestSeries{{"output", []float64{0, 1.0}}},
[]common.TestSeries{{"output", []float64{0, 1.0}}},
},
{
[]common.TestSeries{common.TestSeries{"output", []float64{0, 2.0}}},
[]common.TestSeries{common.TestSeries{"fallback", []float64{0, 1.0}}},
[]common.TestSeries{common.TestSeries{"output", []float64{0, 2.0}}},
[]common.TestSeries{{"output", []float64{0, 2.0}}},
[]common.TestSeries{{"fallback", []float64{0, 1.0}}},
[]common.TestSeries{{"output", []float64{0, 2.0}}},
},
}

Expand Down Expand Up @@ -2377,6 +2378,49 @@ func TestLimit(t *testing.T) {
require.Equal(t, len(testInput), testSeries.Len())
}

func TestLimitSortStable(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

constValues := common.NewTestSeriesValues(ctx, 1000, []float64{1, 2, 3, 4})
series := []*ts.Series{
ts.NewSeries(ctx, "qux", time.Now(), constValues),
ts.NewSeries(ctx, "bar", time.Now(), constValues),
ts.NewSeries(ctx, "foo", time.Now(), constValues),
ts.NewSeries(ctx, "baz", time.Now(), constValues),
}

// Check that if input order is random that the same first
// series is chosen deterministically each time if the results weren't
// already ordered.
var lastOrder []string
for i := 0; i < 100; i++ {
rand.Shuffle(len(series), func(i, j int) {
series[i], series[j] = series[j], series[i]
})

result, err := limit(ctx, singlePathSpec(ts.SeriesList{
Values: series,
SortApplied: false,
}), 2)
require.NoError(t, err)

order := make([]string, 0, len(result.Values))
for _, series := range result.Values {
order = append(order, series.Name())
}

expectedOrder := lastOrder
lastOrder = order
if expectedOrder == nil {
continue
}

require.Equal(t, expectedOrder, order)
}

}

func TestHitCount(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2701,8 +2745,8 @@ func TestSquareRoot(t *testing.T) {
inputSeries = append(inputSeries, series)
}
expected := []common.TestSeries{
common.TestSeries{Name: "squareRoot(foo)", Data: []float64{1.0, nan, 1.73205, nan}},
common.TestSeries{Name: "squareRoot(bar)", Data: []float64{2.0}},
{Name: "squareRoot(foo)", Data: []float64{1.0, nan, 1.73205, nan}},
{Name: "squareRoot(bar)", Data: []float64{2.0}},
}
results, err := squareRoot(ctx, singlePathSpec{
Values: inputSeries,
Expand Down Expand Up @@ -2744,7 +2788,7 @@ func TestStdev(t *testing.T) {
inputSeries = append(inputSeries, series)
}
expected := []common.TestSeries{
common.TestSeries{Name: "stddev(foo,3)", Data: []float64{0.0, 0.5, 0.8165, 0.8165, 0.5, 0.0, nan, 0.0, 0.5, 0.5, 0.0}},
{Name: "stddev(foo,3)", Data: []float64{0.0, 0.5, 0.8165, 0.8165, 0.5, 0.0, nan, 0.0, 0.5, 0.5, 0.0}},
}
results, err := stdev(ctx, singlePathSpec{
Values: inputSeries,
Expand Down Expand Up @@ -2828,11 +2872,11 @@ func testPercentileFunction(t *testing.T, f percentileFunction, expected []commo

func TestNPercentile(t *testing.T) {
expected := []common.TestSeries{
common.TestSeries{
{
Name: "nPercentile(bar, 40.123)",
Data: []float64{3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0},
},
common.TestSeries{
{
Name: "nPercentile(baz, 40.123)",
Data: []float64{1.0},
},
Expand All @@ -2843,15 +2887,15 @@ func TestNPercentile(t *testing.T) {
func TestRemoveAbovePercentile(t *testing.T) {
nan := math.NaN()
expected := []common.TestSeries{
common.TestSeries{
{
Name: "removeAbovePercentile(foo, 40.123)",
Data: []float64{nan, nan, nan, nan, nan},
},
common.TestSeries{
{
Name: "removeAbovePercentile(bar, 40.123)",
Data: []float64{3.0, 2.0, nan, nan, 1.0, nan, nan, nan},
},
common.TestSeries{
{
Name: "removeAbovePercentile(baz, 40.123)",
Data: []float64{1.0},
},
Expand All @@ -2864,15 +2908,15 @@ func TestRemoveBelowPercentile(t *testing.T) {
nan := math.NaN()

expected := []common.TestSeries{
common.TestSeries{
{
Name: "removeBelowPercentile(foo, 40.123)",
Data: []float64{nan, nan, nan, nan, nan},
},
common.TestSeries{
{
Name: "removeBelowPercentile(bar, 40.123)",
Data: []float64{3.0, nan, 4.0, nan, nan, 6.0, nan, 5.0},
},
common.TestSeries{
{
Name: "removeBelowPercentile(baz, 40.123)",
Data: []float64{1.0},
},
Expand Down Expand Up @@ -2975,7 +3019,7 @@ func TestChanged(t *testing.T) {
)

expected := []common.TestSeries{
common.TestSeries{
{
Name: "changed(foo)",
Data: []float64{0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0},
},
Expand Down
5 changes: 3 additions & 2 deletions src/query/graphite/native/summarize.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func smartSummarize(
results[i] = series.RenamedTo(newName)
}

r := ts.NewSeriesList()
// Retain whether sort was applied or not and metadata.
r := ts.SeriesList(series)
r.Values = results
return r, nil
}
Expand Down Expand Up @@ -242,7 +243,7 @@ var (
specificationFunc: averageSpecificationFunc,
}
medianFuncInfo = funcInfo{
fname: "median",
fname: "median",
// median does not have a consolidationFunc
specificationFunc: medianSpecificationFunc,
}
Expand Down
Loading

0 comments on commit 985f866

Please sign in to comment.