Skip to content

Commit

Permalink
Merge #50933
Browse files Browse the repository at this point in the history
50933: colexec: speed up aggregator benchmark r=yuzefovich a=yuzefovich

Our existing BenchmarkAggregator takes unreasonably long time to run
because we benchmark every aggregate function against 48 different
configurations (2 aggregator kinds x 3 types x 4 group sizes x 2 nulls
configs). However, most of those numbers follow the same pattern and
don't provide us with a lot of useful data, so this commit splits the
benchmark into two and reduces the number of configs. One benchmark is
now responsible for measuring the performance of aggregator operators,
but we do so only on a single aggregate function and a single input
type which is sufficient. Another one is running the benchmark of all
aggregate functions with just 4 configurations with the goal of giving
us a sense of relative speeds of different functions. I kept the naming
of the runs of the former benchmark to be the same as the original
benchmark had so that we could perform a limited comparison against the
old branches.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 2, 2020
2 parents bfa4f8a + 0ac85ad commit fd47d01
Showing 1 changed file with 111 additions and 107 deletions.
218 changes: 111 additions & 107 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,119 +753,123 @@ func TestAggregatorRandom(t *testing.T) {
}
}

func BenchmarkAggregator(b *testing.B) {
func benchmarkAggregateFunction(
b *testing.B,
agg aggType,
aggFn execinfrapb.AggregatorSpec_Func,
typ *types.T,
groupSize int,
nullProb float64,
) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()

const numInputBatches = 64
const bytesFixedLength = 8
for _, aggFn := range []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_AVG,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_MIN,
execinfrapb.AggregatorSpec_MAX,
execinfrapb.AggregatorSpec_BOOL_AND,
execinfrapb.AggregatorSpec_BOOL_OR,
} {
fName := execinfrapb.AggregatorSpec_Func_name[int32(aggFn)]
b.Run(fName, func(b *testing.B) {
for _, agg := range aggTypes {
for typIdx, typ := range []*types.T{types.Int, types.Decimal, types.Bytes} {
for _, groupSize := range []int{1, 2, coldata.BatchSize() / 2, coldata.BatchSize()} {
for _, hasNulls := range []bool{false, true} {
for _, numInputBatches := range []int{64} {
if aggFn == execinfrapb.AggregatorSpec_BOOL_AND || aggFn == execinfrapb.AggregatorSpec_BOOL_OR {
typ = types.Bool
if typIdx > 0 {
// We don't need to run the benchmark of bool_and and
// bool_or multiple times, so we skip all runs except
// for the first one.
continue
}
}
if aggFn == execinfrapb.AggregatorSpec_SUM_INT && typ.Family() != types.IntFamily {
// sum_int only works on integers.
continue
}
b.Run(fmt.Sprintf("%s/%s/groupSize=%d/hasNulls=%t/numInputBatches=%d", agg.name, typ.String(),
groupSize, hasNulls, numInputBatches),
func(b *testing.B) {
typs := []*types.T{types.Int, typ}
nTuples := numInputBatches * coldata.BatchSize()
cols := []coldata.Vec{
testAllocator.NewMemColumn(types.Int, nTuples),
testAllocator.NewMemColumn(typ, nTuples),
}
groups := cols[0].Int64()
curGroup := -1
for i := 0; i < nTuples; i++ {
if groupSize == 1 || i%groupSize == 0 {
curGroup++
}
groups[i] = int64(curGroup)
}
nullProb := 0.0
if hasNulls {
nullProb = nullProbability
}
coldatatestutils.RandomVec(coldatatestutils.RandomVecArgs{
Rand: rng,
Vec: cols[1],
N: nTuples,
NullProbability: nullProb,
BytesFixedLength: bytesFixedLength,
})
if typ.Identical(types.Int) && aggFn == execinfrapb.AggregatorSpec_SUM_INT {
// Integer summation of random Int64 values can lead
// to overflow, and we will panic. To go around it, we
// restrict the range of values.
vals := cols[1].Int64()
for i := range vals {
vals[i] = vals[i] % 1024
}
}
source := newChunkingBatchSource(typs, cols, nTuples)
typs := []*types.T{types.Int, typ}
nTuples := numInputBatches * coldata.BatchSize()
cols := []coldata.Vec{
testAllocator.NewMemColumn(types.Int, nTuples),
testAllocator.NewMemColumn(typ, nTuples),
}
groups := cols[0].Int64()
curGroup := -1
for i := 0; i < nTuples; i++ {
if groupSize == 1 || i%groupSize == 0 {
curGroup++
}
groups[i] = int64(curGroup)
}
coldatatestutils.RandomVec(coldatatestutils.RandomVecArgs{
Rand: rng,
Vec: cols[1],
N: nTuples,
NullProbability: nullProb,
BytesFixedLength: bytesFixedLength,
})
if aggFn == execinfrapb.AggregatorSpec_SUM_INT {
// Integer summation of random Int64 values can lead
// to overflow, and we will panic. To go around it, we
// restrict the range of values.
vals := cols[1].Int64()
for i := range vals {
vals[i] = vals[i] % 1024
}
}
source := newChunkingBatchSource(typs, cols, nTuples)

nCols := 1
if aggFn == execinfrapb.AggregatorSpec_COUNT_ROWS {
nCols = 0
}
a, err := agg.new(
testAllocator,
source,
typs,
[]execinfrapb.AggregatorSpec_Func{aggFn},
[]uint32{0},
[][]uint32{[]uint32{1}[:nCols]},
false, /* isScalar */
)
if err != nil {
b.Skip()
}
a.Init()

b.ResetTimer()

// Only count the int64 column.
b.SetBytes(int64(8 * nTuples))
for i := 0; i < b.N; i++ {
a.(resetter).reset(ctx)
source.reset()
// Exhaust aggregator until all batches have been read.
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
}
}
},
)
}
}
}
nCols := 1
if aggFn == execinfrapb.AggregatorSpec_COUNT_ROWS {
nCols = 0
}
a, err := agg.new(
testAllocator,
source,
typs,
[]execinfrapb.AggregatorSpec_Func{aggFn},
[]uint32{0},
[][]uint32{[]uint32{1}[:nCols]},
false, /* isScalar */
)
if err != nil {
b.Skip()
}
a.Init()

b.ResetTimer()

fName := execinfrapb.AggregatorSpec_Func_name[int32(aggFn)]
// Only count the aggregation column.
elementSize := 8
if typ.Identical(types.Bool) {
elementSize = 1
}
b.Run(fmt.Sprintf(
"%s/%s/%s/groupSize=%d/hasNulls=%t/numInputBatches=%d",
fName, agg.name, typ.String(), groupSize, nullProb > 0, numInputBatches),
func(b *testing.B) {
b.SetBytes(int64(elementSize * nTuples))
for i := 0; i < b.N; i++ {
a.(resetter).reset(ctx)
source.reset()
// Exhaust aggregator until all batches have been read.
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
}
}
})
},
)
}

