From 5f08c41cd7329c42157153df38332a3959de1021 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 13 Jul 2021 16:02:57 +0000 Subject: [PATCH] sql: add sql.mutations.mutation_batch_byte_size setting Previously we always constructed 10k row insert batches, regardless of the size of those rows. With large rows, this could easily exceed the kv size limit of 64MB. This changes batch construction to track the size of added keys and values, and send the batch either when it has 10k entries of when the size of added keys and values exceeds the setting, which defaults to 4MB. Release note (bug fix): INSERT and UPDATE statements which operate on larger rows are split into batches using the sql.mutations.mutation_batch_byte_size setting --- pkg/kv/batch.go | 16 +++++++++++-- pkg/sql/delete.go | 3 ++- pkg/sql/insert.go | 3 ++- pkg/sql/mutations/mutations_util.go | 16 +++++++++++++ .../testdata/show_trace_nonmetamorphic | 23 +++++++++++++++++++ pkg/sql/tablewriter.go | 11 +++++++++ pkg/sql/update.go | 3 ++- 7 files changed, 70 insertions(+), 5 deletions(-) diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 663155f08cdd..057d7c11478b 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -51,8 +51,9 @@ type Batch struct { Header roachpb.Header // The AdmissionHeader which will be used when sending the resulting // BatchRequest. To be modified directly. - AdmissionHeader roachpb.AdmissionHeader - reqs []roachpb.RequestUnion + AdmissionHeader roachpb.AdmissionHeader + reqs []roachpb.RequestUnion + mutationReqBytes int // Set when AddRawRequest is used, in which case using the "other" // operations renders the batch unusable. raw bool @@ -385,6 +386,7 @@ func (b *Batch) put(key, value interface{}, inline bool) { } else { b.appendReqs(roachpb.NewPut(k, v)) } + b.mutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -490,6 +492,7 @@ func (b *Batch) cputInternal( } else { b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist)) } + b.mutationReqBytes += len(k) + len(expValue) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -513,6 +516,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) { return } b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones)) + b.mutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -614,6 +618,7 @@ func (b *Batch) Del(keys ...interface{}) { return } reqs = append(reqs, roachpb.NewDelete(k)) + b.mutationReqBytes += len(k) } b.appendReqs(reqs...) b.initResult(len(reqs), len(reqs), notRaw, nil) @@ -805,3 +810,10 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) { b.appendReqs(req) b.initResult(1, 0, notRaw, nil) } + +// ApproximateMutationBytes returns the approximate byte size of the mutations +// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added +// via AddRawRequest are not tracked. +func (b *Batch) ApproximateMutationBytes() int { + return b.mutationReqBytes +} diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index b09ea39ad89d..49dab04b41cc 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -115,7 +115,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if d.run.td.currentBatchSize >= d.run.td.maxBatchSize { + if d.run.td.currentBatchSize >= d.run.td.maxBatchSize || + d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize { break } } diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 2914762884bb..dde50306e5c0 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -237,7 +237,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize { + if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize || + n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize { break } } diff --git a/pkg/sql/mutations/mutations_util.go b/pkg/sql/mutations/mutations_util.go index b1541381781d..488fb034375f 100644 --- a/pkg/sql/mutations/mutations_util.go +++ b/pkg/sql/mutations/mutations_util.go @@ -27,6 +27,13 @@ var defaultMaxBatchSize = int64(util.ConstantWithMetamorphicTestRange( productionMaxBatchSize, /* max */ )) +var testingMaxBatchByteSize = util.ConstantWithMetamorphicTestRange( + "max-batch-byte-size", + 0, // we'll use the cluster setting instead if we see zero. + 1, /* min */ + 32<<20, /* max */ +) + // MaxBatchSize returns the max number of entries in the KV batch for a // mutation operation (delete, insert, update, upsert) - including secondary // index updates, FK cascading updates, etc - before the current KV batch is @@ -54,3 +61,12 @@ func SetMaxBatchSizeForTests(newMaxBatchSize int) { func ResetMaxBatchSizeForTests() { atomic.SwapInt64(&maxBatchSize, defaultMaxBatchSize) } + +// MaxBatchByteSize takes the passed value read from the cluster setting and +// returns it unless the testing metamorphic value overrides it. +func MaxBatchByteSize(clusterSetting int, forceProductionBatchSizes bool) int { + if forceProductionBatchSizes || testingMaxBatchByteSize == 0 { + return clusterSetting + } + return testingMaxBatchByteSize +} diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index ed9de6a7248b..bcf5dbaca6bf 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -383,3 +383,26 @@ WHERE message LIKE '%r$rangeid: sending batch%' dist sender send r40: sending batch 4 CPut, 1 EndTxn to (n1,s1):1 dist sender send r40: sending batch 5 CPut to (n1,s1):1 dist sender send r40: sending batch 1 EndTxn to (n1,s1):1 + +# make a table with some big strings in it. +statement ok +CREATE TABLE blobs (i INT PRIMARY KEY, j STRING) + +# make a table with some big (1mb) strings in it. +statement ok +SET TRACING=ON; + INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536); +SET TRACING=OFF; + +# verify insert of 24 rows paginated into 4 batches since they are .75mb each. +query TT +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] +WHERE message LIKE '%r$rangeid: sending batch%' + AND message NOT LIKE '%PushTxn%' + AND message NOT LIKE '%QueryTxn%' +---- +dist sender send r40: sending batch 6 CPut to (n1,s1):1 +dist sender send r40: sending batch 6 CPut to (n1,s1):1 +dist sender send r40: sending batch 6 CPut to (n1,s1):1 +dist sender send r40: sending batch 6 CPut to (n1,s1):1 +dist sender send r40: sending batch 1 EndTxn to (n1,s1):1 diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index d6c1ecc4046f..73815c508a92 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/mutations" "github.com/cockroachdb/cockroach/pkg/sql/row" @@ -111,6 +112,9 @@ type tableWriterBase struct { // for a mutation operation. By default, it will be set to 10k but can be // a different value in tests. maxBatchSize int + // maxBatchByteSize determines the maximum number of key and value bytes in + // the KV batch for a mutation operation. + maxBatchByteSize int // currentBatchSize is the size of the current batch. It is updated on // every row() call and is reset once a new batch is started. currentBatchSize int @@ -125,6 +129,12 @@ type tableWriterBase struct { forceProductionBatchSizes bool } +var maxBatchBytes = settings.RegisterByteSizeSetting( + "sql.mutations.mutation_batch_byte_size", + "byte size - in key and value lengths -- for mutation batches", + 4<<20, +) + func (tb *tableWriterBase) init( txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext, ) { @@ -133,6 +143,7 @@ func (tb *tableWriterBase) init( tb.b = txn.NewBatch() tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionBatchSizes tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes) + tb.maxBatchByteSize = mutations.MaxBatchByteSize(int(maxBatchBytes.Get(&evalCtx.Settings.SV)), evalCtx.TestingKnobs.ForceProductionBatchSizes) } // flushAndStartNewBatch shares the common flushAndStartNewBatch() code between diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 02dd7958f2cb..0fac095eff04 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -173,7 +173,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize { + if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize || + u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize { break } }