Skip to content

Commit

Permalink
Merge #67953
Browse files Browse the repository at this point in the history
67953: sql, kv: add sql.mutations.max_row_size guardrails r=rytaft,andreimatei a=michae2

**kv: set Batch.pErr during Batch.prepare**

If we fail to construct a Batch (e.g. fail to marshal a key or value)
then an error will be placed in the resultsBuf and the batch will not
actually be sent to the layers below. In this case we still need to set
Batch.pErr, so that Batch.MustPErr is able to return a roachpb.Error to
higher layers without panicking.

I imagine in practice we never fail to marshal the key or value, so we
have never seen this panic in the wild.

Release note: None

Release justification: Bug fix.

**sql: add sql.mutations.max_row_size.log guardrail (large row logging)**

Addresses: #67400

Add sql.mutations.max_row_size.log, a new cluster setting which
controls large row logging. Rows larger than this size will have their
primary keys logged to the SQL_PERF or SQL_INTERNAL_PERF channels
whenever the SQL layer puts them into the KV layer.

This logging takes place in rowHelper, which is used by both
row.Inserter and row.Updater. Most of the work is plumbing
settings.Values and SessionData into rowHelper, and adding a new
structured event type.

Release note (ops change): A new cluster setting,
sql.mutations.max_row_size.log, was added, which controls large row
logging. Whenever a row larger than this size is written (or a single
column family if multiple column families are in use) a LargeRow event
is logged to the SQL_PERF channel (or a LargeRowInternal event is logged
to SQL_INTERNAL_PERF if the row was added by an internal query). This
could occur for INSERT, UPSERT, UPDATE, CREATE TABLE AS, CREATE INDEX,
ALTER TABLE, ALTER INDEX, IMPORT, or RESTORE statements. SELECT, DELETE,
TRUNCATE, and DROP are not affected by this setting.

Release justification: Low risk, high benefit change to existing
functionality. This adds logging whenever a large row is written to the
database. Default is 64 MiB, which is also the default for
kv.raft.command.max_size, meaning on a cluster with default settings
statements writing these rows will fail with an error anyway.

**sql: add sql.mutations.max_row_size.err guardrail (large row errors)**

Addresses: #67400

Add sql.mutations.max_row_size.err, a new cluster setting similar to
sql.mutations.max_row_size.log, which limits the size of rows written to
the database. Statements trying to write a row larger than this will
fail with an error. (Internal queries will not fail with an error, but
will log a LargeRowInternal event to the SQL_INTERNAL_PERF channel.)

We're reusing eventpb.CommonLargeRowDetails as the error type, out of
convenience.

Release note (ops change): A new cluster setting,
sql.mutations.max_row_size.err, was added, which limits the size of rows
written to the database (or individual column families, if multiple
column families are in use). Statements trying to write a row larger
than this will fail with a code 54000 (program_limit_exceeded) error.
(Internal queries writing a row larger than this will not fail, but will
log a LargeRowInternal event to the SQL_INTERNAL_PERF channel.) This
limit is enforced for INSERT, UPSERT, and UPDATE statements. CREATE
TABLE AS, CREATE INDEX, ALTER TABLE, ALTER INDEX, IMPORT, and RESTORE
will not fail with an error, but will log LargeRowInternal events to the
SQL_INTERNAL_PERF channel. SELECT, DELETE, TRUNCATE, and DROP are not
affected by this limit. **Note that existing rows violating the limit
*cannot* be updated, unless the update shrinks the size of the row
below the limit, but *can* be selected, deleted, altered, backed-up, and
restored.** For this reason we recommend using the accompanying setting
sql.mutations.max_row_size.log in conjunction with
SELECT pg_column_size() queries to detect and fix any existing large
rows before lowering sql.mutations.max_row_size.err.

Release justification: Low risk, high benefit change to existing
functionality. This causes statements adding large rows to fail with an
error. Default is 512 MiB, which was the maximum KV size in 20.2 as of
#61818 and also the default
range_max_bytes in 21.1, meaning rows larger than this were not possible
in 20.2 and are not going to perform well in 21.1.



Co-authored-by: Michael Erickson <[email protected]>
  • Loading branch information
craig[bot] and michae2 committed Aug 24, 2021
2 parents 46cef2c + 7d3495f commit 6c05f99
Show file tree
Hide file tree
Showing 38 changed files with 1,768 additions and 183 deletions.
44 changes: 44 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1879,6 +1879,29 @@ are only emitted via external logging.
Events in this category are logged to the `SQL_PERF` channel.


### `large_row`

An event of type `large_row` is recorded when a statement tries to write a row larger than
cluster setting `sql.mutations.max_row_size.log` to the database. Multiple
LargeRow events will be recorded for statements writing multiple large rows.
LargeRow events are recorded before the transaction commits, so in the case
of transaction abort there will not be a corresponding row in the database.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `RowSize` | | no |
| `TableID` | | no |
| `FamilyID` | | no |
| `PrimaryKey` | | yes |
| `ViolatesMaxRowSizeErr` | | no |

### `slow_query`

