Skip to content

Commit

Permalink
colexec: fix hash aggregator when spilling to disk
Browse files Browse the repository at this point in the history
In some cases the aggregation is expected to maintain the required
ordering in order to eliminate an explicit sort afterwards. It is always
the case that the required ordering is a prefix of ordered grouping
columns. With the introduction of disk spilling for the vectorized hash
aggregator in 21.1 release the ordering was no longer maintained if the
spilling occurs. In all previous cases (row-by-row processors and
in-memory columnar operator) the ordering was maintained by
construction, but with `hashBasedPartitioner` the ordering can be
arbitrary.

In order to fix this issue we now do what we did for the external
distinct - we plan an external sort on top of the external hash
aggregator to restore the required ordering. Note that this will only
kick in if the spilling to disk occurred. This required changes to the
AggregatorSpec to propagate the required output ordering.

Release note (bug fix): In 21.1 alpha and beta releases CockroachDB
could return the output in an incorrect order if the query containing
hash aggregation was executed via the vectorized engine and spilling to
temporary storage was required, in some cases.
  • Loading branch information
yuzefovich committed Apr 9, 2021
1 parent d511fb4 commit ef13232
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 253 deletions.
11 changes: 10 additions & 1 deletion pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ func (tc *aggregatorTestCase) init() error {
GroupCols: tc.groupCols,
Aggregations: aggregations,
}
if !tc.unorderedInput {
// If we have ordered on grouping columns input, then we'll require the
// output to also have the same ordering.
outputOrdering := execinfrapb.Ordering{Columns: make([]execinfrapb.Ordering_Column, len(tc.groupCols))}
for i, col := range tc.groupCols {
outputOrdering.Columns[i].ColIdx = col
}
tc.spec.OutputOrdering = outputOrdering
}
return nil
}

Expand Down Expand Up @@ -762,7 +771,7 @@ func TestAggregators(t *testing.T) {
}
log.Infof(ctx, "%s/%s", tc.name, agg.name)
verifier := colexectestutils.OrderedVerifier
if agg.name == "hash" {
if tc.unorderedInput {
verifier = colexectestutils.UnorderedVerifier
}
colexectestutils.RunTestsWithTyps(t, testAllocator, []colexectestutils.Tuples{tc.input}, [][]*types.T{tc.typs}, tc.expected, verifier,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ func RunTestsWithoutAllNullsInjection(
if err != nil {
t.Fatal(err)
}
// We might short-circuit, so defer the closing of the operator.
defer closeIfCloser(ctx, t, op)
op.Init()
b := op.Next(ctx)
if b.Length() == 0 {
Expand Down Expand Up @@ -495,7 +497,6 @@ func RunTestsWithoutAllNullsInjection(
assert.False(t, maybeHasNulls(b))
}
}
closeIfCloser(ctx, t, op)
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/external_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func NewExternalDistinct(
// No particular output ordering is required.
return ed
}
// TODO(yuzefovich): the fact that we're planning an additional external
// sort isn't accounted for when considering the number file descriptors to
// acquire. Not urgent, but it should be fixed.
maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, numRequiredActivePartitions)
return createDiskBackedSorter(ed, inputTypes, outputOrdering.Columns, maxNumberActivePartitions)
}
19 changes: 18 additions & 1 deletion pkg/sql/colexec/external_hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewExternalHashAggregator(
}
return diskBackedFallbackOp
}
return newHashBasedPartitioner(
eha := newHashBasedPartitioner(
newAggArgs.Allocator,
flowCtx,
args,
Expand All @@ -84,6 +84,23 @@ func NewExternalHashAggregator(
diskAcc,
ehaNumRequiredActivePartitions,
)
// The last thing we need to do is making sure that the output has the
// desired ordering if any is required. Note that since the input is assumed
// to be already ordered according to the desired ordering, for the
// in-memory hash aggregation we get it for "free" since it doesn't change
// the ordering of tuples. However, that is not that the case with the
// hash-based partitioner, so we might need to plan an external sort on top
// of it.
outputOrdering := args.Spec.Core.Aggregator.OutputOrdering
if len(outputOrdering.Columns) == 0 {
// No particular output ordering is required.
return eha
}
// TODO(yuzefovich): the fact that we're planning an additional external
// sort isn't accounted for when considering the number file descriptors to
// acquire. Not urgent, but it should be fixed.
maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, ehaNumRequiredActivePartitions)
return createDiskBackedSorter(eha, args.Spec.ResultTypes, outputOrdering.Columns, maxNumberActivePartitions)
}

