From 9f6fce0a5bf2ec154d8cd21357ebc6909fdfc2f6 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 29 Mar 2021 22:24:13 -0700 Subject: [PATCH] colexecjoin: fix the merge joiner in some cases Previously, we were incorrectly updating the internal state of `circularGroupsBuffer` when we needed to expand the buffer size. The bug was introduced several months ago (during 21.1 cycle) when we made the buffer grow dynamically. As it turns out, the bug is easily caught by our unit tests if we randomize the initial buffer size. Release note (bug fix): CockroachDB could previously return incorrect results in some cases when executing the merge join operation via the vectorized engine. The bug is only present in 21.1 test releases. --- pkg/sql/colexec/colexecjoin/BUILD.bazel | 1 + .../colexec/colexecjoin/mergejoiner_util.go | 58 +++++++++++++------ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/sql/colexec/colexecjoin/BUILD.bazel b/pkg/sql/colexec/colexecjoin/BUILD.bazel index 11a21e7ee9e8..a44ed456ad3e 100644 --- a/pkg/sql/colexec/colexecjoin/BUILD.bazel +++ b/pkg/sql/colexec/colexecjoin/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", # keep "//pkg/sql/types", + "//pkg/util", "//pkg/util/duration", # keep "//pkg/util/mon", "@com_github_cockroachdb_apd_v2//:apd", # keep diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_util.go b/pkg/sql/colexec/colexecjoin/mergejoiner_util.go index 78b299b0a1bd..74df84023034 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_util.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_util.go @@ -10,6 +10,8 @@ package colexecjoin +import "github.com/cockroachdb/cockroach/pkg/util" + // circularGroupsBuffer is a struct designed to store the groups' slices for a // given column. It starts out small and will grow dynamically if necessary // until pre-computed maximum capacity (since we know that there is a maximum @@ -45,7 +47,12 @@ type circularGroupsBuffer struct { // groupsBufferInitialSize determines the size used in initial allocations of // the slices of the circularGroupsBuffer. -const groupsBufferInitialSize = 8 +var groupsBufferInitialSize = util.ConstantWithMetamorphicTestRange( + "merge-joiner-groups-buffer", + 8, /* defaultValue */ + 1, /* min */ + 16, /* max */ +) func makeGroupsBuffer(batchSize int) circularGroupsBuffer { return circularGroupsBuffer{ @@ -156,8 +163,8 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() { newLeftGroups := make([]group, getGroupsSlicesLen(newSize)) newRightGroups := make([]group, getGroupsSlicesLen(newSize)) // Note that this if block is never reached when startIdx == endIdx (because - // that would indicate an empty buffer and we would enough capacity given - // our initialization in makeGroupsBuffer). + // that would indicate an empty buffer and we would have enough capacity + // given our initialization in makeGroupsBuffer). if b.startIdx <= b.endIdx { // Current groups are contiguous in the slices, so copying them over is // simple. @@ -165,28 +172,43 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() { copy(newRightGroups, b.rightGroups[b.startIdx:b.endIdx]) b.nextColStartIdx -= b.startIdx } else { - // Current groups are wrapped at position b.capacity. Namely, if we have - // size = 3, capacity = 7, we might have the following: - // buffer = [1, 2, 0', 1', 2', x, 0] - // where startIdx = 6, endIdx = 4, nextColStartIdx = 2, so we need to - // copy over with the adjustments. + // Current groups are wrapped after position b.capacity-1. Namely, if we + // have size = 3, capacity = 7, we might have the following: + // buffer = [1, 2, 0', 1', 2', x, 0] (1) + // where startIdx = 6, endIdx = 5, nextColStartIdx = 2, so we need to + // copy over with the adjustments so that the resulting state is + // buffer = [0, 1, 2, 0', 1', 2', x] + // where startIdx = 0, endIdx = 6, nextColStartIdx = 3. + // + // First, copy over the start of the buffer (which is currently at the + // end of the old slices) into the beginning of the new slices. In the + // example above, we're copying [0]. copy(newLeftGroups, b.leftGroups[b.startIdx:b.capacity]) copy(newRightGroups, b.rightGroups[b.startIdx:b.capacity]) + // If non-empty, copy over the end of the buffer (which is currently at + // the beginning of the old slices). In the example above, we're copying + // [1, 2, 0', 1', 2']. if b.endIdx > 0 { offset := b.capacity - b.startIdx copy(newLeftGroups[offset:], b.leftGroups[:b.endIdx]) copy(newRightGroups[offset:], b.rightGroups[:b.endIdx]) - b.nextColStartIdx += offset - } else { - // There is a special case when endIdx is 0 - we're simply shifting - // the groups by startIdx to the left (which we have already done) - // which requires adjusting nextColStartIdx accordingly. - // - // Consider the following, size = 3, capacity = 7 - // buffer = [x, 0, 1, 2, 0', 1', 2'] - // with startIdx = 1, endIdx = 0, nextColStartIdx = 4. We don't need - // to copy anything, but we need to decrement nextColStartIdx by 1. + } + // Now update b.nextColStartIdx. There are two cases: + // 1. it was in the part we copied first. In such scenario we need to + // shift the index towards the beginning by the same offset as when + // we were copying (by b.startIdx). + // For example, consider the following, size = 3, capacity = 7 + // buffer = [2', x, 0, 1, 2, 0', 1'] + // with startIdx = 2, endIdx = 1, nextColStartIdx = 5. We need to + // decrement nextColStartIdx by 2. + // 2. it was in the part we copied second. In such scenario we need to + // shift the index towards the end by the same offset as when we were + // copying (by b.capacity-b.startIdx). Consider the example (1) + // above: we need to increment nextColStartIdx by 1. + if b.nextColStartIdx >= b.startIdx { b.nextColStartIdx -= b.startIdx + } else { + b.nextColStartIdx += b.capacity - b.startIdx } } b.startIdx = 0