From 192e83ba8d0d55c0c96583e4898fb141232d72a1 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 13 Jul 2021 16:02:57 +0000 Subject: [PATCH] sql: add sql.mutations.mutation_batch_byte_size setting Previously we always constructed 10k row insert batches, regardless of the size of those rows. With large rows, this could easily exceed the kv size limit of 64MB. This changes batch construction to track the size of added keys and values, and send the batch either when it has 10k entries of when the size of added keys and values exceeds the setting, which defaults to 4MB. Release note (bug fix): INSERT and UPDATE statements which operate on larger rows are split into batches using the sql.mutations.mutation_batch_byte_size setting. --- pkg/kv/batch.go | 15 ++++++++ pkg/sql/delete.go | 3 +- pkg/sql/insert.go | 3 +- .../execbuilder/testdata/show_trace_mutations | 35 +++++++++++++++++++ pkg/sql/tablewriter.go | 18 +++++++++- pkg/sql/tablewriter_delete.go | 4 +-- pkg/sql/tablewriter_insert.go | 4 +-- pkg/sql/tablewriter_update.go | 4 +-- pkg/sql/tablewriter_upsert_opt.go | 2 +- pkg/sql/update.go | 3 +- 10 files changed, 80 insertions(+), 11 deletions(-) create mode 100644 pkg/sql/opt/exec/execbuilder/testdata/show_trace_mutations diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 098d9336d619..b579d1755789 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -49,6 +49,10 @@ type Batch struct { // To be modified directly. Header roachpb.Header reqs []roachpb.RequestUnion + + // approxMutationReqBytes tracks the approximate size of keys and values in + // mutations added to this batch via Put, CPut, InitPut, Del, etc. + approxMutationReqBytes int // Set when AddRawRequest is used, in which case using the "other" // operations renders the batch unusable. raw bool @@ -369,6 +373,7 @@ func (b *Batch) put(key, value interface{}, inline bool) { } else { b.appendReqs(roachpb.NewPut(k, v)) } + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -443,6 +448,7 @@ func (b *Batch) cputInternal(key, value interface{}, expValue []byte, allowNotEx return } b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist)) + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -466,6 +472,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) { return } b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones)) + b.approxMutationReqBytes += len(k) + len(v.RawBytes) b.initResult(1, 1, notRaw, nil) } @@ -567,6 +574,7 @@ func (b *Batch) Del(keys ...interface{}) { return } reqs = append(reqs, roachpb.NewDelete(k)) + b.approxMutationReqBytes += len(k) } b.appendReqs(reqs...) b.initResult(len(reqs), len(reqs), notRaw, nil) @@ -754,3 +762,10 @@ func (b *Batch) addSSTable( b.appendReqs(req) b.initResult(1, 0, notRaw, nil) } + +// ApproximateMutationBytes returns the approximate byte size of the mutations +// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added +// via AddRawRequest are not tracked. +func (b *Batch) ApproximateMutationBytes() int { + return b.approxMutationReqBytes +} diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index d960676c9c45..352b0a9d5afb 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -118,7 +118,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if d.run.td.currentBatchSize >= d.run.td.maxBatchSize { + if d.run.td.currentBatchSize >= d.run.td.maxBatchSize || + d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize { break } } diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 20283dd8ee52..a97c4d4830ee 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -240,7 +240,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize { + if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize || + n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize { break } } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_mutations b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_mutations new file mode 100644 index 000000000000..9e5bf6ca1062 --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_mutations @@ -0,0 +1,35 @@ +# LogicTest: local + +# make a table with some big strings in it. +statement ok +CREATE TABLE blobs (i INT PRIMARY KEY, j STRING, FAMILY (i, j)) + +# Get the range id. +let $rangeid +SELECT range_id FROM [ SHOW RANGES FROM TABLE blobs ] + +# Populate table descriptor cache. +query IT +SELECT * FROM blobs +---- + +# make a table with some big (1mb) strings in it. +statement ok +SET TRACING=ON; + INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536); +SET TRACING=OFF; + +# verify insert of 24 rows paginated into 4 batches since they are .75mb each. +query TT +SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] +WHERE message LIKE '%r$rangeid: sending batch%' + AND message NOT LIKE '%PushTxn%' + AND message NOT LIKE '%QueryTxn%' + AND operation NOT LIKE '%[async]%' + AND message NOT LIKE '%HeartbeatTxn%' +---- +dist sender send r35: sending batch 6 CPut to (n1,s1):1 +dist sender send r35: sending batch 6 CPut to (n1,s1):1 +dist sender send r35: sending batch 6 CPut to (n1,s1):1 +dist sender send r35: sending batch 6 CPut to (n1,s1):1 +dist sender send r35: sending batch 1 EndTxn to (n1,s1):1 diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index c5bb3b2922e1..332e27fc326e 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/mutations" "github.com/cockroachdb/cockroach/pkg/sql/row" @@ -109,6 +110,9 @@ type tableWriterBase struct { // for a mutation operation. By default, it will be set to 10k but can be // a different value in tests. maxBatchSize int + // maxBatchByteSize determines the maximum number of key and value bytes in + // the KV batch for a mutation operation. + maxBatchByteSize int // currentBatchSize is the size of the current batch. It is updated on // every row() call and is reset once a new batch is started. currentBatchSize int @@ -120,11 +124,23 @@ type tableWriterBase struct { rows *rowcontainer.RowContainer } -func (tb *tableWriterBase) init(txn *kv.Txn, tableDesc catalog.TableDescriptor) { +var maxBatchBytes = settings.RegisterByteSizeSetting( + "sql.mutations.mutation_batch_byte_size", + "byte size - in key and value lengths -- for mutation batches", + 4<<20, +) + +func (tb *tableWriterBase) init( + txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext, +) { tb.txn = txn tb.desc = tableDesc tb.b = txn.NewBatch() tb.maxBatchSize = mutations.MaxBatchSize() + tb.maxBatchByteSize = int(maxBatchBytes.Default()) + if evalCtx != nil { + tb.maxBatchByteSize = int(maxBatchBytes.Get(&evalCtx.Settings.SV)) + } } // flushAndStartNewBatch shares the common flushAndStartNewBatch() code between diff --git a/pkg/sql/tablewriter_delete.go b/pkg/sql/tablewriter_delete.go index fdb81d14debe..10a86b89e65f 100644 --- a/pkg/sql/tablewriter_delete.go +++ b/pkg/sql/tablewriter_delete.go @@ -42,8 +42,8 @@ func (*tableDeleter) desc() string { return "deleter" } func (td *tableDeleter) walkExprs(_ func(desc string, index int, expr tree.TypedExpr)) {} // init is part of the tableWriter interface. -func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, _ *tree.EvalContext) error { - td.tableWriterBase.init(txn, td.tableDesc()) +func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, evalCtx *tree.EvalContext) error { + td.tableWriterBase.init(txn, td.tableDesc(), evalCtx) return nil } diff --git a/pkg/sql/tablewriter_insert.go b/pkg/sql/tablewriter_insert.go index b0f579f2e805..7dbedc157ea5 100644 --- a/pkg/sql/tablewriter_insert.go +++ b/pkg/sql/tablewriter_insert.go @@ -31,8 +31,8 @@ var _ tableWriter = &tableInserter{} func (*tableInserter) desc() string { return "inserter" } // init is part of the tableWriter interface. -func (ti *tableInserter) init(_ context.Context, txn *kv.Txn, _ *tree.EvalContext) error { - ti.tableWriterBase.init(txn, ti.tableDesc()) +func (ti *tableInserter) init(_ context.Context, txn *kv.Txn, evalCtx *tree.EvalContext) error { + ti.tableWriterBase.init(txn, ti.tableDesc(), evalCtx) return nil } diff --git a/pkg/sql/tablewriter_update.go b/pkg/sql/tablewriter_update.go index ce0178c461d7..a74ad230d230 100644 --- a/pkg/sql/tablewriter_update.go +++ b/pkg/sql/tablewriter_update.go @@ -31,8 +31,8 @@ var _ tableWriter = &tableUpdater{} func (*tableUpdater) desc() string { return "updater" } // init is part of the tableWriter interface. -func (tu *tableUpdater) init(_ context.Context, txn *kv.Txn, _ *tree.EvalContext) error { - tu.tableWriterBase.init(txn, tu.tableDesc()) +func (tu *tableUpdater) init(_ context.Context, txn *kv.Txn, evalCtx *tree.EvalContext) error { + tu.tableWriterBase.init(txn, tu.tableDesc(), evalCtx) return nil } diff --git a/pkg/sql/tablewriter_upsert_opt.go b/pkg/sql/tablewriter_upsert_opt.go index cfe1b61c40d9..e791ab219954 100644 --- a/pkg/sql/tablewriter_upsert_opt.go +++ b/pkg/sql/tablewriter_upsert_opt.go @@ -99,7 +99,7 @@ var _ tableWriter = &optTableUpserter{} func (tu *optTableUpserter) init( ctx context.Context, txn *kv.Txn, evalCtx *tree.EvalContext, ) error { - tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc) + tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc, evalCtx) // rowsNeeded, set upon initialization, indicates whether or not we want // rows returned from the operation. diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 14c260379704..0e9a3c28125b 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -176,7 +176,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) { } // Are we done yet with the current batch? - if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize { + if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize || + u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize { break } }