Skip to content

Commit

Permalink
sql: add sql.mutations.mutation_batch_byte_size setting
Browse files Browse the repository at this point in the history
Previously we always constructed 10k row insert batches, regardless of the
size of those rows. With large rows, this could easily exceed the kv size
limit of 64MB. This changes batch construction to track the size of added
keys and values, and send the batch either when it has 10k entries of when
the size of added keys and values exceeds the setting, which defaults to 4MB.

Release note (bug fix): INSERT and UPDATE statements which operate on larger rows are split into batches using the sql.mutations.mutation_batch_byte_size setting
  • Loading branch information
dt committed Jul 13, 2021
1 parent 6acf9ff commit 5f08c41
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 5 deletions.
16 changes: 14 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type Batch struct {
Header roachpb.Header
// The AdmissionHeader which will be used when sending the resulting
// BatchRequest. To be modified directly.
AdmissionHeader roachpb.AdmissionHeader
reqs []roachpb.RequestUnion
AdmissionHeader roachpb.AdmissionHeader
reqs []roachpb.RequestUnion
mutationReqBytes int
// Set when AddRawRequest is used, in which case using the "other"
// operations renders the batch unusable.
raw bool
Expand Down Expand Up @@ -385,6 +386,7 @@ func (b *Batch) put(key, value interface{}, inline bool) {
} else {
b.appendReqs(roachpb.NewPut(k, v))
}
b.mutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -490,6 +492,7 @@ func (b *Batch) cputInternal(
} else {
b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist))
}
b.mutationReqBytes += len(k) + len(expValue) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand All @@ -513,6 +516,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
return
}
b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones))
b.mutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -614,6 +618,7 @@ func (b *Batch) Del(keys ...interface{}) {
return
}
reqs = append(reqs, roachpb.NewDelete(k))
b.mutationReqBytes += len(k)
}
b.appendReqs(reqs...)
b.initResult(len(reqs), len(reqs), notRaw, nil)
Expand Down Expand Up @@ -805,3 +810,10 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// ApproximateMutationBytes returns the approximate byte size of the mutations
// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added
// via AddRawRequest are not tracked.
func (b *Batch) ApproximateMutationBytes() int {
return b.mutationReqBytes
}
3 changes: 2 additions & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize {
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize ||
d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize {
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize {
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize ||
n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize {
break
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/mutations/mutations_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ var defaultMaxBatchSize = int64(util.ConstantWithMetamorphicTestRange(
productionMaxBatchSize, /* max */
))

var testingMaxBatchByteSize = util.ConstantWithMetamorphicTestRange(
"max-batch-byte-size",
0, // we'll use the cluster setting instead if we see zero.
1, /* min */
32<<20, /* max */
)

// MaxBatchSize returns the max number of entries in the KV batch for a
// mutation operation (delete, insert, update, upsert) - including secondary
// index updates, FK cascading updates, etc - before the current KV batch is
Expand Down Expand Up @@ -54,3 +61,12 @@ func SetMaxBatchSizeForTests(newMaxBatchSize int) {
func ResetMaxBatchSizeForTests() {
atomic.SwapInt64(&maxBatchSize, defaultMaxBatchSize)
}

// MaxBatchByteSize takes the passed value read from the cluster setting and
// returns it unless the testing metamorphic value overrides it.
func MaxBatchByteSize(clusterSetting int, forceProductionBatchSizes bool) int {
if forceProductionBatchSizes || testingMaxBatchByteSize == 0 {
return clusterSetting
}
return testingMaxBatchByteSize
}
23 changes: 23 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,26 @@ WHERE message LIKE '%r$rangeid: sending batch%'
dist sender send r40: sending batch 4 CPut, 1 EndTxn to (n1,s1):1
dist sender send r40: sending batch 5 CPut to (n1,s1):1
dist sender send r40: sending batch 1 EndTxn to (n1,s1):1

# make a table with some big strings in it.
statement ok
CREATE TABLE blobs (i INT PRIMARY KEY, j STRING)

# make a table with some big (1mb) strings in it.
statement ok
SET TRACING=ON;
INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536);
SET TRACING=OFF;

# verify insert of 24 rows paginated into 4 batches since they are .75mb each.
query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%r$rangeid: sending batch%'
AND message NOT LIKE '%PushTxn%'
AND message NOT LIKE '%QueryTxn%'
----
dist sender send r40: sending batch 6 CPut to (n1,s1):1
dist sender send r40: sending batch 6 CPut to (n1,s1):1
dist sender send r40: sending batch 6 CPut to (n1,s1):1
dist sender send r40: sending batch 6 CPut to (n1,s1):1
dist sender send r40: sending batch 1 EndTxn to (n1,s1):1
11 changes: 11 additions & 0 deletions pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/mutations"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -111,6 +112,9 @@ type tableWriterBase struct {
// for a mutation operation. By default, it will be set to 10k but can be
// a different value in tests.
maxBatchSize int
// maxBatchByteSize determines the maximum number of key and value bytes in
// the KV batch for a mutation operation.
maxBatchByteSize int
// currentBatchSize is the size of the current batch. It is updated on
// every row() call and is reset once a new batch is started.
currentBatchSize int
Expand All @@ -125,6 +129,12 @@ type tableWriterBase struct {
forceProductionBatchSizes bool
}

var maxBatchBytes = settings.RegisterByteSizeSetting(
"sql.mutations.mutation_batch_byte_size",
"byte size - in key and value lengths -- for mutation batches",
4<<20,
)

func (tb *tableWriterBase) init(
txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext,
) {
Expand All @@ -133,6 +143,7 @@ func (tb *tableWriterBase) init(
tb.b = txn.NewBatch()
tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionBatchSizes
tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes)
tb.maxBatchByteSize = mutations.MaxBatchByteSize(int(maxBatchBytes.Get(&evalCtx.Settings.SV)), evalCtx.TestingKnobs.ForceProductionBatchSizes)
}

// flushAndStartNewBatch shares the common flushAndStartNewBatch() code between
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize {
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize ||
u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize {
break
}
}
Expand Down

0 comments on commit 5f08c41

Please sign in to comment.