Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: sql: add sql.mutations.mutation_batch_byte_size setting #67958

Merged
merged 1 commit into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type Batch struct {
// To be modified directly.
Header roachpb.Header
reqs []roachpb.RequestUnion

// approxMutationReqBytes tracks the approximate size of keys and values in
// mutations added to this batch via Put, CPut, InitPut, Del, etc.
approxMutationReqBytes int
// Set when AddRawRequest is used, in which case using the "other"
// operations renders the batch unusable.
raw bool
Expand Down Expand Up @@ -384,6 +388,7 @@ func (b *Batch) put(key, value interface{}, inline bool) {
} else {
b.appendReqs(roachpb.NewPut(k, v))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -489,6 +494,7 @@ func (b *Batch) cputInternal(
} else {
b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand All @@ -512,6 +518,7 @@ func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
return
}
b.appendReqs(roachpb.NewInitPut(k, v, failOnTombstones))
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
}

Expand Down Expand Up @@ -613,6 +620,7 @@ func (b *Batch) Del(keys ...interface{}) {
return
}
reqs = append(reqs, roachpb.NewDelete(k))
b.approxMutationReqBytes += len(k)
}
b.appendReqs(reqs...)
b.initResult(len(reqs), len(reqs), notRaw, nil)
Expand Down Expand Up @@ -826,3 +834,10 @@ func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// ApproximateMutationBytes returns the approximate byte size of the mutations
// added to this batch via Put, CPut, InitPut, Del, etc methods. Mutations added
// via AddRawRequest are not tracked.
func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}
3 changes: 2 additions & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize {
if d.run.td.currentBatchSize >= d.run.td.maxBatchSize ||
d.run.td.b.ApproximateMutationBytes() >= d.run.td.maxBatchByteSize {
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize {
if n.run.ti.currentBatchSize >= n.run.ti.maxBatchSize ||
n.run.ti.b.ApproximateMutationBytes() >= n.run.ti.maxBatchByteSize {
break
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/mutations/mutations_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ var defaultMaxBatchSize = int64(util.ConstantWithMetamorphicTestRange(
productionMaxBatchSize, /* max */
))

var testingMaxBatchByteSize = util.ConstantWithMetamorphicTestRange(
"max-batch-byte-size",
0, // we'll use the cluster setting instead if we see zero.
1, /* min */
32<<20, /* max */
)

// MaxBatchSize returns the max number of entries in the KV batch for a
// mutation operation (delete, insert, update, upsert) - including secondary
// index updates, FK cascading updates, etc - before the current KV batch is
Expand Down Expand Up @@ -54,3 +61,12 @@ func SetMaxBatchSizeForTests(newMaxBatchSize int) {
func ResetMaxBatchSizeForTests() {
atomic.SwapInt64(&maxBatchSize, defaultMaxBatchSize)
}

// MaxBatchByteSize takes the passed value read from the cluster setting and
// returns it unless the testing metamorphic value overrides it.
func MaxBatchByteSize(clusterSetting int, forceProductionBatchSizes bool) int {
if forceProductionBatchSizes || testingMaxBatchByteSize == 0 {
return clusterSetting
}
return testingMaxBatchByteSize
}
23 changes: 23 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,26 @@ WHERE message LIKE '%r$rangeid: sending batch%'
dist sender send r37: sending batch 4 CPut, 1 EndTxn to (n1,s1):1
dist sender send r37: sending batch 5 CPut to (n1,s1):1
dist sender send r37: sending batch 1 EndTxn to (n1,s1):1

# make a table with some big strings in it.
statement ok
CREATE TABLE blobs (i INT PRIMARY KEY, j STRING, FAMILY (i, j))

# make a table with some big (1mb) strings in it.
statement ok
SET TRACING=ON;
INSERT INTO blobs SELECT generate_series(1, 24), repeat('0123456789ab', 65536);
SET TRACING=OFF;

# verify insert of 24 rows paginated into 4 batches since they are .75mb each.
query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%r$rangeid: sending batch%'
AND message NOT LIKE '%PushTxn%'
AND message NOT LIKE '%QueryTxn%'
----
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 6 CPut to (n1,s1):1
dist sender send r37: sending batch 1 EndTxn to (n1,s1):1
15 changes: 15 additions & 0 deletions pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/mutations"
"github.com/cockroachdb/cockroach/pkg/sql/row"
Expand Down Expand Up @@ -109,6 +110,9 @@ type tableWriterBase struct {
// for a mutation operation. By default, it will be set to 10k but can be
// a different value in tests.
maxBatchSize int
// maxBatchByteSize determines the maximum number of key and value bytes in
// the KV batch for a mutation operation.
maxBatchByteSize int
// currentBatchSize is the size of the current batch. It is updated on
// every row() call and is reset once a new batch is started.
currentBatchSize int
Expand All @@ -123,6 +127,12 @@ type tableWriterBase struct {
forceProductionBatchSizes bool
}

var maxBatchBytes = settings.RegisterByteSizeSetting(
"sql.mutations.mutation_batch_byte_size",
"byte size - in key and value lengths -- for mutation batches",
4<<20,
)

func (tb *tableWriterBase) init(
txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *tree.EvalContext,
) {
Expand All @@ -131,6 +141,11 @@ func (tb *tableWriterBase) init(
tb.b = txn.NewBatch()
tb.forceProductionBatchSizes = evalCtx != nil && evalCtx.TestingKnobs.ForceProductionBatchSizes
tb.maxBatchSize = mutations.MaxBatchSize(tb.forceProductionBatchSizes)
batchMaxBytes := int(maxBatchBytes.Default())
if evalCtx != nil {
batchMaxBytes = int(maxBatchBytes.Get(&evalCtx.Settings.SV))
}
tb.maxBatchByteSize = mutations.MaxBatchByteSize(batchMaxBytes, tb.forceProductionBatchSizes)
}

// flushAndStartNewBatch shares the common flushAndStartNewBatch() code between
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (u *updateNode) BatchedNext(params runParams) (bool, error) {
}

// Are we done yet with the current batch?
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize {
if u.run.tu.currentBatchSize >= u.run.tu.maxBatchSize ||
u.run.tu.b.ApproximateMutationBytes() >= u.run.tu.maxBatchByteSize {
break
}
}
Expand Down