diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 416c4f59935d..08cda6320102 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -182,7 +182,8 @@ type KVBatch struct { // Progress represents the fraction of the input that generated this row. Progress float32 // KVs is the actual converted KV data. - KVs []roachpb.KeyValue + KVs []roachpb.KeyValue + MemSize int64 } // DatumRowConverter converts Datums into kvs and streams it to the destination @@ -224,6 +225,8 @@ var kvDatumRowConverterBatchSize = util.ConstantWithMetamorphicTestValue( 1, /* metamorphicValue */ ) +const kvDatumRowConverterBatchMemSize = 4 << 20 + // TestingSetDatumRowConverterBatchSize sets kvDatumRowConverterBatchSize and // returns function to reset this setting back to its old value. func TestingSetDatumRowConverterBatchSize(newSize int) func() { @@ -420,6 +423,7 @@ func NewDatumRowConverter( padding := 2 * (len(tableDesc.PublicNonPrimaryIndexes()) + len(tableDesc.GetFamilies())) c.BatchCap = kvDatumRowConverterBatchSize + padding c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap) + c.KvBatch.MemSize = 0 colsOrdered := make([]catalog.Column, len(cols)) for _, col := range c.tableDesc.PublicColumns() { @@ -493,6 +497,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in KVInserter(func(kv roachpb.KeyValue) { kv.Value.InitChecksum(kv.Key) c.KvBatch.KVs = append(c.KvBatch.KVs, kv) + c.KvBatch.MemSize += int64(cap(kv.Key) + cap(kv.Value.RawBytes)) }), insertRow, pm, @@ -502,7 +507,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in return errors.Wrap(err, "insert row") } // If our batch is full, flush it and start a new one. - if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize { + if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize || c.KvBatch.MemSize > kvDatumRowConverterBatchMemSize { if err := c.SendBatch(ctx); err != nil { return err } @@ -528,5 +533,6 @@ func (c *DatumRowConverter) SendBatch(ctx context.Context) error { return ctx.Err() } c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap) + c.KvBatch.MemSize = 0 return nil }