From a9c31fc45274f8e3e5d29a463ab9d53bded5791e Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Tue, 21 Sep 2021 14:14:50 -0400 Subject: [PATCH 1/2] streamingccl: unskip TestStreamIngestionFrontierProcessor This test does not flake anymore. There have been fixes around leaked goroutines that have been checked in in the recent past but the test mistakenly remained skipped: https://github.com/cockroachdb/cockroach/pull/69262 I got 3000 runs under stress. Fixes: #68704 --- .../streamingest/stream_ingestion_frontier_processor_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index 70e0a055f57b..7c0d79358057 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -37,7 +36,6 @@ type partitionToEvent map[streamingccl.PartitionAddress][]streamingccl.Event func TestStreamIngestionFrontierProcessor(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 68795, "flaky test") ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) From d3c354d8240d027db04ab7355eb329b0d8007e0f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sat, 25 Sep 2021 10:32:52 -0700 Subject: [PATCH 2/2] colencoding: reuse scratch space when key decoding bytes and decimals When we're key decoding bytes-like columns, we need to use the scratch byte slice to decode into (in case of decimals, we might need the space temporarily). Previously, we would always allocate a new byte slice only to deep copy it later when calling `coldata.Bytes.Set`. This commit teaches the cFetcher and the relevant decoding methods to reuse the same scratch space which should reduce the memory allocations. One notable change is that now when we're calling `DecodeBytesAscending`, we have to make sure to perform a deep copy so that it is safe to reuse the returned value as the scratch space in the future. Release note: None --- pkg/sql/colencoding/key_encoding.go | 73 ++++++++++++++++------------- pkg/sql/colfetcher/cfetcher.go | 22 +++++++-- 2 files changed, 60 insertions(+), 35 deletions(-) diff --git a/pkg/sql/colencoding/key_encoding.go b/pkg/sql/colencoding/key_encoding.go index a2eaa4f27d99..9339b90c4ae7 100644 --- a/pkg/sql/colencoding/key_encoding.go +++ b/pkg/sql/colencoding/key_encoding.go @@ -52,7 +52,8 @@ func DecodeIndexKeyToCols( colDirs []descpb.IndexDescriptor_Direction, key roachpb.Key, invertedColIdx int, -) (remainingKey roachpb.Key, matches bool, foundNull bool, _ error) { + scratch []byte, +) (remainingKey roachpb.Key, matches bool, foundNull bool, retScratch []byte, _ error) { var decodedTableID descpb.ID var decodedIndexID descpb.IndexID var err error @@ -68,7 +69,7 @@ func DecodeIndexKeyToCols( lastKeyComponentLength := len(key) key, decodedTableID, decodedIndexID, err = rowenc.DecodePartialTableIDIndexID(key) if err != nil { - return nil, false, false, err + return nil, false, false, scratch, err } if decodedTableID != ancestor.TableID || decodedIndexID != ancestor.IndexID { // We don't match. Return a key with the table ID / index ID we're @@ -76,7 +77,7 @@ func DecodeIndexKeyToCols( curPos := len(origKey) - lastKeyComponentLength // Prevent unwanted aliasing on the origKey by setting the capacity. key = rowenc.EncodePartialTableIDIndexID(origKey[:curPos:curPos], ancestor.TableID, ancestor.IndexID) - return key, false, false, nil + return key, false, false, scratch, nil } } @@ -84,12 +85,12 @@ func DecodeIndexKeyToCols( // We don't care about whether this call to DecodeKeyVals found a null or not, because // it is a interleaving ancestor. var isNull bool - key, isNull, err = DecodeKeyValsToCols( + key, isNull, scratch, err = DecodeKeyValsToCols( da, vecs, idx, indexColIdx[:length], checkAllColsForNull, types[:length], - colDirs[:length], nil /* unseen */, key, invertedColIdx, + colDirs[:length], nil /* unseen */, key, invertedColIdx, scratch, ) if err != nil { - return nil, false, false, err + return nil, false, false, scratch, err } indexColIdx, types, colDirs = indexColIdx[length:], types[length:], colDirs[length:] foundNull = foundNull || isNull @@ -103,14 +104,14 @@ func DecodeIndexKeyToCols( curPos := len(origKey) - len(key) // Prevent unwanted aliasing on the origKey by setting the capacity. key = encoding.EncodeInterleavedSentinel(origKey[:curPos:curPos]) - return key, false, false, nil + return key, false, false, scratch, nil } } lastKeyComponentLength := len(key) key, decodedTableID, decodedIndexID, err = rowenc.DecodePartialTableIDIndexID(key) if err != nil { - return nil, false, false, err + return nil, false, false, scratch, err } if decodedTableID != desc.GetID() || decodedIndexID != index.GetID() { // We don't match. Return a key with the table ID / index ID we're @@ -118,16 +119,17 @@ func DecodeIndexKeyToCols( curPos := len(origKey) - lastKeyComponentLength // Prevent unwanted aliasing on the origKey by setting the capacity. key = rowenc.EncodePartialTableIDIndexID(origKey[:curPos:curPos], desc.GetID(), index.GetID()) - return key, false, false, nil + return key, false, false, scratch, nil } } var isNull bool - key, isNull, err = DecodeKeyValsToCols( - da, vecs, idx, indexColIdx, checkAllColsForNull, types, colDirs, nil /* unseen */, key, invertedColIdx, + key, isNull, scratch, err = DecodeKeyValsToCols( + da, vecs, idx, indexColIdx, checkAllColsForNull, types, colDirs, + nil /* unseen */, key, invertedColIdx, scratch, ) if err != nil { - return nil, false, false, err + return nil, false, false, scratch, err } foundNull = foundNull || isNull @@ -139,10 +141,10 @@ func DecodeIndexKeyToCols( curPos := len(origKey) - lastKeyComponentLength // Prevent unwanted aliasing on the origKey by setting the capacity. key = encoding.EncodeNullDescending(origKey[:curPos:curPos]) - return key, false, false, nil + return key, false, false, scratch, nil } - return key, true, foundNull, nil + return key, true, foundNull, scratch, nil } // DecodeKeyValsToCols decodes the values that are part of the key, writing the @@ -171,7 +173,8 @@ func DecodeKeyValsToCols( unseen *util.FastIntSet, key []byte, invertedColIdx int, -) (remainingKey []byte, foundNull bool, _ error) { + scratch []byte, +) (remainingKey []byte, foundNull bool, retScratch []byte, _ error) { for j := range types { var err error i := indexColIdx[j] @@ -188,14 +191,14 @@ func DecodeKeyValsToCols( } var isNull bool isInverted := invertedColIdx == i - key, isNull, err = decodeTableKeyToCol(da, vecs[i], idx, types[j], key, directions[j], isInverted) + key, isNull, scratch, err = decodeTableKeyToCol(da, vecs[i], idx, types[j], key, directions[j], isInverted, scratch) foundNull = isNull || foundNull } if err != nil { - return nil, false, err + return nil, false, scratch, err } } - return key, foundNull, nil + return key, foundNull, scratch, nil } // decodeTableKeyToCol decodes a value encoded by EncodeTableKey, writing the result @@ -210,14 +213,15 @@ func decodeTableKeyToCol( key []byte, dir descpb.IndexDescriptor_Direction, isInverted bool, -) ([]byte, bool, error) { + scratch []byte, +) (_ []byte, _ bool, retScratch []byte, _ error) { if (dir != descpb.IndexDescriptor_ASC) && (dir != descpb.IndexDescriptor_DESC) { - return nil, false, errors.AssertionFailedf("invalid direction: %d", log.Safe(dir)) + return nil, false, scratch, errors.AssertionFailedf("invalid direction: %d", log.Safe(dir)) } var isNull bool if key, isNull = encoding.DecodeIfNull(key); isNull { vec.Nulls().SetNull(idx) - return key, true, nil + return key, true, scratch, nil } // We might have read a NULL value in the interleaved child table which // would update the nulls vector, so we need to explicitly unset the null @@ -229,10 +233,10 @@ func decodeTableKeyToCol( if isInverted { keyLen, err := encoding.PeekLength(key) if err != nil { - return nil, false, err + return nil, false, scratch, err } vec.Bytes().Set(idx, key[:keyLen]) - return key[keyLen:], false, nil + return key[keyLen:], false, scratch, nil } var rkey []byte @@ -272,21 +276,26 @@ func decodeTableKeyToCol( case types.DecimalFamily: var d apd.Decimal if dir == descpb.IndexDescriptor_ASC { - rkey, d, err = encoding.DecodeDecimalAscending(key, nil) + rkey, d, err = encoding.DecodeDecimalAscending(key, scratch[:0]) } else { - rkey, d, err = encoding.DecodeDecimalDescending(key, nil) + rkey, d, err = encoding.DecodeDecimalDescending(key, scratch[:0]) } vec.Decimal()[idx] = d case types.BytesFamily, types.StringFamily, types.UuidFamily: - var r []byte if dir == descpb.IndexDescriptor_ASC { - // No need to perform the deep copy since Set() below will do that - // for us. - rkey, r, err = encoding.DecodeBytesAscending(key, nil) + // We ask for the deep copy to be made so that scratch doesn't + // reference the memory of key - this allows us to return scratch + // to the caller to be reused. The deep copy additionally ensures + // that the memory of the BatchResponse (where key came from) can be + // GCed. + rkey, scratch, err = encoding.DecodeBytesAscendingDeepCopy(key, scratch[:0]) } else { - rkey, r, err = encoding.DecodeBytesDescending(key, nil) + rkey, scratch, err = encoding.DecodeBytesDescending(key, scratch[:0]) } - vec.Bytes().Set(idx, r) + // Set() performs a deep copy, so it is safe to return the scratch slice + // to the caller. Any modifications to the scratch slice made by the + // caller will not affect the value in the vector. + vec.Bytes().Set(idx, scratch) case types.TimestampFamily, types.TimestampTZFamily: var t time.Time if dir == descpb.IndexDescriptor_ASC { @@ -319,7 +328,7 @@ func decodeTableKeyToCol( d, rkey, err = rowenc.DecodeTableKey(da, valType, key, encDir) vec.Datum().Set(idx, d) } - return rkey, false, err + return rkey, false, scratch, err } // UnmarshalColumnValueToCol decodes the value from a roachpb.Value using the diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index ba0a7953f24c..95fbeb41948f 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -329,6 +329,10 @@ type cFetcher struct { tableoidCol coldata.DatumVec } + // scratch is a scratch space used when decoding bytes-like and decimal + // keys. + scratch []byte + typs []*types.T accountingHelper colmem.SetAccountingHelper memoryLimit int64 @@ -512,12 +516,14 @@ func (rf *cFetcher) Init( } indexColOrdinals := table.indexColOrdinals _ = indexColOrdinals[len(indexColumnIDs)-1] + needToDecodeDecimalKey := false for i, id := range indexColumnIDs { colIdx, ok := tableArgs.ColIdxMap.Get(id) if (ok && neededCols.Contains(int(id))) || rf.traceKV { //gcassert:bce indexColOrdinals[i] = colIdx rf.mustDecodeIndexKey = true + needToDecodeDecimalKey = needToDecodeDecimalKey || typs[colIdx].Family() == types.DecimalFamily // A composite column might also have a value encoding which must be // decoded. Others can be removed from neededValueColsByIdx. if compositeColumnIDs.Contains(int(id)) { @@ -533,6 +539,13 @@ func (rf *cFetcher) Init( } } } + if needToDecodeDecimalKey && cap(rf.scratch) < 64 { + // If we need to decode the decimal key encoding, it might use a scratch + // byte slice internally, so we'll allocate such a space to be reused + // for every decimal. + // TODO(yuzefovich): 64 was chosen arbitrarily, tune it. + rf.scratch = make([]byte, 64) + } table.invertedColOrdinal = -1 if table.index.GetType() == descpb.IndexDescriptor_INVERTED { id := table.index.InvertedColumnID() @@ -892,7 +905,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { // to determine whether a KV belongs to the same row as the // previous KV or a different row. checkAllColsForNull := rf.table.isSecondaryIndex && rf.table.index.IsUnique() && rf.table.desc.NumFamilies() != 1 - key, matches, foundNull, err = colencoding.DecodeIndexKeyToCols( + key, matches, foundNull, rf.scratch, err = colencoding.DecodeIndexKeyToCols( &rf.table.da, rf.machine.colvecs, rf.machine.rowIdx, @@ -904,6 +917,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { rf.table.indexColumnDirs, rf.machine.nextKV.Key[rf.table.knownPrefixLength:], rf.table.invertedColOrdinal, + rf.scratch, ) if err != nil { return nil, err @@ -1261,7 +1275,7 @@ func (rf *cFetcher) processValue(ctx context.Context, familyID descpb.FamilyID) if table.isSecondaryIndex && table.index.IsUnique() { // This is a unique secondary index; decode the extra // column values from the value. - valueBytes, _, err = colencoding.DecodeKeyValsToCols( + valueBytes, _, rf.scratch, err = colencoding.DecodeKeyValsToCols( &table.da, rf.machine.colvecs, rf.machine.rowIdx, @@ -1272,6 +1286,7 @@ func (rf *cFetcher) processValue(ctx context.Context, familyID descpb.FamilyID) &rf.machine.remainingValueColsByIdx, valueBytes, rf.table.invertedColOrdinal, + rf.scratch, ) if err != nil { return scrub.WrapError(scrub.SecondaryIndexKeyExtraValueDecodingError, err) @@ -1577,7 +1592,8 @@ func (rf *cFetcher) Release() { *rf = cFetcher{ // The types are small objects, so we don't bother deeply resetting this // slice. - typs: rf.typs[:0], + typs: rf.typs[:0], + scratch: rf.scratch[:0], } cFetcherPool.Put(rf) }