diff --git a/pkg/col/colserde/arrowbatchconverter.go b/pkg/col/colserde/arrowbatchconverter.go index a8c0b63c5b29..e80f70daf7aa 100644 --- a/pkg/col/colserde/arrowbatchconverter.go +++ b/pkg/col/colserde/arrowbatchconverter.go @@ -444,6 +444,13 @@ func (c *ArrowBatchConverter) ArrowToBatch( } default: + // For integers and floats we can just directly cast the slice + // without performing the copy. + // + // However, we have to be careful to set the capacity on each slice + // explicitly to protect memory regions that come after the slice + // from corruption in case the slice will be appended to in the + // future. See an example in deserializeArrowIntoBytes. var col coldata.Column switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { case types.IntFamily: @@ -451,22 +458,26 @@ func (c *ArrowBatchConverter) ArrowToBatch( case 16: intArr := array.NewInt16Data(d) vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength) - col = coldata.Int16s(intArr.Int16Values()) + int16s := coldata.Int16s(intArr.Int16Values()) + col = int16s[:len(int16s):len(int16s)] case 32: intArr := array.NewInt32Data(d) vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength) - col = coldata.Int32s(intArr.Int32Values()) + int32s := coldata.Int32s(intArr.Int32Values()) + col = int32s[:len(int32s):len(int32s)] case 0, 64: intArr := array.NewInt64Data(d) vec.Nulls().SetNullBitmap(intArr.NullBitmapBytes(), batchLength) - col = coldata.Int64s(intArr.Int64Values()) + int64s := coldata.Int64s(intArr.Int64Values()) + col = int64s[:len(int64s):len(int64s)] default: panic(fmt.Sprintf("unexpected int width: %d", typ.Width())) } case types.FloatFamily: floatArr := array.NewFloat64Data(d) vec.Nulls().SetNullBitmap(floatArr.NullBitmapBytes(), batchLength) - col = coldata.Float64s(floatArr.Float64Values()) + float64s := coldata.Float64s(floatArr.Float64Values()) + col = float64s[:len(float64s):len(float64s)] default: panic( fmt.Sprintf("unsupported type for conversion to column batch %s", d.DataType().Name()), @@ -494,5 +505,26 @@ func deserializeArrowIntoBytes( // corresponds. b = make([]byte, 0) } - coldata.BytesFromArrowSerializationFormat(bytes, b, bytesArr.ValueOffsets()) + // Cap the data and offsets slices explicitly to protect against possible + // corruption of the memory region that is after the arrow data for this + // Bytes vector. + // + // Consider the following scenario: a batch with two Bytes vectors is + // serialized. Say + // - the first vector is {data:[foo], offsets:[0, 3]} + // - the second vector is {data:[bar], offsets:[0, 3]}. + // After serializing both of them we will have a flat buffer with something + // like: + // buf = {1foo031bar03} (ones represent the lengths of each vector). + // Now, when the first vector is being deserialized, it's data slice will be + // something like: + // data = [foo031bar03], len(data) = 3, cap(data) > 3. + // If we don't explicitly cap the slice and deserialize it into a Bytes + // vector, then later when we append to that vector, we will overwrite the + // data that is actually a part of the second serialized vector, thus, + // corrupting it (or the next batch). + offsets := bytesArr.ValueOffsets() + b = b[:len(b):len(b)] + offsets = offsets[:len(offsets):len(offsets)] + coldata.BytesFromArrowSerializationFormat(bytes, b, offsets) } diff --git a/pkg/col/colserde/file_test.go b/pkg/col/colserde/file_test.go index bcecc2f6792d..8eb74252aeea 100644 --- a/pkg/col/colserde/file_test.go +++ b/pkg/col/colserde/file_test.go @@ -22,22 +22,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" ) func TestFileRoundtrip(t *testing.T) { defer leaktest.AfterTest(t)() typs, b := randomBatch(testAllocator) + rng, _ := randutil.NewTestRand() t.Run(`mem`, func(t *testing.T) { - // Make a copy of the original batch because the converter modifies and + // Make copies of the original batch because the converter modifies and // casts data without copying for performance reasons. original := coldatatestutils.CopyBatch(b, typs, testColumnFactory) + bCopy := coldatatestutils.CopyBatch(b, typs, testColumnFactory) var buf bytes.Buffer s, err := colserde.NewFileSerializer(&buf, typs) require.NoError(t, err) require.NoError(t, s.AppendBatch(b)) + // Append the same batch again. + require.NoError(t, s.AppendBatch(bCopy)) require.NoError(t, s.Finish()) // Parts of the deserialization modify things (null bitmaps) in place, so @@ -50,9 +55,28 @@ func TestFileRoundtrip(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, d.Close()) }() require.Equal(t, typs, d.Typs()) - require.Equal(t, 1, d.NumBatches()) + require.Equal(t, 2, d.NumBatches()) + + // Check the first batch. require.NoError(t, d.GetBatch(0, roundtrip)) + coldata.AssertEquivalentBatches(t, original, roundtrip) + // Modify the returned batch (by appending some other random + // batch) to make sure that the second serialized batch is + // unchanged. + length := rng.Intn(original.Length()) + 1 + r := coldatatestutils.RandomBatch(testAllocator, rng, typs, length, length, rng.Float64()) + for vecIdx, vec := range roundtrip.ColVecs() { + vec.Append(coldata.SliceArgs{ + Src: r.ColVec(vecIdx), + DestIdx: original.Length(), + SrcEndIdx: length, + }) + } + roundtrip.SetLength(original.Length() + length) + + // Now check the second batch. + require.NoError(t, d.GetBatch(1, roundtrip)) coldata.AssertEquivalentBatches(t, original, roundtrip) }() } diff --git a/pkg/sql/colexec/colexecutils/spilling_buffer.go b/pkg/sql/colexec/colexecutils/spilling_buffer.go index a91040ed505a..eb71f78d867a 100644 --- a/pkg/sql/colexec/colexecutils/spilling_buffer.go +++ b/pkg/sql/colexec/colexecutils/spilling_buffer.go @@ -216,6 +216,10 @@ func (b *SpillingBuffer) AppendTuples( // when tuples from a subsequent batch are accessed. If the index is less than // zero or greater than or equal to the buffer length, GetVecWithTuple will // panic. +// +// WARNING: the returned column vector is only valid until the next call to +// GetVecWithTuple. If the caller wants to hold onto the vector, a copy must be +// made. func (b *SpillingBuffer) GetVecWithTuple( ctx context.Context, colIdx, idx int, ) (_ coldata.Vec, rowIdx int, length int) { diff --git a/pkg/sql/colexec/colexecwindow/buffered_window.go b/pkg/sql/colexec/colexecwindow/buffered_window.go index 398c374cb67c..a2cfce874a43 100644 --- a/pkg/sql/colexec/colexecwindow/buffered_window.go +++ b/pkg/sql/colexec/colexecwindow/buffered_window.go @@ -239,6 +239,18 @@ func (b *bufferedWindowOp) Next() coldata.Batch { // Load the next batch into currentBatch. If currentBatch still has data, // move it into the queue. if b.currentBatch != nil && b.currentBatch.Length() > 0 { + // We might have already set some values on the output vector + // within the current batch. If that vector is bytes-like, we + // have to explicitly maintain the invariant of the vector by + // updating the offsets. + // TODO(yuzefovich): it is quite unfortunate that the output + // vector is being spilled to disk. Consider refactoring this. + switch b.outputColFam { + case types.BytesFamily: + b.currentBatch.ColVec(b.outputColIdx).Bytes().UpdateOffsetsToBeNonDecreasing(b.currentBatch.Length()) + case types.JsonFamily: + b.currentBatch.ColVec(b.outputColIdx).JSON().UpdateOffsetsToBeNonDecreasing(b.currentBatch.Length()) + } b.bufferQueue.Enqueue(b.Ctx, b.currentBatch) } // We have to copy the input batch data because calling Next on the input diff --git a/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go b/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go index dd7f1b118761..e2415fb674b2 100644 --- a/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go +++ b/pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go @@ -299,31 +299,37 @@ func (a *minBoolAggregator) aggregateOverIntervals(intervals []windowInterval) { // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Bool().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy bool + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Bool().Get(cmpIdx) - { - var cmpResult int + { + var cmpResult int - if !cmpVal && val { - cmpResult = -1 - } else if cmpVal && !val { - cmpResult = 1 - } else { - cmpResult = 0 - } + if !cmpVal && valCopy { + cmpResult = -1 + } else if cmpVal && !valCopy { + cmpResult = 1 + } else { + cmpResult = 0 + } - cmp = cmpResult < 0 - } + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -454,23 +460,29 @@ func (a *minBytesAggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Bytes().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy []byte + valCopy = append(valCopy[:0], val...) + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Bytes().Get(cmpIdx) - { - var cmpResult int - cmpResult = bytes.Compare(cmpVal, val) - cmp = cmpResult < 0 - } + { + var cmpResult int + cmpResult = bytes.Compare(cmpVal, valCopy) + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -603,23 +615,29 @@ func (a *minDecimalAggregator) aggregateOverIntervals(intervals []windowInterval // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Decimal().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy apd.Decimal + valCopy.Set(&val) + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Decimal().Get(cmpIdx) - { - var cmpResult int - cmpResult = tree.CompareDecimals(&cmpVal, &val) - cmp = cmpResult < 0 - } + { + var cmpResult int + cmpResult = tree.CompareDecimals(&cmpVal, &valCopy) + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -763,34 +781,40 @@ func (a *minInt16Aggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Int16().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy int16 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Int16().Get(cmpIdx) { - a, b := int64(cmpVal), int64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else { - cmpResult = 0 + var cmpResult int + + { + a, b := int64(cmpVal), int64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else { + cmpResult = 0 + } } - } - cmp = cmpResult < 0 - } + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -934,34 +958,40 @@ func (a *minInt32Aggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Int32().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy int32 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Int32().Get(cmpIdx) { - a, b := int64(cmpVal), int64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else { - cmpResult = 0 + var cmpResult int + + { + a, b := int64(cmpVal), int64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else { + cmpResult = 0 + } } - } - cmp = cmpResult < 0 - } + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -1105,34 +1135,40 @@ func (a *minInt64Aggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Int64().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy int64 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Int64().Get(cmpIdx) { - a, b := int64(cmpVal), int64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else { - cmpResult = 0 + var cmpResult int + + { + a, b := int64(cmpVal), int64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else { + cmpResult = 0 + } } - } - cmp = cmpResult < 0 - } + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -1284,42 +1320,48 @@ func (a *minFloat64Aggregator) aggregateOverIntervals(intervals []windowInterval // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Float64().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy float64 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Float64().Get(cmpIdx) { - a, b := float64(cmpVal), float64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else if a == b { - cmpResult = 0 - } else if math.IsNaN(a) { - if math.IsNaN(b) { + var cmpResult int + + { + a, b := float64(cmpVal), float64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else if a == b { cmpResult = 0 + } else if math.IsNaN(a) { + if math.IsNaN(b) { + cmpResult = 0 + } else { + cmpResult = -1 + } } else { - cmpResult = -1 + cmpResult = 1 } - } else { - cmpResult = 1 } - } - cmp = cmpResult < 0 - } + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -1459,30 +1501,36 @@ func (a *minTimestampAggregator) aggregateOverIntervals(intervals []windowInterv // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Timestamp().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy time.Time + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Timestamp().Get(cmpIdx) - { - var cmpResult int + { + var cmpResult int - if cmpVal.Before(val) { - cmpResult = -1 - } else if val.Before(cmpVal) { - cmpResult = 1 - } else { - cmpResult = 0 + if cmpVal.Before(valCopy) { + cmpResult = -1 + } else if valCopy.Before(cmpVal) { + cmpResult = 1 + } else { + cmpResult = 0 + } + cmp = cmpResult < 0 } - cmp = cmpResult < 0 - } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -1615,23 +1663,29 @@ func (a *minIntervalAggregator) aggregateOverIntervals(intervals []windowInterva // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Interval().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy duration.Duration + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Interval().Get(cmpIdx) - { - var cmpResult int - cmpResult = cmpVal.Compare(val) - cmp = cmpResult < 0 - } + { + var cmpResult int + cmpResult = cmpVal.Compare(valCopy) + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -1790,29 +1844,46 @@ func (a *minJSONAggregator) aggregateOverIntervals(intervals []windowInterval) { // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.JSON().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy json.JSON - { - var cmpResult int + var _err error + var _bytes []byte + _bytes, _err = json.EncodeJSON(nil, val) + if _err != nil { + colexecerror.ExpectedError(_err) + } + valCopy, _err = json.FromEncoding(_bytes) + if _err != nil { + colexecerror.ExpectedError(_err) + } - var err error - cmpResult, err = cmpVal.Compare(val) - if err != nil { - colexecerror.ExpectedError(err) - } + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.JSON().Get(cmpIdx) - cmp = cmpResult < 0 - } + { + var cmpResult int - if cmp { - break + var err error + cmpResult, err = cmpVal.Compare(valCopy) + if err != nil { + colexecerror.ExpectedError(err) + } + + cmp = cmpResult < 0 + } + + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -1947,25 +2018,31 @@ func (a *minDatumAggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Datum().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy interface{} + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Datum().Get(cmpIdx) - { - var cmpResult int + { + var cmpResult int - cmpResult = coldataext.CompareDatum(cmpVal, col, val) + cmpResult = coldataext.CompareDatum(cmpVal, col, valCopy) - cmp = cmpResult < 0 - } + cmp = cmpResult < 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -2189,31 +2266,37 @@ func (a *maxBoolAggregator) aggregateOverIntervals(intervals []windowInterval) { // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Bool().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy bool + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Bool().Get(cmpIdx) - { - var cmpResult int + { + var cmpResult int - if !cmpVal && val { - cmpResult = -1 - } else if cmpVal && !val { - cmpResult = 1 - } else { - cmpResult = 0 - } + if !cmpVal && valCopy { + cmpResult = -1 + } else if cmpVal && !valCopy { + cmpResult = 1 + } else { + cmpResult = 0 + } - cmp = cmpResult > 0 - } + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -2344,23 +2427,29 @@ func (a *maxBytesAggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Bytes().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy []byte + valCopy = append(valCopy[:0], val...) + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Bytes().Get(cmpIdx) - { - var cmpResult int - cmpResult = bytes.Compare(cmpVal, val) - cmp = cmpResult > 0 - } + { + var cmpResult int + cmpResult = bytes.Compare(cmpVal, valCopy) + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -2493,23 +2582,29 @@ func (a *maxDecimalAggregator) aggregateOverIntervals(intervals []windowInterval // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Decimal().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy apd.Decimal + valCopy.Set(&val) + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Decimal().Get(cmpIdx) - { - var cmpResult int - cmpResult = tree.CompareDecimals(&cmpVal, &val) - cmp = cmpResult > 0 - } + { + var cmpResult int + cmpResult = tree.CompareDecimals(&cmpVal, &valCopy) + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -2653,34 +2748,40 @@ func (a *maxInt16Aggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Int16().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy int16 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Int16().Get(cmpIdx) { - a, b := int64(cmpVal), int64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else { - cmpResult = 0 + var cmpResult int + + { + a, b := int64(cmpVal), int64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else { + cmpResult = 0 + } } - } - cmp = cmpResult > 0 - } + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -2824,34 +2925,40 @@ func (a *maxInt32Aggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Int32().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy int32 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Int32().Get(cmpIdx) { - a, b := int64(cmpVal), int64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else { - cmpResult = 0 + var cmpResult int + + { + a, b := int64(cmpVal), int64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else { + cmpResult = 0 + } } - } - cmp = cmpResult > 0 - } + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -2995,34 +3102,40 @@ func (a *maxInt64Aggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Int64().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy int64 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Int64().Get(cmpIdx) { - a, b := int64(cmpVal), int64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else { - cmpResult = 0 + var cmpResult int + + { + a, b := int64(cmpVal), int64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else { + cmpResult = 0 + } } - } - cmp = cmpResult > 0 - } + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -3174,42 +3287,48 @@ func (a *maxFloat64Aggregator) aggregateOverIntervals(intervals []windowInterval // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Float64().Get(cmpIdx) - - { - var cmpResult int + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy float64 + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Float64().Get(cmpIdx) { - a, b := float64(cmpVal), float64(val) - if a < b { - cmpResult = -1 - } else if a > b { - cmpResult = 1 - } else if a == b { - cmpResult = 0 - } else if math.IsNaN(a) { - if math.IsNaN(b) { + var cmpResult int + + { + a, b := float64(cmpVal), float64(valCopy) + if a < b { + cmpResult = -1 + } else if a > b { + cmpResult = 1 + } else if a == b { cmpResult = 0 + } else if math.IsNaN(a) { + if math.IsNaN(b) { + cmpResult = 0 + } else { + cmpResult = -1 + } } else { - cmpResult = -1 + cmpResult = 1 } - } else { - cmpResult = 1 } - } - cmp = cmpResult > 0 - } + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -3349,30 +3468,36 @@ func (a *maxTimestampAggregator) aggregateOverIntervals(intervals []windowInterv // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Timestamp().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy time.Time + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Timestamp().Get(cmpIdx) - { - var cmpResult int + { + var cmpResult int - if cmpVal.Before(val) { - cmpResult = -1 - } else if val.Before(cmpVal) { - cmpResult = 1 - } else { - cmpResult = 0 + if cmpVal.Before(valCopy) { + cmpResult = -1 + } else if valCopy.Before(cmpVal) { + cmpResult = 1 + } else { + cmpResult = 0 + } + cmp = cmpResult > 0 } - cmp = cmpResult > 0 - } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -3505,23 +3630,29 @@ func (a *maxIntervalAggregator) aggregateOverIntervals(intervals []windowInterva // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Interval().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy duration.Duration + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Interval().Get(cmpIdx) - { - var cmpResult int - cmpResult = cmpVal.Compare(val) - cmp = cmpResult > 0 - } + { + var cmpResult int + cmpResult = cmpVal.Compare(valCopy) + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -3680,29 +3811,46 @@ func (a *maxJSONAggregator) aggregateOverIntervals(intervals []windowInterval) { // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.JSON().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy json.JSON - { - var cmpResult int + var _err error + var _bytes []byte + _bytes, _err = json.EncodeJSON(nil, val) + if _err != nil { + colexecerror.ExpectedError(_err) + } + valCopy, _err = json.FromEncoding(_bytes) + if _err != nil { + colexecerror.ExpectedError(_err) + } - var err error - cmpResult, err = cmpVal.Compare(val) - if err != nil { - colexecerror.ExpectedError(err) - } + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.JSON().Get(cmpIdx) - cmp = cmpResult > 0 - } + { + var cmpResult int - if cmp { - break + var err error + cmpResult, err = cmpVal.Compare(valCopy) + if err != nil { + colexecerror.ExpectedError(err) + } + + cmp = cmpResult > 0 + } + + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index @@ -3837,25 +3985,31 @@ func (a *maxDatumAggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.Datum().Get(cmpIdx) + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy interface{} + valCopy = val + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.Datum().Get(cmpIdx) - { - var cmpResult int + { + var cmpResult int - cmpResult = coldataext.CompareDatum(cmpVal, col, val) + cmpResult = coldataext.CompareDatum(cmpVal, col, valCopy) - cmp = cmpResult > 0 - } + cmp = cmpResult > 0 + } - if cmp { - break + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index diff --git a/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go b/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go index 6649c3b27470..4f466289b141 100644 --- a/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/min_max_removable_agg_tmpl.go @@ -268,17 +268,23 @@ func (a *_AGG_TYPEAggregator) aggregateOverIntervals(intervals []windowInterval) // keep it in the queue. Iterate from the end of the queue, removing any // values that are dominated by the current one. Add the current value // once the last value in the queue is better than the current one. - for !a.queue.isEmpty() { - cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) - cmpVal := cmpVec.TemplateType().Get(cmpIdx) - _ASSIGN_CMP(cmp, cmpVal, val, _, col, _) - if cmp { - break + if !a.queue.isEmpty() { + // We have to make a copy of val because GetVecWithTuple + // calls below might reuse the same underlying vector. + var valCopy _GOTYPE + execgen.COPYVAL(valCopy, val) + for !a.queue.isEmpty() { + cmpVec, cmpIdx, _ := a.buffer.GetVecWithTuple(a.Ctx, argColIdx, int(a.queue.getLast())) + cmpVal := cmpVec.TemplateType().Get(cmpIdx) + _ASSIGN_CMP(cmp, cmpVal, valCopy, _, col, _) + if cmp { + break + } + // Any values that could not fit in the queue would also have been + // dominated by the current one, so reset omittedIndex. + a.queue.removeLast() + a.omittedIndex = -1 } - // Any values that could not fit in the queue would also have been - // dominated by the current one, so reset omittedIndex. - a.queue.removeLast() - a.omittedIndex = -1 } if a.queue.addLast(idxToAdd) && a.omittedIndex == -1 { // The value couldn't fit in the queue. Keep track of the first index diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 668991a5ccdb..6bbbab9ceb50 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -1171,43 +1171,65 @@ func TestWindowFunctionsAgainstProcessor(t *testing.T) { ResultTypes: append(inputTypes, outputType), } args := verifyColOperatorArgs{ + rng: rng, anyOrder: true, inputTypes: [][]*types.T{inputTypes}, inputs: []rowenc.EncDatumRows{rows}, pspec: pspec, + // Some window functions don't buffer anything, so they + // won't ever spill to disk. Rather than examining each + // function and checking whether it buffers or not, + // we're being lazy and don't require the spilling to + // occur. + forcedDiskSpillMightNotOccur: true, } - if err := verifyColOperator(t, args); err != nil { - if strings.Contains(err.Error(), "different errors returned") { - // Columnar and row-based windowers are likely to hit - // different errors, and we will swallow those and move - // on. - continue - } - if strings.Contains(err.Error(), "integer out of range") && - fun.AggregateFunc != nil && *fun.AggregateFunc == execinfrapb.SumInt { - // The columnar implementation of this window function uses the - // sliding window optimization, but the row engine version - // doesn't. As a result, in some cases the row engine will - // overflow while the vectorized engine doesn't. + for _, spillForced := range []bool{false, true} { + if spillForced && nRows == manyRows { + // Don't force disk spilling with many rows since it + // might take a while. continue } - fmt.Printf("window function: %s\n", funcName) - fmt.Printf("partitionCols: %v\n", partitionBy) - fmt.Print("ordering: ") - for i := range ordering.Columns { - fmt.Printf("%v %v, ", ordering.Columns[i].ColIdx, ordering.Columns[i].Direction) + args.forceDiskSpill = spillForced + if err := verifyColOperator(t, args); err != nil { + if strings.Contains(err.Error(), "different errors returned") { + // Columnar and row-based windowers are likely to hit + // different errors, and we will swallow those and move + // on. + continue + } + if strings.Contains(err.Error(), "Err:windower-limited: memory budget exceeded") { + // The row-based windower can hit a memory error + // because some of its state cannot be spilled + // to disk. Ignore such cases. + continue + } + if strings.Contains(err.Error(), "integer out of range") && + fun.AggregateFunc != nil && *fun.AggregateFunc == execinfrapb.SumInt { + // The columnar implementation of this window function uses the + // sliding window optimization, but the row engine version + // doesn't. As a result, in some cases the row engine will + // overflow while the vectorized engine doesn't. + continue + } + fmt.Printf("force disk spill: %t\n", spillForced) + fmt.Printf("window function: %s\n", funcName) + fmt.Printf("partitionCols: %v\n", partitionBy) + fmt.Print("ordering: ") + for i := range ordering.Columns { + fmt.Printf("%v %v, ", ordering.Columns[i].ColIdx, ordering.Columns[i].Direction) + } + fmt.Println() + fmt.Printf("argIdxs: %v\n", argsIdxs) + frame := windowerSpec.WindowFns[0].Frame + fmt.Printf("frame mode: %v\n", frame.Mode) + fmt.Printf("start bound: %v\n", frame.Bounds.Start) + fmt.Printf("end bound: %v\n", *frame.Bounds.End) + fmt.Printf("frame exclusion: %v\n", frame.Exclusion) + fmt.Printf("seed = %d\n", seed) + prettyPrintTypes(inputTypes, "t" /* tableName */) + prettyPrintInput(rows, inputTypes, "t" /* tableName */) + t.Fatal(err) } - fmt.Println() - fmt.Printf("argIdxs: %v\n", argsIdxs) - frame := windowerSpec.WindowFns[0].Frame - fmt.Printf("frame mode: %v\n", frame.Mode) - fmt.Printf("start bound: %v\n", frame.Bounds.Start) - fmt.Printf("end bound: %v\n", *frame.Bounds.End) - fmt.Printf("frame exclusion: %v\n", frame.Exclusion) - fmt.Printf("seed = %d\n", seed) - prettyPrintTypes(inputTypes, "t" /* tableName */) - prettyPrintInput(rows, inputTypes, "t" /* tableName */) - t.Fatal(err) } } }