Skip to content

Commit

Permalink
sql: add txn rows written/read guardrails
Browse files Browse the repository at this point in the history
This commit introduces the guardrails on the number of rows written/read
by a single txn. The limits are enforced after each statement of a txn
has been fully executed (i.e. we don't proactively cancel work in the
middle of the execution if the txn has just reached the limits). This is
done in the connExecutor since it is a very convenient place to enforce
the limits so that they apply only to the desired statements. Notably,
things for which we don't want the application of the limits (things
like BACKUP, IMPORT, CREATE STATISTICS, etc) don't go through the
connExecutor and, thus, aren't affected.

The accumulation of the number of rows read by a txn has already been in
place, and this commit introduces the explicit collection of the number
of rows written via the same mechanism - by propagating "rows written"
metrics during the draining of the execution flow. Initially, we
considered using "rows affected" values, but those have a different
meaning from what we want. This metrics collection required teaching the
planNodeToRowSource adapter to ask the mutation planNodes for the number
of rows written.

Note that in many cases, the internal executor doesn't have the
sessionData properly set (i.e. the default values are used), so we'll
never log anything then. This seems acceptable since the focus of these
guardrails is on the externally initiated queries.

Release note (ops change): New cluster settings
`sql.defaults.transaction_rows_written.log`,
`sql.defaults.transaction_rows_written.err`,
`sql.defaults.transaction_rows_read.log`, and
`sql.defaults.transaction_rows_read.err` (as well as the corresponding
session variables have been introduced. These settings determine the
"size" of the transactions in written and read rows upon reaching of
which the transactions are logged or rejected. The logging will go into
SQL_PERF logging channel. Note that the internal queries (i.e. those
issued by CockroachDB internally) cannot error out but can be logged
instead into SQL_INTERNAL_PERF logging channel. The "written" limits
apply to INSERT, INSERT INTO SELECT FROM, INSERT ON CONFLICT, UPSERT,
UPDATE, and DELETE whereas the "read" limits apply to SELECT statement
in addition to all of these. These limits will not apply to CREATE
TABLE AS SELECT, IMPORT, TRUNCATE, DROP, ALTER TABLE, BACKUP, RESTORE,
or CREATE STATISTICS statements.

Release justification: low-risk, high benefit change to existing
functionality.
  • Loading branch information
yuzefovich committed Aug 24, 2021
1 parent 7c36a9d commit 724c709
Show file tree
Hide file tree
Showing 37 changed files with 3,154 additions and 618 deletions.
112 changes: 112 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,61 @@ set to a non-zero value, AND
| `FullIndexScan` | Whether the query contains a full secondary index scan. | no |
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |

### `txn_rows_read_limit`

An event of type `txn_rows_read_limit` is recorded when a transaction tries to read more rows than
cluster setting `sql.defaults.transaction_rows_read.log`. There will only be
a single record for a single transaction (unless it is retried) even if there
are more statement within the transaction that haven't been executed yet.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `Statement` | A normalized copy of the SQL statement that triggered the event. The statement string contains a mix of sensitive and non-sensitive details (it is redactable). | partially |
| `Tag` | The statement tag. This is separate from the statement string, since the statement string can contain sensitive information. The tag is guaranteed not to. | no |
| `User` | The user account that triggered the event. The special usernames `root` and `node` are not considered sensitive. | depends |
| `DescriptorID` | The primary object descriptor affected by the operation. Set to zero for operations that don't affect descriptors. | no |
| `ApplicationName` | The application name for the session where the event was emitted. This is included in the event to ease filtering of logging output by application. Application names starting with a dollar sign (`$`) are not considered sensitive. | depends |
| `PlaceholderValues` | The mapping of SQL placeholders to their values, for prepared statements. | yes |
| `TxnID` | | no |
| `SessionID` | | no |
| `ViolatesTxnRowsLimitErr` | | no |
| `ReadKind` | ReadKind indicates that the "rows read" limit is reached if true and the "rows written" limit otherwise. | no |

### `txn_rows_written_limit`

An event of type `txn_rows_written_limit` is recorded when a transaction tries to write more rows
than cluster setting `sql.defaults.transaction_rows_written.log`. There will
only be a single record for a single transaction (unless it is retried) even
if there are more mutation statement within the transaction that haven't
been executed yet.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `Statement` | A normalized copy of the SQL statement that triggered the event. The statement string contains a mix of sensitive and non-sensitive details (it is redactable). | partially |
| `Tag` | The statement tag. This is separate from the statement string, since the statement string can contain sensitive information. The tag is guaranteed not to. | no |
| `User` | The user account that triggered the event. The special usernames `root` and `node` are not considered sensitive. | depends |
| `DescriptorID` | The primary object descriptor affected by the operation. Set to zero for operations that don't affect descriptors. | no |
| `ApplicationName` | The application name for the session where the event was emitted. This is included in the event to ease filtering of logging output by application. Application names starting with a dollar sign (`$`) are not considered sensitive. | depends |
| `PlaceholderValues` | The mapping of SQL placeholders to their values, for prepared statements. | yes |
| `TxnID` | | no |
| `SessionID` | | no |
| `ViolatesTxnRowsLimitErr` | | no |
| `ReadKind` | ReadKind indicates that the "rows read" limit is reached if true and the "rows written" limit otherwise. | no |

## SQL Slow Query Log (Internal)

Events in this category report slow query execution by
Expand Down Expand Up @@ -2006,6 +2061,63 @@ the "slow query" condition.
| `FullIndexScan` | Whether the query contains a full secondary index scan. | no |
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |

### `txn_rows_read_limit_internal`

An event of type `txn_rows_read_limit_internal` is recorded when an internal transaction tries to
read more rows than cluster setting `sql.defaults.transaction_rows_read.log`
or `sql.defaults.transaction_rows_read.err`. There will only be a single
record for a single transaction (unless it is retried) even if there are more
mutation statement within the transaction that haven't been executed yet.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `Statement` | A normalized copy of the SQL statement that triggered the event. The statement string contains a mix of sensitive and non-sensitive details (it is redactable). | partially |
| `Tag` | The statement tag. This is separate from the statement string, since the statement string can contain sensitive information. The tag is guaranteed not to. | no |
| `User` | The user account that triggered the event. The special usernames `root` and `node` are not considered sensitive. | depends |
| `DescriptorID` | The primary object descriptor affected by the operation. Set to zero for operations that don't affect descriptors. | no |
| `ApplicationName` | The application name for the session where the event was emitted. This is included in the event to ease filtering of logging output by application. Application names starting with a dollar sign (`$`) are not considered sensitive. | depends |
| `PlaceholderValues` | The mapping of SQL placeholders to their values, for prepared statements. | yes |
| `TxnID` | | no |
| `SessionID` | | no |
| `ViolatesTxnRowsLimitErr` | | no |
| `ReadKind` | ReadKind indicates that the "rows read" limit is reached if true and the "rows written" limit otherwise. | no |

### `txn_rows_written_limit_internal`

An event of type `txn_rows_written_limit_internal` is recorded when an internal transaction tries to
write more rows than cluster setting
`sql.defaults.transaction_rows_written.log` or
`sql.defaults.transaction_rows_written.err`. There will only be a single
record for a single transaction (unless it is retried) even if there are more
mutation statement within the transaction that haven't been executed yet.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `Statement` | A normalized copy of the SQL statement that triggered the event. The statement string contains a mix of sensitive and non-sensitive details (it is redactable). | partially |
| `Tag` | The statement tag. This is separate from the statement string, since the statement string can contain sensitive information. The tag is guaranteed not to. | no |
| `User` | The user account that triggered the event. The special usernames `root` and `node` are not considered sensitive. | depends |
| `DescriptorID` | The primary object descriptor affected by the operation. Set to zero for operations that don't affect descriptors. | no |
| `ApplicationName` | The application name for the session where the event was emitted. This is included in the event to ease filtering of logging output by application. Application names starting with a dollar sign (`$`) are not considered sensitive. | depends |
| `PlaceholderValues` | The mapping of SQL placeholders to their values, for prepared statements. | yes |
| `TxnID` | | no |
| `SessionID` | | no |
| `ViolatesTxnRowsLimitErr` | | no |
| `ReadKind` | ReadKind indicates that the "rows read" limit is reached if true and the "rows written" limit otherwise. | no |

## SQL User and Role operations

Events in this category pertain to SQL statements that modify the
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ sql.defaults.results_buffer.size byte size 16 KiB default size of the buffer tha
sql.defaults.serial_normalization enumeration rowid default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2, sql_sequence_cached = 3]
sql.defaults.statement_timeout duration 0s default value for the statement_timeout; default value for the statement_timeout session setting; controls the duration a query is permitted to run before it is canceled; if set to 0, there is no timeout
sql.defaults.stub_catalog_tables.enabled boolean true default value for stub_catalog_tables session setting
sql.defaults.transaction_rows_read.err integer 0 the limit for the number of rows read by a SQL transaction which - once reached - will fail the transaction (or will trigger a logging event to SQL_INTERNAL_PERF for internal transactions); use 0 to disable
sql.defaults.transaction_rows_read.log integer 0 the threshold for the number of rows read by a SQL transaction which - once reached - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable
sql.defaults.transaction_rows_written.err integer 0 the limit for the number of rows written by a SQL transaction which - once reached - will fail the transaction (or will trigger a logging event to SQL_INTERNAL_PERF for internal transactions); use 0 to disable
sql.defaults.transaction_rows_written.log integer 0 the threshold for the number of rows written by a SQL transaction which - once reached - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable
sql.defaults.vectorize enumeration on default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4]
sql.defaults.zigzag_join.enabled boolean true default value for enable_zigzag_join session setting; allows use of zig-zag join by default
sql.distsql.max_running_flows integer 500 maximum number of concurrent flows that can be run on a node
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@
<tr><td><code>sql.defaults.serial_normalization</code></td><td>enumeration</td><td><code>rowid</code></td><td>default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2, sql_sequence_cached = 3]</td></tr>
<tr><td><code>sql.defaults.statement_timeout</code></td><td>duration</td><td><code>0s</code></td><td>default value for the statement_timeout; default value for the statement_timeout session setting; controls the duration a query is permitted to run before it is canceled; if set to 0, there is no timeout</td></tr>
<tr><td><code>sql.defaults.stub_catalog_tables.enabled</code></td><td>boolean</td><td><code>true</code></td><td>default value for stub_catalog_tables session setting</td></tr>
<tr><td><code>sql.defaults.transaction_rows_read.err</code></td><td>integer</td><td><code>0</code></td><td>the limit for the number of rows read by a SQL transaction which - once reached - will fail the transaction (or will trigger a logging event to SQL_INTERNAL_PERF for internal transactions); use 0 to disable</td></tr>
<tr><td><code>sql.defaults.transaction_rows_read.log</code></td><td>integer</td><td><code>0</code></td><td>the threshold for the number of rows read by a SQL transaction which - once reached - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable</td></tr>
<tr><td><code>sql.defaults.transaction_rows_written.err</code></td><td>integer</td><td><code>0</code></td><td>the limit for the number of rows written by a SQL transaction which - once reached - will fail the transaction (or will trigger a logging event to SQL_INTERNAL_PERF for internal transactions); use 0 to disable</td></tr>
<tr><td><code>sql.defaults.transaction_rows_written.log</code></td><td>integer</td><td><code>0</code></td><td>the threshold for the number of rows written by a SQL transaction which - once reached - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable</td></tr>
<tr><td><code>sql.defaults.vectorize</code></td><td>enumeration</td><td><code>on</code></td><td>default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4]</td></tr>
<tr><td><code>sql.defaults.zigzag_join.enabled</code></td><td>boolean</td><td><code>true</code></td><td>default value for enable_zigzag_join session setting; allows use of zig-zag join by default</td></tr>
<tr><td><code>sql.distsql.max_running_flows</code></td><td>integer</td><td><code>500</code></td><td>maximum number of concurrent flows that can be run on a node</td></tr>
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ type Metrics struct {

// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics

// GuardrailMetrics contains metrics related to different guardrails in the
// SQL layer.
GuardrailMetrics GuardrailMetrics
}

