diff --git a/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go b/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go index f5280c75edcb..b61b29fb789a 100644 --- a/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go +++ b/pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go @@ -60,9 +60,11 @@ type bool_OP_TYPE_AGGKINDAgg struct { // {{else}} hashAggregateFuncBase // {{end}} - col []bool - sawNonNull bool - curAgg bool + col []bool + curAgg bool + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool } var _ AggregateFunc = &bool_OP_TYPE_AGGKINDAgg{} @@ -134,7 +136,7 @@ func (a *bool_OP_TYPE_AGGKINDAgg) Flush(outputIdx int) { outputIdx = a.curIdx a.curIdx++ // {{end}} - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(outputIdx) } else { a.col[outputIdx] = a.curAgg @@ -151,6 +153,7 @@ func (a *bool_OP_TYPE_AGGKINDAgg) Reset() { // _DEFAULT_VAL is false. // */}} a.curAgg = _DEFAULT_VAL + a.foundNonNullForCurrentGroup = false } type bool_OP_TYPE_AGGKINDAggAlloc struct { @@ -190,7 +193,7 @@ func _ACCUMULATE_BOOLEAN( // {{end}} if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg @@ -199,7 +202,7 @@ func _ACCUMULATE_BOOLEAN( // {{with .Global}} a.curAgg = _DEFAULT_VAL // {{end}} - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -218,7 +221,7 @@ func _ACCUMULATE_BOOLEAN( // {{with .Global}} _ASSIGN_BOOL_OP(a.curAgg, a.curAgg, col[i]) // {{end}} - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } // {{end}} diff --git a/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go b/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go index 3e45a0d67fe7..f20b1fc60ff0 100644 --- a/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/hash_bool_and_or_agg.eg.go @@ -31,9 +31,11 @@ func newBoolAndHashAggAlloc( type boolAndHashAgg struct { hashAggregateFuncBase - col []bool - sawNonNull bool - curAgg bool + col []bool + curAgg bool + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool } var _ AggregateFunc = &boolAndHashAgg{} @@ -59,7 +61,7 @@ func (a *boolAndHashAgg) Compute( isNull = nulls.NullAt(i) if !isNull { a.curAgg = a.curAgg && col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -70,7 +72,7 @@ func (a *boolAndHashAgg) Compute( isNull = false if !isNull { a.curAgg = a.curAgg && col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -85,7 +87,7 @@ func (a *boolAndHashAgg) Compute( } func (a *boolAndHashAgg) Flush(outputIdx int) { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(outputIdx) } else { a.col[outputIdx] = a.curAgg @@ -94,6 +96,7 @@ func (a *boolAndHashAgg) Flush(outputIdx int) { func (a *boolAndHashAgg) Reset() { a.curAgg = true + a.foundNonNullForCurrentGroup = false } type boolAndHashAggAlloc struct { @@ -129,9 +132,11 @@ func newBoolOrHashAggAlloc( type boolOrHashAgg struct { hashAggregateFuncBase - col []bool - sawNonNull bool - curAgg bool + col []bool + curAgg bool + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool } var _ AggregateFunc = &boolOrHashAgg{} @@ -157,7 +162,7 @@ func (a *boolOrHashAgg) Compute( isNull = nulls.NullAt(i) if !isNull { a.curAgg = a.curAgg || col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -168,7 +173,7 @@ func (a *boolOrHashAgg) Compute( isNull = false if !isNull { a.curAgg = a.curAgg || col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -183,7 +188,7 @@ func (a *boolOrHashAgg) Compute( } func (a *boolOrHashAgg) Flush(outputIdx int) { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(outputIdx) } else { a.col[outputIdx] = a.curAgg @@ -192,6 +197,7 @@ func (a *boolOrHashAgg) Flush(outputIdx int) { func (a *boolOrHashAgg) Reset() { a.curAgg = false + a.foundNonNullForCurrentGroup = false } type boolOrHashAggAlloc struct { diff --git a/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go b/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go index 222d1a11729c..59b1930ff17c 100644 --- a/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go +++ b/pkg/sql/colexec/colexecagg/ordered_bool_and_or_agg.eg.go @@ -31,9 +31,11 @@ func newBoolAndOrderedAggAlloc( type boolAndOrderedAgg struct { orderedAggregateFuncBase - col []bool - sawNonNull bool - curAgg bool + col []bool + curAgg bool + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool } var _ AggregateFunc = &boolAndOrderedAgg{} @@ -62,14 +64,14 @@ func (a *boolAndOrderedAgg) Compute( //gcassert:bce if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = true - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -79,7 +81,7 @@ func (a *boolAndOrderedAgg) Compute( if !isNull { //gcassert:bce a.curAgg = a.curAgg && col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -88,14 +90,14 @@ func (a *boolAndOrderedAgg) Compute( //gcassert:bce if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = true - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -105,7 +107,7 @@ func (a *boolAndOrderedAgg) Compute( if !isNull { //gcassert:bce a.curAgg = a.curAgg && col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -116,14 +118,14 @@ func (a *boolAndOrderedAgg) Compute( for _, i := range sel { if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = true - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -132,7 +134,7 @@ func (a *boolAndOrderedAgg) Compute( isNull = nulls.NullAt(i) if !isNull { a.curAgg = a.curAgg && col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -140,14 +142,14 @@ func (a *boolAndOrderedAgg) Compute( for _, i := range sel { if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = true - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -156,7 +158,7 @@ func (a *boolAndOrderedAgg) Compute( isNull = false if !isNull { a.curAgg = a.curAgg && col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -175,7 +177,7 @@ func (a *boolAndOrderedAgg) Flush(outputIdx int) { _ = outputIdx outputIdx = a.curIdx a.curIdx++ - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(outputIdx) } else { a.col[outputIdx] = a.curAgg @@ -185,6 +187,7 @@ func (a *boolAndOrderedAgg) Flush(outputIdx int) { func (a *boolAndOrderedAgg) Reset() { a.orderedAggregateFuncBase.Reset() a.curAgg = true + a.foundNonNullForCurrentGroup = false } type boolAndOrderedAggAlloc struct { @@ -220,9 +223,11 @@ func newBoolOrOrderedAggAlloc( type boolOrOrderedAgg struct { orderedAggregateFuncBase - col []bool - sawNonNull bool - curAgg bool + col []bool + curAgg bool + // foundNonNullForCurrentGroup tracks if we have seen any non-null values + // for the group that is currently being aggregated. + foundNonNullForCurrentGroup bool } var _ AggregateFunc = &boolOrOrderedAgg{} @@ -251,14 +256,14 @@ func (a *boolOrOrderedAgg) Compute( //gcassert:bce if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = false - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -268,7 +273,7 @@ func (a *boolOrOrderedAgg) Compute( if !isNull { //gcassert:bce a.curAgg = a.curAgg || col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -277,14 +282,14 @@ func (a *boolOrOrderedAgg) Compute( //gcassert:bce if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = false - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -294,7 +299,7 @@ func (a *boolOrOrderedAgg) Compute( if !isNull { //gcassert:bce a.curAgg = a.curAgg || col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -305,14 +310,14 @@ func (a *boolOrOrderedAgg) Compute( for _, i := range sel { if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = false - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -321,7 +326,7 @@ func (a *boolOrOrderedAgg) Compute( isNull = nulls.NullAt(i) if !isNull { a.curAgg = a.curAgg || col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -329,14 +334,14 @@ func (a *boolOrOrderedAgg) Compute( for _, i := range sel { if groups[i] { if !a.isFirstGroup { - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(a.curIdx) } else { a.col[a.curIdx] = a.curAgg } a.curIdx++ a.curAgg = false - a.sawNonNull = false + a.foundNonNullForCurrentGroup = false } a.isFirstGroup = false } @@ -345,7 +350,7 @@ func (a *boolOrOrderedAgg) Compute( isNull = false if !isNull { a.curAgg = a.curAgg || col[i] - a.sawNonNull = true + a.foundNonNullForCurrentGroup = true } } @@ -364,7 +369,7 @@ func (a *boolOrOrderedAgg) Flush(outputIdx int) { _ = outputIdx outputIdx = a.curIdx a.curIdx++ - if !a.sawNonNull { + if !a.foundNonNullForCurrentGroup { a.nulls.SetNull(outputIdx) } else { a.col[outputIdx] = a.curAgg @@ -374,6 +379,7 @@ func (a *boolOrOrderedAgg) Flush(outputIdx int) { func (a *boolOrOrderedAgg) Reset() { a.orderedAggregateFuncBase.Reset() a.curAgg = false + a.foundNonNullForCurrentGroup = false } type boolOrOrderedAggAlloc struct { diff --git a/pkg/sql/colexec/external_distinct_test.go b/pkg/sql/colexec/external_distinct_test.go index 7d7d2feb4101..5a482f6ec17f 100644 --- a/pkg/sql/colexec/external_distinct_test.go +++ b/pkg/sql/colexec/external_distinct_test.go @@ -56,6 +56,8 @@ func TestExternalDistinct(t *testing.T) { accounts []*mon.BoundAccount monitors []*mon.BytesMonitor ) + rng, _ := randutil.NewPseudoRand() + numForcedRepartitions := rng.Intn(5) // Test the case in which the default memory is used as well as the case in // which the distinct spills to disk. for _, spillForced := range []bool{false, true} { @@ -83,8 +85,8 @@ func TestExternalDistinct(t *testing.T) { outputOrdering = convertDistinctColsToOrdering(tc.distinctCols) } distinct, newAccounts, newMonitors, closers, err := createExternalDistinct( - ctx, flowCtx, input, tc.typs, tc.distinctCols, - outputOrdering, queueCfg, sem, nil, /* spillingCallbackFn */ + ctx, flowCtx, input, tc.typs, tc.distinctCols, outputOrdering, + queueCfg, sem, nil /* spillingCallbackFn */, numForcedRepartitions, ) // Check that the external distinct and the disk-backed sort // were added as Closers. @@ -185,6 +187,7 @@ func TestExternalDistinctSpilling(t *testing.T) { var numRuns, numSpills int var semsToCheck []semaphore.Semaphore + numForcedRepartitions := rng.Intn(5) runTestsWithoutAllNullsInjection( t, []tuples{tups}, @@ -202,7 +205,7 @@ func TestExternalDistinctSpilling(t *testing.T) { var outputOrdering execinfrapb.Ordering distinct, newAccounts, newMonitors, closers, err := createExternalDistinct( ctx, flowCtx, input, typs, distinctCols, outputOrdering, queueCfg, - sem, func() { numSpills++ }, + sem, func() { numSpills++ }, numForcedRepartitions, ) require.NoError(t, err) // Check that the external distinct and the disk-backed sort @@ -318,8 +321,8 @@ func BenchmarkExternalDistinct(b *testing.B) { } op, accs, mons, _, err := createExternalDistinct( ctx, flowCtx, []colexecbase.Operator{input}, typs, - distinctCols, outputOrdering, queueCfg, - &colexecbase.TestingSemaphore{}, nil, /* spillingCallbackFn */ + distinctCols, outputOrdering, queueCfg, &colexecbase.TestingSemaphore{}, + nil /* spillingCallbackFn */, 0, /* numForcedRepartitions */ ) memAccounts = append(memAccounts, accs...) memMonitors = append(memMonitors, mons...) @@ -355,6 +358,7 @@ func createExternalDistinct( diskQueueCfg colcontainer.DiskQueueCfg, testingSemaphore semaphore.Semaphore, spillingCallbackFn func(), + numForcedRepartitions int, ) (colexecbase.Operator, []*mon.BoundAccount, []*mon.BytesMonitor, []colexecbase.Closer, error) { distinctSpec := &execinfrapb.DistinctSpec{ DistinctColumns: distinctCols, @@ -376,6 +380,7 @@ func createExternalDistinct( FDSemaphore: testingSemaphore, } args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn + args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions // External sorter relies on different memory accounts to // understand when to start a new partition, so we will not use // the streaming memory account. diff --git a/pkg/sql/colexec/external_hash_aggregator_test.go b/pkg/sql/colexec/external_hash_aggregator_test.go index d472e813ec05..c6cb996d7f41 100644 --- a/pkg/sql/colexec/external_hash_aggregator_test.go +++ b/pkg/sql/colexec/external_hash_aggregator_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/marusama/semaphore" "github.com/stretchr/testify/require" ) @@ -54,6 +55,8 @@ func TestExternalHashAggregator(t *testing.T) { accounts []*mon.BoundAccount monitors []*mon.BytesMonitor ) + rng, _ := randutil.NewPseudoRand() + numForcedRepartitions := rng.Intn(5) // Test the case in which the default memory is used as well as the case in // which the hash aggregator spills to disk. for _, spillForced := range []bool{false, true} { @@ -70,7 +73,7 @@ func TestExternalHashAggregator(t *testing.T) { // aggregator in the fallback strategy. continue } - log.Infof(ctx, "spillForced=%t/%s", spillForced, tc.name) + log.Infof(ctx, "spillForced=%t/numRepartitions=%d/%s", spillForced, numForcedRepartitions, tc.name) constructors, constArguments, outputTypes, err := colexecagg.ProcessAggregations( &evalCtx, nil /* semaCtx */, tc.spec.Aggregations, tc.typs, ) @@ -97,7 +100,7 @@ func TestExternalHashAggregator(t *testing.T) { ConstArguments: constArguments, OutputTypes: outputTypes, }, - queueCfg, sem, + queueCfg, sem, numForcedRepartitions, ) accounts = append(accounts, accs...) monitors = append(monitors, mons...) @@ -159,7 +162,8 @@ func BenchmarkExternalHashAggregator(b *testing.B) { b, aggType{ new: func(args *colexecagg.NewAggregatorArgs) (ResettableOperator, error) { op, accs, mons, _, err := createExternalHashAggregator( - ctx, flowCtx, args, queueCfg, &colexecbase.TestingSemaphore{}, + ctx, flowCtx, args, queueCfg, + &colexecbase.TestingSemaphore{}, 0, /* numForcedRepartitions */ ) memAccounts = append(memAccounts, accs...) memMonitors = append(memMonitors, mons...) @@ -197,6 +201,7 @@ func createExternalHashAggregator( newAggArgs *colexecagg.NewAggregatorArgs, diskQueueCfg colcontainer.DiskQueueCfg, testingSemaphore semaphore.Semaphore, + numForcedRepartitions int, ) (colexecbase.Operator, []*mon.BoundAccount, []*mon.BytesMonitor, []colexecbase.Closer, error) { spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: newAggArgs.InputTypes}}, @@ -213,6 +218,7 @@ func createExternalHashAggregator( DiskQueueCfg: diskQueueCfg, FDSemaphore: testingSemaphore, } + args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions result, err := TestNewColOperator(ctx, flowCtx, args) return result.Op, result.OpAccounts, result.OpMonitors, result.ToClose, err } diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index afaf362f3b39..844b0b137b19 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -57,6 +57,7 @@ func TestExternalHashJoiner(t *testing.T) { monitors []*mon.BytesMonitor ) rng, _ := randutil.NewPseudoRand() + numForcedRepartitions := rng.Intn(5) // Test the case in which the default memory is used as well as the case in // which the joiner spills to disk. for _, spillForced := range []bool{false, true} { @@ -64,7 +65,8 @@ func TestExternalHashJoiner(t *testing.T) { for _, tcs := range [][]*joinTestCase{getHJTestCases(), getMJTestCases()} { for _, tc := range tcs { delegateFDAcquisitions := rng.Float64() < 0.5 - log.Infof(ctx, "spillForced=%t/%s/delegateFDAcquisitions=%t", spillForced, tc.description, delegateFDAcquisitions) + log.Infof(ctx, "spillForced=%t/numRepartitions=%d/%s/delegateFDAcquisitions=%t", + spillForced, numForcedRepartitions, tc.description, delegateFDAcquisitions) var semsToCheck []semaphore.Semaphore oldSkipAllNullsInjection := tc.skipAllNullsInjection if !tc.onExpr.Empty() { @@ -86,7 +88,7 @@ func TestExternalHashJoiner(t *testing.T) { // will not be drained. hjOp, newAccounts, newMonitors, closers, err := createDiskBackedHashJoiner( ctx, flowCtx, spec, sources, func() {}, queueCfg, - 2 /* numForcedPartitions */, delegateFDAcquisitions, sem, + numForcedRepartitions, delegateFDAcquisitions, sem, ) // Expect three closers. These are the external hash joiner, and // one external sorter for each input. diff --git a/pkg/sql/colexec/external_sort_test.go b/pkg/sql/colexec/external_sort_test.go index fe2191208b72..e190c5f6c2e3 100644 --- a/pkg/sql/colexec/external_sort_test.go +++ b/pkg/sql/colexec/external_sort_test.go @@ -49,7 +49,8 @@ func TestExternalSort(t *testing.T) { }, } - const numForcedRepartitions = 3 + rng, _ := randutil.NewPseudoRand() + numForcedRepartitions := rng.Intn(5) queueCfg, cleanup := colcontainerutils.NewTestingDiskQueueCfg(t, true /* inMem */) defer cleanup() @@ -71,7 +72,7 @@ func TestExternalSort(t *testing.T) { } for _, tcs := range [][]sortTestCase{sortAllTestCases, topKSortTestCases, sortChunksTestCases} { for _, tc := range tcs { - log.Infof(context.Background(), "spillForced=%t/%s", spillForced, tc.description) + log.Infof(context.Background(), "spillForced=%t/numRepartitions=%d/%s", spillForced, numForcedRepartitions, tc.description) var semsToCheck []semaphore.Semaphore runTestsWithTyps( t, diff --git a/pkg/sql/colexec/ordered_aggregator.go b/pkg/sql/colexec/ordered_aggregator.go index 5654d141f569..06a0beb580fe 100644 --- a/pkg/sql/colexec/ordered_aggregator.go +++ b/pkg/sql/colexec/ordered_aggregator.go @@ -367,7 +367,9 @@ func (a *orderedAggregator) reset(ctx context.Context) { r.reset(ctx) } a.state = orderedAggregatorAggregating - a.scratch.shouldResetInternalBatch = true + // In some cases we might reset the aggregator before Next() is called for + // the first time, so there might not be a scratch batch allocated yet. + a.scratch.shouldResetInternalBatch = a.scratch.Batch != nil a.scratch.resumeIdx = 0 a.lastReadBatch = nil a.seenNonEmptyBatch = false