An event of type `slow_query` is recorded when a query triggers the "slow query" condition.
Expand Down Expand Up @@ -1929,6 +1952,27 @@ are only emitted via external logging.
Events in this category are logged to the `SQL_INTERNAL_PERF` channel.


### `large_row_internal`

An event of type `large_row_internal` is recorded when an internal query tries to write a row
larger than cluster settings `sql.mutations.max_row_size.log` or
`sql.mutations.max_row_size.err` to the database.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `RowSize` | | no |
| `TableID` | | no |
| `FamilyID` | | no |
| `PrimaryKey` | | yes |
| `ViolatesMaxRowSizeErr` | | no |

### `slow_query_internal`

An event of type `slow_query_internal` is recorded when a query triggers the "slow query" condition,
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ sql.metrics.statement_details.plan_collection.period duration 5m0s the time unti
sql.metrics.statement_details.threshold duration 0s minimum execution time to cause statement statistics to be collected. If configured, no transaction stats are collected.
sql.metrics.transaction_details.enabled boolean true collect per-application transaction statistics
sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region
sql.mutations.max_row_size.err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; setting to 0 disables large row errors
sql.mutations.max_row_size.log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); setting to 0 disables large row logging
sql.notices.enabled boolean true enable notices in the server/client protocol being sent
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability
sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
<tr><td><code>sql.metrics.statement_details.threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum execution time to cause statement statistics to be collected. If configured, no transaction stats are collected.</td></tr>
<tr><td><code>sql.metrics.transaction_details.enabled</code></td><td>boolean</td><td><code>true</code></td><td>collect per-application transaction statistics</td></tr>
<tr><td><code>sql.multiregion.drop_primary_region.enabled</code></td><td>boolean</td><td><code>true</code></td><td>allows dropping the PRIMARY REGION of a database if it is the last region</td></tr>
<tr><td><code>sql.mutations.max_row_size.err</code></td><td>byte size</td><td><code>512 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; setting to 0 disables large row errors</td></tr>
<tr><td><code>sql.mutations.max_row_size.log</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); setting to 0 disables large row logging</td></tr>
<tr><td><code>sql.notices.enabled</code></td><td>boolean</td><td><code>true</code></td><td>enable notices in the server/client protocol being sent</td></tr>
<tr><td><code>sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability</td></tr>
<tr><td><code>sql.spatial.experimental_box2d_comparison_operators.enabled</code></td><td>boolean</td><td><code>false</code></td><td>enables the use of certain experimental box2d comparison operators</td></tr>
Expand Down
50 changes: 50 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/max-row-size
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
new-server name=m1
----

exec-sql
CREATE DATABASE orig;
USE orig;
CREATE TABLE maxrow (i INT PRIMARY KEY, s STRING);
INSERT INTO maxrow VALUES (1, repeat('x', 20000));
----

query-sql
SELECT i, pg_column_size(s) FROM maxrow ORDER BY i;
----
1 20004

exec-sql
SET CLUSTER SETTING sql.mutations.max_row_size.err = '16KiB';
----

query-sql
INSERT INTO maxrow VALUES (2, repeat('x', 20000))
----
pq: row larger than max row size: table 55 family 0 primary key /Table/55/1/2/0 size 20013

exec-sql
BACKUP maxrow TO 'nodelocal://1/maxrow';
CREATE DATABASE d2;
RESTORE maxrow FROM 'nodelocal://1/maxrow' WITH into_db='d2';
----

query-sql
SELECT i, pg_column_size(s) FROM d2.maxrow ORDER BY i;
----
1 20004

query-sql
INSERT INTO d2.maxrow VALUES (2, repeat('y', 20000));
----
pq: row larger than max row size: table 57 family 0 primary key /Table/57/1/2/0 size 20013

exec-sql
SET CLUSTER SETTING sql.mutations.max_row_size.err = default;
INSERT INTO d2.maxrow VALUES (2, repeat('y', 20000));
----

query-sql
SELECT i, pg_column_size(s) FROM d2.maxrow ORDER BY i;
----
1 20004
2 20004
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestConverterFlushesBatches(t *testing.T) {
}

ctx := context.Background()
evalCtx := tree.MakeTestingEvalContext(nil)
evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())

