From 81e01727447f44e2b3832a560cd21a28db90b703 Mon Sep 17 00:00:00 2001 From: Eric Wang Date: Wed, 6 Jan 2021 12:58:01 -0500 Subject: [PATCH 1/3] sql: qualify table name for alter_table_owner event log Fixes https://github.com/cockroachdb/cockroach/issues/57960. Previously, event logs were not capturing the qualified table names for ALTER TABLE OWNER commands. This PR changes the event logs to use the qualified table name. Tests were fixed to reflect these changes. Release note (bug fix): qualify table name for alter_table_owner event log --- pkg/sql/alter_table.go | 9 ++++++--- pkg/sql/logictest/testdata/logic_test/event_log | 8 ++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index fd2f2cc7a672..c4075159bf2f 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -1290,12 +1290,15 @@ func (p *planner) checkCanAlterTableAndSetNewOwner( privs := desc.GetPrivileges() privs.SetOwner(newOwner) + tn, err := p.getQualifiedTableName(ctx, desc) + if err != nil { + return err + } + return p.logEvent(ctx, desc.ID, &eventpb.AlterTableOwner{ - // TODO(knz): Properly qualify this. - // See: https://github.com/cockroachdb/cockroach/issues/57960 - TableName: desc.Name, + TableName: tn.String(), Owner: newOwner.Normalized(), }) } diff --git a/pkg/sql/logictest/testdata/logic_test/event_log b/pkg/sql/logictest/testdata/logic_test/event_log index 6d2b3befd651..8182ef6b3210 100644 --- a/pkg/sql/logictest/testdata/logic_test/event_log +++ b/pkg/sql/logictest/testdata/logic_test/event_log @@ -802,7 +802,7 @@ ORDER BY "timestamp", info ---- 1 alter_database_owner {"DatabaseName": "atest", "EventType": "alter_database_owner", "Owner": "u", "Statement": "ALTER DATABASE atest OWNER TO u", "User": "root"} 1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "u", "SchemaName": "sc", "Statement": "ALTER SCHEMA atest.sc OWNER TO u", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "u", "Statement": "ALTER TABLE atest.sc.t OWNER TO u", "TableName": "t", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "u", "Statement": "ALTER TABLE atest.sc.t OWNER TO u", "TableName": "atest.sc.t", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "u", "Statement": "ALTER TYPE atest.sc.ty OWNER TO u", "TypeName": "ty", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "u", "Statement": "ALTER TYPE atest.sc.ty OWNER TO u", "TypeName": "_ty", "User": "root"} @@ -841,9 +841,9 @@ ORDER BY "timestamp", info ---- 1 alter_database_owner {"DatabaseName": "atest", "EventType": "alter_database_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} 1 alter_schema_owner {"EventType": "alter_schema_owner", "Owner": "v", "SchemaName": "sc", "Statement": "REASSIGN OWNED BY testuser TO v", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "t", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "v", "User": "root"} -1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "s", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.t", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.v", "User": "root"} +1 alter_table_owner {"EventType": "alter_table_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TableName": "atest.sc.s", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TypeName": "ty", "User": "root"} 1 alter_type_owner {"EventType": "alter_type_owner", "Owner": "v", "Statement": "REASSIGN OWNED BY testuser TO v", "TypeName": "_ty", "User": "root"} From 777910e276cf3b16591595502e7021ed2aa93a96 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 11 Jan 2021 15:29:28 -0800 Subject: [PATCH 2/3] colexec: fix decimal/interval overload error propagation Release note (bug fix): CockroachDB could previously return an internal error when evaluating a binary expression between a Decimal and an Interval that required a cast to a Float when the value is out of range, and now a more user-friendly error is returned instead. --- .../colexec/execgen/cmd/execgen/overloads_bin.go | 4 ++-- pkg/sql/colexec/proj_const_left_ops.eg.go | 16 ++++++++-------- pkg/sql/colexec/proj_const_right_ops.eg.go | 16 ++++++++-------- pkg/sql/colexec/proj_non_const_ops.eg.go | 16 ++++++++-------- .../testdata/logic_test/vectorize_overloads | 4 ++++ 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go b/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go index d0d535b0207a..4f94fc8c88d9 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go +++ b/pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go @@ -714,7 +714,7 @@ func (c intervalDecimalCustomizer) getBinOpAssignFunc() assignFunc { return fmt.Sprintf(` f, err := %[3]s.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } %[1]s = %[2]s.MulFloat(f)`, targetElem, leftElem, rightElem) @@ -732,7 +732,7 @@ func (c decimalIntervalCustomizer) getBinOpAssignFunc() assignFunc { return fmt.Sprintf(` f, err := %[2]s.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } %[1]s = %[3]s.MulFloat(f)`, targetElem, leftElem, rightElem) diff --git a/pkg/sql/colexec/proj_const_left_ops.eg.go b/pkg/sql/colexec/proj_const_left_ops.eg.go index 5009e8a23f7f..bc7dc43804d8 100644 --- a/pkg/sql/colexec/proj_const_left_ops.eg.go +++ b/pkg/sql/colexec/proj_const_left_ops.eg.go @@ -11673,7 +11673,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -11689,7 +11689,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -11709,7 +11709,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -11722,7 +11722,7 @@ func (p projMultDecimalConstIntervalOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14565,7 +14565,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14581,7 +14581,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14601,7 +14601,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14614,7 +14614,7 @@ func (p projMultIntervalConstDecimalOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } diff --git a/pkg/sql/colexec/proj_const_right_ops.eg.go b/pkg/sql/colexec/proj_const_right_ops.eg.go index 8f44266de5f7..2d2a5c05a6bb 100644 --- a/pkg/sql/colexec/proj_const_right_ops.eg.go +++ b/pkg/sql/colexec/proj_const_right_ops.eg.go @@ -11674,7 +11674,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -11690,7 +11690,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -11710,7 +11710,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -11723,7 +11723,7 @@ func (p projMultDecimalIntervalConstOp) Next(ctx context.Context) coldata.Batch f, err := arg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = p.constArg.MulFloat(f) } @@ -14566,7 +14566,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14582,7 +14582,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14602,7 +14602,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } @@ -14615,7 +14615,7 @@ func (p projMultIntervalDecimalConstOp) Next(ctx context.Context) coldata.Batch f, err := p.constArg.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg.MulFloat(f) } diff --git a/pkg/sql/colexec/proj_non_const_ops.eg.go b/pkg/sql/colexec/proj_non_const_ops.eg.go index 8c73b9b35cd0..ea28524bd734 100644 --- a/pkg/sql/colexec/proj_non_const_ops.eg.go +++ b/pkg/sql/colexec/proj_non_const_ops.eg.go @@ -12405,7 +12405,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -12425,7 +12425,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -12446,7 +12446,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -12462,7 +12462,7 @@ func (p projMultDecimalIntervalOp) Next(ctx context.Context) coldata.Batch { f, err := arg1.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg2.MulFloat(f) } @@ -15473,7 +15473,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } @@ -15493,7 +15493,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } @@ -15514,7 +15514,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } @@ -15530,7 +15530,7 @@ func (p projMultIntervalDecimalOp) Next(ctx context.Context) coldata.Batch { f, err := arg2.Float64() if err != nil { - colexecerror.InternalError(err) + colexecerror.ExpectedError(err) } projCol[i] = arg1.MulFloat(f) } diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize_overloads b/pkg/sql/logictest/testdata/logic_test/vectorize_overloads index 231d75d32c2e..3a69f33a4fcd 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize_overloads +++ b/pkg/sql/logictest/testdata/logic_test/vectorize_overloads @@ -709,3 +709,7 @@ SELECT _int, _int2, _int // _int2 FROM many_types WHERE _int2 <> 0 312 1 312 789 4 197 2 2 1 + +# Regression test for incorrectly propagating an error as internal (#57773). +statement error .* value out of range +SELECT ((-1.234E+401)::DECIMAL * '-53 years -10 mons -377 days -08:33:40.519057'::INTERVAL::INTERVAL)::INTERVAL FROM many_types From e2d602e7bad91e9154890e957079a93d0ad3e12f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 16 Dec 2020 18:17:58 -0800 Subject: [PATCH 3/3] colexec: fix spilling queue This commit refactors `enqueue` method of the spilling queue to deep-copy the passed-in batches if they are kept in memory. Previous behavior was suboptimal because it was forcing the caller to always allocate a new batch. Additionally, the spilling queue will now perform a coalescing step by attempting to append as many tuples to the tail in-memory batch as possible. The in-memory batches are allocated with dynamically increasing capacity. This allows us to significantly simplify the code of the router outputs which were performing the coalescing step previously. Additionally, this commit fixes a couple of uses of `enqueue` method (the router outputs and the merge joiner) in which they forgot to enqueue a zero-length batch which is necessary when the disk queue is initialized. Release note: None --- pkg/col/coldata/testutils.go | 22 ++- pkg/sql/colcontainer/partitionedqueue.go | 1 + pkg/sql/colexec/external_sort.go | 3 + pkg/sql/colexec/mergejoiner.go | 48 +++-- pkg/sql/colexec/relative_rank.eg.go | 124 +++---------- pkg/sql/colexec/relative_rank_tmpl.go | 38 +--- pkg/sql/colexec/routers.go | 215 +++++------------------ pkg/sql/colexec/routers_test.go | 53 ++++-- pkg/sql/colexec/spilling_queue.go | 156 ++++++++++++++-- pkg/sql/colexec/spilling_queue_test.go | 70 +++++--- 10 files changed, 354 insertions(+), 376 deletions(-) diff --git a/pkg/col/coldata/testutils.go b/pkg/col/coldata/testutils.go index 38ff6f53eeb9..a67c8242976e 100644 --- a/pkg/col/coldata/testutils.go +++ b/pkg/col/coldata/testutils.go @@ -56,11 +56,21 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) { expectedVec := expected.ColVec(colIdx) actualVec := actual.ColVec(colIdx) require.Equal(t, expectedVec.Type(), actualVec.Type()) - require.Equal( - t, - expectedVec.Nulls().Slice(0, expected.Length()), - actualVec.Nulls().Slice(0, actual.Length()), - ) + // Check whether the nulls bitmaps are the same. Note that we don't + // track precisely the fact whether nulls are present or not in + // 'maybeHasNulls' field, so we override it manually to be 'true' for + // both nulls vectors if it is 'true' for at least one of them. This is + // acceptable since we still check the bitmaps precisely. + expectedNulls := expectedVec.Nulls() + actualNulls := actualVec.Nulls() + oldExpMaybeHasNulls, oldActMaybeHasNulls := expectedNulls.maybeHasNulls, actualNulls.maybeHasNulls + defer func() { + expectedNulls.maybeHasNulls, actualNulls.maybeHasNulls = oldExpMaybeHasNulls, oldActMaybeHasNulls + }() + expectedNulls.maybeHasNulls = expectedNulls.maybeHasNulls || actualNulls.maybeHasNulls + actualNulls.maybeHasNulls = expectedNulls.maybeHasNulls || actualNulls.maybeHasNulls + require.Equal(t, expectedNulls.Slice(0, expected.Length()), actualNulls.Slice(0, actual.Length())) + canonicalTypeFamily := expectedVec.CanonicalTypeFamily() if canonicalTypeFamily == types.BytesFamily { // Cannot use require.Equal for this type. @@ -95,7 +105,7 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) { t.Fatalf("Interval mismatch at index %d:\nexpected:\n%sactual:\n%s", i, expectedInterval[i], resultInterval[i]) } } - } else if expectedVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily { + } else if canonicalTypeFamily == typeconv.DatumVecCanonicalTypeFamily { // Cannot use require.Equal for this type. expectedDatum := expectedVec.Datum().Slice(0 /* start */, expected.Length()) resultDatum := actualVec.Datum().Slice(0 /* start */, actual.Length()) diff --git a/pkg/sql/colcontainer/partitionedqueue.go b/pkg/sql/colcontainer/partitionedqueue.go index 6e969c130f7f..c4fe7c64f94e 100644 --- a/pkg/sql/colcontainer/partitionedqueue.go +++ b/pkg/sql/colcontainer/partitionedqueue.go @@ -27,6 +27,7 @@ type PartitionedQueue interface { // partition at that index does not exist, a new one is created. Existing // partitions may not be Enqueued to after calling // CloseAllOpenWriteFileDescriptors. + // WARNING: Selection vectors are ignored. Enqueue(ctx context.Context, partitionIdx int, batch coldata.Batch) error // Dequeue removes and returns the batch from the front of the // partitionIdx'th partition. If the partition is empty, or no partition at diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 40a0d9d02497..bed7fb6f2f12 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -262,6 +262,9 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch { if s.partitioner == nil { s.partitioner = s.partitionerCreator() } + // Note that b will never have a selection vector set because the + // allSpooler performs a deselection when buffering up the tuples, + // and the in-memory sorter has allSpooler as its input. if err := s.partitioner.Enqueue(ctx, newPartitionIdx, b); err != nil { colexecerror.InternalError(err) } diff --git a/pkg/sql/colexec/mergejoiner.go b/pkg/sql/colexec/mergejoiner.go index adf96210f6a5..10082c4a7bac 100644 --- a/pkg/sql/colexec/mergejoiner.go +++ b/pkg/sql/colexec/mergejoiner.go @@ -107,8 +107,9 @@ type mjBuilderCrossProductState struct { type mjBufferedGroup struct { *spillingQueue // firstTuple stores a single tuple that was first in the buffered group. - firstTuple []coldata.Vec - numTuples int + firstTuple []coldata.Vec + numTuples int + scratchBatch coldata.Batch } func (bg *mjBufferedGroup) reset(ctx context.Context) { @@ -546,6 +547,9 @@ func (o *mergeJoinBase) Init() { o.proberState.lBufferedGroup.firstTuple = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( o.left.sourceTypes, 1, /* capacity */ ).ColVecs() + o.proberState.lBufferedGroup.scratchBatch = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( + o.left.sourceTypes, coldata.BatchSize(), + ) o.proberState.rBufferedGroup.spillingQueue = newRewindableSpillingQueue( o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit, o.diskQueueCfg, o.fdSemaphore, o.diskAcc, @@ -553,6 +557,9 @@ func (o *mergeJoinBase) Init() { o.proberState.rBufferedGroup.firstTuple = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( o.right.sourceTypes, 1, /* capacity */ ).ColVecs() + o.proberState.rBufferedGroup.scratchBatch = o.unlimitedAllocator.NewMemBatchWithFixedCapacity( + o.right.sourceTypes, coldata.BatchSize(), + ) o.builderState.lGroups = make([]group, 1) o.builderState.rGroups = make([]group, 1) @@ -572,6 +579,8 @@ func (o *mergeJoinBase) resetBuilderCrossProductState() { // same group as the ones in the buffered group that corresponds to the input // source. This needs to happen when a group starts at the end of an input // batch and can continue into the following batches. +// A zero-length batch needs to be appended when no more batches will be +// appended to the buffered group. func (o *mergeJoinBase) appendToBufferedGroup( ctx context.Context, input *mergeJoinInput, @@ -580,9 +589,6 @@ func (o *mergeJoinBase) appendToBufferedGroup( groupStartIdx int, groupLength int, ) { - if groupLength == 0 { - return - } var ( bufferedGroup *mjBufferedGroup sourceTypes []*types.T @@ -594,9 +600,14 @@ func (o *mergeJoinBase) appendToBufferedGroup( sourceTypes = o.right.sourceTypes bufferedGroup = &o.proberState.rBufferedGroup } - // TODO(yuzefovich): reuse the same scratch batches when spillingQueue - // actually copies the enqueued batch when those are kept in memory. - scratchBatch := o.unlimitedAllocator.NewMemBatchWithFixedCapacity(sourceTypes, groupLength) + if batch.Length() == 0 || groupLength == 0 { + // We have finished appending to this buffered group, so we need to + // enqueue a zero-length batch per the contract of the spilling queue. + if err := bufferedGroup.enqueue(ctx, coldata.ZeroBatch); err != nil { + colexecerror.InternalError(err) + } + return + } if bufferedGroup.numTuples == 0 { o.unlimitedAllocator.PerformOperation(bufferedGroup.firstTuple, func() { for colIdx := range sourceTypes { @@ -616,9 +627,10 @@ func (o *mergeJoinBase) appendToBufferedGroup( } bufferedGroup.numTuples += groupLength - o.unlimitedAllocator.PerformOperation(scratchBatch.ColVecs(), func() { + bufferedGroup.scratchBatch.ResetInternalBatch() + o.unlimitedAllocator.PerformOperation(bufferedGroup.scratchBatch.ColVecs(), func() { for colIdx := range input.sourceTypes { - scratchBatch.ColVec(colIdx).Copy( + bufferedGroup.scratchBatch.ColVec(colIdx).Copy( coldata.CopySliceArgs{ SliceArgs: coldata.SliceArgs{ Src: batch.ColVec(colIdx), @@ -630,10 +642,9 @@ func (o *mergeJoinBase) appendToBufferedGroup( }, ) } + bufferedGroup.scratchBatch.SetLength(groupLength) }) - scratchBatch.SetSelection(false) - scratchBatch.SetLength(groupLength) - if err := bufferedGroup.enqueue(ctx, scratchBatch); err != nil { + if err := bufferedGroup.enqueue(ctx, bufferedGroup.scratchBatch); err != nil { colexecerror.InternalError(err) } } @@ -681,6 +692,15 @@ func (o *mergeJoinBase) sourceFinished() bool { return o.proberState.lLength == 0 || o.proberState.rLength == 0 } +// finishBufferedGroup appends a zero-length batch to the buffered group which +// is required by the contract of the spilling queue. +func (o *mergeJoinBase) finishBufferedGroup(ctx context.Context, input *mergeJoinInput) { + o.appendToBufferedGroup( + ctx, input, coldata.ZeroBatch, nil, /* sel */ + 0 /* groupStartIdx */, 0, /* groupLength */ + ) +} + // completeBufferedGroup extends the buffered group corresponding to input. // First, we check that the first row in batch is still part of the same group. // If this is the case, we use the Distinct operator to find the first @@ -696,6 +716,7 @@ func (o *mergeJoinBase) completeBufferedGroup( ) (_ coldata.Batch, idx int, batchLength int) { batchLength = batch.Length() if o.isBufferedGroupFinished(input, batch, rowIdx) { + o.finishBufferedGroup(ctx, input) return batch, rowIdx, batchLength } @@ -752,6 +773,7 @@ func (o *mergeJoinBase) completeBufferedGroup( if batchLength == 0 { // The input has been exhausted, so the buffered group is now complete. isBufferedGroupComplete = true + o.finishBufferedGroup(ctx, input) } } } diff --git a/pkg/sql/colexec/relative_rank.eg.go b/pkg/sql/colexec/relative_rank.eg.go index 4c13716301e6..f291867efff7 100644 --- a/pkg/sql/colexec/relative_rank.eg.go +++ b/pkg/sql/colexec/relative_rank.eg.go @@ -181,6 +181,7 @@ func (r *percentRankNoPartitionOp) Init() { int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) // All rank functions start counting from 1. Before we assign the rank to a // tuple in the batch, we first increment r.rank, so setting this @@ -236,10 +237,7 @@ func (r *percentRankNoPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -380,12 +378,14 @@ func (r *percentRankWithPartitionOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.bufferedTuples = newSpillingQueue( r.allocator, r.inputTypes, int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) // All rank functions start counting from 1. Before we assign the rank to a // tuple in the batch, we first increment r.rank, so setting this @@ -433,12 +433,6 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running partitions // sizes, including the very last partition. - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ @@ -461,10 +455,7 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -495,13 +486,6 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -512,8 +496,8 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -527,13 +511,6 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -544,8 +521,8 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -698,12 +675,14 @@ func (r *cumeDistNoPartitionOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.bufferedTuples = newSpillingQueue( r.allocator, r.inputTypes, int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) } @@ -746,12 +725,6 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ @@ -772,10 +745,7 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -810,13 +780,6 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -827,8 +790,8 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -842,13 +805,6 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -859,8 +815,8 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -1001,18 +957,21 @@ func (r *cumeDistWithPartitionOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.peerGroupsState.spillingQueue = newSpillingQueue( r.allocator, []*types.T{types.Int}, int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction r.bufferedTuples = newSpillingQueue( r.allocator, r.inputTypes, int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) } @@ -1055,12 +1014,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running partitions // sizes, including the very last partition. - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ @@ -1073,12 +1026,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { } // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ @@ -1101,10 +1048,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( @@ -1135,13 +1079,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -1152,8 +1089,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -1167,13 +1104,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -1184,8 +1114,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -1205,13 +1135,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -1222,8 +1145,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -1237,13 +1160,6 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -1254,8 +1170,8 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } diff --git a/pkg/sql/colexec/relative_rank_tmpl.go b/pkg/sql/colexec/relative_rank_tmpl.go index ed4eddf958ee..07e7c972e22d 100644 --- a/pkg/sql/colexec/relative_rank_tmpl.go +++ b/pkg/sql/colexec/relative_rank_tmpl.go @@ -143,13 +143,6 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}} // We have encountered a start of a new partition, so we // need to save the computed size of the previous one // (if there was one). - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPartitionsSizesCol = r.partitionsState.runningSizes.ColVec(0).Int64() - } if r.numTuplesInPartition > 0 { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.numTuplesInPartition = 0 @@ -160,8 +153,8 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}} if err := r.partitionsState.enqueue(ctx, r.partitionsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.partitionsState.runningSizes = nil r.partitionsState.idx = 0 + r.partitionsState.runningSizes.ResetInternalBatch() } } } @@ -184,13 +177,6 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}} // We have encountered a start of a new peer group, so we // need to save the computed size of the previous one // (if there was one). - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - runningPeerGroupsSizesCol = r.peerGroupsState.runningSizes.ColVec(0).Int64() - } if r.numPeers > 0 { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.numPeers = 0 @@ -201,8 +187,8 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}} if err := r.peerGroupsState.enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { colexecerror.InternalError(err) } - r.peerGroupsState.runningSizes = nil r.peerGroupsState.idx = 0 + r.peerGroupsState.runningSizes.ResetInternalBatch() } } } @@ -291,6 +277,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction // {{end}} // {{if .IsCumeDist}} @@ -299,6 +286,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() { int64(float64(r.memoryLimit)*relativeRankUtilityQueueMemLimitFraction), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) usedMemoryLimitFraction += relativeRankUtilityQueueMemLimitFraction // {{end}} r.bufferedTuples = newSpillingQueue( @@ -306,6 +294,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Init() { int64(float64(r.memoryLimit)*(1.0-usedMemoryLimitFraction)), r.diskQueueCfg, r.fdSemaphore, r.diskAcc, ) + r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, coldata.BatchSize()) r.output = r.allocator.NewMemBatchWithFixedCapacity(append(r.inputTypes, types.Float), coldata.BatchSize()) // {{if .IsPercentRank}} // All rank functions start counting from 1. Before we assign the rank to a @@ -360,12 +349,6 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { // {{if .HasPartition}} // We need to flush the last vector of the running partitions // sizes, including the very last partition. - if r.partitionsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.partitionsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ @@ -380,12 +363,6 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { // {{if .IsCumeDist}} // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. - if r.peerGroupsState.runningSizes == nil { - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.peerGroupsState.runningSizes = r.allocator.NewMemBatchWithFixedCapacity([]*types.T{types.Int}, coldata.BatchSize()) - } runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ @@ -414,10 +391,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { sel := batch.Selection() // First, we buffer up all of the tuples. - // TODO(yuzefovich): do not instantiate a new batch here once - // spillingQueues actually copy the batches when those are kept - // in-memory. - r.scratch = r.allocator.NewMemBatchWithFixedCapacity(r.inputTypes, n) + r.scratch.ResetInternalBatch() r.allocator.PerformOperation(r.scratch.ColVecs(), func() { for colIdx, vec := range r.scratch.ColVecs() { vec.Copy( diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 73c4d1ad2349..7a6b2ec8c60f 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -37,10 +37,10 @@ type routerOutput interface { // initWithHashRouter passes a reference to the HashRouter that will be // pushing batches to this output. initWithHashRouter(*HashRouter) - // addBatch adds the elements specified by the selection vector from batch to - // the output. It returns whether or not the output changed its state to + // addBatch adds the elements specified by the selection vector from batch + // to the output. It returns whether or not the output changed its state to // blocked (see implementations). - addBatch(context.Context, coldata.Batch, []int) bool + addBatch(context.Context, coldata.Batch) bool // cancel tells the output to stop producing batches. Optionally forwards an // error if not nil. cancel(context.Context, error) @@ -66,10 +66,15 @@ type routerOutputOpState int const ( // routerOutputOpRunning is the state in which routerOutputOp operates - // normally. The router output transitions into the draining state when - // either it is finished (when a zero-length batch was added or when it was - // canceled) or it encounters an error. + // normally. The router output transitions into routerOutputDoneAdding when + // a zero-length batch was added or routerOutputOpDraining when it + // encounters an error or the drain is requested. routerOutputOpRunning routerOutputOpState = iota + // routerOutputDoneAdding is the state in which a zero-length was batch was + // added to routerOutputOp and no more batches will be added. The router + // output transitions to routerOutputOpDraining when the output is canceled + // (either closed or the drain is requested). + routerOutputDoneAdding // routerOutputOpDraining is the state in which routerOutputOp always // returns zero-length batches on calls to Next. routerOutputOpDraining @@ -110,55 +115,13 @@ type routerOutputOp struct { // forwardedErr is an error that was forwarded by the HashRouter. If set, // any subsequent calls to Next will return this error. forwardedErr error - // unlimitedAllocator tracks the memory usage of this router output, - // providing a signal for when it should spill to disk. - // The memory lifecycle is as follows: - // - // o.mu.pendingBatch is allocated as a "staging" area. Tuples are copied - // into it in addBatch. - // A read may come in in this state, in which case pendingBatch is returned - // and references to it are removed. Since batches are unsafe for reuse, - // the batch is also manually released from the allocator. - // If a read does not come in and the batch becomes full of tuples, that - // batch is stored in o.mu.data, which is a queue with an in-memory circular - // buffer backed by disk. If the batch fits in memory, a reference to it - // is retained and a new pendingBatch is allocated. - // - // If a read comes in at this point, the batch is dequeued from o.mu.data - // and returned, but the memory is still accounted for. In fact, memory use - // increases up to when o.mu.data is full and must spill to disk. - // Once it spills to disk, the spillingQueue (o.mu.data), will release - // batches it spills to disk to stop accounting for them. - // The tricky part comes when o.mu.data is dequeued from. In this case, the - // reference for a previously-returned batch is overwritten with an on-disk - // batch, so the memory for the overwritten batch is released, while the - // new batch's memory is retained. Note that if batches are being dequeued - // from disk, it must be the case that the circular buffer is now empty, - // holding references to batches that have been previously returned. - // - // In short, batches whose references are retained are also retained in the - // allocator, but if any references are overwritten or lost, those batches - // are released. - unlimitedAllocator *colmem.Allocator - cond *sync.Cond - // pendingBatch is a partially-filled batch with data added through - // addBatch. Once this batch reaches capacity, it is flushed to data. The - // main use of pendingBatch is coalescing various fragmented batches into - // one. - pendingBatch coldata.Batch + cond *sync.Cond // data is a spillingQueue, a circular buffer backed by a disk queue. data *spillingQueue numUnread int blocked bool } - // pendingBatchCapacity indicates the capacity which the new mu.pendingBatch - // should be allocated with. It'll increase dynamically until - // coldata.BatchSize(). We need to track it ourselves since the pending - // batch ownership is given to the spillingQueue, so when using - // ResetMaybeReallocate, we don't the old batch to check the capacity of. - pendingBatchCapacity int - testingKnobs routerOutputOpTestingKnobs } @@ -183,9 +146,6 @@ type routerOutputOpTestingKnobs struct { // defaultRouterOutputBlockedThreshold but can be modified by tests to test // edge cases. blockedThreshold int - // alwaysFlush, if set to true, will always flush o.mu.pendingBatch to - // o.mu.data. - alwaysFlush bool // addBatchTestInducedErrorCb is called after any function call that could // produce an error if that error is nil. If the callback returns an error, // the router output overwrites the nil error with the returned error. @@ -233,7 +193,6 @@ func newRouterOutputOp(args routerOutputOpArgs) *routerOutputOp { unblockedEventsChan: args.unblockedEventsChan, testingKnobs: args.testingKnobs, } - o.mu.unlimitedAllocator = args.unlimitedAllocator o.mu.cond = sync.NewCond(&o.mu) o.mu.data = newSpillingQueue( args.unlimitedAllocator, @@ -266,7 +225,7 @@ func (o *routerOutputOp) nextErrorLocked(ctx context.Context, err error) { func (o *routerOutputOp) Next(ctx context.Context) coldata.Batch { o.mu.Lock() defer o.mu.Unlock() - for o.mu.forwardedErr == nil && o.mu.state == routerOutputOpRunning && o.mu.pendingBatch == nil && o.mu.data.empty() { + for o.mu.forwardedErr == nil && o.mu.state == routerOutputOpRunning && o.mu.data.empty() { // Wait until there is data to read or the output is canceled. o.mu.cond.Wait() } @@ -276,23 +235,12 @@ func (o *routerOutputOp) Next(ctx context.Context) coldata.Batch { if o.mu.state == routerOutputOpDraining { return coldata.ZeroBatch } - var b coldata.Batch - if o.mu.pendingBatch != nil && o.mu.data.empty() { - // o.mu.data is empty (i.e. nothing has been flushed to the spillingQueue), - // but there is a o.mu.pendingBatch that has not been flushed yet. Return - // this batch directly. - b = o.mu.pendingBatch - o.mu.unlimitedAllocator.ReleaseBatch(b) - o.mu.pendingBatch = nil - } else { - var err error - b, err = o.mu.data.dequeue(ctx) - if err == nil && o.testingKnobs.nextTestInducedErrorCb != nil { - err = o.testingKnobs.nextTestInducedErrorCb() - } - if err != nil { - o.nextErrorLocked(ctx, err) - } + b, err := o.mu.data.dequeue(ctx) + if err == nil && o.testingKnobs.nextTestInducedErrorCb != nil { + err = o.testingKnobs.nextTestInducedErrorCb() + } + if err != nil { + o.nextErrorLocked(ctx, err) } o.mu.numUnread -= b.Length() if o.mu.numUnread <= o.testingKnobs.blockedThreshold { @@ -362,113 +310,38 @@ func (o *routerOutputOp) forwardErr(err error) { o.mu.cond.Signal() } -// addBatch copies the columns in batch according to selection into an internal -// buffer. -// The routerOutputOp only adds the elements specified by selection. Therefore, -// an empty selection slice will add no elements. Note that the selection vector -// on the batch is ignored. This is so that callers of addBatch can push the -// same batch with different selection vectors to many different outputs. -// True is returned if the output changes state to blocked (note: if the -// output is already blocked, false is returned). +// addBatch copies the batch (according to its selection vector) into an +// internal buffer. Zero-length batch should be passed-in to indicate that no +// more batches will be added. // TODO(asubiotto): We should explore pipelining addBatch if disk-spilling // performance becomes a concern. The main router goroutine will be writing to // disk as the code is written, meaning that we impact the performance of // writing rows to a fast output if we have to write to disk for a single // slow output. -func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch, selection []int) bool { - if len(selection) > batch.Length() { - selection = selection[:batch.Length()] - } +func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch) bool { o.mu.Lock() defer o.mu.Unlock() - if o.mu.state == routerOutputOpDraining { + switch o.mu.state { + case routerOutputDoneAdding: + colexecerror.InternalError(errors.AssertionFailedf("a batch was added to routerOutput in DoneAdding state")) + case routerOutputOpDraining: // This output is draining, discard any data. return false } - if batch.Length() == 0 { - if o.mu.pendingBatch != nil { - err := o.mu.data.enqueue(ctx, o.mu.pendingBatch) - if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { - err = o.testingKnobs.addBatchTestInducedErrorCb() - } - if err != nil { - colexecerror.InternalError(err) - } - } else if o.testingKnobs.addBatchTestInducedErrorCb != nil { - // This is the last chance to run addBatchTestInducedErorCb if it has - // been set. - if err := o.testingKnobs.addBatchTestInducedErrorCb(); err != nil { - colexecerror.InternalError(err) - } - } - o.mu.pendingBatch = coldata.ZeroBatch - o.mu.cond.Signal() - return false + o.mu.numUnread += batch.Length() + err := o.mu.data.enqueue(ctx, batch) + if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { + err = o.testingKnobs.addBatchTestInducedErrorCb() } - - if len(selection) == 0 { - // Non-zero batch with no selection vector. Nothing to do. - return false + if err != nil { + colexecerror.InternalError(err) } - // Increment o.mu.numUnread before going into the loop, as we will consume - // selection. - o.mu.numUnread += len(selection) - - for toAppend := len(selection); toAppend > 0; { - if o.mu.pendingBatch == nil { - if o.pendingBatchCapacity < coldata.BatchSize() { - // We still haven't reached the maximum capacity, so let's - // calculate the next capacity to use. - if o.pendingBatchCapacity == 0 { - // This is the first set of tuples that are added to this - // router output, so we'll allocate the batch with just - // enough capacity to fill all of these tuples. - o.pendingBatchCapacity = len(selection) - } else { - o.pendingBatchCapacity *= 2 - } - } - // Note: we could have used NewMemBatchWithFixedCapacity here, but - // we choose not to in order to indicate that the capacity of the - // pending batches has dynamic behavior. - o.mu.pendingBatch, _ = o.mu.unlimitedAllocator.ResetMaybeReallocate(o.types, nil /* oldBatch */, o.pendingBatchCapacity) - } - available := o.mu.pendingBatch.Capacity() - o.mu.pendingBatch.Length() - numAppended := toAppend - if toAppend > available { - numAppended = available - } - o.mu.unlimitedAllocator.PerformOperation(o.mu.pendingBatch.ColVecs(), func() { - for i := range o.types { - o.mu.pendingBatch.ColVec(i).Copy( - coldata.CopySliceArgs{ - SliceArgs: coldata.SliceArgs{ - Src: batch.ColVec(i), - Sel: selection[:numAppended], - DestIdx: o.mu.pendingBatch.Length(), - SrcEndIdx: numAppended, - }, - }, - ) - } - }) - newLength := o.mu.pendingBatch.Length() + numAppended - o.mu.pendingBatch.SetLength(newLength) - if o.testingKnobs.alwaysFlush || newLength >= o.mu.pendingBatch.Capacity() { - // The capacity in o.mu.pendingBatch has been filled. - err := o.mu.data.enqueue(ctx, o.mu.pendingBatch) - if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { - err = o.testingKnobs.addBatchTestInducedErrorCb() - } - if err != nil { - colexecerror.InternalError(err) - } - o.mu.pendingBatch = nil - } - toAppend -= numAppended - selection = selection[numAppended:] + if batch.Length() == 0 { + o.mu.state = routerOutputDoneAdding + o.mu.cond.Signal() + return false } stateChanged := false @@ -500,7 +373,6 @@ func (o *routerOutputOp) resetForTests(ctx context.Context) { o.mu.data.reset(ctx) o.mu.numUnread = 0 o.mu.blocked = false - o.pendingBatchCapacity = 0 } // hashRouterDrainState is a state that specifically describes the hashRouter's @@ -752,16 +624,21 @@ func (r *HashRouter) processNextBatch(ctx context.Context) bool { // Done. Push an empty batch to outputs to tell them the data is done as // well. for _, o := range r.outputs { - o.addBatch(ctx, b, nil) + o.addBatch(ctx, b) } return true } selections := r.tupleDistributor.distribute(ctx, b, r.hashCols) for i, o := range r.outputs { - if o.addBatch(ctx, b, selections[i]) { - // This batch blocked the output. - r.numBlockedOutputs++ + if len(selections[i]) > 0 { + b.SetSelection(true) + copy(b.Selection(), selections[i]) + b.SetLength(len(selections[i])) + if o.addBatch(ctx, b) { + // This batch blocked the output. + r.numBlockedOutputs++ + } } } return false diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index 327f76e87357..4e68e099921e 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -54,7 +54,7 @@ type memoryTestCase struct { expSpill bool } -// getDiskqueueCfgAndMemoryTestCases is a test helper that creates an in-memory +// getDiskQueueCfgAndMemoryTestCases is a test helper that creates an in-memory // DiskQueueCfg that can be used to create a new DiskQueue. A cleanup function // is also returned as well as some default memory limits that are useful to // test with: 0 for an immediate spill, a random memory limit up to 64 MiB, and @@ -89,6 +89,19 @@ func getDataAndFullSelection() (tuples, []*types.T, []int) { return data, []*types.T{types.Int}, fullSelection } +// pushSelectionIntoBatch updates b in-place to have the provided selection +// vector while setting the length to minimum of b's length and len(selection). +// Zero-length batch is not modified. +func pushSelectionIntoBatch(b coldata.Batch, selection []int) { + if b.Length() > 0 { + b.SetSelection(true) + copy(b.Selection(), selection) + if len(selection) < b.Length() { + b.SetLength(len(selection)) + } + } +} + func TestRouterOutputAddBatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -173,7 +186,8 @@ func TestRouterOutputAddBatch(t *testing.T) { in.Init() for { b := in.Next(ctx) - o.addBatch(ctx, b, tc.selection) + pushSelectionIntoBatch(b, tc.selection) + o.addBatch(ctx, b) if b.Length() == 0 { break } @@ -216,7 +230,8 @@ func TestRouterOutputNext(t *testing.T) { unblockEvent: func(in colexecbase.Operator, o *routerOutputOp) { for { b := in.Next(ctx) - o.addBatch(ctx, b, fullSelection) + pushSelectionIntoBatch(b, fullSelection) + o.addBatch(ctx, b) if b.Length() == 0 { break } @@ -229,7 +244,7 @@ func TestRouterOutputNext(t *testing.T) { // ReaderWaitsForZeroBatch verifies that a reader blocking on Next will // also get unblocked with no data other than the zero batch. unblockEvent: func(_ colexecbase.Operator, o *routerOutputOp) { - o.addBatch(ctx, coldata.ZeroBatch, nil /* selection */) + o.addBatch(ctx, coldata.ZeroBatch) }, expected: tuples{}, name: "ReaderWaitsForZeroBatch", @@ -332,7 +347,7 @@ func TestRouterOutputNext(t *testing.T) { unblockedEventsChan: unblockedEventsChan, }, ) - o.addBatch(ctx, coldata.ZeroBatch, fullSelection) + o.addBatch(ctx, coldata.ZeroBatch) o.Next(ctx) o.Next(ctx) select { @@ -391,19 +406,22 @@ func TestRouterOutputNext(t *testing.T) { b := in.Next(ctx) // Make sure the output doesn't consider itself blocked. We're right at the // limit but not over. - if o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if o.addBatch(ctx, b) { t.Fatal("unexpectedly blocked") } b = in.Next(ctx) // This addBatch call should now block the output. - if !o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if !o.addBatch(ctx, b) { t.Fatal("unexpectedly still unblocked") } // Add the rest of the data. for { b = in.Next(ctx) - if o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if o.addBatch(ctx, b) { t.Fatal("should only return true when switching from unblocked to blocked") } if b.Length() == 0 { @@ -501,7 +519,8 @@ func TestRouterOutputRandom(t *testing.T) { } } - if o.addBatch(ctx, b, selection) { + pushSelectionIntoBatch(b, selection) + if o.addBatch(ctx, b) { if lastBlockedState { // We might have missed an unblock event during the last loop. select { @@ -575,7 +594,7 @@ func TestRouterOutputRandom(t *testing.T) { type callbackRouterOutput struct { colexecbase.ZeroInputNode - addBatchCb func(coldata.Batch, []int) bool + addBatchCb func(coldata.Batch) bool cancelCb func() forwardedErr error } @@ -584,11 +603,9 @@ var _ routerOutput = &callbackRouterOutput{} func (o *callbackRouterOutput) initWithHashRouter(*HashRouter) {} -func (o *callbackRouterOutput) addBatch( - _ context.Context, batch coldata.Batch, selection []int, -) bool { +func (o *callbackRouterOutput) addBatch(_ context.Context, batch coldata.Batch) bool { if o.addBatchCb != nil { - return o.addBatchCb(batch, selection) + return o.addBatchCb(batch) } return false } @@ -646,8 +663,8 @@ func TestHashRouterComputesDestination(t *testing.T) { // Capture the index. outputIdx := i outputs[i] = &callbackRouterOutput{ - addBatchCb: func(batch coldata.Batch, sel []int) bool { - for _, j := range sel { + addBatchCb: func(batch coldata.Batch) bool { + for _, j := range batch.Selection()[:batch.Length()] { key := batch.ColVec(0).Int64()[j] if _, ok := valsYetToSee[key]; !ok { t.Fatalf("pushed alread seen value to router output: %d", key) @@ -690,7 +707,7 @@ func TestHashRouterCancellation(t *testing.T) { numAddBatches := int64(0) for i := range outputs { outputs[i] = &callbackRouterOutput{ - addBatchCb: func(_ coldata.Batch, _ []int) bool { + addBatchCb: func(_ coldata.Batch) bool { atomic.AddInt64(&numAddBatches, 1) return false }, @@ -814,8 +831,6 @@ func TestHashRouterOneOutput(t *testing.T) { o := newOpTestOutput(routerOutputs[0], expected) ro := routerOutputs[0].(*routerOutputOp) - // Set alwaysFlush so that data is always flushed to the spillingQueue. - ro.testingKnobs.alwaysFlush = true var wg sync.WaitGroup wg.Add(1) diff --git a/pkg/sql/colexec/spilling_queue.go b/pkg/sql/colexec/spilling_queue.go index 4d2e178a8d47..6736e3420f26 100644 --- a/pkg/sql/colexec/spilling_queue.go +++ b/pkg/sql/colexec/spilling_queue.go @@ -54,16 +54,34 @@ type spillingQueue struct { numOnDiskItems int closed bool - diskQueueCfg colcontainer.DiskQueueCfg - diskQueue colcontainer.Queue - fdSemaphore semaphore.Semaphore - dequeueScratch coldata.Batch + // nextInMemBatchCapacity indicates the capacity which the new batch that + // we'll append to items should be allocated with. It'll increase + // dynamically until coldata.BatchSize(). + nextInMemBatchCapacity int + + diskQueueCfg colcontainer.DiskQueueCfg + diskQueue colcontainer.Queue + diskQueueDeselectionScratch coldata.Batch + fdSemaphore semaphore.Semaphore + dequeueScratch coldata.Batch rewindable bool rewindableState struct { numItemsDequeued int } + testingKnobs struct { + // numEnqueues tracks the number of times enqueue() has been called with + // non-zero batch. + numEnqueues int + // maxNumBatchesEnqueuedInMemory, if greater than 0, indicates the + // maximum number of batches that are attempted to be enqueued to the + // in-memory buffer 'items' (other limiting conditions might occur + // earlier). Once numEnqueues reaches this limit, all consequent calls + // to enqueue() will use the disk queue. + maxNumBatchesEnqueuedInMemory int + } + diskAcc *mon.BoundAccount } @@ -134,8 +152,16 @@ func newRewindableSpillingQueue( return q } +// enqueue adds the provided batch to the queue. Zero-length batch needs to be +// added as the last one. +// +// Passed-in batch is deeply copied, so it can safely reused by the caller. The +// spilling queue coalesces all input tuples into the batches of dynamically +// increasing capacity when those are kept in-memory. It also performs a +// deselection step if necessary when adding the batch to the disk queue. func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error { - if batch.Length() == 0 { + n := batch.Length() + if n == 0 { if q.diskQueue != nil { if err := q.diskQueue.Enqueue(ctx, batch); err != nil { return err @@ -143,18 +169,49 @@ func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error } return nil } + q.testingKnobs.numEnqueues++ - if q.numOnDiskItems > 0 || q.unlimitedAllocator.Used() > q.maxMemoryLimit || - (q.numInMemoryItems == len(q.items) && q.numInMemoryItems == q.maxItemsLen) { - // In this case, there is not enough memory available to keep this batch in - // memory, or the in-memory circular buffer has no slots available (we do - // an initial estimate of how many batches would fit into the buffer, which - // might be wrong). The tail of the queue might also already be on disk, in - // which case that is where the batch must be enqueued to maintain order. + alreadySpilled := q.numOnDiskItems > 0 + memoryLimitReached := q.unlimitedAllocator.Used() > q.maxMemoryLimit + maxItemsLenReached := q.numInMemoryItems == len(q.items) && q.numInMemoryItems == q.maxItemsLen + maxInMemEnqueuesExceeded := q.testingKnobs.maxNumBatchesEnqueuedInMemory != 0 && q.testingKnobs.numEnqueues > q.testingKnobs.maxNumBatchesEnqueuedInMemory + if alreadySpilled || memoryLimitReached || maxItemsLenReached || maxInMemEnqueuesExceeded { + // In this case, one of the following conditions is true: + // 1. the tail of the queue might also already be on disk, in which case + // that is where the batch must be enqueued to maintain order + // 2. there is not enough memory available to keep this batch in memory + // 3. the in-memory circular buffer has no slots available (we do an + // initial estimate of how many batches would fit into the buffer, + // which might be wrong) + // 4. we reached the testing limit on the number of items added to the + // in-memory buffer + // so we have to add batch to the disk queue. if err := q.maybeSpillToDisk(ctx); err != nil { return err } q.unlimitedAllocator.ReleaseBatch(batch) + if sel := batch.Selection(); sel != nil { + // We need to perform the deselection since the disk queue + // ignores the selection vectors. + q.diskQueueDeselectionScratch, _ = q.unlimitedAllocator.ResetMaybeReallocate( + q.typs, q.diskQueueDeselectionScratch, n, + ) + q.unlimitedAllocator.PerformOperation(q.diskQueueDeselectionScratch.ColVecs(), func() { + for i := range q.typs { + q.diskQueueDeselectionScratch.ColVec(i).Copy( + coldata.CopySliceArgs{ + SliceArgs: coldata.SliceArgs{ + Src: batch.ColVec(i), + Sel: sel, + SrcEndIdx: n, + }, + }, + ) + } + q.diskQueueDeselectionScratch.SetLength(n) + }) + batch = q.diskQueueDeselectionScratch + } if err := q.diskQueue.Enqueue(ctx, batch); err != nil { return err } @@ -182,7 +239,78 @@ func (q *spillingQueue) enqueue(ctx context.Context, batch coldata.Batch) error q.items = newItems } - q.items[q.curTailIdx] = batch + alreadyCopied := 0 + if q.numInMemoryItems > 0 { + // If we have already enqueued at least one batch, let's try to append + // as many tuples to it as it has the capacity for. + tailBatchIdx := q.curTailIdx - 1 + if tailBatchIdx < 0 { + tailBatchIdx = 0 + } + tailBatch := q.items[tailBatchIdx] + if l, c := tailBatch.Length(), tailBatch.Capacity(); l < c { + alreadyCopied = c - l + if alreadyCopied > n { + alreadyCopied = n + } + q.unlimitedAllocator.PerformOperation(tailBatch.ColVecs(), func() { + for i := range q.typs { + tailBatch.ColVec(i).Append( + coldata.SliceArgs{ + Src: batch.ColVec(i), + Sel: batch.Selection(), + DestIdx: l, + SrcStartIdx: 0, + SrcEndIdx: alreadyCopied, + }, + ) + } + tailBatch.SetLength(l + alreadyCopied) + }) + if alreadyCopied == n { + // We were able to append all of the tuples, so we return early + // since we don't need to update any of the state. + return nil + } + } + } + + var newBatchCapacity int + if q.nextInMemBatchCapacity == coldata.BatchSize() { + // At this point we only allocate batches with maximum capacity. + newBatchCapacity = coldata.BatchSize() + } else { + newBatchCapacity = n - alreadyCopied + if q.nextInMemBatchCapacity > newBatchCapacity { + newBatchCapacity = q.nextInMemBatchCapacity + } + q.nextInMemBatchCapacity = 2 * newBatchCapacity + if q.nextInMemBatchCapacity > coldata.BatchSize() { + q.nextInMemBatchCapacity = coldata.BatchSize() + } + } + + // Note: we could have used NewMemBatchWithFixedCapacity here, but we choose + // not to in order to indicate that the capacity of the new batches has + // dynamic behavior. + newBatch, _ := q.unlimitedAllocator.ResetMaybeReallocate(q.typs, nil /* oldBatch */, newBatchCapacity) + q.unlimitedAllocator.PerformOperation(newBatch.ColVecs(), func() { + for i := range q.typs { + newBatch.ColVec(i).Copy( + coldata.CopySliceArgs{ + SliceArgs: coldata.SliceArgs{ + Src: batch.ColVec(i), + Sel: batch.Selection(), + SrcStartIdx: alreadyCopied, + SrcEndIdx: n, + }, + }, + ) + } + newBatch.SetLength(n - alreadyCopied) + }) + + q.items[q.curTailIdx] = newBatch q.curTailIdx++ if q.curTailIdx == len(q.items) { q.curTailIdx = 0 @@ -342,5 +470,7 @@ func (q *spillingQueue) reset(ctx context.Context) { q.numOnDiskItems = 0 q.curHeadIdx = 0 q.curTailIdx = 0 + q.nextInMemBatchCapacity = 0 q.rewindableState.numItemsDequeued = 0 + q.testingKnobs.numEnqueues = 0 } diff --git a/pkg/sql/colexec/spilling_queue_test.go b/pkg/sql/colexec/spilling_queue_test.go index 068fed96fb7b..daa0af04b0c7 100644 --- a/pkg/sql/colexec/spilling_queue_test.go +++ b/pkg/sql/colexec/spilling_queue_test.go @@ -56,16 +56,32 @@ func TestSpillingQueue(t *testing.T) { prefix = "Rewindable/" } numBatches := 1 + rng.Intn(16) - log.Infof(context.Background(), "%sMemoryLimit=%s/DiskQueueCacheMode=%d/AlwaysCompress=%t/NumBatches=%d", - prefix, humanizeutil.IBytes(memoryLimit), diskQueueCacheMode, alwaysCompress, numBatches) + // Add a limit on the number of batches added to the in-memory + // buffer of the spilling queue. We will set it to half of the total + // number of batches which allows us to exercise the case when the + // spilling to disk queue occurs after some batched were added to + // the in-memory buffer. + setInMemEnqueuesLimit := rng.Float64() < 0.5 + log.Infof(context.Background(), "%sMemoryLimit=%s/DiskQueueCacheMode=%d/AlwaysCompress=%t/NumBatches=%d/InMemEnqueuesLimited=%t", + prefix, humanizeutil.IBytes(memoryLimit), diskQueueCacheMode, alwaysCompress, numBatches, setInMemEnqueuesLimit) + // Since the spilling queue coalesces tuples to fill-in the batches + // up to their capacity, we cannot use the batches we get when + // dequeueing directly. Instead, we are tracking all of the input + // tuples and will be comparing against a window into them. + var tuples *appendOnlyBufferedBatch // Create random input. - batches := make([]coldata.Batch, 0, numBatches) op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ - NumBatches: cap(batches), + NumBatches: numBatches, BatchSize: 1 + rng.Intn(coldata.BatchSize()), Nulls: true, BatchAccumulator: func(b coldata.Batch, typs []*types.T) { - batches = append(batches, coldatatestutils.CopyBatch(b, typs, testColumnFactory)) + if b.Length() == 0 { + return + } + if tuples == nil { + tuples = newAppendOnlyBufferedBatch(testAllocator, typs, nil /* colsToStore */) + } + tuples.append(b, 0 /* startIdx */, b.Length()) }, }) typs := op.Typs() @@ -88,11 +104,32 @@ func TestSpillingQueue(t *testing.T) { ) } + if setInMemEnqueuesLimit { + q.testingKnobs.maxNumBatchesEnqueuedInMemory = numBatches / 2 + } + // Run verification. var ( - b coldata.Batch - err error + b coldata.Batch + err error + numAlreadyDequeuedTuples int ) + + windowedBatch := coldata.NewMemBatchNoCols(typs, coldata.BatchSize()) + getNextWindowIntoTuples := func(windowLen int) coldata.Batch { + // makeWindowIntoBatch creates a window into tuples in the range + // [numAlreadyDequeuedTuples; tuples.length), but we want the + // range [numAlreadyDequeuedTuples; numAlreadyDequeuedTuples + + // windowLen), so we'll temporarily set the length of tuples to + // the desired value and restore it below. + numTuples := tuples.Length() + tuples.SetLength(numAlreadyDequeuedTuples + windowLen) + makeWindowIntoBatch(windowedBatch, tuples, numAlreadyDequeuedTuples, typs) + tuples.SetLength(numTuples) + numAlreadyDequeuedTuples += windowLen + return windowedBatch + } + ctx := context.Background() for { b = op.Next(ctx) @@ -106,36 +143,29 @@ func TestSpillingQueue(t *testing.T) { } else if b.Length() == 0 { t.Fatal("queue incorrectly considered empty") } - coldata.AssertEquivalentBatches(t, batches[0], b) - batches = batches[1:] + coldata.AssertEquivalentBatches(t, getNextWindowIntoTuples(b.Length()), b) } } + numDequeuedTuplesBeforeReading := numAlreadyDequeuedTuples numReadIterations := 1 if rewindable { numReadIterations = 2 } for i := 0; i < numReadIterations; i++ { - batchIdx := 0 - for batches[batchIdx].Length() > 0 { + for { if b, err = q.dequeue(ctx); err != nil { t.Fatal(err) } else if b == nil { t.Fatal("unexpectedly dequeued nil batch") } else if b.Length() == 0 { - t.Fatal("queue incorrectly considered empty") + break } - coldata.AssertEquivalentBatches(t, batches[batchIdx], b) - batchIdx++ - } - - if b, err := q.dequeue(ctx); err != nil { - t.Fatal(err) - } else if b.Length() != 0 { - t.Fatal("queue should be empty") + coldata.AssertEquivalentBatches(t, getNextWindowIntoTuples(b.Length()), b) } if rewindable { require.NoError(t, q.rewind()) + numAlreadyDequeuedTuples = numDequeuedTuplesBeforeReading } }