Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexecwindow: fix disk spilling in some cases #74491

Merged
merged 4 commits into from
Jan 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,29 +444,40 @@ func (c *ArrowBatchConverter) ArrowToBatch(
}

default:
// For integers and floats we can just directly cast the slice
// without performing the copy.
//
// However, we have to be careful to set the capacity on each slice
// explicitly to protect memory regions that come after the slice
// from corruption in case the slice will be appended to in the
// future. See an example in deserializeArrowIntoBytes.
var col coldata.Column
switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) {
case types.IntFamily:
switch typ.Width() {
case 16:
intArr := array.NewInt16Data(d)
vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength)
col = coldata.Int16s(intArr.Int16Values())
int16s := coldata.Int16s(intArr.Int16Values())
col = int16s[:len(int16s):len(int16s)]
case 32:
intArr := array.NewInt32Data(d)
vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength)
col = coldata.Int32s(intArr.Int32Values())
int32s := coldata.Int32s(intArr.Int32Values())
col = int32s[:len(int32s):len(int32s)]
case 0, 64:
intArr := array.NewInt64Data(d)
vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength)
col = coldata.Int64s(intArr.Int64Values())
int64s := coldata.Int64s(intArr.Int64Values())
col = int64s[:len(int64s):len(int64s)]
default:
panic(fmt.Sprintf("unexpected int width: %d", typ.Width()))
}
case types.FloatFamily:
floatArr := array.NewFloat64Data(d)
vec.Nulls().SetNullBitmap(floatArr.NullBitmapBytes(), batchLength)
col = coldata.Float64s(floatArr.Float64Values())
float64s := coldata.Float64s(floatArr.Float64Values())
col = float64s[:len(float64s):len(float64s)]
default:
panic(
fmt.Sprintf("unsupported type for conversion to column batch %s", d.DataType().Name()),
Expand Down Expand Up @@ -494,5 +505,26 @@ func deserializeArrowIntoBytes(
// corresponds.
b = make([]byte, 0)
}
coldata.BytesFromArrowSerializationFormat(bytes, b, bytesArr.ValueOffsets())
// Cap the data and offsets slices explicitly to protect against possible
// corruption of the memory region that is after the arrow data for this
// Bytes vector.
//
// Consider the following scenario: a batch with two Bytes vectors is
// serialized. Say
// - the first vector is {data:[foo], offsets:[0, 3]}
// - the second vector is {data:[bar], offsets:[0, 3]}.
// After serializing both of them we will have a flat buffer with something
// like:
// buf = {1foo031bar03} (ones represent the lengths of each vector).
// Now, when the first vector is being deserialized, it's data slice will be
// something like:
// data = [foo031bar03], len(data) = 3, cap(data) > 3.
// If we don't explicitly cap the slice and deserialize it into a Bytes
// vector, then later when we append to that vector, we will overwrite the
// data that is actually a part of the second serialized vector, thus,
// corrupting it (or the next batch).
offsets := bytesArr.ValueOffsets()
b = b[:len(b):len(b)]
offsets = offsets[:len(offsets):len(offsets)]
coldata.BytesFromArrowSerializationFormat(bytes, b, offsets)
}
28 changes: 26 additions & 2 deletions pkg/col/colserde/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func TestFileRoundtrip(t *testing.T) {
defer leaktest.AfterTest(t)()
typs, b := randomBatch(testAllocator)
rng, _ := randutil.NewTestRand()

t.Run(`mem`, func(t *testing.T) {
// Make a copy of the original batch because the converter modifies and
// Make copies of the original batch because the converter modifies and
// casts data without copying for performance reasons.
original := coldatatestutils.CopyBatch(b, typs, testColumnFactory)
bCopy := coldatatestutils.CopyBatch(b, typs, testColumnFactory)

var buf bytes.Buffer
s, err := colserde.NewFileSerializer(&buf, typs)
require.NoError(t, err)
require.NoError(t, s.AppendBatch(b))
// Append the same batch again.
require.NoError(t, s.AppendBatch(bCopy))
require.NoError(t, s.Finish())

// Parts of the deserialization modify things (null bitmaps) in place, so
Expand All @@ -50,9 +55,28 @@ func TestFileRoundtrip(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()
require.Equal(t, typs, d.Typs())
require.Equal(t, 1, d.NumBatches())
require.Equal(t, 2, d.NumBatches())

// Check the first batch.
require.NoError(t, d.GetBatch(0, roundtrip))
coldata.AssertEquivalentBatches(t, original, roundtrip)

// Modify the returned batch (by appending some other random
// batch) to make sure that the second serialized batch is
// unchanged.
length := rng.Intn(original.Length()) + 1
r := coldatatestutils.RandomBatch(testAllocator, rng, typs, length, length, rng.Float64())
for vecIdx, vec := range roundtrip.ColVecs() {
vec.Append(coldata.SliceArgs{
Src: r.ColVec(vecIdx),
DestIdx: original.Length(),
SrcEndIdx: length,
})
}
roundtrip.SetLength(original.Length() + length)

// Now check the second batch.
require.NoError(t, d.GetBatch(1, roundtrip))
coldata.AssertEquivalentBatches(t, original, roundtrip)
}()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/colexecutils/spilling_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (b *SpillingBuffer) AppendTuples(
// when tuples from a subsequent batch are accessed. If the index is less than
// zero or greater than or equal to the buffer length, GetVecWithTuple will
// panic.
//
// WARNING: the returned column vector is only valid until the next call to
// GetVecWithTuple. If the caller wants to hold onto the vector, a copy must be
// made.
func (b *SpillingBuffer) GetVecWithTuple(
ctx context.Context, colIdx, idx int,
) (_ coldata.Vec, rowIdx int, length int) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
// Load the next batch into currentBatch. If currentBatch still has data,
// move it into the queue.
if b.currentBatch != nil && b.currentBatch.Length() > 0 {
// We might have already set some values on the output vector
// within the current batch. If that vector is bytes-like, we
// have to explicitly maintain the invariant of the vector by
// updating the offsets.
// TODO(yuzefovich): it is quite unfortunate that the output
// vector is being spilled to disk. Consider refactoring this.
switch b.outputColFam {
case types.BytesFamily:
b.currentBatch.ColVec(b.outputColIdx).Bytes().UpdateOffsetsToBeNonDecreasing(b.currentBatch.Length())
case types.JsonFamily:
b.currentBatch.ColVec(b.outputColIdx).JSON().UpdateOffsetsToBeNonDecreasing(b.currentBatch.Length())
}
b.bufferQueue.Enqueue(b.Ctx, b.currentBatch)
}
// We have to copy the input batch data because calling Next on the input
Expand Down
Loading