diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 3fc9fd8c670e..3740b86aa214 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -192,7 +192,8 @@ type KVBatch struct { // Progress represents the fraction of the input that generated this row. Progress float32 // KVs is the actual converted KV data. - KVs []roachpb.KeyValue + KVs []roachpb.KeyValue + MemSize int64 } // DatumRowConverter converts Datums into kvs and streams it to the destination @@ -234,6 +235,8 @@ var kvDatumRowConverterBatchSize = util.ConstantWithMetamorphicTestValue( 1, /* metamorphicValue */ ) +const kvDatumRowConverterBatchMemSize = 4 << 20 + // TestingSetDatumRowConverterBatchSize sets kvDatumRowConverterBatchSize and // returns function to reset this setting back to its old value. func TestingSetDatumRowConverterBatchSize(newSize int) func() { @@ -432,6 +435,7 @@ func NewDatumRowConverter( padding := 2 * (len(tableDesc.PublicNonPrimaryIndexes()) + len(tableDesc.GetFamilies())) c.BatchCap = kvDatumRowConverterBatchSize + padding c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap) + c.KvBatch.MemSize = 0 colsOrdered := make([]catalog.Column, len(cols)) for _, col := range c.tableDesc.PublicColumns() { @@ -505,6 +509,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in KVInserter(func(kv roachpb.KeyValue) { kv.Value.InitChecksum(kv.Key) c.KvBatch.KVs = append(c.KvBatch.KVs, kv) + c.KvBatch.MemSize += int64(cap(kv.Key) + cap(kv.Value.RawBytes)) }), insertRow, pm, @@ -514,7 +519,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in return errors.Wrap(err, "insert row") } // If our batch is full, flush it and start a new one. - if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize { + if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize || c.KvBatch.MemSize > kvDatumRowConverterBatchMemSize { if err := c.SendBatch(ctx); err != nil { return err } @@ -540,5 +545,6 @@ func (c *DatumRowConverter) SendBatch(ctx context.Context) error { return ctx.Err() } c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap) + c.KvBatch.MemSize = 0 return nil }