tests := []testSpec{
newTestSpec(ctx, t, csvFormat(), "testdata/csv/data-0"),
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestImportIgnoresProcessedFiles(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

evalCtx := tree.MakeTestingEvalContext(nil)
evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestImportHonorsResumePosition(t *testing.T) {
pkBulkAdder := &doNothingKeyAdder{}
ctx := context.Background()

evalCtx := tree.MakeTestingEvalContext(nil)
evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) {

batchSize := 13
defer row.TestingSetDatumRowConverterBatchSize(batchSize)()
evalCtx := tree.MakeTestingEvalContext(nil)
evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2517,7 +2517,9 @@ func (r *importResumer) dropTables(
// older-format (v1.1) descriptor. This enables ClearTableData to use a
// RangeClear for faster data removal, rather than removing by chunks.
empty[i].TableDesc().DropTime = dropTime
if err := gcjob.ClearTableData(ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, empty[i]); err != nil {
if err := gcjob.ClearTableData(
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, empty[i],
); err != nil {
return errors.Wrapf(err, "clearing data for table %d", empty[i].GetID())
}
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ func (b *Batch) MustPErr() *roachpb.Error {
return b.pErr
}

func (b *Batch) prepare() error {
for _, r := range b.Results {
if r.Err != nil {
return r.Err
}
// validate that there were no errors while marshaling keys and values.
func (b *Batch) validate() error {
err := b.resultErr()
if err != nil {
// Set pErr just as sendAndFill does, so that higher layers can find it
// using MustPErr.
b.pErr = roachpb.NewError(err)
}
return nil
return err
}

func (b *Batch) initResult(calls, numRows int, raw bool, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func sendAndFill(ctx context.Context, send SenderFunc, b *Batch) error {
// operation. The order of the results matches the order the operations were
// added to the batch.
func (db *DB) Run(ctx context.Context, b *Batch) error {
if err := b.prepare(); err != nil {
if err := b.validate(); err != nil {
return err
}
return sendAndFill(ctx, db.send, b)
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ func TestBatch(t *testing.T) {
"bb": []byte("2"),
}
checkResults(t, expected, b.Results)

b2 := &kv.Batch{}
b2.Put(42, "the answer")
if err := db.Run(context.Background(), b2); !testutils.IsError(err, "unable to marshal key") {
t.Fatal("expected marshaling error from running bad put")
}
if err := b2.MustPErr(); !testutils.IsPError(err, "unable to marshal key") {
t.Fatal("expected marshaling error from MustPErr")
}
}

func TestDB_Scan(t *testing.T) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,20 @@ var disableSyncRaftLog = settings.RegisterBoolSetting(
false,
)

// MaxCommandSizeFloor is the minimum allowed value for the MaxCommandSize
// cluster setting.
const MaxCommandSizeFloor = 4 << 20 // 4MB
const (
// MaxCommandSizeFloor is the minimum allowed value for the
// kv.raft.command.max_size cluster setting.
MaxCommandSizeFloor = 4 << 20 // 4MB
// MaxCommandSizeDefault is the default for the kv.raft.command.max_size
// cluster setting.
MaxCommandSizeDefault = 64 << 20
)

// MaxCommandSize wraps "kv.raft.command.max_size".
var MaxCommandSize = settings.RegisterByteSizeSetting(
"kv.raft.command.max_size",
"maximum size of a raft command",
64<<20,
MaxCommandSizeDefault,
func(size int64) error {
if size < MaxCommandSizeFloor {
return fmt.Errorf("max_size must be greater than %s", humanizeutil.IBytes(MaxCommandSizeFloor))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (txn *Txn) DelRange(ctx context.Context, begin, end interface{}) error {
// operation. The order of the results matches the order the operations were
// added to the batch.
func (txn *Txn) Run(ctx context.Context, b *Batch) error {
if err := b.prepare(); err != nil {
if err := b.validate(); err != nil {
return err
}
return sendAndFill(ctx, txn.Send, b)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ go_test(
"//pkg/util/log/channel",
"//pkg/util/log/eventpb",
"//pkg/util/log/logconfig",
"//pkg/util/log/logpb",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
14 changes: 11 additions & 3 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,9 @@ func TruncateInterleavedIndexes(
resumeAt := resume
// Make a new txn just to drop this chunk.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rd := row.MakeDeleter(codec, table, nil /* requestedCols */)
rd := row.MakeDeleter(
codec, table, nil /* requestedCols */, &execCfg.Settings.SV, true, /* internal */
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
return err
Expand Down Expand Up @@ -878,7 +880,10 @@ func (sc *SchemaChanger) truncateIndexes(
if err != nil {
return err
}
rd := row.MakeDeleter(sc.execCfg.Codec, tableDesc, nil /* requestedCols */)
rd := row.MakeDeleter(
sc.execCfg.Codec, tableDesc, nil /* requestedCols */, &sc.settings.SV,
true, /* internal */
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
return err
Expand Down Expand Up @@ -2479,7 +2484,10 @@ func indexTruncateInTxn(
alloc := &rowenc.DatumAlloc{}
var sp roachpb.Span
for done := false; !done; done = sp.Key == nil {
rd := row.MakeDeleter(execCfg.Codec, tableDesc, nil /* requestedCols */)
rd := row.MakeDeleter(
execCfg.Codec, tableDesc, nil /* requestedCols */, &execCfg.Settings.SV,
evalCtx.SessionData.Internal,
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, evalCtx); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
requestedCols,
row.UpdaterOnlyColumns,
&cb.alloc,
&cb.evalCtx.Settings.SV,
cb.evalCtx.SessionData.Internal,
)
if err != nil {
return roachpb.Key{}, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ func (n *createTableNode) startExec(params runParams) error {
params.ExecCfg().Codec,
desc.ImmutableCopy().(catalog.TableDescriptor),
desc.PublicColumns(),
params.p.alloc)
params.p.alloc,
&params.ExecCfg().Settings.SV,
params.p.SessionData().Internal,
)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 6c05f99

Please sign in to comment.