// NewServer creates a new Server. Start() needs to be called before the Server
Expand Down Expand Up @@ -414,6 +418,12 @@ func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics {
getMetricMeta(MetaSQLStatsFlushDuration, internal), 6*metricsSampleInterval,
),
},
GuardrailMetrics: GuardrailMetrics{
TxnRowsWrittenLogCount: metric.NewCounter(getMetricMeta(MetaTxnRowsWrittenLog, internal)),
TxnRowsWrittenErrCount: metric.NewCounter(getMetricMeta(MetaTxnRowsWrittenErr, internal)),
TxnRowsReadLogCount: metric.NewCounter(getMetricMeta(MetaTxnRowsReadLog, internal)),
TxnRowsReadErrCount: metric.NewCounter(getMetricMeta(MetaTxnRowsReadErr, internal)),
},
}
}

Expand Down Expand Up @@ -1216,6 +1226,16 @@ type connExecutor struct {
rowsRead int64
bytesRead int64

// rowsWritten tracks the number of rows written (modified) by all
// statements in this txn so far.
rowsWritten int64

// rowsWrittenLogged and rowsReadLoggen indicates whether we have
// already logged an event about reaching written/read rows setting,
// respectively.
rowsWrittenLogged bool
rowsReadLogged bool

// hasAdminRole is used to cache if the user running the transaction
// has admin privilege. hasAdminRoleCache is set for the first statement
// in a transaction.
Expand Down
123 changes: 123 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -1020,6 +1022,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(

ex.extraTxnState.rowsRead += stats.rowsRead
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
Expand All @@ -1031,6 +1034,120 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
}

if limitsErr := ex.handleTxnRowsWrittenReadLimits(ctx); limitsErr != nil && res.Err() == nil {
res.SetError(limitsErr)
}

return err
}

