diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md
index 3189856d0ab3..44a995cbc69f 100644
--- a/docs/generated/sql/functions.md
+++ b/docs/generated/sql/functions.md
@@ -337,14 +337,6 @@
json_array_elements_text(input: jsonb) → string | Expands a JSON array to a set of text values.
@@ -1166,24 +1148,6 @@ available replica will error.
Function → Returns | Description | Volatility |
-crdb_internal.filter_multiregion_fields_from_zone_config_sql(val: string) → string | Takes in a CONFIGURE ZONE SQL statement and returns a modified
-SQL statement omitting multi-region related zone configuration fields.
-If the CONFIGURE ZONE statement can be inferred by the database’s or
-table’s zone configuration this will return NULL.
- | Stable |
-crdb_internal.reset_multi_region_zone_configs_for_database(id: int) → bool | Resets the zone configuration for a multi-region database to
-match its original state. No-ops if the given database ID is not multi-region
-enabled.
- | Volatile |
-crdb_internal.reset_multi_region_zone_configs_for_table(id: int) → bool | Resets the zone configuration for a multi-region table to
-match its original state. No-ops if the given table ID is not a multi-region
-table.
- | Volatile |
-crdb_internal.validate_multi_region_zone_configs() → bool | Validates all multi-region zone configurations are correctly setup
-for the current database, including all tables, indexes and partitions underneath.
-Returns an error if validation fails. This builtin uses un-leased versions of the
-each descriptor, requiring extra round trips.
- | Volatile |
default_to_database_primary_region(val: string) → string | Returns the given region if the region has been added to the current database.
Otherwise, this will return the primary region of the current database.
This will error if the current database is not a multi-region database.
@@ -1309,23 +1273,6 @@ the locality flag on node startup. Returns an error if no region is set.
aclexplode(aclitems: string[]) → tuple{oid AS grantor, oid AS grantee, string AS privilege_type, bool AS is_grantable} | Produces a virtual table containing aclitem stuff (returns no rows as this feature is unsupported in CockroachDB)
| Stable |
-crdb_internal.scan(span: bytes[]) → tuple{bytes AS key, bytes AS value, string AS ts} | Returns the raw keys and values from the specified span
- | Stable |
-crdb_internal.scan(start_key: bytes, end_key: bytes) → tuple{bytes AS key, bytes AS value, string AS ts} | Returns the raw keys and values with their timestamp from the specified span
- | Stable |
-crdb_internal.tenant_span_stats() → tuple{int AS database_id, int AS table_id, int AS range_count, int AS approximate_disk_bytes, int AS live_bytes, int AS total_bytes, float AS live_percentage} | Returns statistics (range count, disk size, live range bytes, total range bytes, live range byte percentage) for all of the tenant’s tables.
- | Stable |
-crdb_internal.tenant_span_stats(database_id: int) → tuple{int AS database_id, int AS table_id, int AS range_count, int AS approximate_disk_bytes, int AS live_bytes, int AS total_bytes, float AS live_percentage} | Returns statistics (range count, disk size, live range bytes, total range bytes, live range byte percentage) for tables of the provided database id.
- | Stable |
-crdb_internal.tenant_span_stats(database_id: int, table_id: int) → tuple{int AS database_id, int AS table_id, int AS range_count, int AS approximate_disk_bytes, int AS live_bytes, int AS total_bytes, float AS live_percentage} | Returns statistics (range count, disk size, live range bytes, total range bytes, live range byte percentage) for the provided table id.
- | Stable |
-crdb_internal.tenant_span_stats(spans: tuple[]) → tuple{bytes AS start_key, bytes AS end_key, jsonb AS stats} | Returns SpanStats for the provided spans.
- | Stable |
-crdb_internal.testing_callback(name: string) → int | For internal CRDB testing only. The function calls a callback identified by name registered with the server by the test.
- | Volatile |
-crdb_internal.unary_table() → tuple | Produces a virtual table containing a single row with no values.
-This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
generate_series(start: int, end: int) → int | Produces a virtual table containing the integer values from start to end , inclusive.
| Immutable |
generate_series(start: int, end: int, step: int) → int | Produces a virtual table containing the integer values from start to end , inclusive, by increment of step .
@@ -2768,52 +2715,6 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
| Immutable |
convert_to(str: string, enc: string) → bytes | Encode the string str as a byte array using encoding enc . Supports encodings ‘UTF8’ and ‘LATIN1’.
| Immutable |
-crdb_internal.decode_external_plan_gist(gist: string) → string | Returns rows of output similar to EXPLAIN from a gist such as those found in planGists element of the statistics column of the statement_statistics table without attempting to resolve tables or indexes.
- | Volatile |
-crdb_internal.decode_plan_gist(gist: string) → string | Returns rows of output similar to EXPLAIN from a gist such as those found in planGists element of the statistics column of the statement_statistics table.
- | Volatile |
-crdb_internal.gen_rand_ident(name_pattern: string, count: int) → string | Returns random SQL identifiers.
-gen_rand_ident(pattern, count) is an alias for gen_rand_ident(pattern, count, ‘’).
-See the documentation of the other gen_rand_ident overload for details.
- | Volatile |
-crdb_internal.gen_rand_ident(name_pattern: string, count: int, parameters: jsonb) → string | Returns count random SQL identifiers that resemble the name_pattern.
-The last argument is a JSONB object containing the following optional fields:
-
-- “seed”: the seed to use for the pseudo-random generator (default: random).
-- “number”: whether to add a number to the generated names (default true).
-When enabled, occurrences of the character ‘#’ in the name pattern are
-replaced by the number. If ‘#’ is not present, the number is added at the end.
-- “noise”: whether to add noise to the generated names (default true).
-It adds a non-zero probability for each of the probability options below left to zero.
-(To enable noise generally but disable one type of noise, set its probability to -1.)
-- “punctuate”: probability of adding punctuation.
-- “fmt”: probability of adding random Go/C formatting directives.
-- “escapes”: probability of adding random escape sequences.
-- “quote”: probabiltiy of adding single or double quotes.
-- “emote”: probability of adding emojis.
-- “space”: probability of adding simple spaces.
-- “whitespace”: probability of adding complex whitespace.
-- “capitals”: probability of using capital letters.
-Note: the name pattern must contain ASCII letters already for capital letters to be used.
-- “diacritics”: probability of adding diacritics.
-- “diacritic_depth”: max number of diacritics to add at a time (default 1).
-- “zalgo”: special option that overrides diacritics and diacritic_depth (default false).
-
- | Volatile |
-crdb_internal.show_create_all_schemas(database_name: string) → string | Returns rows of CREATE schema statements.
-The output can be used to recreate a database.’
- | Volatile |
-crdb_internal.show_create_all_tables(database_name: string) → string | Returns rows of CREATE table statements followed by
-ALTER table statements that add table constraints. The rows are ordered
-by dependencies. All foreign keys are added after the creation of the table
-in the alter statements.
-It is not recommended to perform this operation on a database with many
-tables.
-The output can be used to recreate a database.’
- | Volatile |
-crdb_internal.show_create_all_types(database_name: string) → string | Returns rows of CREATE type statements.
-The output can be used to recreate a database.’
- | Volatile |
decode(text: string, format: string) → bytes | Decodes data using format (hex / escape / base64 ).
| Immutable |
decompress(data: bytes, codec: string) → bytes | Decompress data with the specified codec (gzip , ‘lz4’, ‘snappy’, 'zstd).
@@ -3126,215 +3027,6 @@ a CockroachDB HLC in decimal form.
may increase either contention or retry errors, or both.
Returns an error if run in a transaction with an isolation level weaker than SERIALIZABLE.
| Volatile |
-crdb_internal.active_version() → jsonb | Returns the current active cluster version.
- | Volatile |
-crdb_internal.approximate_timestamp(timestamp: decimal) → timestamp | Converts the crdb_internal_mvcc_timestamp column into an approximate timestamp.
- | Immutable |
-crdb_internal.assignment_cast(val: anyelement, type: anyelement) → anyelement | This function is used internally to perform assignment casts during mutations.
- | Stable |
-crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) → tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail, interval AS duration} | Runs a consistency check on ranges touching the specified key range. an empty start or end key is treated as the minimum and maximum possible, respectively. stats_only should only be set to false when targeting a small number of ranges to avoid overloading the cluster. Each returned row contains the range ID, the status (a roachpb.CheckConsistencyResponse_Status), and verbose detail.
-Example usage:
-SELECT * FROM crdb_internal.check_consistency(true, b'\x02', b'\x04')
- | Volatile |
-crdb_internal.check_password_hash_format(password: bytes) → string | This function checks whether a string is a precomputed password hash. Returns the hash algorithm.
- | Immutable |
-crdb_internal.cluster_id() → uuid | Returns the logical cluster ID for this tenant.
- | Stable |
-crdb_internal.cluster_name() → string | Returns the cluster name.
- | Volatile |
-crdb_internal.cluster_setting_encoded_default(setting: string) → string | Returns the encoded default value of the given cluster setting.
- | Immutable |
-crdb_internal.create_join_token() → string | Creates a join token for use when adding a new node to a secure cluster.
- | Volatile |
-crdb_internal.create_session_revival_token() → bytes | Generate a token that can be used to create a new session for the current user.
- | Volatile |
-crdb_internal.create_sql_schema_telemetry_job() → int | This function is used to create a schema telemetry job instance.
- | Volatile |
-crdb_internal.decode_cluster_setting(setting: string, value: string) → string | Decodes the given encoded value for a cluster setting.
- | Immutable |
-crdb_internal.deserialize_session(session: bytes) → bool | This function deserializes the serialized variables into the current session.
- | Volatile |
-crdb_internal.encode_key(table_id: int, index_id: int, row_tuple: anyelement) → bytes | Generate the key for a row on a particular table and index.
- | Stable |
-crdb_internal.fingerprint(span: bytes[], start_time: decimal, all_revisions: bool) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.fingerprint(span: bytes[], start_time: timestamptz, all_revisions: bool) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.fingerprint(span: bytes[], stripped: bool) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.force_assertion_error(msg: string) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.force_error(errorCode: string, msg: string) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.force_log_fatal(msg: string) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.force_panic(msg: string) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.force_retry(val: interval) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.generate_test_objects(names: string, counts: int[]) → jsonb | Generates a number of objects whose name follow the provided pattern.
-generate_test_objects(pat, counts) is an alias for
-generate_test_objects(’{“names”:pat, “counts”:counts}’::jsonb)
- | Volatile |
-crdb_internal.generate_test_objects(names: string, number: int) → jsonb | Generates a number of objects whose name follow the provided pattern.
-generate_test_objects(pat, num) is an alias for
-generate_test_objects(’{“names”:pat, “counts”:[num]}’::jsonb)
- | Volatile |
-crdb_internal.generate_test_objects(parameters: jsonb) → jsonb | Generates a number of objects whose name follow the provided pattern.
-Parameters:
-
-- “names”: pattern to use to name the generated objects (default:
-“test”).
-- “counts”: counts of generated objects (default: [10]).
-- “dry_run”: prepare the schema but do not actually write it
-(default: false).
-- “seed”: random seed to use (default: auto).
-- “randomize_columns”: whether to randomize the column names on tables
-(default: true).
-- “table_templates”: table templates to use.
-If the last part of “names” is “_”, the name of the template
-will be used as base pattern during name generation for tables.
-Otherwise, the last part of “names” will be used as pattern.
-If no table templates are specified, a simple template is used.
-- “name_gen”: configuration for the name generation, see below.
-
-Name generation options:
-
-- “number”: whether to add a number to the generated names (default true).
-When enabled, occurrences of the character ‘#’ in the name pattern are
-replaced by the number. If ‘#’ is not present, the number is added at the end.
-- “noise”: whether to add noise to the generated names (default true).
-It adds a non-zero probability for each of the probability options below left to zero.
-(To enable noise generally but disable one type of noise, set its probability to -1.)
-- “punctuate”: probability of adding punctuation.
-- “fmt”: probability of adding random Go/C formatting directives.
-- “escapes”: probability of adding random escape sequences.
-- “quote”: probabiltiy of adding single or double quotes.
-- “emote”: probability of adding emojis.
-- “space”: probability of adding simple spaces.
-- “whitespace”: probability of adding complex whitespace.
-- “capitals”: probability of using capital letters.
-Note: the name pattern must contain ASCII letters already for capital letters to be used.
-- “diacritics”: probability of adding diacritics.
-- “diacritic_depth”: max number of diacritics to add at a time (default 1).
-- “zalgo”: special option that overrides diacritics and diacritic_depth (default false).
-
- | Volatile |
-crdb_internal.get_database_id(name: string) → int | | Stable |
-crdb_internal.get_namespace_id(parent_id: int, name: string) → int | | Stable |
-crdb_internal.get_namespace_id(parent_id: int, parent_schema_id: int, name: string) → int | | Stable |
-crdb_internal.get_vmodule() → string | Returns the vmodule configuration on the gateway node processing this request.
- | Volatile |
-crdb_internal.get_zone_config(namespace_id: int) → bytes | | Stable |
-crdb_internal.has_role_option(option: string) → bool | Returns whether the current user has the specified role option
- | Stable |
-crdb_internal.index_span(table_id: int, index_id: int) → bytes[] | This function returns the span that contains the keys for the given index.
- | Leakproof |
-crdb_internal.is_admin() → bool | Retrieves the current user’s admin status.
- | Stable |
-crdb_internal.is_at_least_version(version: string) → bool | Returns true if the cluster version is not older than the argument.
- | Volatile |
-crdb_internal.is_constraint_active(table_name: string, constraint_name: string) → bool | This function is used to determine if a given constraint is currently.
-active for the current transaction.
- | Volatile |
-crdb_internal.job_execution_details(job_id: int) → jsonb | Output a JSONB version of the specified job’s execution details. The execution details are collectedand persisted during the lifetime of the job and provide more observability into the job’s execution
- | Volatile |
-crdb_internal.lease_holder(key: bytes) → int | This function is used to fetch the leaseholder corresponding to a request key
- | Volatile |
-crdb_internal.list_sql_keys_in_range(range_id: int) → tuple{string AS key, string AS value, string AS ts} | Returns all SQL K/V pairs within the requested range.
- | Volatile |
-crdb_internal.locality_value(key: string) → string | Returns the value of the specified locality key.
- | Stable |
-crdb_internal.node_executable_version() → string | Returns the version of CockroachDB this node is running.
- | Volatile |
-crdb_internal.node_id() → int | Returns the node ID.
- | Stable |
-crdb_internal.notice(msg: string) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.notice(severity: string, msg: string) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.num_geo_inverted_index_entries(table_id: int, index_id: int, val: geography) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_geo_inverted_index_entries(table_id: int, index_id: int, val: geometry) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_inverted_index_entries(val: string, version: int) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_inverted_index_entries(val: anyelement[]) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_inverted_index_entries(val: anyelement[], version: int) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_inverted_index_entries(val: jsonb) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_inverted_index_entries(val: jsonb, version: int) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.num_inverted_index_entries(val: tsvector, version: int) → int | This function is used only by CockroachDB’s developers for testing purposes.
- | Stable |
-crdb_internal.payloads_for_span(span_id: int) → tuple{string AS payload_type, jsonb AS payload_jsonb} | Returns the payload(s) of the requested span and all its children.
- | Volatile |
-crdb_internal.payloads_for_trace(trace_id: int) → tuple{int AS span_id, string AS payload_type, jsonb AS payload_jsonb} | Returns the payload(s) of the requested trace.
- | Volatile |
-crdb_internal.pretty_key(raw_key: bytes, skip_fields: int) → string | This function is used only by CockroachDB’s developers for testing purposes.
- | Immutable |
-crdb_internal.pretty_span(raw_key_start: bytes, raw_key_end: bytes, skip_fields: int) → string | This function is used only by CockroachDB’s developers for testing purposes.
- | Immutable |
-crdb_internal.pretty_value(raw_value: bytes) → string | This function is used only by CockroachDB’s developers for testing purposes.
- | Immutable |
-crdb_internal.range_stats(key: bytes) → jsonb | This function is used to retrieve range statistics information as a JSON object.
- | Volatile |
-crdb_internal.read_file(uri: string) → bytes | Read the content of the file at the supplied external storage URI
- | Volatile |
-crdb_internal.repair_ttl_table_scheduled_job(oid: oid) → void | Repairs the scheduled job for a TTL table if it is missing.
- | Volatile |
-crdb_internal.request_job_execution_details(jobID: int) → bool | Used to request the collection of execution details for a given job ID
- | Volatile |
-crdb_internal.request_statement_bundle(stmtFingerprint: string, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) → bool | Used to request statement bundle for a given statement fingerprint
-that has execution latency greater than the ‘minExecutionLatency’. If the
-‘expiresAfter’ argument is empty, then the statement bundle request never
-expires until the statement bundle is collected
- | Volatile |
-crdb_internal.reset_activity_tables() → bool | This function is used to clear the {statement|transaction} activity system tables.
- | Volatile |
-crdb_internal.reset_index_usage_stats() → bool | This function is used to clear the collected index usage statistics.
- | Volatile |
-crdb_internal.reset_sql_stats() → bool | This function is used to clear the collected SQL statistics.
- | Volatile |
-crdb_internal.revalidate_unique_constraint(table_name: string, constraint_name: string) → void | This function is used to revalidate the given unique constraint in the given
-table. Returns an error if validation fails.
- | Volatile |
-crdb_internal.revalidate_unique_constraints_in_all_tables() → void | This function is used to revalidate all unique constraints in tables
-in the current database. Returns an error if validation fails.
- | Volatile |
-crdb_internal.revalidate_unique_constraints_in_table(table_name: string) → void | This function is used to revalidate all unique constraints in the given
-table. Returns an error if validation fails.
- | Volatile |
-crdb_internal.round_decimal_values(val: decimal, scale: int) → decimal | This function is used internally to round decimal values during mutations.
- | Immutable |
-crdb_internal.round_decimal_values(val: decimal[], scale: int) → decimal[] | This function is used internally to round decimal array values during mutations.
- | Stable |
-crdb_internal.schedule_sql_stats_compaction() → bool | This function is used to start a SQL stats compaction job.
- | Volatile |
-crdb_internal.serialize_session() → bytes | This function serializes the variables in the current session.
- | Volatile |
-crdb_internal.set_trace_verbose(trace_id: int, verbosity: bool) → bool | Returns true if root span was found and verbosity was set, false otherwise.
- | Volatile |
-crdb_internal.set_vmodule(vmodule_string: string) → int | Set the equivalent of the --vmodule flag on the gateway node processing this request; it affords control over the logging verbosity of different files. Example syntax: crdb_internal.set_vmodule('recordio=2,file=1,gfs*=3') . Reset with: crdb_internal.set_vmodule('') . Raising the verbosity can severely affect performance.
- | Volatile |
-crdb_internal.sstable_metrics(node_id: int, store_id: int, start_key: bytes, end_key: bytes) → tuple{int AS node_id, int AS store_id, int AS level, int AS file_num, int AS approximate_span_bytes, jsonb AS metrics} | Returns statistics for the sstables containing keys in the range start_key and end_key for the provided node id.
- | Stable |
-crdb_internal.table_span(table_id: int) → bytes[] | This function returns the span that contains the keys for the given table.
- | Leakproof |
-crdb_internal.trace_id() → int | Returns the current trace ID or an error if no trace is open.
- | Volatile |
-crdb_internal.unsafe_clear_gossip_info(key: string) → bool | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.validate_session_revival_token(token: bytes) → bool | Validate a token that was created by create_session_revival_token. Intended for testing.
- | Volatile |
-crdb_internal.validate_ttl_scheduled_jobs() → void | Validate all TTL tables have a valid scheduled job attached.
- | Volatile |
-crdb_internal.void_func() → void | This function is used only by CockroachDB’s developers for testing purposes.
- | Volatile |
-crdb_internal.write_file(data: bytes, uri: string) → int | Write the content passed to a file at the supplied external storage URI
- | Volatile |
current_database() → string | Returns the current database.
| Stable |
current_schema() → string | Returns the current schema.
@@ -3361,17 +3053,6 @@ table. Returns an error if validation fails.
| Volatile |
|
-### System repair functions
-
-
-Function → Returns | Description | Volatility |
-
-crdb_internal.force_delete_table_data(id: int) → bool | This function can be used to clear the data belonging to a table, when the table cannot be dropped.
- | Volatile |
-crdb_internal.repair_catalog_corruption(descriptor_id: int, corruption: string) → bool | repair_catalog_corruption(descriptor_id,corruption) attempts to repair corrupt records in system tables associated with that descriptor id
- | Volatile |
-
-
### TIMETZ functions
diff --git a/pkg/cmd/docgen/funcs.go b/pkg/cmd/docgen/funcs.go
index 1c65cd6c55d3..df5c6272c8cc 100644
--- a/pkg/cmd/docgen/funcs.go
+++ b/pkg/cmd/docgen/funcs.go
@@ -177,7 +177,7 @@ func generateFunctions(from []string, categorize bool) []byte {
// NB: funcs can appear more than once i.e. upper/lowercase variants for
// faster lookups, so normalize to lowercase and de-dupe using a set.
name = strings.ToLower(name)
- if _, ok := seen[name]; ok {
+ if _, ok := seen[name]; ok || strings.HasPrefix(name, "crdb_internal.") {
continue
}
seen[name] = struct{}{}
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go
index 4f6aaff71f4c..82fe799700d1 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender.go
@@ -368,6 +368,9 @@ func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics {
}
}
+// MetricStruct implements metrics.Struct interface.
+func (DistSenderRangeFeedMetrics) MetricStruct() {}
+
// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates
// DistSenderMetrics for batch requests that have been divided and are currently
// forwarding to a specific replica for the corresponding range. The metrics
diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go
index 496d611f41a0..0cc4cb34fd81 100644
--- a/pkg/kv/kvnemesis/validator.go
+++ b/pkg/kv/kvnemesis/validator.go
@@ -1232,16 +1232,6 @@ func (v *validator) failIfError(
) (ambiguous, hasError bool) {
exceptions = append(exceptions[:len(exceptions):len(exceptions)], func(err error) bool {
return errors.Is(err, errInjected)
- }, func(err error) bool {
- // Work-around for [1].
- //
- // TODO(arul): find out why we (as of [2]) sometimes leaking
- // *TransactionPushError (wrapped in `UnhandledRetryableError`) from
- // `db.Get`, `db.Scan`, etc.
- //
- // [1]: https://github.com/cockroachdb/cockroach/issues/105330
- // [2]: https://github.com/cockroachdb/cockroach/pull/97779
- return errors.HasType(err, (*kvpb.UnhandledRetryableError)(nil))
})
switch r.Type {
case ResultType_Unknown:
diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go
index ba68798a1435..2efdd4b90b20 100644
--- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go
+++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go
@@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/must"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
@@ -213,38 +214,9 @@ func PushTxn(
}
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)
- // If the pusher is aware that the pushee's currently recorded attempt at a
- // parallel commit failed, either because it found intents at a higher
- // timestamp than the parallel commit attempt or because it found intents at
- // a higher epoch than the parallel commit attempt, it should not consider
- // the pushee to be performing a parallel commit. Its commit status is not
- // indeterminate.
- if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
- reply.PusheeTxn.Status = roachpb.PENDING
- reply.PusheeTxn.InFlightWrites = nil
- // If the pusher is aware that the pushee's currently recorded attempt
- // at a parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs.
- // We don't want to move the transaction back to PENDING, as this is not
- // (currently) allowed by the recovery protocol. We also don't want to
- // move the transaction to a new timestamp while retaining the STAGING
- // status, as this could allow the transaction to enter an implicit
- // commit state without its knowledge, leading to atomicity violations.
- //
- // This has no effect on pushes that fail with a TransactionPushError.
- // Such pushes will still wait on the pushee to retry its commit and
- // eventually commit or abort. It also has no effect on expired pushees,
- // as they would have been aborted anyway. This only impacts pushes
- // which would have succeeded due to priority mismatches. In these
- // cases, the push acts the same as a short-circuited transaction
- // recovery process, because the transaction recovery procedure always
- // finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
- if pushType == kvpb.PUSH_TIMESTAMP {
- pushType = kvpb.PUSH_ABORT
- }
- }
-
pusherIso, pusheeIso := args.PusherTxn.IsoLevel, reply.PusheeTxn.IsoLevel
pusherPri, pusheePri := args.PusherTxn.Priority, reply.PusheeTxn.Priority
+ pusheeStatus := reply.PusheeTxn.Status
var pusherWins bool
var reason string
switch {
@@ -258,7 +230,7 @@ func PushTxn(
// If just attempting to cleanup old or already-committed txns,
// pusher always fails.
pusherWins = false
- case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
+ case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
reason = "pusher has priority"
pusherWins = true
case args.Force:
@@ -282,13 +254,42 @@ func PushTxn(
// If the pushed transaction is in the staging state, we can't change its
// record without first going through the transaction recovery process and
// attempting to finalize it.
+ pusheeStaging := pusheeStatus == roachpb.STAGING
+ // However, if the pusher is aware that the pushee's currently recorded
+ // attempt at a parallel commit failed, either because it found intents at a
+ // higher timestamp than the parallel commit attempt or because it found
+ // intents at a higher epoch than the parallel commit attempt, it should not
+ // consider the pushee to be performing a parallel commit. Its commit status
+ // is not indeterminate.
+ pusheeStagingFailed := pusheeStaging && (knownHigherTimestamp || knownHigherEpoch)
recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes
- if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) {
+ if pusheeStaging && !pusheeStagingFailed && (pusherWins || recoverOnFailedPush) {
err := kvpb.NewIndeterminateCommitError(reply.PusheeTxn)
log.VEventf(ctx, 1, "%v", err)
return result.Result{}, err
}
+ // If the pusher is aware that the pushee's currently recorded attempt at a
+ // parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
+ // want to move the transaction back to PENDING, as this is not (currently)
+ // allowed by the recovery protocol. We also don't want to move the
+ // transaction to a new timestamp while retaining the STAGING status, as this
+ // could allow the transaction to enter an implicit commit state without its
+ // knowledge, leading to atomicity violations.
+ //
+ // This has no effect on pushes that fail with a TransactionPushError. Such
+ // pushes will still wait on the pushee to retry its commit and eventually
+ // commit or abort. It also has no effect on expired pushees, as they would
+ // have been aborted anyway. This only impacts pushes which would have
+ // succeeded due to priority mismatches. In these cases, the push acts the
+ // same as a short-circuited transaction recovery process, because the
+ // transaction recovery procedure always finalizes target transactions, even
+ // if initiated by a PUSH_TIMESTAMP.
+ if pusheeStaging && pusherWins && pushType == kvpb.PUSH_TIMESTAMP {
+ _ = must.True(ctx, pusheeStagingFailed, "parallel commit must be known to have failed for push to succeed")
+ pushType = kvpb.PUSH_ABORT
+ }
+
if !pusherWins {
err := kvpb.NewTransactionPushError(reply.PusheeTxn)
log.VEventf(ctx, 1, "%v", err)
@@ -306,6 +307,8 @@ func PushTxn(
// Forward the timestamp to accommodate AbortSpan GC. See method comment for
// details.
reply.PusheeTxn.WriteTimestamp.Forward(reply.PusheeTxn.LastActive())
+ // If the transaction was previously staging, clear its in-flight writes.
+ reply.PusheeTxn.InFlightWrites = nil
// If the transaction record was already present, persist the updates to it.
// If not, then we don't want to create it. This could allow for finalized
// transactions to be revived. Instead, we obey the invariant that only the
diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go
index e43b29643eaf..5f26e84c0926 100644
--- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go
+++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go
@@ -747,10 +747,11 @@ func (c *cluster) PushTransaction(
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
pusheeIso := pusheeTxn.IsoLevel
pusheePri := pusheeTxn.Priority
+ pusheeStatus := pusheeTxn.Status
// NOTE: this logic is adapted from cmd_push_txn.go.
var pusherWins bool
switch {
- case pusheeTxn.Status.IsFinalized():
+ case pusheeStatus.IsFinalized():
// Already finalized.
return pusheeTxn, nil
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
@@ -758,7 +759,7 @@ func (c *cluster) PushTransaction(
return pusheeTxn, nil
case pushType == kvpb.PUSH_TOUCH:
pusherWins = false
- case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
+ case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
pusherWins = true
default:
pusherWins = false
diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go
index 2a91da9c4857..e08e75d7f644 100644
--- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go
+++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go
@@ -1298,7 +1298,11 @@ func canPushWithPriority(req Request, s waitingState) bool {
}
pusheeIso = s.txn.IsoLevel
pusheePri = s.txn.Priority
- return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri)
+ // We assume that the pushee is in the PENDING state when deciding whether
+ // to push. A push may determine that the pushee is STAGING or has already
+ // been finalized.
+ pusheeStatus := roachpb.PENDING
+ return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
}
func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {
diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go
index d7b47b029c90..1afa3cd3a34d 100644
--- a/pkg/kv/kvserver/replica_send.go
+++ b/pkg/kv/kvserver/replica_send.go
@@ -773,7 +773,7 @@ func (r *Replica) handleTransactionPushError(
dontRetry := r.store.cfg.TestingKnobs.DontRetryPushTxnFailures
if !dontRetry && ba.IsSinglePushTxnRequest() {
pushReq := ba.Requests[0].GetInner().(*kvpb.PushTxnRequest)
- dontRetry = txnwait.ShouldPushImmediately(pushReq)
+ dontRetry = txnwait.ShouldPushImmediately(pushReq, t.PusheeTxn.Status)
}
if dontRetry {
return g, pErr
diff --git a/pkg/kv/kvserver/txn_recovery_integration_test.go b/pkg/kv/kvserver/txn_recovery_integration_test.go
index af78ce8fa482..9e0b9bc1ab44 100644
--- a/pkg/kv/kvserver/txn_recovery_integration_test.go
+++ b/pkg/kv/kvserver/txn_recovery_integration_test.go
@@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
@@ -288,6 +290,127 @@ func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
})
}
+// TestTxnRecoveryFromStagingWithoutHighPriority tests that the transaction
+// recovery process is NOT initiated by a normal-priority operation which
+// encounters a staging transaction. Instead, the normal-priority operation
+// waits for the committing transaction to complete. The test contains a subtest
+// for each of the combinations of the following options:
+//
+// - pusheeIsoLevel: configures the isolation level of the pushee (committing)
+// transaction. Isolation levels affect the behavior of pushes of pending
+// transactions, but not of staging transactions.
+//
+// - pusheeCommits: configures whether or not the staging transaction is
+// implicitly and, eventually, explicitly committed or not.
+//
+// - pusherWriting: configures whether or not the conflicting operation is a
+// read (false) or a write (true), which dictates the kind of push operation
+// dispatched against the staging transaction.
+func TestTxnRecoveryFromStagingWithoutHighPriority(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
+ run := func(t *testing.T, pusheeIsoLevel isolation.Level, pusheeCommits, pusherWriting bool) {
+ stopper := stop.NewStopper()
+ defer stopper.Stop(ctx)
+ manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
+ cfg := TestStoreConfig(hlc.NewClockForTesting(manual))
+ store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)
+
+ // Create a transaction that will get stuck performing a parallel
+ // commit.
+ keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
+ txn := newTransaction("txn", keyA, 1, store.Clock())
+ txn.IsoLevel = pusheeIsoLevel
+
+ // Issue two writes, which will be considered in-flight at the time of
+ // the transaction's EndTxn request.
+ keyAVal := []byte("value")
+ pArgs := putArgs(keyA, keyAVal)
+ pArgs.Sequence = 1
+ h := kvpb.Header{Txn: txn}
+ _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
+ require.Nil(t, pErr, "error: %s", pErr)
+
+ pArgs = putArgs(keyB, []byte("value2"))
+ pArgs.Sequence = 2
+ h2 := kvpb.Header{Txn: txn.Clone()}
+ if !pusheeCommits {
+ // If we're not going to have the pushee commit, make sure it never enters
+ // the implicit commit state by bumping the timestamp of one of its writes.
+ manual.Advance(100)
+ h2.Txn.WriteTimestamp = store.Clock().Now()
+ }
+ _, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
+ require.Nil(t, pErr, "error: %s", pErr)
+
+ // Issue a parallel commit, which will put the transaction into a
+ // STAGING state. Include both writes as the EndTxn's in-flight writes.
+ et, etH := endTxnArgs(txn, true)
+ et.InFlightWrites = []roachpb.SequencedWrite{
+ {Key: keyA, Sequence: 1},
+ {Key: keyB, Sequence: 2},
+ }
+ etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
+ require.Nil(t, pErr, "error: %s", pErr)
+ require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)
+
+ // Issue a conflicting, normal-priority operation.
+ var conflictArgs kvpb.Request
+ if pusherWriting {
+ pArgs = putArgs(keyB, []byte("value3"))
+ conflictArgs = &pArgs
+ } else {
+ gArgs := getArgs(keyB)
+ conflictArgs = &gArgs
+ }
+ manual.Advance(100)
+ pErrC := make(chan *kvpb.Error, 1)
+ require.NoError(t, stopper.RunAsyncTask(ctx, "conflict", func(ctx context.Context) {
+ _, pErr := kv.SendWrapped(ctx, store.TestSender(), conflictArgs)
+ pErrC <- pErr
+ }))
+
+ // Wait for the conflict to push and be queued in the txn wait queue.
+ testutils.SucceedsSoon(t, func() error {
+ select {
+ case pErr := <-pErrC:
+ t.Fatalf("conflicting operation unexpectedly completed: pErr=%s", pErr)
+ default:
+ }
+ if v := store.txnWaitMetrics.PusherWaiting.Value(); v != 1 {
+ return errors.Errorf("expected 1 pusher waiting, found %d", v)
+ }
+ return nil
+ })
+
+ // Finalize the STAGING txn, either by committing it or by aborting it.
+ et2, et2H := endTxnArgs(txn, pusheeCommits)
+ etReply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), et2H, &et2)
+ require.Nil(t, pErr, "error: %s", pErr)
+ expStatus := roachpb.COMMITTED
+ if !pusheeCommits {
+ expStatus = roachpb.ABORTED
+ }
+ require.Equal(t, expStatus, etReply.Header().Txn.Status)
+
+ // This will unblock the conflicting operation, which should succeed.
+ pErr = <-pErrC
+ require.Nil(t, pErr, "error: %s", pErr)
+ }
+
+ for _, pusheeIsoLevel := range isolation.Levels() {
+ t.Run("pushee_iso_level="+pusheeIsoLevel.String(), func(t *testing.T) {
+ testutils.RunTrueAndFalse(t, "pushee_commits", func(t *testing.T, pusheeCommits bool) {
+ testutils.RunTrueAndFalse(t, "pusher_writing", func(t *testing.T, pusherWriting bool) {
+ run(t, pusheeIsoLevel, pusheeCommits, pusherWriting)
+ })
+ })
+ })
+ }
+}
+
// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
// write intents. This can cause it to remove an intent from an implicitly
// committed STAGING txn. When txn recovery kicks in, it will fail to find the
diff --git a/pkg/kv/kvserver/txnwait/queue.go b/pkg/kv/kvserver/txnwait/queue.go
index 822b0bbba70f..bd716eefecc6 100644
--- a/pkg/kv/kvserver/txnwait/queue.go
+++ b/pkg/kv/kvserver/txnwait/queue.go
@@ -70,7 +70,7 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() {
// proceed without queueing. This is true for pushes which are neither
// ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where
// the pushee has min priority or pusher has max priority.
-func ShouldPushImmediately(req *kvpb.PushTxnRequest) bool {
+func ShouldPushImmediately(req *kvpb.PushTxnRequest, pusheeStatus roachpb.TransactionStatus) bool {
if req.Force {
return true
}
@@ -78,15 +78,18 @@ func ShouldPushImmediately(req *kvpb.PushTxnRequest) bool {
req.PushType,
req.PusherTxn.IsoLevel, req.PusheeTxn.IsoLevel,
req.PusherTxn.Priority, req.PusheeTxn.Priority,
+ pusheeStatus,
)
}
// CanPushWithPriority returns true if the pusher can perform the specified push
-// type on the pushee, based on the two txns' isolation levels and priorities.
+// type on the pushee, based on the two txns' isolation levels, their priorities,
+// and the pushee's status.
func CanPushWithPriority(
pushType kvpb.PushTxnType,
pusherIso, pusheeIso isolation.Level,
pusherPri, pusheePri enginepb.TxnPriority,
+ pusheeStatus roachpb.TransactionStatus,
) bool {
// Normalize the transaction priorities so that normal user priorities are
// considered equal for the purposes of pushing.
@@ -103,6 +106,15 @@ func CanPushWithPriority(
case kvpb.PUSH_ABORT:
return pusherPri > pusheePri
case kvpb.PUSH_TIMESTAMP:
+ // If the pushee transaction is STAGING, only let the PUSH_TIMESTAMP through
+ // to disrupt the transaction commit if the pusher has a higher priority. If
+ // the priorities are equal, the PUSH_TIMESTAMP should wait for the commit
+ // to complete.
+ if pusheeStatus == roachpb.STAGING {
+ return pusherPri > pusheePri
+ }
+ // Otherwise, the pushee has not yet started committing...
+
// If the pushee transaction tolerates write skew, the PUSH_TIMESTAMP is
// harmless, so let it through.
return pusheeIso.ToleratesWriteSkew() ||
@@ -476,10 +488,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID
func (q *Queue) MaybeWaitForPush(
ctx context.Context, req *kvpb.PushTxnRequest,
) (*kvpb.PushTxnResponse, *kvpb.Error) {
- if ShouldPushImmediately(req) {
- return nil, nil
- }
-
q.mu.Lock()
// If the txn wait queue is not enabled or if the request is not
// contained within the replica, do nothing. The request can fall
@@ -501,6 +509,9 @@ func (q *Queue) MaybeWaitForPush(
if txn := pending.getTxn(); isPushed(req, txn) {
q.mu.Unlock()
return createPushTxnResponse(txn), nil
+ } else if ShouldPushImmediately(req, txn.Status) {
+ q.mu.Unlock()
+ return nil, nil
}
push := &waitingPush{
diff --git a/pkg/kv/kvserver/txnwait/queue_test.go b/pkg/kv/kvserver/txnwait/queue_test.go
index 74c8088a19a1..259d9f1fc957 100644
--- a/pkg/kv/kvserver/txnwait/queue_test.go
+++ b/pkg/kv/kvserver/txnwait/queue_test.go
@@ -161,7 +161,8 @@ func TestShouldPushImmediately(t *testing.T) {
Priority: test.pusheePri,
},
}
- shouldPush := ShouldPushImmediately(&req)
+ pusheeStatus := roachpb.PENDING
+ shouldPush := ShouldPushImmediately(&req, pusheeStatus)
require.Equal(t, test.shouldPush, shouldPush)
})
}
@@ -179,6 +180,7 @@ func testCanPushWithPriorityPushAbort(t *testing.T) {
max := enginepb.MaxTxnPriority
mid1 := enginepb.TxnPriority(1)
mid2 := enginepb.TxnPriority(2)
+ statuses := []roachpb.TransactionStatus{roachpb.PENDING, roachpb.STAGING}
testCases := []struct {
pusherPri enginepb.TxnPriority
pusheePri enginepb.TxnPriority
@@ -201,22 +203,31 @@ func testCanPushWithPriorityPushAbort(t *testing.T) {
{max, mid2, true},
{max, max, false},
}
- // NOTE: the behavior of PUSH_ABORT pushes is agnostic to isolation levels.
- for _, pusherIso := range isolation.Levels() {
- for _, pusheeIso := range isolation.Levels() {
- for _, test := range testCases {
- name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
- pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
- t.Run(name, func(t *testing.T) {
- canPush := CanPushWithPriority(kvpb.PUSH_ABORT, pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
- require.Equal(t, test.exp, canPush)
- })
+ // NOTE: the behavior of PUSH_ABORT pushes is agnostic to pushee status.
+ for _, pusheeStatus := range statuses {
+ // NOTE: the behavior of PUSH_ABORT pushes is agnostic to isolation levels.
+ for _, pusherIso := range isolation.Levels() {
+ for _, pusheeIso := range isolation.Levels() {
+ for _, test := range testCases {
+ name := fmt.Sprintf("pusheeStatus=%s/pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
+ pusheeStatus, pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
+ t.Run(name, func(t *testing.T) {
+ canPush := CanPushWithPriority(
+ kvpb.PUSH_ABORT, pusherIso, pusheeIso, test.pusherPri, test.pusheePri, pusheeStatus)
+ require.Equal(t, test.exp, canPush)
+ })
+ }
}
}
}
}
func testCanPushWithPriorityPushTimestamp(t *testing.T) {
+ t.Run("pusheeStatus="+roachpb.PENDING.String(), testCanPushWithPriorityPushTimestampPusheePending)
+ t.Run("pusheeStatus="+roachpb.STAGING.String(), testCanPushWithPriorityPushTimestampPusheeStaging)
+}
+
+func testCanPushWithPriorityPushTimestampPusheePending(t *testing.T) {
SSI := isolation.Serializable
SI := isolation.Snapshot
RC := isolation.ReadCommitted
@@ -389,30 +400,80 @@ func testCanPushWithPriorityPushTimestamp(t *testing.T) {
name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
test.pusherIso, test.pusheeIso, test.pusherPri, test.pusheePri)
t.Run(name, func(t *testing.T) {
- canPush := CanPushWithPriority(kvpb.PUSH_TIMESTAMP, test.pusherIso, test.pusheeIso, test.pusherPri, test.pusheePri)
+ canPush := CanPushWithPriority(
+ kvpb.PUSH_TIMESTAMP, test.pusherIso, test.pusheeIso, test.pusherPri, test.pusheePri, roachpb.PENDING)
require.Equal(t, test.exp, canPush)
})
}
}
-func testCanPushWithPriorityPushTouch(t *testing.T) {
+func testCanPushWithPriorityPushTimestampPusheeStaging(t *testing.T) {
min := enginepb.MinTxnPriority
max := enginepb.MaxTxnPriority
mid1 := enginepb.TxnPriority(1)
mid2 := enginepb.TxnPriority(2)
- priorities := []enginepb.TxnPriority{min, mid1, mid2, max}
- // NOTE: the behavior of PUSH_TOUCH pushes is agnostic to isolation levels.
+ testCases := []struct {
+ pusherPri enginepb.TxnPriority
+ pusheePri enginepb.TxnPriority
+ exp bool
+ }{
+ {min, min, false},
+ {min, mid1, false},
+ {min, mid2, false},
+ {min, max, false},
+ {mid1, min, true},
+ {mid1, mid1, false},
+ {mid1, mid2, false},
+ {mid1, max, false},
+ {mid2, min, true},
+ {mid2, mid1, false},
+ {mid2, mid2, false},
+ {mid2, max, false},
+ {max, min, true},
+ {max, mid1, true},
+ {max, mid2, true},
+ {max, max, false},
+ }
+ // NOTE: the behavior of PUSH_TIMESTAMP pushes is agnostic to isolation levels
+ // when the pushee transaction is STAGING.
for _, pusherIso := range isolation.Levels() {
for _, pusheeIso := range isolation.Levels() {
- // NOTE: the behavior of PUSH_TOUCH pushes is agnostic to txn priorities.
- for _, pusherPri := range priorities {
- for _, pusheePri := range priorities {
- name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
- pusherIso, pusheeIso, pusherPri, pusheePri)
- t.Run(name, func(t *testing.T) {
- canPush := CanPushWithPriority(kvpb.PUSH_TOUCH, pusherIso, pusheeIso, pusherPri, pusheePri)
- require.True(t, canPush)
- })
+ for _, test := range testCases {
+ name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
+ pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
+ t.Run(name, func(t *testing.T) {
+ canPush := CanPushWithPriority(
+ kvpb.PUSH_TIMESTAMP, pusherIso, pusheeIso, test.pusherPri, test.pusheePri, roachpb.STAGING)
+ require.Equal(t, test.exp, canPush)
+ })
+ }
+ }
+ }
+}
+
+func testCanPushWithPriorityPushTouch(t *testing.T) {
+ min := enginepb.MinTxnPriority
+ max := enginepb.MaxTxnPriority
+ mid1 := enginepb.TxnPriority(1)
+ mid2 := enginepb.TxnPriority(2)
+ priorities := []enginepb.TxnPriority{min, mid1, mid2, max}
+ statuses := []roachpb.TransactionStatus{roachpb.PENDING, roachpb.STAGING}
+ // NOTE: the behavior of PUSH_TOUCH pushes is agnostic to pushee status.
+ for _, pusheeStatus := range statuses {
+ // NOTE: the behavior of PUSH_TOUCH pushes is agnostic to isolation levels.
+ for _, pusherIso := range isolation.Levels() {
+ for _, pusheeIso := range isolation.Levels() {
+ // NOTE: the behavior of PUSH_TOUCH pushes is agnostic to txn priorities.
+ for _, pusherPri := range priorities {
+ for _, pusheePri := range priorities {
+ name := fmt.Sprintf("pusheeStatus=%s/pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
+ pusheeStatus, pusherIso, pusheeIso, pusherPri, pusheePri)
+ t.Run(name, func(t *testing.T) {
+ canPush := CanPushWithPriority(
+ kvpb.PUSH_TOUCH, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
+ require.True(t, canPush)
+ })
+ }
}
}
}
diff --git a/pkg/server/node.go b/pkg/server/node.go
index bf1326c7e104..1284d7cfcb86 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -195,6 +195,30 @@ This metric is thus not an indicator of KV health.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
+ metaActiveRangeFeed = metric.Metadata{
+ Name: "rpc.streams.rangefeed.active",
+ Help: `Number of currently running RangeFeed streams`,
+ Measurement: "Streams",
+ Unit: metric.Unit_COUNT,
+ }
+ metaTotalRangeFeed = metric.Metadata{
+ Name: "rpc.streams.rangefeed.recv",
+ Help: `Total number of RangeFeed streams`,
+ Measurement: "Streams",
+ Unit: metric.Unit_COUNT,
+ }
+ metaActiveMuxRangeFeed = metric.Metadata{
+ Name: "rpc.streams.mux_rangefeed.active",
+ Help: `Number of currently running MuxRangeFeed streams`,
+ Measurement: "Streams",
+ Unit: metric.Unit_COUNT,
+ }
+ metaTotalMuxRangeFeed = metric.Metadata{
+ Name: "rpc.streams.mux_rangefeed.recv",
+ Help: `Total number of MuxRangeFeed streams`,
+ Measurement: "Streams",
+ Unit: metric.Unit_COUNT,
+ }
)
// Cluster settings.
@@ -243,6 +267,10 @@ type nodeMetrics struct {
CrossRegionBatchResponseBytes *metric.Counter
CrossZoneBatchRequestBytes *metric.Counter
CrossZoneBatchResponseBytes *metric.Counter
+ NumRangeFeed *metric.Counter
+ ActiveRangeFeed *metric.Gauge
+ NumMuxRangeFeed *metric.Counter
+ ActiveMuxRangeFeed *metric.Gauge
}
func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMetrics {
@@ -263,6 +291,10 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe
CrossRegionBatchResponseBytes: metric.NewCounter(metaCrossRegionBatchResponse),
CrossZoneBatchRequestBytes: metric.NewCounter(metaCrossZoneBatchRequest),
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
+ ActiveRangeFeed: metric.NewGauge(metaActiveRangeFeed),
+ NumRangeFeed: metric.NewCounter(metaTotalRangeFeed),
+ ActiveMuxRangeFeed: metric.NewGauge(metaActiveMuxRangeFeed),
+ NumMuxRangeFeed: metric.NewCounter(metaTotalMuxRangeFeed),
}
for i := range nm.MethodCounts {
@@ -1630,6 +1662,10 @@ func (n *Node) RangeFeed(args *kvpb.RangeFeedRequest, stream kvpb.Internal_Range
_, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()
+ n.metrics.NumRangeFeed.Inc(1)
+ n.metrics.ActiveRangeFeed.Inc(1)
+ defer n.metrics.ActiveRangeFeed.Inc(-1)
+
if err := errors.CombineErrors(future.Wait(ctx, n.stores.RangeFeed(args, stream))); err != nil {
// Got stream context error, probably won't be able to propagate it to the stream,
// but give it a try anyway.
@@ -1765,6 +1801,10 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
}
defer cleanup()
+ n.metrics.NumMuxRangeFeed.Inc(1)
+ n.metrics.ActiveMuxRangeFeed.Inc(1)
+ defer n.metrics.ActiveMuxRangeFeed.Inc(-1)
+
for {
req, err := stream.Recv()
if err != nil {
@@ -1782,9 +1822,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
wrapped: muxStream,
}
- // TODO(yevgeniy): Add observability into actively running rangefeeds.
+ n.metrics.NumMuxRangeFeed.Inc(1)
+ n.metrics.ActiveMuxRangeFeed.Inc(1)
f := n.stores.RangeFeed(req, &sink)
f.WhenReady(func(err error) {
+ n.metrics.ActiveMuxRangeFeed.Inc(-1)
if err == nil {
cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED
if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) {
diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go
index 70e14bc98ef0..1930dee0555c 100644
--- a/pkg/sql/instrumentation.go
+++ b/pkg/sql/instrumentation.go
@@ -424,8 +424,27 @@ func (ih *instrumentationHelper) Finish(
if pwe, ok := retPayload.(payloadWithError); ok {
payloadErr = pwe.errorCause()
}
+ bundleCtx := ctx
+ if bundleCtx.Err() != nil {
+ // The only two possible errors on the context are the context
+ // cancellation or the context deadline being exceeded. The
+ // former seems more likely, and the cancellation is most likely
+ // to have occurred due to a statement timeout, so we still want
+ // to proceed with saving the statement bundle. Thus, we
+ // override the canceled context, but first we'll log the error
+ // as a warning.
+ log.Warningf(
+ bundleCtx, "context has an error when saving the bundle, proceeding "+
+ "with the background one (with deadline of 10 seconds): %v", bundleCtx.Err(),
+ )
+ // We want to be conservative, so we add a deadline of 10
+ // seconds on top of the background context.
+ var cancel context.CancelFunc
+ bundleCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) // nolint:context
+ defer cancel()
+ }
bundle = buildStatementBundle(
- ctx, ih.explainFlags, cfg.DB, ie.(*InternalExecutor), stmtRawSQL, &p.curPlan,
+ bundleCtx, ih.explainFlags, cfg.DB, ie.(*InternalExecutor), stmtRawSQL, &p.curPlan,
ob.BuildString(), trace, placeholders, res.Err(), payloadErr, retErr,
&p.extendedEvalCtx.Settings.SV,
)
@@ -434,7 +453,7 @@ func (ih *instrumentationHelper) Finish(
// to the current user and aren't included into the bundle.
warnings = append(warnings, bundle.errorStrings...)
bundle.insert(
- ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest,
+ bundleCtx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest,
)
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
}
diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go
index 2b7a3a35b413..ea5757d199c9 100644
--- a/pkg/sql/row/row_converter.go
+++ b/pkg/sql/row/row_converter.go
@@ -489,7 +489,7 @@ func NewDatumRowConverter(
return c, nil
}
-const rowIDBits = 64 - builtinconstants.NodeIDBits
+const rowIDBits = 64 - builtinconstants.UniqueIntNodeIDBits
// Row inserts kv operations into the current kv batch, and triggers a SendBatch
// if necessary.
diff --git a/pkg/sql/sem/builtins/builtinconstants/constants.go b/pkg/sql/sem/builtins/builtinconstants/constants.go
index 13eb43f35465..6e0964fd8e09 100644
--- a/pkg/sql/sem/builtins/builtinconstants/constants.go
+++ b/pkg/sql/sem/builtins/builtinconstants/constants.go
@@ -76,6 +76,28 @@ const (
CreateSchemaTelemetryJobBuiltinName = "crdb_internal.create_sql_schema_telemetry_job"
)
-// NodeIDBits is the number of bits stored in the lower portion of
-// GenerateUniqueInt.
-const NodeIDBits = 15
+// A unique int generated by GenerateUniqueInt is a 64-bit integer with
+// the following format:
+//
+// [1 leading zero bit][48 bits for timestamp][15 bits for nodeID]
+const (
+ // UniqueIntLeadingZeroBits is the number of leading zero bits in a unique
+ // int generated by GenerateUniqueInt.
+ UniqueIntLeadingZeroBits = 1
+
+ // UniqueIntTimestampBits is the number of bits in the timestamp segment
+ // in a unique int generated by GenerateUniqueInt.
+ UniqueIntTimestampBits = 48
+
+ // UniqueIntNodeIDBits is the number of bits in the node ID segment
+ // in a unique int generated by GenerateUniqueInt.
+ UniqueIntNodeIDBits = 15
+
+ // UniqueIntNodeIDMask is a bitmask for the node ID in a unique int
+ // generated by GenerateUniqueInt.
+ UniqueIntNodeIDMask = 1<> 16) << 15)) >> 15
- v := (bits.Reverse64(ts) >> 1) | (val & (1<<15 - 1))
- return v
+ ts := uniqueInt & builtinconstants.UniqueIntTimestampMask
+ nodeID := uniqueInt & builtinconstants.UniqueIntNodeIDMask
+ reversedTS := bits.Reverse64(ts< |