Skip to content

Commit

Permalink
colexecjoin: fix the merge joiner in some cases
Browse files Browse the repository at this point in the history
Previously, we were incorrectly updating the internal state of
`circularGroupsBuffer` when we needed to expand the buffer size. The bug
was introduced several months ago (during 21.1 cycle) when we made the
buffer grow dynamically. As it turns out, the bug is easily caught by
our unit tests if we randomize the initial buffer size.

Release note (bug fix): CockroachDB could previously return incorrect
results in some cases when executing the merge join operation via the
vectorized engine. The bug is only present in 21.1 test releases.
  • Loading branch information
yuzefovich committed Mar 30, 2021
1 parent 34904ed commit 9f6fce0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/duration", # keep
"//pkg/util/mon",
"@com_github_cockroachdb_apd_v2//:apd", # keep
Expand Down
58 changes: 40 additions & 18 deletions pkg/sql/colexec/colexecjoin/mergejoiner_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package colexecjoin

import "github.com/cockroachdb/cockroach/pkg/util"

// circularGroupsBuffer is a struct designed to store the groups' slices for a
// given column. It starts out small and will grow dynamically if necessary
// until pre-computed maximum capacity (since we know that there is a maximum
Expand Down Expand Up @@ -45,7 +47,12 @@ type circularGroupsBuffer struct {

// groupsBufferInitialSize determines the size used in initial allocations of
// the slices of the circularGroupsBuffer.
const groupsBufferInitialSize = 8
var groupsBufferInitialSize = util.ConstantWithMetamorphicTestRange(
"merge-joiner-groups-buffer",
8, /* defaultValue */
1, /* min */
16, /* max */
)

func makeGroupsBuffer(batchSize int) circularGroupsBuffer {
return circularGroupsBuffer{
Expand Down Expand Up @@ -156,37 +163,52 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() {
newLeftGroups := make([]group, getGroupsSlicesLen(newSize))
newRightGroups := make([]group, getGroupsSlicesLen(newSize))
// Note that this if block is never reached when startIdx == endIdx (because
// that would indicate an empty buffer and we would enough capacity given
// our initialization in makeGroupsBuffer).
// that would indicate an empty buffer and we would have enough capacity
// given our initialization in makeGroupsBuffer).
if b.startIdx <= b.endIdx {
// Current groups are contiguous in the slices, so copying them over is
// simple.
copy(newLeftGroups, b.leftGroups[b.startIdx:b.endIdx])
copy(newRightGroups, b.rightGroups[b.startIdx:b.endIdx])
b.nextColStartIdx -= b.startIdx
} else {
// Current groups are wrapped at position b.capacity. Namely, if we have
// size = 3, capacity = 7, we might have the following:
// buffer = [1, 2, 0', 1', 2', x, 0]
// where startIdx = 6, endIdx = 4, nextColStartIdx = 2, so we need to
// copy over with the adjustments.
// Current groups are wrapped after position b.capacity-1. Namely, if we
// have size = 3, capacity = 7, we might have the following:
// buffer = [1, 2, 0', 1', 2', x, 0] (1)
// where startIdx = 6, endIdx = 5, nextColStartIdx = 2, so we need to
// copy over with the adjustments so that the resulting state is
// buffer = [0, 1, 2, 0', 1', 2', x]
// where startIdx = 0, endIdx = 6, nextColStartIdx = 3.
//
// First, copy over the start of the buffer (which is currently at the
// end of the old slices) into the beginning of the new slices. In the
// example above, we're copying [0].
copy(newLeftGroups, b.leftGroups[b.startIdx:b.capacity])
copy(newRightGroups, b.rightGroups[b.startIdx:b.capacity])
// If non-empty, copy over the end of the buffer (which is currently at
// the beginning of the old slices). In the example above, we're copying
// [1, 2, 0', 1', 2'].
if b.endIdx > 0 {
offset := b.capacity - b.startIdx
copy(newLeftGroups[offset:], b.leftGroups[:b.endIdx])
copy(newRightGroups[offset:], b.rightGroups[:b.endIdx])
b.nextColStartIdx += offset
} else {
// There is a special case when endIdx is 0 - we're simply shifting
// the groups by startIdx to the left (which we have already done)
// which requires adjusting nextColStartIdx accordingly.
//
// Consider the following, size = 3, capacity = 7
// buffer = [x, 0, 1, 2, 0', 1', 2']
// with startIdx = 1, endIdx = 0, nextColStartIdx = 4. We don't need
// to copy anything, but we need to decrement nextColStartIdx by 1.
}
// Now update b.nextColStartIdx. There are two cases:
// 1. it was in the part we copied first. In such scenario we need to
// shift the index towards the beginning by the same offset as when
// we were copying (by b.startIdx).
// For example, consider the following, size = 3, capacity = 7
// buffer = [2', x, 0, 1, 2, 0', 1']
// with startIdx = 2, endIdx = 1, nextColStartIdx = 5. We need to
// decrement nextColStartIdx by 2.
// 2. it was in the part we copied second. In such scenario we need to
// shift the index towards the end by the same offset as when we were
// copying (by b.capacity-b.startIdx). Consider the example (1)
// above: we need to increment nextColStartIdx by 1.
if b.nextColStartIdx >= b.startIdx {
b.nextColStartIdx -= b.startIdx
} else {
b.nextColStartIdx += b.capacity - b.startIdx
}
}
b.startIdx = 0
Expand Down

0 comments on commit 9f6fce0

Please sign in to comment.