// handleTxnRowsGuardrails handles either "written" or "read" rows guardrails.
func (ex *connExecutor) handleTxnRowsGuardrails(
ctx context.Context,
numRows, logLimit, errLimit int64,
readKind bool,
logCounter, errCounter *metric.Counter,
) error {
var err error
shouldLog := logLimit != 0 && numRows >= logLimit
shouldErr := errLimit != 0 && numRows >= errLimit
if shouldLog || shouldErr {
commonSQLEventDetails := ex.planner.getCommonSQLEventDetails()
info := eventpb.CommonTxnRowsLimitDetails{
TxnID: ex.state.mu.txn.ID().String(),
SessionID: ex.sessionID.String(),
ViolatesTxnRowsLimitErr: shouldErr,
ReadKind: readKind,
}
if shouldErr && ex.executorType == executorTypeInternal {
// Internal work should never err and always log if violating either
// limit.
shouldErr = false
shouldLog = true
}
if readKind {
if ex.extraTxnState.rowsReadLogged {
// We have already logged an event about this transaction.
shouldLog = false
} else {
ex.extraTxnState.rowsReadLogged = shouldLog
}
} else {
if ex.extraTxnState.rowsWrittenLogged {
// We have already logged an event about this transaction.
shouldLog = false
} else {
ex.extraTxnState.rowsWrittenLogged = shouldLog
}
}
if shouldLog {
var event eventpb.EventPayload
if ex.executorType == executorTypeInternal {
if readKind {
event = &eventpb.TxnRowsReadLimitInternal{
CommonSQLEventDetails: *commonSQLEventDetails,
CommonTxnRowsLimitDetails: info,
}
} else {
event = &eventpb.TxnRowsWrittenLimitInternal{
CommonSQLEventDetails: *commonSQLEventDetails,
CommonTxnRowsLimitDetails: info,
}
}
} else {
if readKind {
event = &eventpb.TxnRowsReadLimit{
CommonSQLEventDetails: *commonSQLEventDetails,
CommonTxnRowsLimitDetails: info,
}
} else {
event = &eventpb.TxnRowsWrittenLimit{
CommonSQLEventDetails: *commonSQLEventDetails,
CommonTxnRowsLimitDetails: info,
}
}
}
log.StructuredEvent(ctx, event)
logCounter.Inc(1)
}
if shouldErr {
err = pgerror.WithCandidateCode(&info, pgcode.ProgramLimitExceeded)
errCounter.Inc(1)
}
}
return err
}

