Skip to content

Commit

Permalink
sql: add sql.mutations.mutation_batch_byte_size setting
Browse files Browse the repository at this point in the history
Previously we always constructed 10k row insert batches, regardless of the
size of those rows. With large rows, this could easily exceed the kv size
limit of 64MB. This changes batch construction to track the size of added
keys and values, and send the batch either when it has 10k entries of when
the size of added keys and values exceeds the setting, which defaults to 4MB.

Release note (bug fix): INSERT and UPDATE statements which operate on larger
rows are split into batches using the sql.mutations.mutation_batch_byte_size
setting.
  • Loading branch information
dt authored and yuzefovich committed Aug 3, 2021
1 parent 0e5432e commit 192e83b
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 11 deletions.
15 changes: 15 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Batch struct {
// To be modified directly.
Header roachpb.Header
reqs []roachpb.RequestUnion

// approxMutationReqBytes tracks the approximate size of keys and values in
// mutations added to this batch via Put, CPut, InitPut, Del, etc.
approxMutationReqBytes int
// Set when AddRawRequest is used, in which case using the "other"
// operations renders the batch unusable.
raw bool
Expand Down Expand Up @@ -369,6 +373,7 @@ func (b *Batch) put(key, value interface{}, inline bool) {
} else {
b.appendReqs(roachpb.NewPut(k, v))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -443,6 +448,7 @@ func (b *Batch) cputInternal(key, value interface{}, expValue []byte, allowNotEx
return
}
b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist))
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand All @@ -466,6 +472,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
return
}
b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones))
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -567,6 +574,7 @@ func (b *Batch) Del(keys ...interface{}) {
return
}
reqs = append(reqs, roachpb.NewDelete(k))
b.approxMutationReqBytes += len(k)
}
b.appendReqs(reqs...)
b.initResult(len(reqs), len(reqs), notRaw, nil)
Expand Down Expand Up @@ -754,3 +762,10 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// ApproximateMutationBytes returns the approximate byte size of the mutations
// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added
// via AddRawRequest are not tracked.
func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}
3 changes: 2 additions & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize {
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize ||
d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize {
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize {
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize ||
n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize {
break
}
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace_mutations
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# LogicTest: local

# make a table with some big strings in it.
statement ok
CREATE TABLE blobs (i INT PRIMARY KEY, j STRING, FAMILY (i, j))

# Get the range id.
let $rangeid
SELECT range_id FROM [ SHOW RANGES FROM TABLE blobs ]

# Populate table descriptor cache.
query IT
SELECT * FROM blobs
----

# make a table with some big (1mb) strings in it.
statement ok
SET TRACING=ON;
INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536);
SET TRACING=OFF;

# verify insert of 24 rows paginated into 4 batches since they are .75mb each.
query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%r$rangeid: sending batch%'
AND message NOT LIKE '%PushTxn%'
AND message NOT LIKE '%QueryTxn%'
AND operation NOT LIKE '%[async]%'
AND message NOT LIKE '%HeartbeatTxn%'
----
dist sender send r35: sending batch 6 CPut to (n1,s1):1
dist sender send r35: sending batch 6 CPut to (n1,s1):1
dist sender send r35: sending batch 6 CPut to (n1,s1):1
dist sender send r35: sending batch 6 CPut to (n1,s1):1
dist sender send r35: sending batch 1 EndTxn to (n1,s1):1
18 changes: 17 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/mutations"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -109,6 +110,9 @@ type tableWriterBase struct {
// for a mutation operation. By default, it will be set to 10k but can be
// a different value in tests.
maxBatchSize int
// maxBatchByteSize determines the maximum number of key and value bytes in
// the KV batch for a mutation operation.
maxBatchByteSize int
// currentBatchSize is the size of the current batch. It is updated on
// every row() call and is reset once a new batch is started.
currentBatchSize int
Expand All @@ -120,11 +124,23 @@ type tableWriterBase struct {
rows *rowcontainer.RowContainer
}

func (tb *tableWriterBase) init(txn *kv.Txn, tableDesc catalog.TableDescriptor) {
var maxBatchBytes = settings.RegisterByteSizeSetting(
"sql.mutations.mutation_batch_byte_size",
"byte size - in key and value lengths -- for mutation batches",
4<<20,
)

func (tb *tableWriterBase) init(
txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext,
) {
tb.txn = txn
tb.desc = tableDesc
tb.b = txn.NewBatch()
tb.maxBatchSize = mutations.MaxBatchSize()
tb.maxBatchByteSize = int(maxBatchBytes.Default())
if evalCtx != nil {
tb.maxBatchByteSize = int(maxBatchBytes.Get(&evalCtx.Settings.SV))
}
}

// flushAndStartNewBatch shares the common flushAndStartNewBatch() code between
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/tablewriter_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (*tableDeleter) desc() string { return "deleter" }
func (td *tableDeleter) walkExprs(_ func(desc string, index int, expr tree.TypedExpr)) {}

// init is part of the tableWriter interface.
func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, _ *tree.EvalContext) error {
td.tableWriterBase.init(txn, td.tableDesc())
func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, evalCtx *tree.EvalContext) error {
td.tableWriterBase.init(txn, td.tableDesc(), evalCtx)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/tablewriter_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ var _ tableWriter = &tableInserter{}
func (*tableInserter) desc() string { return "inserter" }

// init is part of the tableWriter interface.
func (ti *tableInserter) init(_ context.Context, txn *kv.Txn, _ *tree.EvalContext) error {
ti.tableWriterBase.init(txn, ti.tableDesc())
func (ti *tableInserter) init(_ context.Context, txn *kv.Txn, evalCtx *tree.EvalContext) error {
ti.tableWriterBase.init(txn, ti.tableDesc(), evalCtx)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/tablewriter_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ var _ tableWriter = &tableUpdater{}
func (*tableUpdater) desc() string { return "updater" }

// init is part of the tableWriter interface.
func (tu *tableUpdater) init(_ context.Context, txn *kv.Txn, _ *tree.EvalContext) error {
tu.tableWriterBase.init(txn, tu.tableDesc())
func (tu *tableUpdater) init(_ context.Context, txn *kv.Txn, evalCtx *tree.EvalContext) error {
tu.tableWriterBase.init(txn, tu.tableDesc(), evalCtx)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/tablewriter_upsert_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var _ tableWriter = &optTableUpserter{}
func (tu *optTableUpserter) init(
ctx context.Context, txn *kv.Txn, evalCtx *tree.EvalContext,
) error {
tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc)
tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc, evalCtx)

// rowsNeeded, set upon initialization, indicates whether or not we want
// rows returned from the operation.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize {
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize ||
u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize {
break
}
}
Expand Down

0 comments on commit 192e83b

Please sign in to comment.