Skip to content

Commit

Permalink
Merge #74491
Browse files Browse the repository at this point in the history
74491: colexecwindow: fix disk spilling in some cases r=yuzefovich a=yuzefovich

**colserde: fix possible data corruption scenario during disk spilling**

This commit fixes a possible data corruption (which would either result
in a silent wrong query result or an internal error) scenario that could
occur when the data is serialized/deserialized in the vectorized engine.
This would occur when the deserialized vectors are appended to, and it
was the most likely to occur with Bytes-like types (because their `Set`s
can behave like appends to a certain degree).

We need to deserialize the data in two paths - in the inbox after
reading from network and during the disk spilling. I believe that the
former is safe (since we don't modify those batches) and the latter is
mostly safe (since we tend to not modify the batches that we read from
disk). I think the only exception is the window functions.

Consider the following scenario: a batch with two Bytes vectors is
serialized. Say
- the first vector is `{data:[foo], offsets:[0, 3]}`
- the second vector is `{data:[bar], offsets:[0, 3]}`.
After serializing both of them we will have a flat buffer with something
like:
  `buf = {1foo031bar03}` (ones represent the lengths of each vector).
Now, when the first vector is being deserialized, it's data slice will be
something like:
  `data` = `[foo031bar03]`, `len(data) = 3`, `cap(data) > 3`.
If we don't explicitly cap the slice and deserialize it into a Bytes
vector, then later when we append to that vector, we will overwrite the
data that is actually a part of the second serialized vector, thus,
corrupting it (or the next batch).

Release note (bug fix): Previously, CockroachDB could return incorrect
results or internal errors on queries with window functions returning
INT, FLOAT, BYTES, STRING, UUID, or JSON type when the disk spilling
occurred. The bug was introduced in 21.2.0 and is now fixed.

**colexecwindow: make bytes-like output vector valid before spilling**

`bufferedWindowOp` is special in a sense that its output vector is
appended by the `vectorTypeEnforcer` that is its input operator. That
output vector, thus, is part of the input batch, and the vector is
updated in incremental fashion (as the results are ready). It is also
possible that the input batch needs to be spilled to disk for the
operator to make progress. Previously, the output vector could be in an
invalid state (if it was bytes-like) because less elements were set on
the vector than the length of the batch. This is now fixed by making the
output vector valid before spilling.

Fixes: #70715.

Release note: None (because the previous commit contains very similar
info).

**colexecwindow: fix min/max optimized window functions**

This commit fixes a problem with MIN/MAX optimized window functions in
the vectorized engine. The problem was that we forgot to make an
explicit copy of the value before rewinding the spilling buffer (which
can make the previously retrieved value invalid).

Fixes: #74476.

Release note (bug fix): CockroachDB could previously incorrectly
calculate MIN/MAX when used as window functions in some cases after
spilling to disk. The bug was introduced in 21.2.0 and is now fixed.

**distsql: force disk spilling in TestWindowFunctionsAgainstProcessor**

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 7, 2022
2 parents 927a4a1 + ef4f3eb commit eddfd51
Show file tree
Hide file tree
Showing 7 changed files with 712 additions and 458 deletions.
42 changes: 37 additions & 5 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,29 +444,40 @@ func (c *ArrowBatchConverter) ArrowToBatch(
}

default:
// For integers and floats we can just directly cast the slice
// without performing the copy.
//
// However, we have to be careful to set the capacity on each slice
// explicitly to protect memory regions that come after the slice
// from corruption in case the slice will be appended to in the
// future. See an example in deserializeArrowIntoBytes.
var col coldata.Column
switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) {
case types.IntFamily:
switch typ.Width() {
case 16:
intArr := array.NewInt16Data(d)
vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength)
col = coldata.Int16s(intArr.Int16Values())
int16s := coldata.Int16s(intArr.Int16Values())
col = int16s[:len(int16s):len(int16s)]
case 32:
intArr := array.NewInt32Data(d)
vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength)
col = coldata.Int32s(intArr.Int32Values())
int32s := coldata.Int32s(intArr.Int32Values())
col = int32s[:len(int32s):len(int32s)]
case 0, 64:
intArr := array.NewInt64Data(d)
vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength)
col = coldata.Int64s(intArr.Int64Values())
int64s := coldata.Int64s(intArr.Int64Values())
col = int64s[:len(int64s):len(int64s)]
default:
panic(fmt.Sprintf("unexpected int width: %d", typ.Width()))
}
case types.FloatFamily:
floatArr := array.NewFloat64Data(d)
vec.Nulls().SetNullBitmap(floatArr.NullBitmapBytes(), batchLength)
col = coldata.Float64s(floatArr.Float64Values())
float64s := coldata.Float64s(floatArr.Float64Values())
col = float64s[:len(float64s):len(float64s)]
default:
panic(
fmt.Sprintf("unsupported type for conversion to column batch %s", d.DataType().Name()),
Expand Down Expand Up @@ -494,5 +505,26 @@ func deserializeArrowIntoBytes(
// corresponds.
b = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(bytes, b, bytesArr.ValueOffsets())
// Cap the data and offsets slices explicitly to protect against possible
// corruption of the memory region that is after the arrow data for this
// Bytes vector.
//
// Consider the following scenario: a batch with two Bytes vectors is
// serialized. Say
// - the first vector is {data:[foo], offsets:[0, 3]}
// - the second vector is {data:[bar], offsets:[0, 3]}.
// After serializing both of them we will have a flat buffer with something
// like:
// buf = {1foo031bar03} (ones represent the lengths of each vector).
// Now, when the first vector is being deserialized, it's data slice will be
// something like:
// data = [foo031bar03], len(data) = 3, cap(data) > 3.
// If we don't explicitly cap the slice and deserialize it into a Bytes
// vector, then later when we append to that vector, we will overwrite the
// data that is actually a part of the second serialized vector, thus,
// corrupting it (or the next batch).
offsets := bytesArr.ValueOffsets()
b = b[:len(b):len(b)]
offsets = offsets[:len(offsets):len(offsets)]
coldata.BytesFromArrowSerializationFormat(bytes, b, offsets)
}
28 changes: 26 additions & 2 deletions pkg/col/colserde/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func TestFileRoundtrip(t *testing.T) {
defer leaktest.AfterTest(t)()
typs, b := randomBatch(testAllocator)
rng, _ := randutil.NewTestRand()

t.Run(`mem`, func(t *testing.T) {
// Make a copy of the original batch because the converter modifies and
// Make copies of the original batch because the converter modifies and
// casts data without copying for performance reasons.
original := coldatatestutils.CopyBatch(b, typs, testColumnFactory)
bCopy := coldatatestutils.CopyBatch(b, typs, testColumnFactory)

var buf bytes.Buffer
s, err := colserde.NewFileSerializer(&buf, typs)
require.NoError(t, err)
require.NoError(t, s.AppendBatch(b))
// Append the same batch again.
require.NoError(t, s.AppendBatch(bCopy))
require.NoError(t, s.Finish())

// Parts of the deserialization modify things (null bitmaps) in place, so
Expand All @@ -50,9 +55,28 @@ func TestFileRoundtrip(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()
require.Equal(t, typs, d.Typs())
require.Equal(t, 1, d.NumBatches())
require.Equal(t, 2, d.NumBatches())

// Check the first batch.
require.NoError(t, d.GetBatch(0, roundtrip))
coldata.AssertEquivalentBatches(t, original, roundtrip)

// Modify the returned batch (by appending some other random
// batch) to make sure that the second serialized batch is
// unchanged.
length := rng.Intn(original.Length()) + 1
r := coldatatestutils.RandomBatch(testAllocator, rng, typs, length, length, rng.Float64())
for vecIdx, vec := range roundtrip.ColVecs() {
vec.Append(coldata.SliceArgs{
Src: r.ColVec(vecIdx),
DestIdx: original.Length(),
SrcEndIdx: length,
})
}
roundtrip.SetLength(original.Length() + length)

// Now check the second batch.
require.NoError(t, d.GetBatch(1, roundtrip))
coldata.AssertEquivalentBatches(t, original, roundtrip)
}()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/colexecutils/spilling_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (b *SpillingBuffer) AppendTuples(
// when tuples from a subsequent batch are accessed. If the index is less than
// zero or greater than or equal to the buffer length, GetVecWithTuple will
// panic.
//
// WARNING: the returned column vector is only valid until the next call to
// GetVecWithTuple. If the caller wants to hold onto the vector, a copy must be
// made.
func (b *SpillingBuffer) GetVecWithTuple(
ctx context.Context, colIdx, idx int,
) (_ coldata.Vec, rowIdx int, length int) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
// Load the next batch into currentBatch. If currentBatch still has data,
// move it into the queue.
if b.currentBatch != nil && b.currentBatch.Length() > 0 {
// We might have already set some values on the output vector
// within the current batch. If that vector is bytes-like, we
// have to explicitly maintain the invariant of the vector by
// updating the offsets.
// TODO(yuzefovich): it is quite unfortunate that the output
// vector is being spilled to disk. Consider refactoring this.
switch b.outputColFam {
case types.BytesFamily:
b.currentBatch.ColVec(b.outputColIdx).Bytes().UpdateOffsetsToBeNonDecreasing(b.currentBatch.Length())
case types.JsonFamily:
b.currentBatch.ColVec(b.outputColIdx).JSON().UpdateOffsetsToBeNonDecreasing(b.currentBatch.Length())
}
b.bufferQueue.Enqueue(b.Ctx, b.currentBatch)
}
// We have to copy the input batch data because calling Next on the input
Expand Down
Loading

0 comments on commit eddfd51

Please sign in to comment.