// handleTxnRowsWrittenReadLimits checks whether the current transaction has
// reached the limits on the number of rows written/read and logs the
// corresponding event or returns an error. It should be called after executing
// a single statement.
func (ex *connExecutor) handleTxnRowsWrittenReadLimits(ctx context.Context) error {
// Note that in many cases, the internal executor doesn't have the
// sessionData properly set (i.e. the default values are used), so we'll
// never log anything then. This seems acceptable since the focus of these
// guardrails is on the externally initiated queries.
sd := ex.sessionData
err := ex.handleTxnRowsGuardrails(
ctx,
ex.extraTxnState.rowsWritten,
sd.TxnRowsWrittenLog,
sd.TxnRowsWrittenErr,
false, /* readKind */
ex.metrics.GuardrailMetrics.TxnRowsWrittenLogCount,
ex.metrics.GuardrailMetrics.TxnRowsWrittenErrCount,
)
if readErr := ex.handleTxnRowsGuardrails(
ctx,
ex.extraTxnState.rowsRead,
sd.TxnRowsReadLog,
sd.TxnRowsReadErr,
true, /* readKind */
ex.metrics.GuardrailMetrics.TxnRowsReadLogCount,
ex.metrics.GuardrailMetrics.TxnRowsReadErrCount,
); readErr != nil && err == nil {
err = readErr
}
return err
}

Expand Down Expand Up @@ -1074,6 +1191,8 @@ type topLevelQueryStats struct {
bytesRead int64
// rowsRead is the number of rows read from disk.
rowsRead int64
// rowsWritten is the number of rows written.
rowsWritten int64
}

// execWithDistSQLEngine converts a plan to a distributed SQL physical plan and
Expand Down Expand Up @@ -1652,6 +1771,9 @@ func (ex *connExecutor) recordTransactionStart() (
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
ex.extraTxnState.rowsWrittenLogged = false
ex.extraTxnState.rowsReadLogged = false
if txnExecStatsSampleRate := collectTxnStatsSampleRate.Get(&ex.server.GetExecutorConfig().Settings.SV); txnExecStatsSampleRate > 0 {
ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64()
}
Expand All @@ -1678,6 +1800,7 @@ func (ex *connExecutor) recordTransactionStart() (
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0

if ex.server.cfg.TestingKnobs.BeforeRestart != nil {
ex.server.cfg.TestingKnobs.BeforeRestart(ex.Ctx(), ex.extraTxnState.autoRetryReason)
Expand Down
Loading

0 comments on commit 724c709

Please sign in to comment.