diff --git a/pkg/col/coldata/testutils.go b/pkg/col/coldata/testutils.go index 38ff6f53eeb9..a67c8242976e 100644 --- a/pkg/col/coldata/testutils.go +++ b/pkg/col/coldata/testutils.go @@ -56,11 +56,21 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) { expectedVec := expected.ColVec(colIdx) actualVec := actual.ColVec(colIdx) require.Equal(t, expectedVec.Type(), actualVec.Type()) - require.Equal( - t, - expectedVec.Nulls().Slice(0, expected.Length()), - actualVec.Nulls().Slice(0, actual.Length()), - ) + // Check whether the nulls bitmaps are the same. Note that we don't + // track precisely the fact whether nulls are present or not in + // 'maybeHasNulls' field, so we override it manually to be 'true' for + // both nulls vectors if it is 'true' for at least one of them. This is + // acceptable since we still check the bitmaps precisely. + expectedNulls := expectedVec.Nulls() + actualNulls := actualVec.Nulls() + oldExpMaybeHasNulls, oldActMaybeHasNulls := expectedNulls.maybeHasNulls, actualNulls.maybeHasNulls + defer func() { + expectedNulls.maybeHasNulls, actualNulls.maybeHasNulls = oldExpMaybeHasNulls, oldActMaybeHasNulls + }() + expectedNulls.maybeHasNulls = expectedNulls.maybeHasNulls || actualNulls.maybeHasNulls + actualNulls.maybeHasNulls = expectedNulls.maybeHasNulls || actualNulls.maybeHasNulls + require.Equal(t, expectedNulls.Slice(0, expected.Length()), actualNulls.Slice(0, actual.Length())) + canonicalTypeFamily := expectedVec.CanonicalTypeFamily() if canonicalTypeFamily == types.BytesFamily { // Cannot use require.Equal for this type. @@ -95,7 +105,7 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) { t.Fatalf("Interval mismatch at index %d:\nexpected:\n%sactual:\n%s", i, expectedInterval[i], resultInterval[i]) } } - } else if expectedVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + } else if canonicalTypeFamily == typeconv.DatumVecCanonicalTypeFamily { // Cannot use require.Equal for this type. expectedDatum := expectedVec.Datum().Slice(0 /* start */, expected.Length()) resultDatum := actualVec.Datum().Slice(0 /* start */, actual.Length()) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 53a9726cbe5b..8bf2057d1d4f 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -1321,12 +1321,15 @@ func (p *planner) checkCanAlterTableAndSetNewOwner( privs := desc.GetPrivileges() privs.SetOwner(newOwner) + tn, err := p.getQualifiedTableName(ctx, desc) + if err != nil { + return err + } + return p.logEvent(ctx, desc.ID, &eventpb.AlterTableOwner{ - // TODO(knz): Properly qualify this. - // See: https://github.com/cockroachdb/cockroach/issues/57960 - TableName: desc.Name, + TableName: tn.String(), Owner: newOwner.Normalized(), }) } diff --git a/pkg/sql/colcontainer/partitionedqueue.go b/pkg/sql/colcontainer/partitionedqueue.go index 6e969c130f7f..c4fe7c64f94e 100644 --- a/pkg/sql/colcontainer/partitionedqueue.go +++ b/pkg/sql/colcontainer/partitionedqueue.go @@ -27,6 +27,7 @@ type PartitionedQueue interface { // partition at that index does not exist, a new one is created. Existing // partitions may not be Enqueued to after calling // CloseAllOpenWriteFileDescriptors. + // WARNING: Selection vectors are ignored. Enqueue(ctx context.Context, partitionIdx int, batch coldata.Batch) error // Dequeue removes and returns the batch from the front of the // partitionIdx'th partition. If the partition is empty, or no partition at diff --git a/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go b/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go index d0d535b0207a..4f94fc8c88d9 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go +++ b/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go @@ -714,7 +714,7 @@ func (c intervalDecimalCustomizer) getBinOpAssignFunc() assignFunc { return fmt.Sprintf(` f, err := %[3]s.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } %[1]s = %[2]s.MulFloat(f)`, targetElem, leftElem, rightElem) @@ -732,7 +732,7 @@ func (c decimalIntervalCustomizer) getBinOpAssignFunc() assignFunc { return fmt.Sprintf(` f, err := %[2]s.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } %[1]s = %[3]s.MulFloat(f)`, targetElem, leftElem, rightElem) diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 40a0d9d02497..bed7fb6f2f12 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -262,6 +262,9 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch { if s.partitioner == nil { s.partitioner = s.partitionerCreator() } + // Note that b will never have a selection vector set because the + // allSpooler performs a deselection when buffering up the tuples, + // and the in-memory sorter has allSpooler as its input. if err := s.partitioner.Enqueue(ctx, newPartitionIdx, b); err != nil { colexecerror.InternalError(err) } diff --git a/pkg/sql/colexec/mergejoiner.go b/pkg/sql/colexec/mergejoiner.go index adf96210f6a5..10082c4a7bac 100644 --- a/pkg/sql/colexec/mergejoiner.go +++ b/pkg/sql/colexec/mergejoiner.go @@ -107,8 +107,9 @@ type mjBuilderCrossProductState struct { type mjBufferedGroup struct { *spillingQueue // firstTuple stores a single tuple that was first in the buffered group. - firstTuple []coldata.Vec - numTuples int + firstTuple []coldata.Vec + numTuples int + scratchBatch coldata.Batch } func (bg *mjBufferedGroup) reset(ctx context.Context) { @@ -546,6 +547,9 @@ func (o *mergeJoinBase) Init() { o.proberState.lBufferedGroup.firstTuple = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( o.left.sourceTypes, 1, /* capacity */ ).ColVecs() + o.proberState.lBufferedGroup.scratchBatch = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( + o.left.sourceTypes, coldata.BatchSize(), + ) o.proberState.rBufferedGroup.spillingQueue = newRewindableSpillingQueue( o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit, o.diskQueueCfg, o.fdSemaphore, o.diskAcc, @@ -553,6 +557,9 @@ func (o *mergeJoinBase) Init() { o.proberState.rBufferedGroup.firstTuple = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( o.right.sourceTypes, 1, /* capacity */ ).ColVecs() + o.proberState.rBufferedGroup.scratchBatch = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( + o.right.sourceTypes, coldata.BatchSize(), + ) o.builderState.lGroups = make([]group, 1) o.builderState.rGroups = make([]group, 1) @@ -572,6 +579,8 @@ func (o *mergeJoinBase) resetBuilderCrossProductState() { // same group as the ones in the buffered group that corresponds to the input // source. This needs to happen when a group starts at the end of an input // batch and can continue into the following batches. +// A zero-length batch needs to be appended when no more batches will be +// appended to the buffered group. func (o *mergeJoinBase) appendToBufferedGroup( ctx context.Context, input *mergeJoinInput, @@ -580,9 +589,6 @@ func (o *mergeJoinBase) appendToBufferedGroup( groupStartIdx int, groupLength int, ) { - if groupLength == 0 { - return - } var ( bufferedGroup *mjBufferedGroup sourceTypes []*types.T @@ -594,9 +600,14 @@ func (o *mergeJoinBase) appendToBufferedGroup( sourceTypes = o.right.sourceTypes bufferedGroup = &o.proberState.rBufferedGroup } - // TODO(yuzefovich): reuse the same scratch batches when spillingQueue - // actually copies the enqueued batch when those are kept in memory. - scratchBatch := o.unlimitedAllocator.NewMemBatchWithFixedCapacity(sourceTypes, groupLength) + if batch.Length() == 0 || groupLength == 0 { + // We have finished appending to this buffered group, so we need to + // enqueue a zero-length batch per the contract of the spilling queue. + if err := bufferedGroup.enqueue(ctx, coldata.ZeroBatch); err != nil { + colexecerror.InternalError(err) + } + return + } if bufferedGroup.numTuples == 0 { o.unlimitedAllocator.PerformOperation(bufferedGroup.firstTuple, func() { for colIdx := range sourceTypes { @@ -616,9 +627,10 @@ func (o *mergeJoinBase) appendToBufferedGroup( } bufferedGroup.numTuples += groupLength - o.unlimitedAllocator.PerformOperation(scratchBatch.ColVecs(), func() { + bufferedGroup.scratchBatch.ResetInternalBatch() + o.unlimitedAllocator.PerformOperation(bufferedGroup.scratchBatch.ColVecs(), func() { for colIdx := range input.sourceTypes { - scratchBatch.ColVec(colIdx).Copy( + bufferedGroup.scratchBatch.ColVec(colIdx).Copy( coldata.CopySliceArgs{ SliceArgs: coldata.SliceArgs{ Src: batch.ColVec(colIdx), @@ -630,10 +642,9 @@ func (o *mergeJoinBase) appendToBufferedGroup( }, ) } + bufferedGroup.scratchBatch.SetLength(groupLength) }) - scratchBatch.SetSelection(false) - scratchBatch.SetLength(groupLength) - if err := bufferedGroup.enqueue(ctx, scratchBatch); err != nil { + if err := bufferedGroup.enqueue(ctx, bufferedGroup.scratchBatch); err != nil { colexecerror.InternalError(err) } } @@ -681,6 +692,15 @@ func (o *mergeJoinBase) sourceFinished() bool { return o.proberState.lLength == 0 || o.proberState.rLength == 0 } +// finishBufferedGroup appends a zero-length batch to the buffered group which +// is required by the contract of the spilling queue. +func (o *mergeJoinBase) finishBufferedGroup(ctx context.Context, input *mergeJoinInput) { + o.appendToBufferedGroup( + ctx, input, coldata.ZeroBatch, nil, /* sel */ + 0 /* groupStartIdx */, 0, /* groupLength */ + ) +} + // completeBufferedGroup extends the buffered group corresponding to input. // First, we check that the first row in batch is still part of the same group. // If this is the case, we use the Distinct operator to find the first @@ -696,6 +716,7 @@ func (o *mergeJoinBase) completeBufferedGroup( ) (_ coldata.Batch, idx int, batchLength int) { batchLength = batch.Length() if o.isBufferedGroupFinished(input, batch, rowIdx) { + o.finishBufferedGroup(ctx, input) return batch, rowIdx, batchLength } @@ -752,6 +773,7 @@ func (o *mergeJoinBase) completeBufferedGroup( if batchLength == 0 { // The input has been exhausted, so the buffered group is now complete. isBufferedGroupComplete = true + o.finishBufferedGroup(ctx, input) } } } diff --git a/pkg/sql/colexec/proj_const_left_ops.eg.go b/pkg/sql/colexec/proj_const_left_ops.eg.go index 5009e8a23f7f..bc7dc43804d8 100644 --- a/pkg/sql/colexec/proj_const_left_ops.eg.go +++ b/pkg/sql/colexec/proj_const_left_ops.eg.go @@ -11673,7 +11673,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -11689,7 +11689,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -11709,7 +11709,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -11722,7 +11722,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14565,7 +14565,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14581,7 +14581,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14601,7 +14601,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14614,7 +14614,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } diff --git a/pkg/sql/colexec/proj_const_right_ops.eg.go b/pkg/sql/colexec/proj_const_right_ops.eg.go index 8f44266de5f7..2d2a5c05a6bb 100644 --- a/pkg/sql/colexec/proj_const_right_ops.eg.go +++ b/pkg/sql/colexec/proj_const_right_ops.eg.go @@ -11674,7 +11674,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -11690,7 +11690,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -11710,7 +11710,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -11723,7 +11723,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14566,7 +14566,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14582,7 +14582,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14602,7 +14602,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14615,7 +14615,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } diff --git a/pkg/sql/colexec/proj_non_const_ops.eg.go b/pkg/sql/colexec/proj_non_const_ops.eg.go index 8c73b9b35cd0..ea28524bd734 100644 --- a/pkg/sql/colexec/proj_non_const_ops.eg.go +++ b/pkg/sql/colexec/proj_non_const_ops.eg.go @@ -12405,7 +12405,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -12425,7 +12425,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -12446,7 +12446,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -12462,7 +12462,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -15473,7 +15473,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } @@ -15493,7 +15493,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } @@ -15514,7 +15514,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } @@ -15530,7 +15530,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } diff --git a/pkg/sql/colexec/relative_rank.eg.go b/pkg/sql/colexec/relative_rank.eg.go index 4c13716301e6..f291867efff7 100644 --- a/pkg/sql/colexec/relative_rank.eg.go +++ b/pkg/sql/colexec/relative_rank.eg.go @@ -181,6 +181,7 @@ func (r *percentRankNoPartitionOp) Init() { int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) // All rank functions start counting from 1. Before we assign the rank to a // tuple in the batch, we first increment r.rank, so setting this @@ -236,10 +237,7 @@ func (r *percentRankNoPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -380,12 +378,14 @@ func (r *percentRankWithPartitionOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.bufferedTuples = newSpillingQueue( r.allocator, r.inputTypes, int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) // All rank functions start counting from 1. Before we assign the rank to a // tuple in the batch, we first increment r.rank, so setting this @@ -433,12 +433,6 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running partitions // sizes, including the very last partition. - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ @@ -461,10 +455,7 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -495,13 +486,6 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -512,8 +496,8 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -527,13 +511,6 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -544,8 +521,8 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -698,12 +675,14 @@ func (r *cumeDistNoPartitionOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.bufferedTuples = newSpillingQueue( r.allocator, r.inputTypes, int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) } @@ -746,12 +725,6 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ @@ -772,10 +745,7 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -810,13 +780,6 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -827,8 +790,8 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -842,13 +805,6 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -859,8 +815,8 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -1001,18 +957,21 @@ func (r *cumeDistWithPartitionOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.peerGroupsState.spillingQueue = newSpillingQueue( r.allocator, []*types.T{types.Int}, int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.bufferedTuples = newSpillingQueue( r.allocator, r.inputTypes, int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) } @@ -1055,12 +1014,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running partitions // sizes, including the very last partition. - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ @@ -1073,12 +1026,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ @@ -1101,10 +1048,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -1135,13 +1079,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -1152,8 +1089,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -1167,13 +1104,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -1184,8 +1114,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -1205,13 +1135,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -1222,8 +1145,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -1237,13 +1160,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -1254,8 +1170,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } diff --git a/pkg/sql/colexec/relative_rank_tmpl.go b/pkg/sql/colexec/relative_rank_tmpl.go index ed4eddf958ee..07e7c972e22d 100644 --- a/pkg/sql/colexec/relative_rank_tmpl.go +++ b/pkg/sql/colexec/relative_rank_tmpl.go @@ -143,13 +143,6 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}} // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -160,8 +153,8 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}} if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -184,13 +177,6 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}} // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -201,8 +187,8 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}} if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -291,6 +277,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction // {{end}} // {{if .IsCumeDist}} @@ -299,6 +286,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction // {{end}} r.bufferedTuples = newSpillingQueue( @@ -306,6 +294,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() { int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) // {{if .IsPercentRank}} // All rank functions start counting from 1. Before we assign the rank to a @@ -360,12 +349,6 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { // {{if .HasPartition}} // We need to flush the last vector of the running partitions // sizes, including the very last partition. - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ @@ -380,12 +363,6 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { // {{if .IsCumeDist}} // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ @@ -414,10 +391,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 73c4d1ad2349..7a6b2ec8c60f 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -37,10 +37,10 @@ type routerOutput interface { // initWithHashRouter passes a reference to the HashRouter that will be // pushing batches to this output. initWithHashRouter(*HashRouter) - // addBatch adds the elements specified by the selection vector from batch to - // the output. It returns whether or not the output changed its state to + // addBatch adds the elements specified by the selection vector from batch + // to the output. It returns whether or not the output changed its state to // blocked (see implementations). - addBatch(context.Context, coldata.Batch, []int) bool + addBatch(context.Context, coldata.Batch) bool // cancel tells the output to stop producing batches. Optionally forwards an // error if not nil. cancel(context.Context, error) @@ -66,10 +66,15 @@ type routerOutputOpState int const ( // routerOutputOpRunning is the state in which routerOutputOp operates - // normally. The router output transitions into the draining state when - // either it is finished (when a zero-length batch was added or when it was - // canceled) or it encounters an error. + // normally. The router output transitions into routerOutputDoneAdding when + // a zero-length batch was added or routerOutputOpDraining when it + // encounters an error or the drain is requested. routerOutputOpRunning routerOutputOpState = iota + // routerOutputDoneAdding is the state in which a zero-length was batch was + // added to routerOutputOp and no more batches will be added. The router + // output transitions to routerOutputOpDraining when the output is canceled + // (either closed or the drain is requested). + routerOutputDoneAdding // routerOutputOpDraining is the state in which routerOutputOp always // returns zero-length batches on calls to Next. routerOutputOpDraining @@ -110,55 +115,13 @@ type routerOutputOp struct { // forwardedErr is an error that was forwarded by the HashRouter. If set, // any subsequent calls to Next will return this error. forwardedErr error - // unlimitedAllocator tracks the memory usage of this router output, - // providing a signal for when it should spill to disk. - // The memory lifecycle is as follows: - // - // o.mu.pendingBatch is allocated as a "staging" area. Tuples are copied - // into it in addBatch. - // A read may come in in this state, in which case pendingBatch is returned - // and references to it are removed. Since batches are unsafe for reuse, - // the batch is also manually released from the allocator. - // If a read does not come in and the batch becomes full of tuples, that - // batch is stored in o.mu.data, which is a queue with an in-memory circular - // buffer backed by disk. If the batch fits in memory, a reference to it - // is retained and a new pendingBatch is allocated. - // - // If a read comes in at this point, the batch is dequeued from o.mu.data - // and returned, but the memory is still accounted for. In fact, memory use - // increases up to when o.mu.data is full and must spill to disk. - // Once it spills to disk, the spillingQueue (o.mu.data), will release - // batches it spills to disk to stop accounting for them. - // The tricky part comes when o.mu.data is dequeued from. In this case, the - // reference for a previously-returned batch is overwritten with an on-disk - // batch, so the memory for the overwritten batch is released, while the - // new batch's memory is retained. Note that if batches are being dequeued - // from disk, it must be the case that the circular buffer is now empty, - // holding references to batches that have been previously returned. - // - // In short, batches whose references are retained are also retained in the - // allocator, but if any references are overwritten or lost, those batches - // are released. - unlimitedAllocator *colmem.Allocator - cond *sync.Cond - // pendingBatch is a partially-filled batch with data added through - // addBatch. Once this batch reaches capacity, it is flushed to data. The - // main use of pendingBatch is coalescing various fragmented batches into - // one. - pendingBatch coldata.Batch + cond *sync.Cond // data is a spillingQueue, a circular buffer backed by a disk queue. data *spillingQueue numUnread int blocked bool } - // pendingBatchCapacity indicates the capacity which the new mu.pendingBatch - // should be allocated with. It'll increase dynamically until - // coldata.BatchSize(). We need to track it ourselves since the pending - // batch ownership is given to the spillingQueue, so when using - // ResetMaybeReallocate, we don't the old batch to check the capacity of. - pendingBatchCapacity int - testingKnobs routerOutputOpTestingKnobs } @@ -183,9 +146,6 @@ type routerOutputOpTestingKnobs struct { // defaultRouterOutputBlockedThreshold but can be modified by tests to test // edge cases. blockedThreshold int - // alwaysFlush, if set to true, will always flush o.mu.pendingBatch to - // o.mu.data. - alwaysFlush bool // addBatchTestInducedErrorCb is called after any function call that could // produce an error if that error is nil. If the callback returns an error, // the router output overwrites the nil error with the returned error. @@ -233,7 +193,6 @@ func newRouterOutputOp(args routerOutputOpArgs) *routerOutputOp { unblockedEventsChan: args.unblockedEventsChan, testingKnobs: args.testingKnobs, } - o.mu.unlimitedAllocator = args.unlimitedAllocator o.mu.cond = sync.NewCond(&o.mu) o.mu.data = newSpillingQueue( args.unlimitedAllocator, @@ -266,7 +225,7 @@ func (o *routerOutputOp) nextErrorLocked(ctx context.Context, err error) { func (o *routerOutputOp) Next(ctx context.Context) coldata.Batch { o.mu.Lock() defer o.mu.Unlock() - for o.mu.forwardedErr == nil && o.mu.state == routerOutputOpRunning && o.mu.pendingBatch == nil && o.mu.data.empty() { + for o.mu.forwardedErr == nil && o.mu.state == routerOutputOpRunning && o.mu.data.empty() { // Wait until there is data to read or the output is canceled. o.mu.cond.Wait() } @@ -276,23 +235,12 @@ func (o *routerOutputOp) Next(ctx context.Context) coldata.Batch { if o.mu.state == routerOutputOpDraining { return coldata.ZeroBatch } - var b coldata.Batch - if o.mu.pendingBatch != nil && o.mu.data.empty() { - // o.mu.data is empty (i.e. nothing has been flushed to the spillingQueue), - // but there is a o.mu.pendingBatch that has not been flushed yet. Return - // this batch directly. - b = o.mu.pendingBatch - o.mu.unlimitedAllocator.ReleaseBatch(b) - o.mu.pendingBatch = nil - } else { - var err error - b, err = o.mu.data.dequeue(ctx) - if err == nil && o.testingKnobs.nextTestInducedErrorCb != nil { - err = o.testingKnobs.nextTestInducedErrorCb() - } - if err != nil { - o.nextErrorLocked(ctx, err) - } + b, err := o.mu.data.dequeue(ctx) + if err == nil && o.testingKnobs.nextTestInducedErrorCb != nil { + err = o.testingKnobs.nextTestInducedErrorCb() + } + if err != nil { + o.nextErrorLocked(ctx, err) } o.mu.numUnread -= b.Length() if o.mu.numUnread <= o.testingKnobs.blockedThreshold { @@ -362,113 +310,38 @@ func (o *routerOutputOp) forwardErr(err error) { o.mu.cond.Signal() } -// addBatch copies the columns in batch according to selection into an internal -// buffer. -// The routerOutputOp only adds the elements specified by selection. Therefore, -// an empty selection slice will add no elements. Note that the selection vector -// on the batch is ignored. This is so that callers of addBatch can push the -// same batch with different selection vectors to many different outputs. -// True is returned if the output changes state to blocked (note: if the -// output is already blocked, false is returned). +// addBatch copies the batch (according to its selection vector) into an +// internal buffer. Zero-length batch should be passed-in to indicate that no +// more batches will be added. // TODO(asubiotto): We should explore pipelining addBatch if disk-spilling // performance becomes a concern. The main router goroutine will be writing to // disk as the code is written, meaning that we impact the performance of // writing rows to a fast output if we have to write to disk for a single // slow output. -func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch, selection []int) bool { - if len(selection) > batch.Length() { - selection = selection[:batch.Length()] - } +func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch) bool { o.mu.Lock() defer o.mu.Unlock() - if o.mu.state == routerOutputOpDraining { + switch o.mu.state { + case routerOutputDoneAdding: + colexecerror.InternalError(errors.AssertionFailedf("a batch was added to routerOutput in DoneAdding state")) + case routerOutputOpDraining: // This output is draining, discard any data. return false } - if batch.Length() == 0 { - if o.mu.pendingBatch != nil { - err := o.mu.data.enqueue(ctx, o.mu.pendingBatch) - if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { - err = o.testingKnobs.addBatchTestInducedErrorCb() - } - if err != nil { - colexecerror.InternalError(err) - } - } else if o.testingKnobs.addBatchTestInducedErrorCb != nil { - // This is the last chance to run addBatchTestInducedErorCb if it has - // been set. - if err := o.testingKnobs.addBatchTestInducedErrorCb(); err != nil { - colexecerror.InternalError(err) - } - } - o.mu.pendingBatch = coldata.ZeroBatch - o.mu.cond.Signal() - return false + o.mu.numUnread += batch.Length() + err := o.mu.data.enqueue(ctx, batch) + if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { + err = o.testingKnobs.addBatchTestInducedErrorCb() } - - if len(selection) == 0 { - // Non-zero batch with no selection vector. Nothing to do. - return false + if err != nil { + colexecerror.InternalError(err) } - // Increment o.mu.numUnread before going into the loop, as we will consume - // selection. - o.mu.numUnread += len(selection) - - for toAppend := len(selection); toAppend > 0; { - if o.mu.pendingBatch == nil { - if o.pendingBatchCapacity < coldata.BatchSize() { - // We still haven't reached the maximum capacity, so let's - // calculate the next capacity to use. - if o.pendingBatchCapacity == 0 { - // This is the first set of tuples that are added to this - // router output, so we'll allocate the batch with just - // enough capacity to fill all of these tuples. - o.pendingBatchCapacity = len(selection) - } else { - o.pendingBatchCapacity *= 2 - } - } - // Note: we could have used NewMemBatchWithFixedCapacity here, but - // we choose not to in order to indicate that the capacity of the - // pending batches has dynamic behavior. - o.mu.pendingBatch, _ = o.mu.unlimitedAllocator.ResetMaybeReallocate(o.types, nil /* oldBatch */, o.pendingBatchCapacity) - } - available := o.mu.pendingBatch.Capacity() - o.mu.pendingBatch.Length() - numAppended := toAppend - if toAppend > available { - numAppended = available - } - o.mu.unlimitedAllocator.PerformOperation(o.mu.pendingBatch.ColVecs(), func() { - for i := range o.types { - o.mu.pendingBatch.ColVec(i).Copy( - coldata.CopySliceArgs{ - SliceArgs: coldata.SliceArgs{ - Src: batch.ColVec(i), - Sel: selection[:numAppended], - DestIdx: o.mu.pendingBatch.Length(), - SrcEndIdx: numAppended, - }, - }, - ) - } - }) - newLength := o.mu.pendingBatch.Length() + numAppended - o.mu.pendingBatch.SetLength(newLength) - if o.testingKnobs.alwaysFlush || newLength >= o.mu.pendingBatch.Capacity() { - // The capacity in o.mu.pendingBatch has been filled. - err := o.mu.data.enqueue(ctx, o.mu.pendingBatch) - if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { - err = o.testingKnobs.addBatchTestInducedErrorCb() - } - if err != nil { - colexecerror.InternalError(err) - } - o.mu.pendingBatch = nil - } - toAppend -= numAppended - selection = selection[numAppended:] + if batch.Length() == 0 { + o.mu.state = routerOutputDoneAdding + o.mu.cond.Signal() + return false } stateChanged := false @@ -500,7 +373,6 @@ func (o *routerOutputOp) resetForTests(ctx context.Context) { o.mu.data.reset(ctx) o.mu.numUnread = 0 o.mu.blocked = false - o.pendingBatchCapacity = 0 } // hashRouterDrainState is a state that specifically describes the hashRouter's @@ -752,16 +624,21 @@ func (r *HashRouter) processNextBatch(ctx context.Context) bool { // Done. Push an empty batch to outputs to tell them the data is done as // well. for _, o := range r.outputs { - o.addBatch(ctx, b, nil) + o.addBatch(ctx, b) } return true } selections := r.tupleDistributor.distribute(ctx, b, r.hashCols) for i, o := range r.outputs { - if o.addBatch(ctx, b, selections[i]) { - // This batch blocked the output. - r.numBlockedOutputs++ + if len(selections[i]) > 0 { + b.SetSelection(true) + copy(b.Selection(), selections[i]) + b.SetLength(len(selections[i])) + if o.addBatch(ctx, b) { + // This batch blocked the output. + r.numBlockedOutputs++ + } } } return false diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index 327f76e87357..4e68e099921e 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -54,7 +54,7 @@ type memoryTestCase struct { expSpill bool } -// getDiskqueueCfgAndMemoryTestCases is a test helper that creates an in-memory +// getDiskQueueCfgAndMemoryTestCases is a test helper that creates an in-memory // DiskQueueCfg that can be used to create a new DiskQueue. A cleanup function // is also returned as well as some default memory limits that are useful to // test with: 0 for an immediate spill, a random memory limit up to 64 MiB, and @@ -89,6 +89,19 @@ func getDataAndFullSelection() (tuples, []*types.T, []int) { return data, []*types.T{types.Int}, fullSelection } +// pushSelectionIntoBatch updates b in-place to have the provided selection +// vector while setting the length to minimum of b's length and len(selection). +// Zero-length batch is not modified. +func pushSelectionIntoBatch(b coldata.Batch, selection []int) { + if b.Length() > 0 { + b.SetSelection(true) + copy(b.Selection(), selection) + if len(selection) < b.Length() { + b.SetLength(len(selection)) + } + } +} + func TestRouterOutputAddBatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -173,7 +186,8 @@ func TestRouterOutputAddBatch(t *testing.T) { in.Init() for { b := in.Next(ctx) - o.addBatch(ctx, b, tc.selection) + pushSelectionIntoBatch(b, tc.selection) + o.addBatch(ctx, b) if b.Length() == 0 { break } @@ -216,7 +230,8 @@ func TestRouterOutputNext(t *testing.T) { unblockEvent: func(in colexecbase.Operator, o *routerOutputOp) { for { b := in.Next(ctx) - o.addBatch(ctx, b, fullSelection) + pushSelectionIntoBatch(b, fullSelection) + o.addBatch(ctx, b) if b.Length() == 0 { break } @@ -229,7 +244,7 @@ func TestRouterOutputNext(t *testing.T) { // ReaderWaitsForZeroBatch verifies that a reader blocking on Next will // also get unblocked with no data other than the zero batch. unblockEvent: func(_ colexecbase.Operator, o *routerOutputOp) { - o.addBatch(ctx, coldata.ZeroBatch, nil /* selection */) + o.addBatch(ctx, coldata.ZeroBatch) }, expected: tuples{}, name: "ReaderWaitsForZeroBatch", @@ -332,7 +347,7 @@ func TestRouterOutputNext(t *testing.T) { unblockedEventsChan: unblockedEventsChan, }, ) - o.addBatch(ctx, coldata.ZeroBatch, fullSelection) + o.addBatch(ctx, coldata.ZeroBatch) o.Next(ctx) o.Next(ctx) select { @@ -391,19 +406,22 @@ func TestRouterOutputNext(t *testing.T) { b := in.Next(ctx) // Make sure the output doesn't consider itself blocked. We're right at the // limit but not over. - if o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if o.addBatch(ctx, b) { t.Fatal("unexpectedly blocked") } b = in.Next(ctx) // This addBatch call should now block the output. - if !o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if !o.addBatch(ctx, b) { t.Fatal("unexpectedly still unblocked") } // Add the rest of the data. for { b = in.Next(ctx) - if o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if o.addBatch(ctx, b) { t.Fatal("should only return true when switching from unblocked to blocked") } if b.Length() == 0 { @@ -501,7 +519,8 @@ func TestRouterOutputRandom(t *testing.T) { } } - if o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if o.addBatch(ctx, b) { if lastBlockedState { // We might have missed an unblock event during the last loop. select { @@ -575,7 +594,7 @@ func TestRouterOutputRandom(t *testing.T) { type callbackRouterOutput struct { colexecbase.ZeroInputNode - addBatchCb func(coldata.Batch, []int) bool + addBatchCb func(coldata.Batch) bool cancelCb func() forwardedErr error } @@ -584,11 +603,9 @@ var _ routerOutput = &callbackRouterOutput{} func (o *callbackRouterOutput) initWithHashRouter(*HashRouter) {} -func (o *callbackRouterOutput) addBatch( - _ context.Context, batch coldata.Batch, selection []int, -) bool { +func (o *callbackRouterOutput) addBatch(_ context.Context, batch coldata.Batch) bool { if o.addBatchCb != nil { - return o.addBatchCb(batch, selection) + return o.addBatchCb(batch) } return false } @@ -646,8 +663,8 @@ func TestHashRouterComputesDestination(t *testing.T) { // Capture the index. outputIdx := i outputs[i] = &callbackRouterOutput{ - addBatchCb: func(batch coldata.Batch, sel []int) bool { - for _, j := range sel { + addBatchCb: func(batch coldata.Batch) bool { + for _, j := range batch.Selection()[:batch.Length()] { key := batch.ColVec(0).Int64()[j] if _, ok := valsYetToSee[key]; !ok { t.Fatalf("pushed alread seen value to router output: %d", key) @@ -690,7 +707,7 @@ func TestHashRouterCancellation(t *testing.T) { numAddBatches := int64(0) for i := range outputs { outputs[i] = &callbackRouterOutput{ - addBatchCb: func(_ coldata.Batch, _ []int) bool { + addBatchCb: func(_ coldata.Batch) bool { atomic.AddInt64(&numAddBatches, 1) return false }, @@ -814,8 +831,6 @@ func TestHashRouterOneOutput(t *testing.T) { o := newOpTestOutput(routerOutputs[0], expected) ro := routerOutputs[0].(*routerOutputOp) - // Set alwaysFlush so that data is always flushed to the spillingQueue. - ro.testingKnobs.alwaysFlush = true var wg sync.WaitGroup wg.Add(1) diff --git a/pkg/sql/colexec/spilling_queue.go b/pkg/sql/colexec/spilling_queue.go index 4d2e178a8d47..6736e3420f26 100644 --- a/pkg/sql/colexec/spilling_queue.go +++ b/pkg/sql/colexec/spilling_queue.go @@ -54,16 +54,34 @@ type spillingQueue struct { numOnDiskItems int closed bool - diskQueueCfg colcontainer.DiskQueueCfg - diskQueue colcontainer.Queue - fdSemaphore semaphore.Semaphore - dequeueScratch coldata.Batch + // nextInMemBatchCapacity indicates the capacity which the new batch that + // we'll append to items should be allocated with. It'll increase + // dynamically until coldata.BatchSize(). + nextInMemBatchCapacity int + + diskQueueCfg colcontainer.DiskQueueCfg + diskQueue colcontainer.Queue + diskQueueDeselectionScratch coldata.Batch + fdSemaphore semaphore.Semaphore + dequeueScratch coldata.Batch rewindable bool rewindableState struct { numItemsDequeued int } + testingKnobs struct { + // numEnqueues tracks the number of times enqueue() has been called with + // non-zero batch. + numEnqueues int + // maxNumBatchesEnqueuedInMemory, if greater than 0, indicates the + // maximum number of batches that are attempted to be enqueued to the + // in-memory buffer 'items' (other limiting conditions might occur + // earlier). Once numEnqueues reaches this limit, all consequent calls + // to enqueue() will use the disk queue. + maxNumBatchesEnqueuedInMemory int + } + diskAcc *mon.BoundAccount } @@ -134,8 +152,16 @@ func newRewindableSpillingQueue( return q } +// enqueue adds the provided batch to the queue. Zero-length batch needs to be +// added as the last one. +// +// Passed-in batch is deeply copied, so it can safely reused by the caller. The +// spilling queue coalesces all input tuples into the batches of dynamically +// increasing capacity when those are kept in-memory. It also performs a +// deselection step if necessary when adding the batch to the disk queue. func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error { - if batch.Length() == 0 { + n := batch.Length() + if n == 0 { if q.diskQueue != nil { if err := q.diskQueue.Enqueue(ctx, batch); err != nil { return err @@ -143,18 +169,49 @@ func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error } return nil } + q.testingKnobs.numEnqueues++ - if q.numOnDiskItems > 0 || q.unlimitedAllocator.Used() > q.maxMemoryLimit || - (q.numInMemoryItems == len(q.items) && q.numInMemoryItems == q.maxItemsLen) { - // In this case, there is not enough memory available to keep this batch in - // memory, or the in-memory circular buffer has no slots available (we do - // an initial estimate of how many batches would fit into the buffer, which - // might be wrong). The tail of the queue might also already be on disk, in - // which case that is where the batch must be enqueued to maintain order. + alreadySpilled := q.numOnDiskItems > 0 + memoryLimitReached := q.unlimitedAllocator.Used() > q.maxMemoryLimit + maxItemsLenReached := q.numInMemoryItems == len(q.items) && q.numInMemoryItems == q.maxItemsLen + maxInMemEnqueuesExceeded := q.testingKnobs.maxNumBatchesEnqueuedInMemory != 0 && q.testingKnobs.numEnqueues > q.testingKnobs.maxNumBatchesEnqueuedInMemory + if alreadySpilled || memoryLimitReached || maxItemsLenReached || maxInMemEnqueuesExceeded { + // In this case, one of the following conditions is true: + // 1. the tail of the queue might also already be on disk, in which case + // that is where the batch must be enqueued to maintain order + // 2. there is not enough memory available to keep this batch in memory + // 3. the in-memory circular buffer has no slots available (we do an + // initial estimate of how many batches would fit into the buffer, + // which might be wrong) + // 4. we reached the testing limit on the number of items added to the + // in-memory buffer + // so we have to add batch to the disk queue. if err := q.maybeSpillToDisk(ctx); err != nil { return err } q.unlimitedAllocator.ReleaseBatch(batch) + if sel := batch.Selection(); sel != nil { + // We need to perform the deselection since the disk queue + // ignores the selection vectors. + q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate( + q.typs, q.diskQueueDeselectionScratch, n, + ) + q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() { + for i := range q.typs { + q.diskQueueDeselectionScratch.ColVec(i).Copy( + coldata.CopySliceArgs{ + SliceArgs: coldata.SliceArgs{ + Src: batch.ColVec(i), + Sel: sel, + SrcEndIdx: n, + }, + }, + ) + } + q.diskQueueDeselectionScratch.SetLength(n) + }) + batch = q.diskQueueDeselectionScratch + } if err := q.diskQueue.Enqueue(ctx, batch); err != nil { return err } @@ -182,7 +239,78 @@ func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error q.items = newItems } - q.items[q.curTailIdx] = batch + alreadyCopied := 0 + if q.numInMemoryItems > 0 { + // If we have already enqueued at least one batch, let's try to append + // as many tuples to it as it has the capacity for. + tailBatchIdx := q.curTailIdx - 1 + if tailBatchIdx < 0 { + tailBatchIdx = 0 + } + tailBatch := q.items[tailBatchIdx] + if l, c := tailBatch.Length(), tailBatch.Capacity(); l < c { + alreadyCopied = c - l + if alreadyCopied > n { + alreadyCopied = n + } + q.unlimitedAllocator.PerformOperation(tailBatch.ColVecs(), func() { + for i := range q.typs { + tailBatch.ColVec(i).Append( + coldata.SliceArgs{ + Src: batch.ColVec(i), + Sel: batch.Selection(), + DestIdx: l, + SrcStartIdx: 0, + SrcEndIdx: alreadyCopied, + }, + ) + } + tailBatch.SetLength(l + alreadyCopied) + }) + if alreadyCopied == n { + // We were able to append all of the tuples, so we return early + // since we don't need to update any of the state. + return nil + } + } + } + + var newBatchCapacity int + if q.nextInMemBatchCapacity == coldata.BatchSize() { + // At this point we only allocate batches with maximum capacity. + newBatchCapacity = coldata.BatchSize() + } else { + newBatchCapacity = n - alreadyCopied + if q.nextInMemBatchCapacity > newBatchCapacity { + newBatchCapacity = q.nextInMemBatchCapacity + } + q.nextInMemBatchCapacity = 2 * newBatchCapacity + if q.nextInMemBatchCapacity > coldata.BatchSize() { + q.nextInMemBatchCapacity = coldata.BatchSize() + } + } + + // Note: we could have used NewMemBatchWithFixedCapacity here, but we choose + // not to in order to indicate that the capacity of the new batches has + // dynamic behavior. + newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocate(q.typs, nil /* oldBatch */, newBatchCapacity) + q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() { + for i := range q.typs { + newBatch.ColVec(i).Copy( + coldata.CopySliceArgs{ + SliceArgs: coldata.SliceArgs{ + Src: batch.ColVec(i), + Sel: batch.Selection(), + SrcStartIdx: alreadyCopied, + SrcEndIdx: n, + }, + }, + ) + } + newBatch.SetLength(n - alreadyCopied) + }) + + q.items[q.curTailIdx] = newBatch q.curTailIdx++ if q.curTailIdx == len(q.items) { q.curTailIdx = 0 @@ -342,5 +470,7 @@ func (q *spillingQueue) reset(ctx context.Context) { q.numOnDiskItems = 0 q.curHeadIdx = 0 q.curTailIdx = 0 + q.nextInMemBatchCapacity = 0 q.rewindableState.numItemsDequeued = 0 + q.testingKnobs.numEnqueues = 0 } diff --git a/pkg/sql/colexec/spilling_queue_test.go b/pkg/sql/colexec/spilling_queue_test.go index 068fed96fb7b..daa0af04b0c7 100644 --- a/pkg/sql/colexec/spilling_queue_test.go +++ b/pkg/sql/colexec/spilling_queue_test.go @@ -56,16 +56,32 @@ func TestSpillingQueue(t *testing.T) { prefix = "Rewindable/" } numBatches := 1 + rng.Intn(16) - log.Infof(context.Background(), "%sMemoryLimit=%s/DiskQueueCacheMode=%d/AlwaysCompress=%t/NumBatches=%d", - prefix, humanizeutil.IBytes(memoryLimit), diskQueueCacheMode, alwaysCompress, numBatches) + // Add a limit on the number of batches added to the in-memory + // buffer of the spilling queue. We will set it to half of the total + // number of batches which allows us to exercise the case when the + // spilling to disk queue occurs after some batched were added to + // the in-memory buffer. + setInMemEnqueuesLimit := rng.Float64() < 0.5 + log.Infof(context.Background(), "%sMemoryLimit=%s/DiskQueueCacheMode=%d/AlwaysCompress=%t/NumBatches=%d/InMemEnqueuesLimited=%t", + prefix, humanizeutil.IBytes(memoryLimit), diskQueueCacheMode, alwaysCompress, numBatches, setInMemEnqueuesLimit) + // Since the spilling queue coalesces tuples to fill-in the batches + // up to their capacity, we cannot use the batches we get when + // dequeueing directly. Instead, we are tracking all of the input + // tuples and will be comparing against a window into them. + var tuples *appendOnlyBufferedBatch // Create random input. - batches := make([]coldata.Batch, 0, numBatches) op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ - NumBatches: cap(batches), + NumBatches: numBatches, BatchSize: 1 + rng.Intn(coldata.BatchSize()), Nulls: true, BatchAccumulator: func(b coldata.Batch, typs []*types.T) { - batches = append(batches, coldatatestutils.CopyBatch(b, typs, testColumnFactory)) + if b.Length() == 0 { + return + } + if tuples == nil { + tuples = newAppendOnlyBufferedBatch(testAllocator, typs, nil /* colsToStore */) + } + tuples.append(b, 0 /* startIdx */, b.Length()) }, }) typs := op.Typs() @@ -88,11 +104,32 @@ func TestSpillingQueue(t *testing.T) { ) } + if setInMemEnqueuesLimit { + q.testingKnobs.maxNumBatchesEnqueuedInMemory = numBatches / 2 + } + // Run verification. var ( - b coldata.Batch - err error + b coldata.Batch + err error + numAlreadyDequeuedTuples int ) + + windowedBatch := coldata.NewMemBatchNoCols(typs, coldata.BatchSize()) + getNextWindowIntoTuples := func(windowLen int) coldata.Batch { + // makeWindowIntoBatch creates a window into tuples in the range + // [numAlreadyDequeuedTuples; tuples.length), but we want the + // range [numAlreadyDequeuedTuples; numAlreadyDequeuedTuples + + // windowLen), so we'll temporarily set the length of tuples to + // the desired value and restore it below. + numTuples := tuples.Length() + tuples.SetLength(numAlreadyDequeuedTuples + windowLen) + makeWindowIntoBatch(windowedBatch, tuples, numAlreadyDequeuedTuples, typs) + tuples.SetLength(numTuples) + numAlreadyDequeuedTuples += windowLen + return windowedBatch + } + ctx := context.Background() for { b = op.Next(ctx) @@ -106,36 +143,29 @@ func TestSpillingQueue(t *testing.T) { } else if b.Length() == 0 { t.Fatal("queue incorrectly considered empty") } - coldata.AssertEquivalentBatches(t, batches[0], b) - batches = batches[1:] + coldata.AssertEquivalentBatches(t, getNextWindowIntoTuples(b.Length()), b) } } + numDequeuedTuplesBeforeReading := numAlreadyDequeuedTuples numReadIterations := 1 if rewindable { numReadIterations = 2 } for i := 0; i < numReadIterations; i++ { - batchIdx := 0 - for batches[batchIdx].Length() > 0 { + for { if b, err = q.dequeue(ctx); err != nil { t.Fatal(err) } else if b == nil { t.Fatal("unexpectedly dequeued nil batch") } else if b.Length() == 0 { - t.Fatal("queue incorrectly considered empty") + break } - coldata.AssertEquivalentBatches(t, batches[batchIdx], b) - batchIdx++ - } - - if b, err := q.dequeue(ctx); err != nil { - t.Fatal(err) - } else if b.Length() != 0 { - t.Fatal("queue should be empty") + coldata.AssertEquivalentBatches(t, getNextWindowIntoTuples(b.Length()), b) } if rewindable { require.NoError(t, q.rewind()) + numAlreadyDequeuedTuples = numDequeuedTuplesBeforeReading } } diff --git a/pkg/sql/logictest/testdata/logic_test/event_log b/pkg/sql/logictest/testdata/logic_test/event_log index 6d2b3befd651..8182ef6b3210 100644 --- a/pkg/sql/logictest/testdata/logic_test/event_log +++ b/pkg/sql/logictest/testdata/logic_test/event_log @@ -802,7 +802,7 @@ ORDER BY "timestamp", info ---- 1 alter_database_owner {"DatabaseName": "atest", "EventType": "alter_database_owner", "Owner": "u", "Statement": "ALTER DATABASE atest OWNER TO u", "User": "root"} 1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "u", "SchemaName": "sc", "Statement": "ALTER SCHEMA atest.sc OWNER TO u", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "u", "Statement": "ALTER TABLE atest.sc.t OWNER TO u", "TableName": "t", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "u", "Statement": "ALTER TABLE atest.sc.t OWNER TO u", "TableName": "atest.sc.t", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "u", "Statement": "ALTER TYPE atest.sc.ty OWNER TO u", "TypeName": "ty", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "u", "Statement": "ALTER TYPE atest.sc.ty OWNER TO u", "TypeName": "_ty", "User": "root"} @@ -841,9 +841,9 @@ ORDER BY "timestamp", info ---- 1 alter_database_owner {"DatabaseName": "atest", "EventType": "alter_database_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} 1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "v", "SchemaName": "sc", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "t", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "v", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "s", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.t", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.v", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.s", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TypeName": "ty", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TypeName": "_ty", "User": "root"} diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_overloads b/pkg/sql/logictest/testdata/logic_test/vectorize_overloads index 231d75d32c2e..3a69f33a4fcd 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_overloads +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_overloads @@ -709,3 +709,7 @@ SELECT _int, _int2, _int // _int2 FROM many_types WHERE _int2 <> 0 312 1 312 789 4 197 2 2 1 + +# Regression test for incorrectly propagating an error as internal (#57773). +statement error .* value out of range +SELECT ((-1.234E+401)::DECIMAL * '-53 years -10 mons -377 days -08:33:40.519057'::INTERVAL::INTERVAL)::INTERVAL FROM many_types