diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index d64600816e09..10aa76b41cc7 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -50,6 +50,10 @@ type Batch struct { // To be modified directly. Header roachpb.Header reqs []roachpb.RequestUnion + + // approxMutationReqBytes tracks the approximate size of keys and values in + // mutations added to this batch via Put, CPut, InitPut, Del, etc. + approxMutationReqBytes int // Set when AddRawRequest is used, in which case using the "other" // operations renders the batch unusable. raw bool @@ -384,6 +388,7 @@ func (b *Batch) put(key, value interface{}, inline bool) { } else { b.appendReqs(roachpb.NewPut(k, v)) } + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -489,6 +494,7 @@ func (b *Batch) cputInternal( } else { b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist)) } + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -512,6 +518,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) { return } b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones)) + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -613,6 +620,7 @@ func (b *Batch) Del(keys ...interface{}) { return } reqs = append(reqs, roachpb.NewDelete(k)) + b.approxMutationReqBytes += len(k) } b.appendReqs(reqs...) b.initResult(len(reqs), len(reqs), notRaw, nil) @@ -826,3 +834,10 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) { b.appendReqs(req) b.initResult(1, 0, notRaw, nil) } + +// ApproximateMutationBytes returns the approximate byte size of the mutations +// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added +// via AddRawRequest are not tracked. +func (b *Batch) ApproximateMutationBytes() int { + return b.approxMutationReqBytes +} diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index b09ea39ad89d..49dab04b41cc 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -115,7 +115,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if d.run.td.currentBatchSize >= d.run.td.maxBatchSize { + if d.run.td.currentBatchSize >= d.run.td.maxBatchSize || + d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize { break } } diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 7fa7b5a3e096..eb1092b198f7 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -238,7 +238,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize { + if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize || + n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize { break } } diff --git a/pkg/sql/mutations/mutations_util.go b/pkg/sql/mutations/mutations_util.go index b1541381781d..488fb034375f 100644 --- a/pkg/sql/mutations/mutations_util.go +++ b/pkg/sql/mutations/mutations_util.go @@ -27,6 +27,13 @@ var defaultMaxBatchSize = int64(util.ConstantWithMetamorphicTestRange( productionMaxBatchSize, /* max */ )) +var testingMaxBatchByteSize = util.ConstantWithMetamorphicTestRange( + "max-batch-byte-size", + 0, // we'll use the cluster setting instead if we see zero. + 1, /* min */ + 32<<20, /* max */ +) + // MaxBatchSize returns the max number of entries in the KV batch for a // mutation operation (delete, insert, update, upsert) - including secondary // index updates, FK cascading updates, etc - before the current KV batch is @@ -54,3 +61,12 @@ func SetMaxBatchSizeForTests(newMaxBatchSize int) { func ResetMaxBatchSizeForTests() { atomic.SwapInt64(&maxBatchSize, defaultMaxBatchSize) } + +// MaxBatchByteSize takes the passed value read from the cluster setting and +// returns it unless the testing metamorphic value overrides it. +func MaxBatchByteSize(clusterSetting int, forceProductionBatchSizes bool) int { + if forceProductionBatchSizes || testingMaxBatchByteSize == 0 { + return clusterSetting + } + return testingMaxBatchByteSize +} diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index a986a534b264..dcd51f8139ac 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -383,3 +383,26 @@ WHERE message LIKE '%r$rangeid: sending batch%' dist sender send r37: sending batch 4 CPut, 1 EndTxn to (n1,s1):1 dist sender send r37: sending batch 5 CPut to (n1,s1):1 dist sender send r37: sending batch 1 EndTxn to (n1,s1):1 + +# make a table with some big strings in it. +statement ok +CREATE TABLE blobs (i INT PRIMARY KEY, j STRING, FAMILY (i, j)) + +# make a table with some big (1mb) strings in it. +statement ok +SET TRACING=ON; + INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536); +SET TRACING=OFF; + +# verify insert of 24 rows paginated into 4 batches since they are .75mb each. +query TT +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] +WHERE message LIKE '%r$rangeid: sending batch%' + AND message NOT LIKE '%PushTxn%' + AND message NOT LIKE '%QueryTxn%' +---- +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 6 CPut to (n1,s1):1 +dist sender send r37: sending batch 1 EndTxn to (n1,s1):1 diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index cbb33d618cc7..79295104c263 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/mutations" "github.com/cockroachdb/cockroach/pkg/sql/row" @@ -109,6 +110,9 @@ type tableWriterBase struct { // for a mutation operation. By default, it will be set to 10k but can be // a different value in tests. maxBatchSize int + // maxBatchByteSize determines the maximum number of key and value bytes in + // the KV batch for a mutation operation. + maxBatchByteSize int // currentBatchSize is the size of the current batch. It is updated on // every row() call and is reset once a new batch is started. currentBatchSize int @@ -123,6 +127,12 @@ type tableWriterBase struct { forceProductionBatchSizes bool } +var maxBatchBytes = settings.RegisterByteSizeSetting( + "sql.mutations.mutation_batch_byte_size", + "byte size - in key and value lengths -- for mutation batches", + 4<<20, +) + func (tb *tableWriterBase) init( txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext, ) { @@ -131,6 +141,11 @@ func (tb *tableWriterBase) init( tb.b = txn.NewBatch() tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionBatchSizes tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes) + batchMaxBytes := int(maxBatchBytes.Default()) + if evalCtx != nil { + batchMaxBytes = int(maxBatchBytes.Get(&evalCtx.Settings.SV)) + } + tb.maxBatchByteSize = mutations.MaxBatchByteSize(batchMaxBytes, tb.forceProductionBatchSizes) } // flushAndStartNewBatch shares the common flushAndStartNewBatch() code between diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 262e78a90520..79bdf544ef3d 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -174,7 +174,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize { + if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize || + u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize { break } }