// BenchmarkAggregator runs the benchmark both aggregators with diverse data
// source parameters but using a single aggregate function. The goal of this
// benchmark is measuring the performance of the aggregators themselves
// depending on the parameters of the input.
func BenchmarkAggregator(b *testing.B) {
aggFn := execinfrapb.AggregatorSpec_MIN
for _, agg := range aggTypes {
for _, groupSize := range []int{1, 2, 32, 128, coldata.BatchSize() / 2, coldata.BatchSize()} {
for _, nullProb := range []float64{0.0, nullProbability} {
benchmarkAggregateFunction(b, agg, aggFn, types.Int, groupSize, nullProb)
}
}
}
}

// BenchmarkAllAggregateFunctions runs the benchmark of all supported aggregate
// functions in 4 configurations (hash vs ordered, and small groups vs big
// groups). Such configurations were chosen since they provide good enough
// signal on the speeds of aggregate functions. For more diverse configurations
// look at BenchmarkAggregator.
func BenchmarkAllAggregateFunctions(b *testing.B) {
for _, aggFn := range SupportedAggFns {
for _, agg := range aggTypes {
typ := types.Int
if aggFn == execinfrapb.AggregatorSpec_BOOL_AND || aggFn == execinfrapb.AggregatorSpec_BOOL_OR {
typ = types.Bool
}
for _, groupSize := range []int{1, coldata.BatchSize()} {
benchmarkAggregateFunction(b, agg, aggFn, typ, groupSize, nullProbability)
}
}
}
}

Expand Down

0 comments on commit fd47d01

Please sign in to comment.