Skip to content

Commit

Permalink
Merge pull request #78957 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-78945

release-22.1: sql/row: limit row converter batches to 4MB
  • Loading branch information
dt authored Apr 13, 2022
2 parents 7c14d67 + 2ccc5e3 commit d632696
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ type KVBatch struct {
// Progress represents the fraction of the input that generated this row.
Progress float32
// KVs is the actual converted KV data.
KVs []roachpb.KeyValue
KVs []roachpb.KeyValue
MemSize int64
}

// DatumRowConverter converts Datums into kvs and streams it to the destination
Expand Down Expand Up @@ -224,6 +225,8 @@ var kvDatumRowConverterBatchSize = util.ConstantWithMetamorphicTestValue(
1, /* metamorphicValue */
)

const kvDatumRowConverterBatchMemSize = 4 << 20

// TestingSetDatumRowConverterBatchSize sets kvDatumRowConverterBatchSize and
// returns function to reset this setting back to its old value.
func TestingSetDatumRowConverterBatchSize(newSize int) func() {
Expand Down Expand Up @@ -420,6 +423,7 @@ func NewDatumRowConverter(
padding := 2 * (len(tableDesc.PublicNonPrimaryIndexes()) + len(tableDesc.GetFamilies()))
c.BatchCap = kvDatumRowConverterBatchSize + padding
c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap)
c.KvBatch.MemSize = 0

colsOrdered := make([]catalog.Column, len(cols))
for _, col := range c.tableDesc.PublicColumns() {
Expand Down Expand Up @@ -493,6 +497,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
KVInserter(func(kv roachpb.KeyValue) {
kv.Value.InitChecksum(kv.Key)
c.KvBatch.KVs = append(c.KvBatch.KVs, kv)
c.KvBatch.MemSize += int64(cap(kv.Key) + cap(kv.Value.RawBytes))
}),
insertRow,
pm,
Expand All @@ -502,7 +507,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
return errors.Wrap(err, "insert row")
}
// If our batch is full, flush it and start a new one.
if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize {
if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize || c.KvBatch.MemSize > kvDatumRowConverterBatchMemSize {
if err := c.SendBatch(ctx); err != nil {
return err
}
Expand All @@ -528,5 +533,6 @@ func (c *DatumRowConverter) SendBatch(ctx context.Context) error {
return ctx.Err()
}
c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap)
c.KvBatch.MemSize = 0
return nil
}

0 comments on commit d632696

Please sign in to comment.