Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58013: colexec: fix spilling queue r=yuzefovich a=yuzefovich

This commit refactors `enqueue` method of the spilling queue to
deep-copy the passed-in batches if they are kept in memory. Previous
behavior was suboptimal because it was forcing the caller to always
allocate a new batch. Additionally, the spilling queue will now perform
a coalescing step by attempting to append as many tuples to the tail
in-memory batch as possible. The in-memory batches are allocated with
dynamically increasing capacity.

This allows us to significantly simplify the code of the router outputs
which were performing the coalescing step previously.

Additionally, this commit fixes a couple of uses of `enqueue` method
(the router outputs and the merge joiner) in which they forgot to
enqueue a zero-length batch which is necessary when the disk queue is
initialized.

Fixes: #47062.

Release note: None

58504: sql: qualify table name for alter_table_owner event log r=ajwerner a=the-ericwang35

Fixes #57960.

Previously, event logs were not capturing the qualified table names
for ALTER TABLE OWNER commands.
This PR changes the event logs to use the qualified table name.
Tests were fixed to reflect these changes.

Release note (bug fix): qualify table name for alter_table_owner event log

58743: colexec: fix decimal/interval overload error propagation r=yuzefovich a=yuzefovich

Fixes: #57773.

Release note (bug fix): CockroachDB could previously return an internal
error when evaluating a binary expression between a Decimal and an
Interval that required a cast to a Float when the value is out of range,
and now a more user-friendly error is returned instead.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Eric Wang <[email protected]>
  • Loading branch information