// HashAggregationDiskSpillingEnabled is a cluster setting that allows to
Expand Down
33 changes: 22 additions & 11 deletions pkg/sql/colexec/external_hash_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,33 @@ func TestExternalHashAggregator(t *testing.T) {
&evalCtx, nil /* semaCtx */, tc.spec.Aggregations, tc.typs,
)
require.NoError(t, err)
verifier := colexectestutils.OrderedVerifier
if tc.unorderedInput {
verifier = colexectestutils.UnorderedVerifier
}
var numExpectedClosers int
if diskSpillingEnabled {
// The external sorter and the disk spiller should be added
// as Closers (the latter is responsible for closing the
// in-memory hash aggregator as well as the external one).
numExpectedClosers = 2
if len(tc.spec.OutputOrdering.Columns) > 0 {
// When the output ordering is required, we also plan
// another external sort.
numExpectedClosers++
}
} else {
// Only the in-memory hash aggregator should be added.
numExpectedClosers = 1
}
var semsToCheck []semaphore.Semaphore
colexectestutils.RunTestsWithTyps(
t,
testAllocator,
[]colexectestutils.Tuples{tc.input},
[][]*types.T{tc.typs},
tc.expected,
colexectestutils.UnorderedVerifier,
verifier,
func(input []colexecop.Operator) (colexecop.Operator, error) {
sem := colexecop.NewTestingSemaphore(ehaNumRequiredFDs)
semsToCheck = append(semsToCheck, sem)
Expand All @@ -112,16 +131,8 @@ func TestExternalHashAggregator(t *testing.T) {
)
accounts = append(accounts, accs...)
monitors = append(monitors, mons...)
if diskSpillingEnabled {
// Check that the external sorter and the disk
// spiller were added as Closers (the latter is
// responsible for closing the in-memory hash
// aggregator as well as the external one).
require.Equal(t, 2, len(closers))
} else {
// Only the in-memory hash aggregator has been
// created.
require.Equal(t, 1, len(closers))
require.Equal(t, numExpectedClosers, len(closers))
if !diskSpillingEnabled {
// Sanity check that indeed only the in-memory hash
// aggregator was created.
_, isHashAgg := op.(*hashAggregator)
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/hash_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func TestHashAggregator(t *testing.T) {
&evalCtx, nil /* semaCtx */, tc.spec.Aggregations, tc.typs,
)
require.NoError(t, err)
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.input}, tc.expected, colexectestutils.UnorderedVerifier, func(sources []colexecop.Operator) (colexecop.Operator, error) {
verifier := colexectestutils.OrderedVerifier
if tc.unorderedInput {
verifier = colexectestutils.UnorderedVerifier
}
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.input}, tc.expected, verifier, func(sources []colexecop.Operator) (colexecop.Operator, error) {
return NewHashAggregator(&colexecagg.NewAggregatorArgs{
Allocator: testAllocator,
MemAccount: testMemAcc,
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,12 +1593,17 @@ func (dsp *DistSQLPlanner) planAggregators(
var finalAggsSpec execinfrapb.AggregatorSpec
var finalAggsPost execinfrapb.PostProcessSpec

// Note that we pass in nil as the second argument because we will have a
// simple 1-to-1 PlanToStreamColMap in the end.
finalOutputOrdering := dsp.convertOrdering(info.reqOrdering, nil /* planToStreamColMap */)

if !multiStage {
finalAggsSpec = execinfrapb.AggregatorSpec{
Type: aggType,
Aggregations: info.aggregations,
GroupCols: groupCols,
OrderedGroupCols: orderedGroupCols,
OutputOrdering: finalOutputOrdering,
}
} else {
// Some aggregations might need multiple aggregation as part of
Expand Down Expand Up @@ -1820,6 +1825,7 @@ func (dsp *DistSQLPlanner) planAggregators(
Aggregations: localAggs,
GroupCols: groupCols,
OrderedGroupCols: orderedGroupCols,
OutputOrdering: execinfrapb.Ordering{Columns: ordCols},
}

p.AddNoGroupingStage(
Expand All @@ -1834,6 +1840,7 @@ func (dsp *DistSQLPlanner) planAggregators(
Aggregations: finalAggs,
GroupCols: finalGroupCols,
OrderedGroupCols: finalOrderedGroupCols,
OutputOrdering: finalOutputOrdering,
}

if needRender {
Expand Down
Loading

0 comments on commit ef13232

Please sign in to comment.