From 2ccc5e3f6bebfc03c530b7ab1b3a690c3e620704 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 29 Mar 2022 12:40:39 +0000 Subject: [PATCH] sql/row: limit row converter batches to 4MB These small batches are used to reduce channel overhead when passing KVs to the IMPORT ingest process, where we actually has move sophisticated memory monitoring in place around the big buffers. They are expected to be small and ephemeral, but for tables with very large rows, 5k KVs can actually become a non-trivial amount of used memory. This adds a second limit that triggers a flush if >4MB of key/value slice cap has been added to the batch. Release note: none. --- pkg/sql/row/row_converter.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 416c4f59935d..08cda6320102 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -182,7 +182,8 @@ type KVBatch struct { // Progress represents the fraction of the input that generated this row. Progress float32 // KVs is the actual converted KV data. - KVs []roachpb.KeyValue + KVs []roachpb.KeyValue + MemSize int64 } // DatumRowConverter converts Datums into kvs and streams it to the destination @@ -224,6 +225,8 @@ var kvDatumRowConverterBatchSize = util.ConstantWithMetamorphicTestValue( 1, /* metamorphicValue */ ) +const kvDatumRowConverterBatchMemSize = 4 << 20 + // TestingSetDatumRowConverterBatchSize sets kvDatumRowConverterBatchSize and // returns function to reset this setting back to its old value. func TestingSetDatumRowConverterBatchSize(newSize int) func() { @@ -420,6 +423,7 @@ func NewDatumRowConverter( padding := 2 * (len(tableDesc.PublicNonPrimaryIndexes()) + len(tableDesc.GetFamilies())) c.BatchCap = kvDatumRowConverterBatchSize + padding c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap) + c.KvBatch.MemSize = 0 colsOrdered := make([]catalog.Column, len(cols)) for _, col := range c.tableDesc.PublicColumns() { @@ -493,6 +497,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in KVInserter(func(kv roachpb.KeyValue) { kv.Value.InitChecksum(kv.Key) c.KvBatch.KVs = append(c.KvBatch.KVs, kv) + c.KvBatch.MemSize += int64(cap(kv.Key) + cap(kv.Value.RawBytes)) }), insertRow, pm, @@ -502,7 +507,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in return errors.Wrap(err, "insert row") } // If our batch is full, flush it and start a new one. - if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize { + if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize || c.KvBatch.MemSize > kvDatumRowConverterBatchMemSize { if err := c.SendBatch(ctx); err != nil { return err } @@ -528,5 +533,6 @@ func (c *DatumRowConverter) SendBatch(ctx context.Context) error { return ctx.Err() } c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap) + c.KvBatch.MemSize = 0 return nil }