From 9f6fce0a5bf2ec154d8cd21357ebc6909fdfc2f6 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 29 Mar 2021 22:24:13 -0700 Subject: [PATCH 1/2] colexecjoin: fix the merge joiner in some cases Previously, we were incorrectly updating the internal state of `circularGroupsBuffer` when we needed to expand the buffer size. The bug was introduced several months ago (during 21.1 cycle) when we made the buffer grow dynamically. As it turns out, the bug is easily caught by our unit tests if we randomize the initial buffer size. Release note (bug fix): CockroachDB could previously return incorrect results in some cases when executing the merge join operation via the vectorized engine. The bug is only present in 21.1 test releases. --- pkg/sql/colexec/colexecjoin/BUILD.bazel | 1 + .../colexec/colexecjoin/mergejoiner_util.go | 58 +++++++++++++------ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/sql/colexec/colexecjoin/BUILD.bazel b/pkg/sql/colexec/colexecjoin/BUILD.bazel index 11a21e7ee9e8..a44ed456ad3e 100644 --- a/pkg/sql/colexec/colexecjoin/BUILD.bazel +++ b/pkg/sql/colexec/colexecjoin/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", # keep "//pkg/sql/types", + "//pkg/util", "//pkg/util/duration", # keep "//pkg/util/mon", "@com_github_cockroachdb_apd_v2//:apd", # keep diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner_util.go b/pkg/sql/colexec/colexecjoin/mergejoiner_util.go index 78b299b0a1bd..74df84023034 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner_util.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner_util.go @@ -10,6 +10,8 @@ package colexecjoin +import "github.com/cockroachdb/cockroach/pkg/util" + // circularGroupsBuffer is a struct designed to store the groups' slices for a // given column. It starts out small and will grow dynamically if necessary // until pre-computed maximum capacity (since we know that there is a maximum @@ -45,7 +47,12 @@ type circularGroupsBuffer struct { // groupsBufferInitialSize determines the size used in initial allocations of // the slices of the circularGroupsBuffer. -const groupsBufferInitialSize = 8 +var groupsBufferInitialSize = util.ConstantWithMetamorphicTestRange( + "merge-joiner-groups-buffer", + 8, /* defaultValue */ + 1, /* min */ + 16, /* max */ +) func makeGroupsBuffer(batchSize int) circularGroupsBuffer { return circularGroupsBuffer{ @@ -156,8 +163,8 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() { newLeftGroups := make([]group, getGroupsSlicesLen(newSize)) newRightGroups := make([]group, getGroupsSlicesLen(newSize)) // Note that this if block is never reached when startIdx == endIdx (because - // that would indicate an empty buffer and we would enough capacity given - // our initialization in makeGroupsBuffer). + // that would indicate an empty buffer and we would have enough capacity + // given our initialization in makeGroupsBuffer). if b.startIdx <= b.endIdx { // Current groups are contiguous in the slices, so copying them over is // simple. @@ -165,28 +172,43 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() { copy(newRightGroups, b.rightGroups[b.startIdx:b.endIdx]) b.nextColStartIdx -= b.startIdx } else { - // Current groups are wrapped at position b.capacity. Namely, if we have - // size = 3, capacity = 7, we might have the following: - // buffer = [1, 2, 0', 1', 2', x, 0] - // where startIdx = 6, endIdx = 4, nextColStartIdx = 2, so we need to - // copy over with the adjustments. + // Current groups are wrapped after position b.capacity-1. Namely, if we + // have size = 3, capacity = 7, we might have the following: + // buffer = [1, 2, 0', 1', 2', x, 0] (1) + // where startIdx = 6, endIdx = 5, nextColStartIdx = 2, so we need to + // copy over with the adjustments so that the resulting state is + // buffer = [0, 1, 2, 0', 1', 2', x] + // where startIdx = 0, endIdx = 6, nextColStartIdx = 3. + // + // First, copy over the start of the buffer (which is currently at the + // end of the old slices) into the beginning of the new slices. In the + // example above, we're copying [0]. copy(newLeftGroups, b.leftGroups[b.startIdx:b.capacity]) copy(newRightGroups, b.rightGroups[b.startIdx:b.capacity]) + // If non-empty, copy over the end of the buffer (which is currently at + // the beginning of the old slices). In the example above, we're copying + // [1, 2, 0', 1', 2']. if b.endIdx > 0 { offset := b.capacity - b.startIdx copy(newLeftGroups[offset:], b.leftGroups[:b.endIdx]) copy(newRightGroups[offset:], b.rightGroups[:b.endIdx]) - b.nextColStartIdx += offset - } else { - // There is a special case when endIdx is 0 - we're simply shifting - // the groups by startIdx to the left (which we have already done) - // which requires adjusting nextColStartIdx accordingly. - // - // Consider the following, size = 3, capacity = 7 - // buffer = [x, 0, 1, 2, 0', 1', 2'] - // with startIdx = 1, endIdx = 0, nextColStartIdx = 4. We don't need - // to copy anything, but we need to decrement nextColStartIdx by 1. + } + // Now update b.nextColStartIdx. There are two cases: + // 1. it was in the part we copied first. In such scenario we need to + // shift the index towards the beginning by the same offset as when + // we were copying (by b.startIdx). + // For example, consider the following, size = 3, capacity = 7 + // buffer = [2', x, 0, 1, 2, 0', 1'] + // with startIdx = 2, endIdx = 1, nextColStartIdx = 5. We need to + // decrement nextColStartIdx by 2. + // 2. it was in the part we copied second. In such scenario we need to + // shift the index towards the end by the same offset as when we were + // copying (by b.capacity-b.startIdx). Consider the example (1) + // above: we need to increment nextColStartIdx by 1. + if b.nextColStartIdx >= b.startIdx { b.nextColStartIdx -= b.startIdx + } else { + b.nextColStartIdx += b.capacity - b.startIdx } } b.startIdx = 0 From 3d969a6f571696bb7be89754f8e4c700e94a6278 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Tue, 30 Mar 2021 16:40:24 -0400 Subject: [PATCH 2/2] geo: fix st_segmentize to not panic when passed NaN as param Release note: None --- pkg/geo/geogfn/segmentize.go | 3 +++ pkg/geo/geogfn/segmentize_test.go | 21 +++++++++++++++++++++ pkg/geo/geomfn/segmentize.go | 3 +++ pkg/geo/geomfn/segmentize_test.go | 21 +++++++++++++++++++++ 4 files changed, 48 insertions(+) diff --git a/pkg/geo/geogfn/segmentize.go b/pkg/geo/geogfn/segmentize.go index af5b17de0cb8..9c8f1d345e4f 100644 --- a/pkg/geo/geogfn/segmentize.go +++ b/pkg/geo/geogfn/segmentize.go @@ -25,6 +25,9 @@ import ( // This works by dividing each segment by a power of 2 to find the // smallest power less than or equal to the segmentMaxLength. func Segmentize(geography geo.Geography, segmentMaxLength float64) (geo.Geography, error) { + if math.IsNaN(segmentMaxLength) || math.IsInf(segmentMaxLength, 1 /* sign */) { + return geography, nil + } geometry, err := geography.AsGeomT() if err != nil { return geo.Geography{}, err diff --git a/pkg/geo/geogfn/segmentize_test.go b/pkg/geo/geogfn/segmentize_test.go index 0a2f3723eee1..16fc396093f2 100644 --- a/pkg/geo/geogfn/segmentize_test.go +++ b/pkg/geo/geogfn/segmentize_test.go @@ -12,6 +12,7 @@ package geogfn import ( "fmt" + "math" "testing" "github.com/cockroachdb/cockroach/pkg/geo" @@ -100,6 +101,26 @@ func TestSegmentize(t *testing.T) { maxSegmentLength: 150000.0, expectedWKT: "LINESTRING Z (0 0 25,0 1 0,0.49878052093921765 2.0003038990352664 25,0.9981696941692514 3.0004561476391296 50,1.4984735304805308 4.000380457593079 75,2 5 100)", }, + { + wkt: "LINESTRING(0 0, 1 1)", + maxSegmentLength: math.NaN(), + expectedWKT: "LINESTRING(0 0, 1 1)", + }, + { + wkt: "LINESTRING M (0 0 0, 1 1 1)", + maxSegmentLength: math.Sqrt(-1), + expectedWKT: "LINESTRING M (0 0 0, 1 1 1)", + }, + { + wkt: "LINESTRING ZM (0 0 0 0, 1 1 1 1)", + maxSegmentLength: -math.NaN(), + expectedWKT: "LINESTRING(0 0 0 0, 1 1 1 1)", + }, + { + wkt: "LINESTRING(0 0, 1 1)", + maxSegmentLength: math.Inf(1), + expectedWKT: "LINESTRING(0 0, 1 1)", + }, } for _, test := range segmentizeTestCases { t.Run(fmt.Sprintf("%s, maximum segment length: %f", test.wkt, test.maxSegmentLength), func(t *testing.T) { diff --git a/pkg/geo/geomfn/segmentize.go b/pkg/geo/geomfn/segmentize.go index 06560f259c39..057cbdb21fa5 100644 --- a/pkg/geo/geomfn/segmentize.go +++ b/pkg/geo/geomfn/segmentize.go @@ -26,6 +26,9 @@ import ( // between given two-points such that each segment has length less // than or equal to given maximum segment length. func Segmentize(g geo.Geometry, segmentMaxLength float64) (geo.Geometry, error) { + if math.IsNaN(segmentMaxLength) || math.IsInf(segmentMaxLength, 1 /* sign */) { + return g, nil + } geometry, err := g.AsGeomT() if err != nil { return geo.Geometry{}, err diff --git a/pkg/geo/geomfn/segmentize_test.go b/pkg/geo/geomfn/segmentize_test.go index 649c97856869..1f31f5989090 100644 --- a/pkg/geo/geomfn/segmentize_test.go +++ b/pkg/geo/geomfn/segmentize_test.go @@ -12,6 +12,7 @@ package geomfn import ( "fmt" + "math" "testing" "github.com/cockroachdb/cockroach/pkg/geo" @@ -125,6 +126,26 @@ func TestSegmentize(t *testing.T) { maxSegmentLength: -1, expectedWKT: "MULTIPOINT ((0.0 0.0), (1.0 1.0))", }, + { + wkt: "LINESTRING(0 0, 1 1)", + maxSegmentLength: math.NaN(), + expectedWKT: "LINESTRING(0 0, 1 1)", + }, + { + wkt: "LINESTRING M (0 0 0, 1 1 1)", + maxSegmentLength: math.Sqrt(-1), + expectedWKT: "LINESTRING M (0 0 0, 1 1 1)", + }, + { + wkt: "LINESTRING ZM (0 0 0 0, 1 1 1 1)", + maxSegmentLength: -math.NaN(), + expectedWKT: "LINESTRING(0 0 0 0, 1 1 1 1)", + }, + { + wkt: "LINESTRING(0 0, 1 1)", + maxSegmentLength: math.Inf(1), + expectedWKT: "LINESTRING(0 0, 1 1)", + }, } for _, test := range segmentizeTestCases { t.Run(fmt.Sprintf("%s, maximum segment length: %f", test.wkt, test.maxSegmentLength), func(t *testing.T) {