Skip to content

Commit

Permalink
sql/row: limit row converter batches to 4MB
Browse files Browse the repository at this point in the history
These small batches are used to reduce channel overhead when passing KVs
to the IMPORT ingest process, where we actually has move sophisticated
memory monitoring in place around the big buffers. They are expected to
be small and ephemeral, but for tables with very large rows, 5k KVs can
actually become a non-trivial amount of used memory. This adds a second limit
that triggers a flush if >4MB of key/value slice cap has been added to the
batch.

Release note: none.
  • Loading branch information
dt committed Mar 29, 2022
1 parent 081231f commit 2ccc5e3
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ type KVBatch struct {
// Progress represents the fraction of the input that generated this row.
Progress float32
// KVs is the actual converted KV data.
KVs []roachpb.KeyValue
KVs []roachpb.KeyValue
MemSize int64
}

// DatumRowConverter converts Datums into kvs and streams it to the destination
Expand Down Expand Up @@ -224,6 +225,8 @@ var kvDatumRowConverterBatchSize = util.ConstantWithMetamorphicTestValue(
1, /* metamorphicValue */
)

const kvDatumRowConverterBatchMemSize = 4 << 20

// TestingSetDatumRowConverterBatchSize sets kvDatumRowConverterBatchSize and
// returns function to reset this setting back to its old value.
func TestingSetDatumRowConverterBatchSize(newSize int) func() {
Expand Down Expand Up @@ -420,6 +423,7 @@ func NewDatumRowConverter(
padding := 2 * (len(tableDesc.PublicNonPrimaryIndexes()) + len(tableDesc.GetFamilies()))
c.BatchCap = kvDatumRowConverterBatchSize + padding
c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap)
c.KvBatch.MemSize = 0

colsOrdered := make([]catalog.Column, len(cols))
for _, col := range c.tableDesc.PublicColumns() {
Expand Down Expand Up @@ -493,6 +497,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
KVInserter(func(kv roachpb.KeyValue) {
kv.Value.InitChecksum(kv.Key)
c.KvBatch.KVs = append(c.KvBatch.KVs, kv)
c.KvBatch.MemSize += int64(cap(kv.Key) + cap(kv.Value.RawBytes))
}),
insertRow,
pm,
Expand All @@ -502,7 +507,7 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in
return errors.Wrap(err, "insert row")
}
// If our batch is full, flush it and start a new one.
if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize {
if len(c.KvBatch.KVs) >= kvDatumRowConverterBatchSize || c.KvBatch.MemSize > kvDatumRowConverterBatchMemSize {
if err := c.SendBatch(ctx); err != nil {
return err
}
Expand All @@ -528,5 +533,6 @@ func (c *DatumRowConverter) SendBatch(ctx context.Context) error {
return ctx.Err()
}
c.KvBatch.KVs = make([]roachpb.KeyValue, 0, c.BatchCap)
c.KvBatch.MemSize = 0
return nil
}

0 comments on commit 2ccc5e3

Please sign in to comment.