Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
62785: colexecjoin: fix the merge joiner in some cases r=yuzefovich a=yuzefovich

Previously, we were incorrectly updating the internal state of
`circularGroupsBuffer` when we needed to expand the buffer size. The bug
was introduced several months ago (during 21.1 cycle) when we made the
buffer grow dynamically. As it turns out, the bug is easily caught by
our unit tests if we randomize the initial buffer size.

Fixes: cockroachdb#62520.

Release note (bug fix): CockroachDB could previously return incorrect
results in some cases when executing the merge join operation via the
vectorized engine. The bug is only present in 21.1 test releases.

62828: geo: fix st_segmentize to not panic when passed NaN as param r=otan a=andyyang890

Fixes cockroachdb#62741.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
3 people committed Mar 30, 2021
3 parents fed2225 + 9f6fce0 + 3d969a6 commit d145e9f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 18 deletions.
3 changes: 3 additions & 0 deletions pkg/geo/geogfn/segmentize.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
// This works by dividing each segment by a power of 2 to find the
// smallest power less than or equal to the segmentMaxLength.
func Segmentize(geography geo.Geography, segmentMaxLength float64) (geo.Geography, error) {
if math.IsNaN(segmentMaxLength) || math.IsInf(segmentMaxLength, 1 /* sign */) {
return geography, nil
}
geometry, err := geography.AsGeomT()
if err != nil {
return geo.Geography{}, err
Expand Down
21 changes: 21 additions & 0 deletions pkg/geo/geogfn/segmentize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package geogfn

import (
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/geo"
Expand Down Expand Up @@ -100,6 +101,26 @@ func TestSegmentize(t *testing.T) {
maxSegmentLength: 150000.0,
expectedWKT: "LINESTRING Z (0 0 25,0 1 0,0.49878052093921765 2.0003038990352664 25,0.9981696941692514 3.0004561476391296 50,1.4984735304805308 4.000380457593079 75,2 5 100)",
},
{
wkt: "LINESTRING(0 0, 1 1)",
maxSegmentLength: math.NaN(),
expectedWKT: "LINESTRING(0 0, 1 1)",
},
{
wkt: "LINESTRING M (0 0 0, 1 1 1)",
maxSegmentLength: math.Sqrt(-1),
expectedWKT: "LINESTRING M (0 0 0, 1 1 1)",
},
{
wkt: "LINESTRING ZM (0 0 0 0, 1 1 1 1)",
maxSegmentLength: -math.NaN(),
expectedWKT: "LINESTRING(0 0 0 0, 1 1 1 1)",
},
{
wkt: "LINESTRING(0 0, 1 1)",
maxSegmentLength: math.Inf(1),
expectedWKT: "LINESTRING(0 0, 1 1)",
},
}
for _, test := range segmentizeTestCases {
t.Run(fmt.Sprintf("%s, maximum segment length: %f", test.wkt, test.maxSegmentLength), func(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/geo/geomfn/segmentize.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
// between given two-points such that each segment has length less
// than or equal to given maximum segment length.
func Segmentize(g geo.Geometry, segmentMaxLength float64) (geo.Geometry, error) {
if math.IsNaN(segmentMaxLength) || math.IsInf(segmentMaxLength, 1 /* sign */) {
return g, nil
}
geometry, err := g.AsGeomT()
if err != nil {
return geo.Geometry{}, err
Expand Down
21 changes: 21 additions & 0 deletions pkg/geo/geomfn/segmentize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package geomfn

import (
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/geo"
Expand Down Expand Up @@ -125,6 +126,26 @@ func TestSegmentize(t *testing.T) {
maxSegmentLength: -1,
expectedWKT: "MULTIPOINT ((0.0 0.0), (1.0 1.0))",
},
{
wkt: "LINESTRING(0 0, 1 1)",
maxSegmentLength: math.NaN(),
expectedWKT: "LINESTRING(0 0, 1 1)",
},
{
wkt: "LINESTRING M (0 0 0, 1 1 1)",
maxSegmentLength: math.Sqrt(-1),
expectedWKT: "LINESTRING M (0 0 0, 1 1 1)",
},
{
wkt: "LINESTRING ZM (0 0 0 0, 1 1 1 1)",
maxSegmentLength: -math.NaN(),
expectedWKT: "LINESTRING(0 0 0 0, 1 1 1 1)",
},
{
wkt: "LINESTRING(0 0, 1 1)",
maxSegmentLength: math.Inf(1),
expectedWKT: "LINESTRING(0 0, 1 1)",
},
}
for _, test := range segmentizeTestCases {
t.Run(fmt.Sprintf("%s, maximum segment length: %f", test.wkt, test.maxSegmentLength), func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecjoin/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/duration", # keep
"//pkg/util/mon",
"@com_github_cockroachdb_apd_v2//:apd", # keep
Expand Down
58 changes: 40 additions & 18 deletions pkg/sql/colexec/colexecjoin/mergejoiner_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package colexecjoin

import "github.com/cockroachdb/cockroach/pkg/util"

// circularGroupsBuffer is a struct designed to store the groups' slices for a
// given column. It starts out small and will grow dynamically if necessary
// until pre-computed maximum capacity (since we know that there is a maximum
Expand Down Expand Up @@ -45,7 +47,12 @@ type circularGroupsBuffer struct {

// groupsBufferInitialSize determines the size used in initial allocations of
// the slices of the circularGroupsBuffer.
const groupsBufferInitialSize = 8
var groupsBufferInitialSize = util.ConstantWithMetamorphicTestRange(
"merge-joiner-groups-buffer",
8, /* defaultValue */
1, /* min */
16, /* max */
)

func makeGroupsBuffer(batchSize int) circularGroupsBuffer {
return circularGroupsBuffer{
Expand Down Expand Up @@ -156,37 +163,52 @@ func (b *circularGroupsBuffer) ensureCapacityForNewGroup() {
newLeftGroups := make([]group, getGroupsSlicesLen(newSize))
newRightGroups := make([]group, getGroupsSlicesLen(newSize))
// Note that this if block is never reached when startIdx == endIdx (because
// that would indicate an empty buffer and we would enough capacity given
// our initialization in makeGroupsBuffer).
// that would indicate an empty buffer and we would have enough capacity
// given our initialization in makeGroupsBuffer).
if b.startIdx <= b.endIdx {
// Current groups are contiguous in the slices, so copying them over is
// simple.
copy(newLeftGroups, b.leftGroups[b.startIdx:b.endIdx])
copy(newRightGroups, b.rightGroups[b.startIdx:b.endIdx])
b.nextColStartIdx -= b.startIdx
} else {
// Current groups are wrapped at position b.capacity. Namely, if we have
// size = 3, capacity = 7, we might have the following:
// buffer = [1, 2, 0', 1', 2', x, 0]
// where startIdx = 6, endIdx = 4, nextColStartIdx = 2, so we need to
// copy over with the adjustments.
// Current groups are wrapped after position b.capacity-1. Namely, if we
// have size = 3, capacity = 7, we might have the following:
// buffer = [1, 2, 0', 1', 2', x, 0] (1)
// where startIdx = 6, endIdx = 5, nextColStartIdx = 2, so we need to
// copy over with the adjustments so that the resulting state is
// buffer = [0, 1, 2, 0', 1', 2', x]
// where startIdx = 0, endIdx = 6, nextColStartIdx = 3.
//
// First, copy over the start of the buffer (which is currently at the
// end of the old slices) into the beginning of the new slices. In the
// example above, we're copying [0].
copy(newLeftGroups, b.leftGroups[b.startIdx:b.capacity])
copy(newRightGroups, b.rightGroups[b.startIdx:b.capacity])
// If non-empty, copy over the end of the buffer (which is currently at
// the beginning of the old slices). In the example above, we're copying
// [1, 2, 0', 1', 2'].
if b.endIdx > 0 {
offset := b.capacity - b.startIdx
copy(newLeftGroups[offset:], b.leftGroups[:b.endIdx])
copy(newRightGroups[offset:], b.rightGroups[:b.endIdx])
b.nextColStartIdx += offset
} else {
// There is a special case when endIdx is 0 - we're simply shifting
// the groups by startIdx to the left (which we have already done)
// which requires adjusting nextColStartIdx accordingly.
//
// Consider the following, size = 3, capacity = 7
// buffer = [x, 0, 1, 2, 0', 1', 2']
// with startIdx = 1, endIdx = 0, nextColStartIdx = 4. We don't need
// to copy anything, but we need to decrement nextColStartIdx by 1.
}
// Now update b.nextColStartIdx. There are two cases:
// 1. it was in the part we copied first. In such scenario we need to
// shift the index towards the beginning by the same offset as when
// we were copying (by b.startIdx).
// For example, consider the following, size = 3, capacity = 7
// buffer = [2', x, 0, 1, 2, 0', 1']
// with startIdx = 2, endIdx = 1, nextColStartIdx = 5. We need to
// decrement nextColStartIdx by 2.
// 2. it was in the part we copied second. In such scenario we need to
// shift the index towards the end by the same offset as when we were
// copying (by b.capacity-b.startIdx). Consider the example (1)
// above: we need to increment nextColStartIdx by 1.
if b.nextColStartIdx >= b.startIdx {
b.nextColStartIdx -= b.startIdx
} else {
b.nextColStartIdx += b.capacity - b.startIdx
}
}
b.startIdx = 0
Expand Down

0 comments on commit d145e9f

Please sign in to comment.