3 people committed Jan 12, 2021
4 parents ee5c4e6 + e2d602e + 81e0172 + 777910e commit 158601d
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 409 deletions.
22 changes: 16 additions & 6 deletions pkg/col/coldata/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,21 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) {
expectedVec := expected.ColVec(colIdx)
actualVec := actual.ColVec(colIdx)
require.Equal(t, expectedVec.Type(), actualVec.Type())
require.Equal(
t,
expectedVec.Nulls().Slice(0, expected.Length()),
actualVec.Nulls().Slice(0, actual.Length()),
)
// Check whether the nulls bitmaps are the same. Note that we don't
// track precisely the fact whether nulls are present or not in
// 'maybeHasNulls' field, so we override it manually to be 'true' for
// both nulls vectors if it is 'true' for at least one of them. This is
// acceptable since we still check the bitmaps precisely.
expectedNulls := expectedVec.Nulls()
actualNulls := actualVec.Nulls()
oldExpMaybeHasNulls, oldActMaybeHasNulls := expectedNulls.maybeHasNulls, actualNulls.maybeHasNulls
defer func() {
expectedNulls.maybeHasNulls, actualNulls.maybeHasNulls = oldExpMaybeHasNulls, oldActMaybeHasNulls
}()
expectedNulls.maybeHasNulls = expectedNulls.maybeHasNulls || actualNulls.maybeHasNulls
actualNulls.maybeHasNulls = expectedNulls.maybeHasNulls || actualNulls.maybeHasNulls
require.Equal(t, expectedNulls.Slice(0, expected.Length()), actualNulls.Slice(0, actual.Length()))

canonicalTypeFamily := expectedVec.CanonicalTypeFamily()
if canonicalTypeFamily == types.BytesFamily {
// Cannot use require.Equal for this type.
Expand Down Expand Up @@ -95,7 +105,7 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) {
t.Fatalf("Interval mismatch at index %d:\nexpected:\n%sactual:\n%s", i, expectedInterval[i], resultInterval[i])
}
}
} else if expectedVec.CanonicalTypeFamily() == typeconv.DatumVecCanonicalTypeFamily {
} else if canonicalTypeFamily == typeconv.DatumVecCanonicalTypeFamily {
// Cannot use require.Equal for this type.
expectedDatum := expectedVec.Datum().Slice(0 /* start */, expected.Length())
resultDatum := actualVec.Datum().Slice(0 /* start */, actual.Length())
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,12 +1321,15 @@ func (p *planner) checkCanAlterTableAndSetNewOwner(
privs := desc.GetPrivileges()
privs.SetOwner(newOwner)

tn, err := p.getQualifiedTableName(ctx, desc)
if err != nil {
return err
}

return p.logEvent(ctx,
desc.ID,
&eventpb.AlterTableOwner{
// TODO(knz): Properly qualify this.
// See: https://github.com/cockroachdb/cockroach/issues/57960
TableName: desc.Name,
TableName: tn.String(),
Owner: newOwner.Normalized(),
})
}
1 change: 1 addition & 0 deletions pkg/sql/colcontainer/partitionedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type PartitionedQueue interface {
// partition at that index does not exist, a new one is created. Existing
// partitions may not be Enqueued to after calling
// CloseAllOpenWriteFileDescriptors.
// WARNING: Selection vectors are ignored.
Enqueue(ctx context.Context, partitionIdx int, batch coldata.Batch) error
// Dequeue removes and returns the batch from the front of the
// partitionIdx'th partition. If the partition is empty, or no partition at
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/execgen/cmd/execgen/overloads_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func (c intervalDecimalCustomizer) getBinOpAssignFunc() assignFunc {
return fmt.Sprintf(`
f, err := %[3]s.Float64()
if err != nil {
colexecerror.InternalError(err)
colexecerror.ExpectedError(err)
}
%[1]s = %[2]s.MulFloat(f)`,
targetElem, leftElem, rightElem)
Expand All @@ -732,7 +732,7 @@ func (c decimalIntervalCustomizer) getBinOpAssignFunc() assignFunc {
return fmt.Sprintf(`
f, err := %[2]s.Float64()
if err != nil {
colexecerror.InternalError(err)
colexecerror.ExpectedError(err)
}
%[1]s = %[3]s.MulFloat(f)`,
targetElem, leftElem, rightElem)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
if s.partitioner == nil {
s.partitioner = s.partitionerCreator()
}
// Note that b will never have a selection vector set because the
// allSpooler performs a deselection when buffering up the tuples,
// and the in-memory sorter has allSpooler as its input.
if err := s.partitioner.Enqueue(ctx, newPartitionIdx, b); err != nil {
colexecerror.InternalError(err)
}
Expand Down
48 changes: 35 additions & 13 deletions pkg/sql/colexec/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ type mjBuilderCrossProductState struct {
type mjBufferedGroup struct {
*spillingQueue
// firstTuple stores a single tuple that was first in the buffered group.
firstTuple []coldata.Vec
numTuples int
firstTuple []coldata.Vec
numTuples int
scratchBatch coldata.Batch
}

func (bg *mjBufferedGroup) reset(ctx context.Context) {
Expand Down Expand Up @@ -546,13 +547,19 @@ func (o *mergeJoinBase) Init() {
o.proberState.lBufferedGroup.firstTuple = o.unlimitedAllocator.NewMemBatchWithFixedCapacity(
o.left.sourceTypes, 1, /* capacity */
).ColVecs()
o.proberState.lBufferedGroup.scratchBatch = o.unlimitedAllocator.NewMemBatchWithFixedCapacity(
o.left.sourceTypes, coldata.BatchSize(),
)
o.proberState.rBufferedGroup.spillingQueue = newRewindableSpillingQueue(
o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit,
o.diskQueueCfg, o.fdSemaphore, o.diskAcc,
)
o.proberState.rBufferedGroup.firstTuple = o.unlimitedAllocator.NewMemBatchWithFixedCapacity(
o.right.sourceTypes, 1, /* capacity */
).ColVecs()
o.proberState.rBufferedGroup.scratchBatch = o.unlimitedAllocator.NewMemBatchWithFixedCapacity(
o.right.sourceTypes, coldata.BatchSize(),
)

o.builderState.lGroups = make([]group, 1)
o.builderState.rGroups = make([]group, 1)
Expand All @@ -572,6 +579,8 @@ func (o *mergeJoinBase) resetBuilderCrossProductState() {
// same group as the ones in the buffered group that corresponds to the input
// source. This needs to happen when a group starts at the end of an input
// batch and can continue into the following batches.
// A zero-length batch needs to be appended when no more batches will be
// appended to the buffered group.
func (o *mergeJoinBase) appendToBufferedGroup(
ctx context.Context,
input *mergeJoinInput,
Expand All @@ -580,9 +589,6 @@ func (o *mergeJoinBase) appendToBufferedGroup(
groupStartIdx int,
groupLength int,
) {
if groupLength == 0 {
return
}
var (
bufferedGroup *mjBufferedGroup
sourceTypes []*types.T
Expand All @@ -594,9 +600,14 @@ func (o *mergeJoinBase) appendToBufferedGroup(
sourceTypes = o.right.sourceTypes
bufferedGroup = &o.proberState.rBufferedGroup
}
// TODO(yuzefovich): reuse the same scratch batches when spillingQueue
// actually copies the enqueued batch when those are kept in memory.
scratchBatch := o.unlimitedAllocator.NewMemBatchWithFixedCapacity(sourceTypes, groupLength)
if batch.Length() == 0 || groupLength == 0 {
// We have finished appending to this buffered group, so we need to
// enqueue a zero-length batch per the contract of the spilling queue.
if err := bufferedGroup.enqueue(ctx, coldata.ZeroBatch); err != nil {
colexecerror.InternalError(err)
}
return
}
if bufferedGroup.numTuples == 0 {
o.unlimitedAllocator.PerformOperation(bufferedGroup.firstTuple, func() {
for colIdx := range sourceTypes {
Expand All @@ -616,9 +627,10 @@ func (o *mergeJoinBase) appendToBufferedGroup(
}
bufferedGroup.numTuples += groupLength

o.unlimitedAllocator.PerformOperation(scratchBatch.ColVecs(), func() {
bufferedGroup.scratchBatch.ResetInternalBatch()
o.unlimitedAllocator.PerformOperation(bufferedGroup.scratchBatch.ColVecs(), func() {
for colIdx := range input.sourceTypes {
scratchBatch.ColVec(colIdx).Copy(
bufferedGroup.scratchBatch.ColVec(colIdx).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: batch.ColVec(colIdx),
Expand All @@ -630,10 +642,9 @@ func (o *mergeJoinBase) appendToBufferedGroup(
},
)
}
bufferedGroup.scratchBatch.SetLength(groupLength)
})
scratchBatch.SetSelection(false)
scratchBatch.SetLength(groupLength)
if err := bufferedGroup.enqueue(ctx, scratchBatch); err != nil {
if err := bufferedGroup.enqueue(ctx, bufferedGroup.scratchBatch); err != nil {
colexecerror.InternalError(err)
}
}
Expand Down Expand Up @@ -681,6 +692,15 @@ func (o *mergeJoinBase) sourceFinished() bool {
return o.proberState.lLength == 0 || o.proberState.rLength == 0
}

// finishBufferedGroup appends a zero-length batch to the buffered group which
// is required by the contract of the spilling queue.
func (o *mergeJoinBase) finishBufferedGroup(ctx context.Context, input *mergeJoinInput) {
o.appendToBufferedGroup(
ctx, input, coldata.ZeroBatch, nil, /* sel */
0 /* groupStartIdx */, 0, /* groupLength */
)
}

// completeBufferedGroup extends the buffered group corresponding to input.
// First, we check that the first row in batch is still part of the same group.
// If this is the case, we use the Distinct operator to find the first
Expand All @@ -696,6 +716,7 @@ func (o *mergeJoinBase) completeBufferedGroup(
) (_ coldata.Batch, idx int, batchLength int) {
batchLength = batch.Length()
if o.isBufferedGroupFinished(input, batch, rowIdx) {
o.finishBufferedGroup(ctx, input)
return batch, rowIdx, batchLength
}

Expand Down Expand Up @@ -752,6 +773,7 @@ func (o *mergeJoinBase) completeBufferedGroup(
if batchLength == 0 {
// The input has been exhausted, so the buffered group is now complete.
isBufferedGroupComplete = true
o.finishBufferedGroup(ctx, input)
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/colexec/proj_const_left_ops.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions pkg/sql/colexec/proj_const_right_ops.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 158601d

Please sign in to comment.