diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 6539bf87c290..518a4dfafa35 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -629,8 +629,6 @@
STORAGE | rpc.method.writebatch.recv | Number of WriteBatch requests processed | RPCs | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | rpc.streams.mux_rangefeed.active | Number of currently running MuxRangeFeed streams | Streams | GAUGE | COUNT | AVG | NONE |
STORAGE | rpc.streams.mux_rangefeed.recv | Total number of MuxRangeFeed streams | Streams | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
-STORAGE | rpc.streams.rangefeed.active | Number of currently running RangeFeed streams | Streams | GAUGE | COUNT | AVG | NONE |
-STORAGE | rpc.streams.rangefeed.recv | Total number of RangeFeed streams | Streams | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | spanconfig.kvsubscriber.oldest_protected_record_nanos | Difference between the current time and the oldest protected timestamp (sudden drops indicate a record being released; an ever increasing number indicates that the oldest record is around and preventing GC if > configured GC TTL) | Nanoseconds | GAUGE | NANOSECONDS | AVG | NONE |
STORAGE | spanconfig.kvsubscriber.protected_record_count | Number of protected timestamp records, as seen by KV | Records | GAUGE | COUNT | AVG | NONE |
STORAGE | spanconfig.kvsubscriber.update_behind_nanos | Difference between the current time and when the KVSubscriber received its last update (an ever increasing number indicates that we're no longer receiving updates) | Nanoseconds | GAUGE | NANOSECONDS | AVG | NONE |
diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go
index 2b8f857e7746..641353a1884c 100644
--- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go
+++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go
@@ -1102,8 +1102,16 @@ func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string {
func testTenantStatusCancelSession(t *testing.T, helper serverccl.TenantTestHelper) {
// Open a SQL session on tenant SQL pod 0.
- sqlPod0 := helper.TestCluster().TenantConn(0)
- sqlPod0.Exec(t, "SELECT 1")
+ ctx := context.Background()
+ // Open two different SQL sessions on tenant SQL pod 0.
+ sqlPod0 := helper.TestCluster().TenantDB(0)
+ sqlPod0SessionToCancel, err := sqlPod0.Conn(ctx)
+ require.NoError(t, err)
+ sqlPod0SessionForIntrospection, err := sqlPod0.Conn(ctx)
+ require.NoError(t, err)
+ _, err = sqlPod0SessionToCancel.ExecContext(ctx, "SELECT 1")
+ require.NoError(t, err)
+ introspectionRunner := sqlutils.MakeSQLRunner(sqlPod0SessionForIntrospection)
// See the session over HTTP on tenant SQL pod 1.
httpPod1 := helper.TestCluster().TenantAdminHTTPClient(t, 1)
@@ -1122,7 +1130,7 @@ func testTenantStatusCancelSession(t *testing.T, helper serverccl.TenantTestHelp
// See the session over SQL on tenant SQL pod 0.
sessionID := hex.EncodeToString(session.ID)
require.Eventually(t, func() bool {
- return strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID)
+ return strings.Contains(strings.Join(selectClusterSessionIDs(t, introspectionRunner), ","), sessionID)
}, 5*time.Second, 100*time.Millisecond)
// Cancel the session over HTTP from tenant SQL pod 1.
@@ -1134,7 +1142,7 @@ func testTenantStatusCancelSession(t *testing.T, helper serverccl.TenantTestHelp
// No longer see the session over SQL from tenant SQL pod 0.
// (The SQL client maintains an internal connection pool and automatically reconnects.)
require.Eventually(t, func() bool {
- return !strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID)
+ return !strings.Contains(strings.Join(selectClusterSessionIDs(t, introspectionRunner), ","), sessionID)
}, 5*time.Second, 100*time.Millisecond)
// Attempt to cancel the session again over HTTP from tenant SQL pod 1, so that we can see the error message.
diff --git a/pkg/cli/testdata/zip/testzip_fallback b/pkg/cli/testdata/zip/testzip_fallback
new file mode 100644
index 000000000000..4d4ba0fa0a09
--- /dev/null
+++ b/pkg/cli/testdata/zip/testzip_fallback
@@ -0,0 +1,275 @@
+----
+debug zip --concurrency=1 --cpu-profile-duration=1s /dev/null
+[cluster] discovering virtual clusters... done
+[cluster] creating output file /dev/null... done
+[cluster] establishing RPC connection to ...
+[cluster] using SQL address: ...
+[cluster] requesting data for debug/events... received response... writing JSON output: debug/events.json... done
+[cluster] requesting data for debug/rangelog... received response... writing JSON output: debug/rangelog.json... done
+[cluster] requesting data for debug/settings... received response... writing JSON output: debug/settings.json... done
+[cluster] requesting data for debug/reports/problemranges... received response... writing JSON output: debug/reports/problemranges.json... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_checkpoints... writing output: debug/crdb_internal.cluster_replication_node_stream_checkpoints.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_spans... writing output: debug/crdb_internal.cluster_replication_node_stream_spans.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_streams... writing output: debug/crdb_internal.cluster_replication_node_streams.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_spans... writing output: debug/crdb_internal.cluster_replication_spans.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_function_statements... writing output: debug/crdb_internal.create_function_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_procedure_statements... writing output: debug/crdb_internal.create_procedure_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_schema_statements... writing output: debug/crdb_internal.create_schema_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_statements... writing output: debug/crdb_internal.create_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_type_statements... writing output: debug/crdb_internal.create_type_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.logical_replication_node_processors... writing output: debug/crdb_internal.logical_replication_node_processors.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_contention_events... writing output: debug/crdb_internal.cluster_contention_events.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_database_privileges... writing output: debug/crdb_internal.cluster_database_privileges.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_distsql_flows... writing output: debug/crdb_internal.cluster_distsql_flows.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_locks... writing output: debug/crdb_internal.cluster_locks.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_queries... writing output: debug/crdb_internal.cluster_queries.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_sessions... writing output: debug/crdb_internal.cluster_sessions.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_settings... writing output: debug/crdb_internal.cluster_settings.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_transactions... writing output: debug/crdb_internal.cluster_transactions.txt... done
+[cluster] retrieving SQL data for crdb_internal.default_privileges... writing output: debug/crdb_internal.default_privileges.txt... done
+[cluster] retrieving SQL data for crdb_internal.index_usage_statistics... writing output: debug/crdb_internal.index_usage_statistics.txt... done
+[cluster] retrieving SQL data for crdb_internal.invalid_objects... writing output: debug/crdb_internal.invalid_objects.txt... done
+[cluster] retrieving SQL data for crdb_internal.jobs... writing output: debug/crdb_internal.jobs.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_node_liveness... writing output: debug/crdb_internal.kv_node_liveness.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_node_status... writing output: debug/crdb_internal.kv_node_status.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_protected_ts_records... writing output: debug/crdb_internal.kv_protected_ts_records.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_store_status... writing output: debug/crdb_internal.kv_store_status.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_system_privileges... writing output: debug/crdb_internal.kv_system_privileges.txt... done
+[cluster] retrieving SQL data for crdb_internal.partitions... writing output: debug/crdb_internal.partitions.txt... done
+[cluster] retrieving SQL data for crdb_internal.probe_ranges_1s_read_limit_100... writing output: debug/crdb_internal.probe_ranges_1s_read_limit_100.txt... done
+[cluster] retrieving SQL data for crdb_internal.regions... writing output: debug/crdb_internal.regions.txt... done
+[cluster] retrieving SQL data for crdb_internal.schema_changes... writing output: debug/crdb_internal.schema_changes.txt... done
+[cluster] retrieving SQL data for crdb_internal.super_regions... writing output: debug/crdb_internal.super_regions.txt... done
+[cluster] retrieving SQL data for crdb_internal.system_jobs... writing output: debug/crdb_internal.system_jobs.txt... done
+[cluster] retrieving SQL data for crdb_internal.table_indexes... writing output: debug/crdb_internal.table_indexes.txt... done
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events... writing output: debug/crdb_internal.transaction_contention_events.txt...
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: last request failed: ERROR: column "fail" does not exist (SQLSTATE 42703)
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: creating error output: debug/crdb_internal.transaction_contention_events.txt.err.txt... done
+[cluster] retrieving SQL data for crdb_internal.zones... writing output: debug/crdb_internal.zones.txt... done
+[cluster] retrieving SQL data for system.database_role_settings... writing output: debug/system.database_role_settings.txt... done
+[cluster] retrieving SQL data for system.descriptor... writing output: debug/system.descriptor.txt... done
+[cluster] retrieving SQL data for system.eventlog... writing output: debug/system.eventlog.txt... done
+[cluster] retrieving SQL data for system.external_connections... writing output: debug/system.external_connections.txt... done
+[cluster] retrieving SQL data for system.job_info... writing output: debug/system.job_info.txt... done
+[cluster] retrieving SQL data for system.jobs... writing output: debug/system.jobs.txt... done
+[cluster] retrieving SQL data for system.lease... writing output: debug/system.lease.txt... done
+[cluster] retrieving SQL data for system.locations... writing output: debug/system.locations.txt... done
+[cluster] retrieving SQL data for system.migrations... writing output: debug/system.migrations.txt... done
+[cluster] retrieving SQL data for system.namespace... writing output: debug/system.namespace.txt... done
+[cluster] retrieving SQL data for system.privileges... writing output: debug/system.privileges.txt... done
+[cluster] retrieving SQL data for system.protected_ts_meta... writing output: debug/system.protected_ts_meta.txt... done
+[cluster] retrieving SQL data for system.protected_ts_records... writing output: debug/system.protected_ts_records.txt... done
+[cluster] retrieving SQL data for system.rangelog... writing output: debug/system.rangelog.txt... done
+[cluster] retrieving SQL data for system.replication_constraint_stats... writing output: debug/system.replication_constraint_stats.txt... done
+[cluster] retrieving SQL data for system.replication_critical_localities... writing output: debug/system.replication_critical_localities.txt... done
+[cluster] retrieving SQL data for system.replication_stats... writing output: debug/system.replication_stats.txt... done
+[cluster] retrieving SQL data for system.reports_meta... writing output: debug/system.reports_meta.txt... done
+[cluster] retrieving SQL data for system.role_id_seq... writing output: debug/system.role_id_seq.txt... done
+[cluster] retrieving SQL data for system.role_members... writing output: debug/system.role_members.txt... done
+[cluster] retrieving SQL data for system.role_options... writing output: debug/system.role_options.txt... done
+[cluster] retrieving SQL data for system.scheduled_jobs... writing output: debug/system.scheduled_jobs.txt... done
+[cluster] retrieving SQL data for system.settings... writing output: debug/system.settings.txt... done
+[cluster] retrieving SQL data for system.span_configurations... writing output: debug/system.span_configurations.txt... done
+[cluster] retrieving SQL data for system.sql_instances... writing output: debug/system.sql_instances.txt... done
+[cluster] retrieving SQL data for system.sql_stats_cardinality... writing output: debug/system.sql_stats_cardinality.txt... done
+[cluster] retrieving SQL data for system.sqlliveness... writing output: debug/system.sqlliveness.txt... done
+[cluster] retrieving SQL data for system.statement_diagnostics... writing output: debug/system.statement_diagnostics.txt... done
+[cluster] retrieving SQL data for system.statement_diagnostics_requests... writing output: debug/system.statement_diagnostics_requests.txt... done
+[cluster] retrieving SQL data for system.statement_statistics_limit_5000... writing output: debug/system.statement_statistics_limit_5000.txt... done
+[cluster] retrieving SQL data for system.table_statistics... writing output: debug/system.table_statistics.txt... done
+[cluster] retrieving SQL data for system.task_payloads... writing output: debug/system.task_payloads.txt... done
+[cluster] retrieving SQL data for system.tenant_settings... writing output: debug/system.tenant_settings.txt... done
+[cluster] retrieving SQL data for system.tenant_tasks... writing output: debug/system.tenant_tasks.txt... done
+[cluster] retrieving SQL data for system.tenant_usage... writing output: debug/system.tenant_usage.txt... done
+[cluster] retrieving SQL data for system.tenants... writing output: debug/system.tenants.txt... done
+[cluster] requesting nodes... received response... writing JSON output: debug/nodes.json... done
+[cluster] requesting liveness... received response... writing JSON output: debug/liveness.json... done
+[cluster] requesting tenant ranges... received response...
+[cluster] requesting tenant ranges: last request failed: rpc error: ...
+[cluster] requesting tenant ranges: creating error output: debug/tenant_ranges.err.txt... done
+[cluster] collecting the inflight traces for jobs... received response... done
+[cluster] requesting CPU profiles
+[cluster] profiles generated
+[cluster] profile for node 1... writing binary output: debug/nodes/1/cpu.pprof... done
+[node 1] node status... writing JSON output: debug/nodes/1/status.json... done
+[node 1] using SQL connection URL: postgresql://...
+[node 1] retrieving SQL data for crdb_internal.active_range_feeds... writing output: debug/nodes/1/crdb_internal.active_range_feeds.txt... done
+[node 1] retrieving SQL data for crdb_internal.feature_usage... writing output: debug/nodes/1/crdb_internal.feature_usage.txt... done
+[node 1] retrieving SQL data for crdb_internal.gossip_alerts... writing output: debug/nodes/1/crdb_internal.gossip_alerts.txt... done
+[node 1] retrieving SQL data for crdb_internal.gossip_liveness... writing output: debug/nodes/1/crdb_internal.gossip_liveness.txt... done
+[node 1] retrieving SQL data for crdb_internal.gossip_nodes... writing output: debug/nodes/1/crdb_internal.gossip_nodes.txt... done
+[node 1] retrieving SQL data for crdb_internal.kv_session_based_leases... writing output: debug/nodes/1/crdb_internal.kv_session_based_leases.txt... done
+[node 1] retrieving SQL data for crdb_internal.leases... writing output: debug/nodes/1/crdb_internal.leases.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_build_info... writing output: debug/nodes/1/crdb_internal.node_build_info.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_contention_events... writing output: debug/nodes/1/crdb_internal.node_contention_events.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_distsql_flows... writing output: debug/nodes/1/crdb_internal.node_distsql_flows.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_execution_insights... writing output: debug/nodes/1/crdb_internal.node_execution_insights.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_inflight_trace_spans... writing output: debug/nodes/1/crdb_internal.node_inflight_trace_spans.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_memory_monitors... writing output: debug/nodes/1/crdb_internal.node_memory_monitors.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_metrics... writing output: debug/nodes/1/crdb_internal.node_metrics.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_queries... writing output: debug/nodes/1/crdb_internal.node_queries.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_runtime_info... writing output: debug/nodes/1/crdb_internal.node_runtime_info.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_sessions... writing output: debug/nodes/1/crdb_internal.node_sessions.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_statement_statistics... writing output: debug/nodes/1/crdb_internal.node_statement_statistics.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_tenant_capabilities_cache... writing output: debug/nodes/1/crdb_internal.node_tenant_capabilities_cache.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_transaction_statistics... writing output: debug/nodes/1/crdb_internal.node_transaction_statistics.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_transactions... writing output: debug/nodes/1/crdb_internal.node_transactions.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_txn_execution_insights... writing output: debug/nodes/1/crdb_internal.node_txn_execution_insights.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_txn_stats... writing output: debug/nodes/1/crdb_internal.node_txn_stats.txt... done
+[node 1] requesting data for debug/nodes/1/details... received response... writing JSON output: debug/nodes/1/details.json... done
+[node 1] requesting data for debug/nodes/1/gossip... received response... writing JSON output: debug/nodes/1/gossip.json... done
+[node 1] requesting stacks... received response... writing binary output: debug/nodes/1/stacks.txt... done
+[node 1] requesting stacks with labels... received response... writing binary output: debug/nodes/1/stacks_with_labels.txt... done
+[node 1] requesting heap profile... received response... writing binary output: debug/nodes/1/heap.pprof... done
+[node 1] requesting engine stats... received response... writing binary output: debug/nodes/1/lsm.txt... done
+[node 1] requesting heap profile list... received response... done
+[node ?] ? heap profiles found
+[node 1] requesting goroutine dump list... received response... done
+[node ?] ? goroutine dumps found
+[node 1] requesting cpu profile list... received response... done
+[node ?] ? cpu profiles found
+[node 1] requesting log files list... received response... done
+[node ?] ? log files found
+[node 1] requesting ranges... received response... done
+[node 1] writing ranges... writing JSON output: debug/nodes/1/ranges.json... done
+[cluster] pprof summary script... writing binary output: debug/pprof-summary.sh... done
+[cluster] hot range summary script... writing binary output: debug/hot-ranges.sh... done
+[cluster] tenant hot range summary script... writing binary output: debug/hot-ranges-tenant.sh... done
+----
+debug zip --concurrency=1 --cpu-profile-duration=1s /dev/null
+[cluster] discovering virtual clusters... done
+[cluster] creating output file /dev/null... done
+[cluster] establishing RPC connection to ...
+[cluster] using SQL address: ...
+[cluster] requesting data for debug/events... received response... writing JSON output: debug/events.json... done
+[cluster] requesting data for debug/rangelog... received response... writing JSON output: debug/rangelog.json... done
+[cluster] requesting data for debug/settings... received response... writing JSON output: debug/settings.json... done
+[cluster] requesting data for debug/reports/problemranges... received response... writing JSON output: debug/reports/problemranges.json... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_checkpoints... writing output: debug/crdb_internal.cluster_replication_node_stream_checkpoints.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_spans... writing output: debug/crdb_internal.cluster_replication_node_stream_spans.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_streams... writing output: debug/crdb_internal.cluster_replication_node_streams.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_spans... writing output: debug/crdb_internal.cluster_replication_spans.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_function_statements... writing output: debug/crdb_internal.create_function_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_procedure_statements... writing output: debug/crdb_internal.create_procedure_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_schema_statements... writing output: debug/crdb_internal.create_schema_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_statements... writing output: debug/crdb_internal.create_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.create_type_statements... writing output: debug/crdb_internal.create_type_statements.txt... done
+[cluster] retrieving SQL data for "".crdb_internal.logical_replication_node_processors... writing output: debug/crdb_internal.logical_replication_node_processors.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_contention_events... writing output: debug/crdb_internal.cluster_contention_events.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_database_privileges... writing output: debug/crdb_internal.cluster_database_privileges.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_distsql_flows... writing output: debug/crdb_internal.cluster_distsql_flows.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_locks... writing output: debug/crdb_internal.cluster_locks.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_queries... writing output: debug/crdb_internal.cluster_queries.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_sessions... writing output: debug/crdb_internal.cluster_sessions.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_settings... writing output: debug/crdb_internal.cluster_settings.txt... done
+[cluster] retrieving SQL data for crdb_internal.cluster_transactions... writing output: debug/crdb_internal.cluster_transactions.txt... done
+[cluster] retrieving SQL data for crdb_internal.default_privileges... writing output: debug/crdb_internal.default_privileges.txt... done
+[cluster] retrieving SQL data for crdb_internal.index_usage_statistics... writing output: debug/crdb_internal.index_usage_statistics.txt... done
+[cluster] retrieving SQL data for crdb_internal.invalid_objects... writing output: debug/crdb_internal.invalid_objects.txt... done
+[cluster] retrieving SQL data for crdb_internal.jobs... writing output: debug/crdb_internal.jobs.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_node_liveness... writing output: debug/crdb_internal.kv_node_liveness.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_node_status... writing output: debug/crdb_internal.kv_node_status.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_protected_ts_records... writing output: debug/crdb_internal.kv_protected_ts_records.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_store_status... writing output: debug/crdb_internal.kv_store_status.txt... done
+[cluster] retrieving SQL data for crdb_internal.kv_system_privileges... writing output: debug/crdb_internal.kv_system_privileges.txt... done
+[cluster] retrieving SQL data for crdb_internal.partitions... writing output: debug/crdb_internal.partitions.txt... done
+[cluster] retrieving SQL data for crdb_internal.probe_ranges_1s_read_limit_100... writing output: debug/crdb_internal.probe_ranges_1s_read_limit_100.txt... done
+[cluster] retrieving SQL data for crdb_internal.regions... writing output: debug/crdb_internal.regions.txt... done
+[cluster] retrieving SQL data for crdb_internal.schema_changes... writing output: debug/crdb_internal.schema_changes.txt... done
+[cluster] retrieving SQL data for crdb_internal.super_regions... writing output: debug/crdb_internal.super_regions.txt... done
+[cluster] retrieving SQL data for crdb_internal.system_jobs... writing output: debug/crdb_internal.system_jobs.txt... done
+[cluster] retrieving SQL data for crdb_internal.table_indexes... writing output: debug/crdb_internal.table_indexes.txt... done
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events... writing output: debug/crdb_internal.transaction_contention_events.txt...
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: last request failed: ERROR: column "fail" does not exist (SQLSTATE 42703)
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: creating error output: debug/crdb_internal.transaction_contention_events.txt.err.txt... done
+[cluster] retrieving SQL data for crdb_internal.transaction_contention_events (fallback)... writing output: debug/crdb_internal.transaction_contention_events.fallback.txt... done
+[cluster] retrieving SQL data for crdb_internal.zones... writing output: debug/crdb_internal.zones.txt... done
+[cluster] retrieving SQL data for system.database_role_settings... writing output: debug/system.database_role_settings.txt... done
+[cluster] retrieving SQL data for system.descriptor... writing output: debug/system.descriptor.txt... done
+[cluster] retrieving SQL data for system.eventlog... writing output: debug/system.eventlog.txt... done
+[cluster] retrieving SQL data for system.external_connections... writing output: debug/system.external_connections.txt... done
+[cluster] retrieving SQL data for system.job_info... writing output: debug/system.job_info.txt... done
+[cluster] retrieving SQL data for system.jobs... writing output: debug/system.jobs.txt... done
+[cluster] retrieving SQL data for system.lease... writing output: debug/system.lease.txt... done
+[cluster] retrieving SQL data for system.locations... writing output: debug/system.locations.txt... done
+[cluster] retrieving SQL data for system.migrations... writing output: debug/system.migrations.txt... done
+[cluster] retrieving SQL data for system.namespace... writing output: debug/system.namespace.txt... done
+[cluster] retrieving SQL data for system.privileges... writing output: debug/system.privileges.txt... done
+[cluster] retrieving SQL data for system.protected_ts_meta... writing output: debug/system.protected_ts_meta.txt... done
+[cluster] retrieving SQL data for system.protected_ts_records... writing output: debug/system.protected_ts_records.txt... done
+[cluster] retrieving SQL data for system.rangelog... writing output: debug/system.rangelog.txt... done
+[cluster] retrieving SQL data for system.replication_constraint_stats... writing output: debug/system.replication_constraint_stats.txt... done
+[cluster] retrieving SQL data for system.replication_critical_localities... writing output: debug/system.replication_critical_localities.txt... done
+[cluster] retrieving SQL data for system.replication_stats... writing output: debug/system.replication_stats.txt... done
+[cluster] retrieving SQL data for system.reports_meta... writing output: debug/system.reports_meta.txt... done
+[cluster] retrieving SQL data for system.role_id_seq... writing output: debug/system.role_id_seq.txt... done
+[cluster] retrieving SQL data for system.role_members... writing output: debug/system.role_members.txt... done
+[cluster] retrieving SQL data for system.role_options... writing output: debug/system.role_options.txt... done
+[cluster] retrieving SQL data for system.scheduled_jobs... writing output: debug/system.scheduled_jobs.txt... done
+[cluster] retrieving SQL data for system.settings... writing output: debug/system.settings.txt... done
+[cluster] retrieving SQL data for system.span_configurations... writing output: debug/system.span_configurations.txt... done
+[cluster] retrieving SQL data for system.sql_instances... writing output: debug/system.sql_instances.txt... done
+[cluster] retrieving SQL data for system.sql_stats_cardinality... writing output: debug/system.sql_stats_cardinality.txt... done
+[cluster] retrieving SQL data for system.sqlliveness... writing output: debug/system.sqlliveness.txt... done
+[cluster] retrieving SQL data for system.statement_diagnostics... writing output: debug/system.statement_diagnostics.txt... done
+[cluster] retrieving SQL data for system.statement_diagnostics_requests... writing output: debug/system.statement_diagnostics_requests.txt... done
+[cluster] retrieving SQL data for system.statement_statistics_limit_5000... writing output: debug/system.statement_statistics_limit_5000.txt... done
+[cluster] retrieving SQL data for system.table_statistics... writing output: debug/system.table_statistics.txt... done
+[cluster] retrieving SQL data for system.task_payloads... writing output: debug/system.task_payloads.txt... done
+[cluster] retrieving SQL data for system.tenant_settings... writing output: debug/system.tenant_settings.txt... done
+[cluster] retrieving SQL data for system.tenant_tasks... writing output: debug/system.tenant_tasks.txt... done
+[cluster] retrieving SQL data for system.tenant_usage... writing output: debug/system.tenant_usage.txt... done
+[cluster] retrieving SQL data for system.tenants... writing output: debug/system.tenants.txt... done
+[cluster] requesting nodes... received response... writing JSON output: debug/nodes.json... done
+[cluster] requesting liveness... received response... writing JSON output: debug/liveness.json... done
+[cluster] requesting tenant ranges... received response...
+[cluster] requesting tenant ranges: last request failed: rpc error: ...
+[cluster] requesting tenant ranges: creating error output: debug/tenant_ranges.err.txt... done
+[cluster] collecting the inflight traces for jobs... received response... done
+[cluster] requesting CPU profiles
+[cluster] profiles generated
+[cluster] profile for node 1... writing binary output: debug/nodes/1/cpu.pprof... done
+[node 1] node status... writing JSON output: debug/nodes/1/status.json... done
+[node 1] using SQL connection URL: postgresql://...
+[node 1] retrieving SQL data for crdb_internal.active_range_feeds... writing output: debug/nodes/1/crdb_internal.active_range_feeds.txt... done
+[node 1] retrieving SQL data for crdb_internal.feature_usage... writing output: debug/nodes/1/crdb_internal.feature_usage.txt... done
+[node 1] retrieving SQL data for crdb_internal.gossip_alerts... writing output: debug/nodes/1/crdb_internal.gossip_alerts.txt... done
+[node 1] retrieving SQL data for crdb_internal.gossip_liveness... writing output: debug/nodes/1/crdb_internal.gossip_liveness.txt... done
+[node 1] retrieving SQL data for crdb_internal.gossip_nodes... writing output: debug/nodes/1/crdb_internal.gossip_nodes.txt... done
+[node 1] retrieving SQL data for crdb_internal.kv_session_based_leases... writing output: debug/nodes/1/crdb_internal.kv_session_based_leases.txt... done
+[node 1] retrieving SQL data for crdb_internal.leases... writing output: debug/nodes/1/crdb_internal.leases.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_build_info... writing output: debug/nodes/1/crdb_internal.node_build_info.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_contention_events... writing output: debug/nodes/1/crdb_internal.node_contention_events.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_distsql_flows... writing output: debug/nodes/1/crdb_internal.node_distsql_flows.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_execution_insights... writing output: debug/nodes/1/crdb_internal.node_execution_insights.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_inflight_trace_spans... writing output: debug/nodes/1/crdb_internal.node_inflight_trace_spans.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_memory_monitors... writing output: debug/nodes/1/crdb_internal.node_memory_monitors.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_metrics... writing output: debug/nodes/1/crdb_internal.node_metrics.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_queries... writing output: debug/nodes/1/crdb_internal.node_queries.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_runtime_info... writing output: debug/nodes/1/crdb_internal.node_runtime_info.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_sessions... writing output: debug/nodes/1/crdb_internal.node_sessions.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_statement_statistics... writing output: debug/nodes/1/crdb_internal.node_statement_statistics.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_tenant_capabilities_cache... writing output: debug/nodes/1/crdb_internal.node_tenant_capabilities_cache.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_transaction_statistics... writing output: debug/nodes/1/crdb_internal.node_transaction_statistics.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_transactions... writing output: debug/nodes/1/crdb_internal.node_transactions.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_txn_execution_insights... writing output: debug/nodes/1/crdb_internal.node_txn_execution_insights.txt... done
+[node 1] retrieving SQL data for crdb_internal.node_txn_stats... writing output: debug/nodes/1/crdb_internal.node_txn_stats.txt... done
+[node 1] requesting data for debug/nodes/1/details... received response... writing JSON output: debug/nodes/1/details.json... done
+[node 1] requesting data for debug/nodes/1/gossip... received response... writing JSON output: debug/nodes/1/gossip.json... done
+[node 1] requesting stacks... received response... writing binary output: debug/nodes/1/stacks.txt... done
+[node 1] requesting stacks with labels... received response... writing binary output: debug/nodes/1/stacks_with_labels.txt... done
+[node 1] requesting heap profile... received response... writing binary output: debug/nodes/1/heap.pprof... done
+[node 1] requesting engine stats... received response... writing binary output: debug/nodes/1/lsm.txt... done
+[node 1] requesting heap profile list... received response... done
+[node ?] ? heap profiles found
+[node 1] requesting goroutine dump list... received response... done
+[node ?] ? goroutine dumps found
+[node 1] requesting cpu profile list... received response... done
+[node ?] ? cpu profiles found
+[node 1] requesting log files list... received response... done
+[node ?] ? log files found
+[node 1] requesting ranges... received response... done
+[node 1] writing ranges... writing JSON output: debug/nodes/1/ranges.json... done
+[cluster] pprof summary script... writing binary output: debug/pprof-summary.sh... done
+[cluster] hot range summary script... writing binary output: debug/hot-ranges.sh... done
+[cluster] tenant hot range summary script... writing binary output: debug/hot-ranges-tenant.sh... done
diff --git a/pkg/cli/zip.go b/pkg/cli/zip.go
index eb5eaa6ccab6..e2c2044b503f 100644
--- a/pkg/cli/zip.go
+++ b/pkg/cli/zip.go
@@ -490,7 +490,7 @@ INNER JOIN latestprogress ON j.id = latestprogress.job_id;`,
// An error is returned by this function if it is unable to write to
// the output file or some other unrecoverable error is encountered.
func (zc *debugZipContext) dumpTableDataForZip(
- zr *zipReporter, conn clisqlclient.Conn, base, table, query string,
+ zr *zipReporter, conn clisqlclient.Conn, base, table string, tableQuery TableQuery,
) error {
ctx := context.Background()
baseName := base + "/" + sanitizeFilename(table)
@@ -498,6 +498,10 @@ func (zc *debugZipContext) dumpTableDataForZip(
s := zr.start("retrieving SQL data for %s", table)
const maxRetries = 5
suffix := ""
+
+ query := tableQuery.query
+ fallback := tableQuery.fallback != ""
+
for numRetries := 1; numRetries <= maxRetries; numRetries++ {
name := baseName + suffix + "." + zc.clusterPrinter.sqlOutputFilenameExtension
s.progress("writing output: %s", name)
@@ -545,7 +549,18 @@ func (zc *debugZipContext) dumpTableDataForZip(
break
}
if pgcode.MakeCode(pgErr.Code) != pgcode.SerializationFailure {
- // A non-retry error. We've printed the error, and
+ // A non-retry error. If we have a fallback, try with that.
+ if fallback {
+ fallback = false
+
+ query = tableQuery.fallback
+ numRetries = 1 // Reset counter since this is a different query.
+ baseName = baseName + ".fallback"
+ s = zr.start("retrieving SQL data for %s (fallback)", table)
+
+ continue
+ }
+ // A non-retry error, no fallback. We've printed the error, and
// there's nothing to retry. Stop here.
break
}
diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go
index 395adfd5d2d9..c8d1913f62d9 100644
--- a/pkg/cli/zip_table_registry.go
+++ b/pkg/cli/zip_table_registry.go
@@ -58,6 +58,15 @@ type TableRegistryConfig struct {
// customQueryRedacted should NOT be a `SELECT * FROM table` type query, as
// this could leak newly added sensitive columns into the output.
customQueryRedacted string
+
+ // customQueryUnredactedFallback is an alternative query that will be
+ // attempted if `customQueryUnredacted` does not return within the
+ // timeout or fails. If empty it will be ignored.
+ customQueryUnredactedFallback string
+ // customQueryRedactedFallback is an alternative query that will be
+ // attempted if `customQueryRedacted` does not return within the
+ // timeout or fails. If empty it will be ignored.
+ customQueryRedactedFallback string
}
// DebugZipTableRegistry is a registry of `crdb_internal` and `system` tables
@@ -74,26 +83,35 @@ type TableRegistryConfig struct {
// may be a way to avoid having to completely omit entire columns.
type DebugZipTableRegistry map[string]TableRegistryConfig
+// TableQuery holds two sql query strings together that are used to
+// dump tables when generating a debug zip. `query` is the primary
+// query to run, and `fallback` is one to try if the primary fails or
+// times out.
+type TableQuery struct {
+ query string
+ fallback string
+}
+
// QueryForTable produces the appropriate query for `debug zip` for the given
// table to use, taking redaction into account. If the provided tableName does
// not exist in the registry, or no redacted config exists in the registry for
// the tableName, an error is returned.
-func (r DebugZipTableRegistry) QueryForTable(tableName string, redact bool) (string, error) {
+func (r DebugZipTableRegistry) QueryForTable(tableName string, redact bool) (TableQuery, error) {
tableConfig, ok := r[tableName]
if !ok {
- return "", errors.Newf("no entry found in table registry for: %s", tableName)
+ return TableQuery{}, errors.Newf("no entry found in table registry for: %s", tableName)
}
if !redact {
if tableConfig.customQueryUnredacted != "" {
- return tableConfig.customQueryUnredacted, nil
+ return TableQuery{tableConfig.customQueryUnredacted, tableConfig.customQueryUnredactedFallback}, nil
}
- return fmt.Sprintf("TABLE %s", tableName), nil
+ return TableQuery{fmt.Sprintf("TABLE %s", tableName), ""}, nil
}
if tableConfig.customQueryRedacted != "" {
- return tableConfig.customQueryRedacted, nil
+ return TableQuery{tableConfig.customQueryRedacted, tableConfig.customQueryRedactedFallback}, nil
}
if len(tableConfig.nonSensitiveCols) == 0 {
- return "", errors.Newf("requested redacted query for table %s, but no non-sensitive columns defined", tableName)
+ return TableQuery{}, errors.Newf("requested redacted query for table %s, but no non-sensitive columns defined", tableName)
}
var colsString strings.Builder
for i, colName := range tableConfig.nonSensitiveCols {
@@ -103,7 +121,7 @@ func (r DebugZipTableRegistry) QueryForTable(tableName string, redact bool) (str
colsString.WriteString(", ")
}
}
- return fmt.Sprintf("SELECT %s FROM %s", colsString.String(), tableName), nil
+ return TableQuery{fmt.Sprintf("SELECT %s FROM %s", colsString.String(), tableName), ""}, nil
}
// GetTables returns all the table names within the registry. Useful for
@@ -611,6 +629,20 @@ WHERE ss.transaction_fingerprint_id != '\x0000000000000000' AND s.fingerprint_id
GROUP BY collection_ts, contention_duration, waiting_txn_id, waiting_txn_fingerprint_id, blocking_txn_id,
blocking_txn_fingerprint_id, waiting_stmt_fingerprint_id, contending_pretty_key, s.metadata ->> 'query',
index_name, table_name, database_name
+`,
+ customQueryUnredactedFallback: `
+SELECT collection_ts,
+ contention_duration,
+ waiting_txn_id,
+ waiting_txn_fingerprint_id,
+ waiting_stmt_fingerprint_id,
+ blocking_txn_id,
+ blocking_txn_fingerprint_id,
+ contending_pretty_key,
+ index_name,
+ table_name,
+ database_name
+FROM crdb_internal.transaction_contention_events
`,
// `contending_key` column contains the contended key, which may
// contain sensitive row-level data. So, we will only fetch if the
diff --git a/pkg/cli/zip_table_registry_test.go b/pkg/cli/zip_table_registry_test.go
index 643624825708..1f88be28746a 100644
--- a/pkg/cli/zip_table_registry_test.go
+++ b/pkg/cli/zip_table_registry_test.go
@@ -42,6 +42,11 @@ func TestQueryForTable(t *testing.T) {
nonSensitiveCols: NonSensitiveColumns{"x", "crdb_internal.pretty_key(y, 0) as y", "z"},
customQueryUnredacted: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query",
},
+ "table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback": {
+ nonSensitiveCols: NonSensitiveColumns{"x", "crdb_internal.pretty_key(y, 0) as y", "z"},
+ customQueryUnredacted: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback",
+ customQueryUnredactedFallback: "SELECT x FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback",
+ },
}
t.Run("errors if no table config present in registry", func(t *testing.T) {
@@ -53,7 +58,7 @@ func TestQueryForTable(t *testing.T) {
t.Run("produces `TABLE` query when unredacted with no custom query", func(t *testing.T) {
table := "table_with_sensitive_cols"
- expected := "TABLE table_with_sensitive_cols"
+ expected := TableQuery{query: "TABLE table_with_sensitive_cols"}
actual, err := reg.QueryForTable(table, false /* redact */)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
@@ -61,7 +66,7 @@ func TestQueryForTable(t *testing.T) {
t.Run("produces custom query when unredacted and custom query supplied", func(t *testing.T) {
table := "table_with_custom_queries"
- expected := "SELECT * FROM table_with_custom_queries"
+ expected := TableQuery{query: "SELECT * FROM table_with_custom_queries"}
actual, err := reg.QueryForTable(table, false /* redact */)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
@@ -69,7 +74,7 @@ func TestQueryForTable(t *testing.T) {
t.Run("produces query with only non-sensitive columns when redacted and no custom query", func(t *testing.T) {
table := "table_with_sensitive_cols"
- expected := `SELECT x, y, z FROM table_with_sensitive_cols`
+ expected := TableQuery{query: `SELECT x, y, z FROM table_with_sensitive_cols`}
actual, err := reg.QueryForTable(table, true /* redact */)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
@@ -77,7 +82,7 @@ func TestQueryForTable(t *testing.T) {
t.Run("produces custom when redacted and custom query supplied", func(t *testing.T) {
table := "table_with_custom_queries"
- expected := "SELECT a, b, c FROM table_with_custom_queries"
+ expected := TableQuery{query: "SELECT a, b, c FROM table_with_custom_queries"}
actual, err := reg.QueryForTable(table, true /* redact */)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
@@ -93,7 +98,7 @@ func TestQueryForTable(t *testing.T) {
t.Run("produces query when a combination of nonSensitiveCols and customQueryUnredacted is supplied", func(t *testing.T) {
table := "table_with_non_sensitive_cols_and_custom_unredacted_query"
- expected := "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query"
+ expected := TableQuery{query: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query"}
t.Run("with redact flag", func(t *testing.T) {
actual, err := reg.QueryForTable(table, true /* redact */)
@@ -107,6 +112,17 @@ func TestQueryForTable(t *testing.T) {
assert.Equal(t, expected, actual)
})
})
+
+ t.Run("with fallback query", func(t *testing.T) {
+ table := "table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback"
+ expected := TableQuery{
+ query: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback",
+ fallback: "SELECT x FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback",
+ }
+ actual, err := reg.QueryForTable(table, false /* redact */)
+ assert.NoError(t, err)
+ assert.Equal(t, expected, actual)
+ })
}
func TestNoForbiddenSystemTablesInDebugZip(t *testing.T) {
@@ -125,8 +141,8 @@ func TestNoForbiddenSystemTablesInDebugZip(t *testing.T) {
"system.transaction_activity",
}
for _, forbiddenTable := range forbiddenSysTables {
- query, err := zipSystemTables.QueryForTable(forbiddenTable, false /* redact */)
- assert.Equal(t, "", query)
+ tableQuery, err := zipSystemTables.QueryForTable(forbiddenTable, false /* redact */)
+ assert.Equal(t, "", tableQuery.query)
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("no entry found in table registry for: %s", forbiddenTable), err.Error())
}
diff --git a/pkg/cli/zip_test.go b/pkg/cli/zip_test.go
index ea718146c39d..7dc13c3fc193 100644
--- a/pkg/cli/zip_test.go
+++ b/pkg/cli/zip_test.go
@@ -177,6 +177,46 @@ func TestZip(t *testing.T) {
})
}
+// This tests the operation of zip over secure clusters.
+func TestZipQueryFallback(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ skip.UnderRace(t, "test too slow under race")
+
+ existing := zipInternalTablesPerCluster["crdb_internal.transaction_contention_events"]
+ zipInternalTablesPerCluster["crdb_internal.transaction_contention_events"] = TableRegistryConfig{
+ nonSensitiveCols: existing.nonSensitiveCols,
+ // We want this to fail to trigger the fallback.
+ customQueryUnredacted: "SELECT FAIL;",
+ customQueryUnredactedFallback: existing.customQueryUnredactedFallback,
+ }
+
+ dir, cleanupFn := testutils.TempDir(t)
+ defer cleanupFn()
+
+ c := NewCLITest(TestCLIParams{
+ StoreSpecs: []base.StoreSpec{{
+ Path: dir,
+ }},
+ })
+ defer c.Cleanup()
+
+ out, err := c.RunWithCapture("debug zip --concurrency=1 --cpu-profile-duration=1s " + os.DevNull)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Strip any non-deterministic messages.
+ out = eraseNonDeterministicZipOutput(out)
+
+ // We use datadriven simply to read the golden output file; we don't actually
+ // run any commands. Using datadriven allows TESTFLAGS=-rewrite.
+ datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "testzip_fallback"), func(t *testing.T, td *datadriven.TestData) string {
+ return out
+ })
+}
+
// This tests the operation of redacted zip over secure clusters.
func TestZipRedacted(t *testing.T) {
defer leaktest.AfterTest(t)()
@@ -801,7 +841,7 @@ func TestZipRetries(t *testing.T) {
sqlConn,
"test",
`generate_series(1,15000) as t(x)`,
- `select if(x<11000,x,crdb_internal.force_retry('1h')) from generate_series(1,15000) as t(x)`,
+ TableQuery{query: `select if(x<11000,x,crdb_internal.force_retry('1h')) from generate_series(1,15000) as t(x)`},
); err != nil {
t.Fatal(err)
}
@@ -830,6 +870,95 @@ test/generate_series(1,15000) as t(x).4.json.err.txt
assert.Equal(t, expected, fileList.String())
}
+// This checks that SQL retry errors are properly handled.
+func TestZipFallback(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ s := serverutils.StartServerOnly(t, base.TestServerArgs{Insecure: true})
+ defer s.Stopper().Stop(context.Background())
+
+ dir, cleanupFn := testutils.TempDir(t)
+ defer cleanupFn()
+
+ zipName := filepath.Join(dir, "test.zip")
+
+ func() {
+ out, err := os.Create(zipName)
+ if err != nil {
+ t.Fatal(err)
+ }
+ z := newZipper(out)
+ defer func() {
+ if err := z.close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // Lower the buffer size so that an error is returned when running the
+ // generate_series query.
+ sqlURL := url.URL{
+ Scheme: "postgres",
+ User: url.User(username.RootUser),
+ Host: s.AdvSQLAddr(),
+ }
+ sqlConn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, sqlURL.String())
+ defer func() {
+ if err := sqlConn.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ zr := zipCtx.newZipReporter("test")
+ zr.sqlOutputFilenameExtension = "json"
+ zc := debugZipContext{
+ z: z,
+ clusterPrinter: zr,
+ timeout: 3 * time.Second,
+ }
+ if err := zc.dumpTableDataForZip(
+ zr,
+ sqlConn,
+ "test",
+ `test_table_fail`,
+ TableQuery{
+ query: `SELECT blah`,
+ },
+ ); err != nil {
+ t.Fatal(err)
+ }
+ if err := zc.dumpTableDataForZip(
+ zr,
+ sqlConn,
+ "test",
+ `test_table_succeed`,
+ TableQuery{
+ query: `SELECT blah`,
+ fallback: `SELECT 1`,
+ },
+ ); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ r, err := zip.OpenReader(zipName)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() { _ = r.Close() }()
+ var fileList bytes.Buffer
+ for _, f := range r.File {
+ fmt.Fprintln(&fileList, f.Name)
+ }
+ const expected = `test/test_table_fail.json
+test/test_table_fail.json.err.txt
+test/test_table_succeed.json
+test/test_table_succeed.json.err.txt
+test/test_table_succeed.fallback.json
+`
+ assert.Equal(t, expected, fileList.String())
+}
+
// This test the operation of zip over secure clusters.
func TestToHex(t *testing.T) {
defer leaktest.AfterTest(t)()
diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
index e5ba37ce8890..5faedb453121 100644
--- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
+++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
@@ -1638,6 +1638,8 @@ func (c *channelSink) Context() context.Context {
return c.ctx
}
+func (c *channelSink) SendIsThreadSafe() {}
+
func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error {
select {
case c.ch <- e:
diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go
index 9180a09107d6..0507872f9575 100644
--- a/pkg/kv/kvpb/api.go
+++ b/pkg/kv/kvpb/api.go
@@ -2518,8 +2518,15 @@ func (s *ScanStats) String() string {
// RangeFeedEventSink is an interface for sending a single rangefeed event.
type RangeFeedEventSink interface {
+ // Context returns the context for this stream.
Context() context.Context
+ // Send blocks until it sends the RangeFeedEvent, the stream is done, or the
+ // stream breaks. Send must be safe to call on the same stream in different
+ // goroutines.
Send(*RangeFeedEvent) error
+ // SendIsThreadSafe is a no-op declaration method. It is a contract that the
+ // interface has a thread-safe Send method.
+ SendIsThreadSafe()
}
// RangeFeedEventProducer is an adapter for receiving rangefeed events with either
diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel
index 30788d39ca78..12c45129b105 100644
--- a/pkg/kv/kvserver/BUILD.bazel
+++ b/pkg/kv/kvserver/BUILD.bazel
@@ -74,6 +74,7 @@ go_library(
"replica_send.go",
"replica_split_load.go",
"replica_sst_snapshot_storage.go",
+ "replica_store_liveness.go",
"replica_tscache.go",
"replica_write.go",
"replicate_queue.go",
@@ -166,6 +167,8 @@ go_library(
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/split",
"//pkg/kv/kvserver/stateloader",
+ "//pkg/kv/kvserver/storeliveness",
+ "//pkg/kv/kvserver/storeliveness/storelivenesspb",
"//pkg/kv/kvserver/tenantrate",
"//pkg/kv/kvserver/tscache",
"//pkg/kv/kvserver/txnrecovery",
@@ -176,6 +179,7 @@ go_library(
"//pkg/multitenant/tenantcostmodel",
"//pkg/raft",
"//pkg/raft/raftpb",
+ "//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/roachpb",
"//pkg/rpc",
diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go
index dc1950ee21ca..356bf691ce94 100644
--- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go
+++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go
@@ -457,6 +457,8 @@ func (s *dummyStream) Context() context.Context {
return s.ctx
}
+func (s *dummyStream) SendIsThreadSafe() {}
+
func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error {
if ev.Val == nil && ev.Error == nil {
return nil
diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go
index 50a2c39cdfec..af59ca71bf5e 100644
--- a/pkg/kv/kvserver/rangefeed/bench_test.go
+++ b/pkg/kv/kvserver/rangefeed/bench_test.go
@@ -207,3 +207,7 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
s.events++
return nil
}
+
+// Note that Send itself is not thread-safe, but it is written to be used only
+// in a single threaded environment in this test, ensuring thread-safety.
+func (s *noopStream) SendIsThreadSafe() {}
diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go
index 954b179ebcb2..b0e30c67c112 100644
--- a/pkg/kv/kvserver/rangefeed/processor_test.go
+++ b/pkg/kv/kvserver/rangefeed/processor_test.go
@@ -1811,6 +1811,8 @@ func newConsumer(blockAfter int) *consumer {
}
}
+func (c *consumer) SendIsThreadSafe() {}
+
func (c *consumer) Send(e *kvpb.RangeFeedEvent) error {
if e.Val != nil {
v := int(atomic.AddInt32(&c.sentValues, 1))
diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go
index a65cbf36a3fe..532485616962 100644
--- a/pkg/kv/kvserver/rangefeed/registry.go
+++ b/pkg/kv/kvserver/rangefeed/registry.go
@@ -30,11 +30,7 @@ import (
// Stream is a object capable of transmitting RangeFeedEvents.
type Stream interface {
- // Context returns the context for this stream.
- Context() context.Context
- // Send blocks until it sends m, the stream is done, or the stream breaks.
- // Send must be safe to call on the same stream in different goroutines.
- Send(*kvpb.RangeFeedEvent) error
+ kvpb.RangeFeedEventSink
}
// Shared event is an entry stored in registration channel. Each entry is
diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go
index f3e43fc36a89..4b3e76dede3a 100644
--- a/pkg/kv/kvserver/rangefeed/registry_test.go
+++ b/pkg/kv/kvserver/rangefeed/registry_test.go
@@ -65,6 +65,8 @@ func (s *testStream) Cancel() {
s.ctxDone()
}
+func (s *testStream) SendIsThreadSafe() {}
+
func (s *testStream) Send(e *kvpb.RangeFeedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go
index 1ab02c394c19..a8c72337179c 100644
--- a/pkg/kv/kvserver/replica_init.go
+++ b/pkg/kv/kvserver/replica_init.go
@@ -294,11 +294,12 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
ctx := r.AnnotateCtx(context.Background())
rg, err := raft.NewRawNode(newRaftConfig(
ctx,
- raft.Storage((*replicaRaftStorage)(r)),
+ (*replicaRaftStorage)(r),
raftpb.PeerID(r.replicaID),
r.mu.state.RaftAppliedIndex,
r.store.cfg,
&raftLogger{ctx: ctx},
+ (*replicaRLockedStoreLiveness)(r),
))
if err != nil {
return err
diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go
index e11ca36b9a9f..6e6f5475a505 100644
--- a/pkg/kv/kvserver/replica_rangefeed.go
+++ b/pkg/kv/kvserver/replica_rangefeed.go
@@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
- "github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
@@ -122,24 +121,6 @@ const defaultEventChanCap = 4096
var defaultEventChanTimeout = envutil.EnvOrDefaultDuration(
"COCKROACH_RANGEFEED_SEND_TIMEOUT", 50*time.Millisecond)
-// lockedRangefeedStream is an implementation of rangefeed.Stream which provides
-// support for concurrent calls to Send. Note that the default implementation of
-// grpc.Stream is not safe for concurrent calls to Send.
-type lockedRangefeedStream struct {
- wrapped kvpb.RangeFeedEventSink
- sendMu syncutil.Mutex
-}
-
-func (s *lockedRangefeedStream) Context() context.Context {
- return s.wrapped.Context()
-}
-
-func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error {
- s.sendMu.Lock()
- defer s.sendMu.Unlock()
- return s.wrapped.Send(e)
-}
-
// rangefeedTxnPusher is a shim around intentResolver that implements the
// rangefeed.TxnPusher interface.
type rangefeedTxnPusher struct {
@@ -294,8 +275,6 @@ func (r *Replica) RangeFeed(
checkTS = r.Clock().Now()
}
- lockedStream := &lockedRangefeedStream{wrapped: stream}
-
// If we will be using a catch-up iterator, wait for the limiter here before
// locking raftMu.
usingCatchUpIter := false
@@ -352,7 +331,7 @@ func (r *Replica) RangeFeed(
var done future.ErrorFuture
p := r.registerWithRangefeedRaftMuLocked(
- ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, lockedStream, &done,
+ ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, &done,
)
r.raftMu.Unlock()
diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go
index 08d188b65849..2ba4716d6d71 100644
--- a/pkg/kv/kvserver/replica_rangefeed_test.go
+++ b/pkg/kv/kvserver/replica_rangefeed_test.go
@@ -79,6 +79,8 @@ func (s *testStream) Cancel() {
s.cancel()
}
+func (s *testStream) SendIsThreadSafe() {}
+
func (s *testStream) Send(e *kvpb.RangeFeedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/pkg/kv/kvserver/replica_store_liveness.go b/pkg/kv/kvserver/replica_store_liveness.go
new file mode 100644
index 000000000000..d6d4c4aa4291
--- /dev/null
+++ b/pkg/kv/kvserver/replica_store_liveness.go
@@ -0,0 +1,75 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package kvserver
+
+import (
+ slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
+ "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+)
+
+// replicaRLockedStoreLiveness implements the raftstoreliveness.StoreLiveness
+// interface. The interface methods assume that Replica.mu is held in read mode
+// by their callers.
+type replicaRLockedStoreLiveness Replica
+
+var _ raftstoreliveness.StoreLiveness = (*replicaRLockedStoreLiveness)(nil)
+
+func (r *replicaRLockedStoreLiveness) getStoreIdent(replicaID uint64) (slpb.StoreIdent, bool) {
+ r.mu.AssertRHeld()
+ desc, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(replicaID))
+ if !ok {
+ return slpb.StoreIdent{}, false
+ }
+ return slpb.StoreIdent{NodeID: desc.NodeID, StoreID: desc.StoreID}, true
+}
+
+// SupportFor implements the raftstoreliveness.StoreLiveness interface.
+func (r *replicaRLockedStoreLiveness) SupportFor(
+ replicaID uint64,
+) (raftstoreliveness.StoreLivenessEpoch, bool) {
+ storeID, ok := r.getStoreIdent(replicaID)
+ if !ok {
+ return 0, false
+ }
+ epoch, ok := r.store.storeLiveness.SupportFor(storeID)
+ if !ok {
+ return 0, false
+ }
+ return raftstoreliveness.StoreLivenessEpoch(epoch), true
+}
+
+// SupportFrom implements the raftstoreliveness.StoreLiveness interface.
+func (r *replicaRLockedStoreLiveness) SupportFrom(
+ replicaID uint64,
+) (raftstoreliveness.StoreLivenessEpoch, hlc.Timestamp, bool) {
+ storeID, ok := r.getStoreIdent(replicaID)
+ if !ok {
+ return 0, hlc.Timestamp{}, false
+ }
+ epoch, exp, ok := r.store.storeLiveness.SupportFrom(storeID)
+ if !ok {
+ return 0, hlc.Timestamp{}, false
+ }
+ return raftstoreliveness.StoreLivenessEpoch(epoch), exp, true
+}
+
+// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface.
+func (r *replicaRLockedStoreLiveness) SupportFromEnabled() bool {
+ // TODO(nvanbenschoten): hook this up to a version check and cluster setting.
+ return false
+}
+
+// SupportExpired implements the raftstoreliveness.StoreLiveness interface.
+func (r *replicaRLockedStoreLiveness) SupportExpired(ts hlc.Timestamp) bool {
+ return ts.Less(r.store.Clock().Now())
+}
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index aff99ad05f1f..c7300abaea80 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery"
@@ -61,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
+ "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
@@ -387,6 +389,7 @@ func newRaftConfig(
appliedIndex kvpb.RaftIndex,
storeCfg StoreConfig,
logger raft.Logger,
+ storeLiveness raftstoreliveness.StoreLiveness,
) *raft.Config {
return &raft.Config{
ID: id,
@@ -402,6 +405,7 @@ func newRaftConfig(
MaxInflightBytes: storeCfg.RaftMaxInflightBytes,
Storage: strg,
Logger: logger,
+ StoreLiveness: storeLiveness,
// We only set this on replica initialization, so replicas without
// StepDownOnRemoval may remain on 23.2 nodes until they restart. That's
@@ -893,6 +897,7 @@ type Store struct {
metrics *StoreMetrics
intentResolver *intentresolver.IntentResolver
recoveryMgr txnrecovery.Manager
+ storeLiveness storeliveness.Fabric
syncWaiter *logstore.SyncWaiterLoop
raftEntryCache *raftentry.Cache
limiters batcheval.Limiters
@@ -2154,6 +2159,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
)
s.metrics.registry.AddMetricStruct(s.recoveryMgr.Metrics())
+ // TODO(mira): create the store liveness support manager here.
+ // s.storeLiveness = ...
+
s.rangeIDAlloc = idAlloc
now := s.cfg.Clock.Now()
diff --git a/pkg/raft/raftstoreliveness/store_liveness.go b/pkg/raft/raftstoreliveness/store_liveness.go
index fc37bfb913de..758101ba7070 100644
--- a/pkg/raft/raftstoreliveness/store_liveness.go
+++ b/pkg/raft/raftstoreliveness/store_liveness.go
@@ -64,7 +64,9 @@ type StoreLiveness interface {
// active or not, which is what prompts the "SupportFrom" prefix.
SupportFromEnabled() bool
- // SupportInPast returns whether the supplied timestamp is before the current
- // time.
- SupportInPast(ts hlc.Timestamp) bool
+ // SupportExpired returns whether the supplied expiration timestamp is before
+ // the present time and has therefore expired. If the method returns false,
+ // the timestamp is still in the future and still provides support up to that
+ // point in time.
+ SupportExpired(ts hlc.Timestamp) bool
}
diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go
index f9ec1f5baca9..6f7044b8af0d 100644
--- a/pkg/roachpb/data.go
+++ b/pkg/roachpb/data.go
@@ -1853,10 +1853,15 @@ func (l Lease) SafeFormat(w redact.SafePrinter, _ rune) {
return
}
w.Printf("repl=%s seq=%d start=%s", l.Replica, l.Sequence, l.Start)
- if l.Type() == LeaseExpiration {
+ switch l.Type() {
+ case LeaseExpiration:
w.Printf(" exp=%s", l.Expiration)
- } else {
+ case LeaseEpoch:
w.Printf(" epo=%d min-exp=%s", l.Epoch, l.MinExpiration)
+ case LeaseLeader:
+ w.Printf(" term=%d min-exp=%s", l.Term, l.MinExpiration)
+ default:
+ panic("unexpected lease type")
}
w.Printf(" pro=%s", l.ProposedTS)
}
@@ -1883,14 +1888,23 @@ const (
// LeaseEpoch allows range operations while the node liveness epoch
// is equal to the lease epoch.
LeaseEpoch
+ // LeaseLeader allows range operations while the replica is guaranteed
+ // to be the range's raft leader.
+ LeaseLeader
)
// Type returns the lease type.
func (l Lease) Type() LeaseType {
- if l.Epoch == 0 {
- return LeaseExpiration
+ if l.Epoch != 0 && l.Term != 0 {
+ panic("lease cannot have both epoch and term")
+ }
+ if l.Epoch != 0 {
+ return LeaseEpoch
}
- return LeaseEpoch
+ if l.Term != 0 {
+ return LeaseLeader
+ }
+ return LeaseExpiration
}
// Speculative returns true if this lease instance doesn't correspond to a
@@ -1914,7 +1928,9 @@ func (l Lease) Speculative() bool {
// expToEpochEquiv indicates whether an expiration-based lease
// can be considered equivalent to an epoch-based lease during
// a promotion from expiration-based to epoch-based. It is used
-// for mixed-version compatibility.
+// for mixed-version compatibility. No such flag is needed for
+// expiration-based to leader lease promotion, because there is
+// no need for mixed-version compatibility.
//
// NB: Lease.Equivalent is NOT symmetric. For expiration-based
// leases, a lease is equivalent to another with an equal or
@@ -1935,14 +1951,14 @@ func (l Lease) Speculative() bool {
// times are the same, the leases could turn out to be non-equivalent -- in
// that case they will share a start time but not the sequence.
//
-// NB: we do not allow transitions from epoch-based or leader leases (not
-// yet implemented) to expiration-based leases to be equivalent. This was
-// because both of the former lease types don't have an expiration in the
-// lease, while the latter does. We can introduce safety violations by
-// shortening the lease expiration if we allow this transition, since the
-// new lease may not apply at the leaseholder until much after it applies at
-// some other replica, so the leaseholder may continue acting as one based
-// on an old lease, while the other replica has stepped up as leaseholder.
+// NB: we do not allow transitions from epoch-based or leader leases to
+// expiration-based leases to be equivalent. This was because both of the
+// former lease types don't have an expiration in the lease, while the
+// latter does. We can introduce safety violations by shortening the lease
+// expiration if we allow this transition, since the new lease may not apply
+// at the leaseholder until much after it applies at some other replica, so
+// the leaseholder may continue acting as one based on an old lease, while
+// the other replica has stepped up as leaseholder.
func (l Lease) Equivalent(newL Lease, expToEpochEquiv bool) bool {
// Ignore proposed timestamp & deprecated start stasis.
l.ProposedTS, newL.ProposedTS = hlc.ClockTimestamp{}, hlc.ClockTimestamp{}
@@ -1977,6 +1993,17 @@ func (l Lease) Equivalent(newL Lease, expToEpochEquiv bool) bool {
if l.MinExpiration.LessEq(newL.MinExpiration) {
l.MinExpiration, newL.MinExpiration = hlc.Timestamp{}, hlc.Timestamp{}
}
+
+ case LeaseLeader:
+ if l.Term == newL.Term {
+ l.Term, newL.Term = 0, 0
+ }
+ // For leader leases, extensions to the minimum expiration are considered
+ // equivalent.
+ if l.MinExpiration.LessEq(newL.MinExpiration) {
+ l.MinExpiration, newL.MinExpiration = hlc.Timestamp{}, hlc.Timestamp{}
+ }
+
case LeaseExpiration:
switch newL.Type() {
case LeaseEpoch:
@@ -1999,6 +2026,27 @@ func (l Lease) Equivalent(newL Lease, expToEpochEquiv bool) bool {
newL.MinExpiration = hlc.Timestamp{}
}
+ case LeaseLeader:
+ // An expiration-based lease being promoted to a leader lease. This
+ // transition occurs after a successful lease transfer if the setting
+ // kv.transfer_expiration_leases_first.enabled is enabled and leader
+ // leases are in use.
+ //
+ // Expiration-based leases carry a local expiration timestamp. Leader
+ // leases extend their expiration indirectly through the leadership
+ // fortification protocol and associated Store Liveness heartbeats. We
+ // assume that this promotion is only proposed if the leader support
+ // expiration (and associated min expiration) is equal to or later than
+ // previous expiration carried by the expiration-based lease. This is a
+ // case where Equivalent is not commutative, as the reverse transition
+ // (from leader lease to expiration-based) requires a sequence increment.
+ //
+ // Ignore expiration, term, and min expiration. The remaining fields
+ // which are compared are Replica and Start.
+ l.Expiration = nil
+ newL.Term = 0
+ newL.MinExpiration = hlc.Timestamp{}
+
case LeaseExpiration:
// See the comment above, though this field's nullability wasn't
// changed. We nil it out for completeness only.
@@ -2090,6 +2138,9 @@ func (l *Lease) Equal(that interface{}) bool {
if !l.MinExpiration.Equal(&that1.MinExpiration) {
return false
}
+ if l.Term != that1.Term {
+ return false
+ }
return true
}
diff --git a/pkg/roachpb/data.proto b/pkg/roachpb/data.proto
index 718ab027f73d..b3b5217c9f8a 100644
--- a/pkg/roachpb/data.proto
+++ b/pkg/roachpb/data.proto
@@ -704,9 +704,13 @@ message Lease {
(gogoproto.nullable) = false,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];
- // The epoch of the lease holder's node liveness entry. This field is only set
- // for epoch-based leases. If this value is non-zero, the expiration field and
- // the term field (not yet implemented) are ignored.
+ // The epoch of the lease holder's node liveness record. The lease inherits
+ // the expiration of the node liveness record for as long as the node liveness
+ // record retains this epoch. The lease is invalid if the node liveness record
+ // is updated with a different epoch.
+ //
+ // This field is only set for epoch-based leases. If this value is non-zero,
+ // the expiration field and the term field must not be set.
int64 epoch = 6;
// A zero-indexed sequence number which is incremented during the acquisition
@@ -726,13 +730,22 @@ message Lease {
// The minimum expiration at which the lease expires, independent of any other
// expiry condition. This field can be used to place a floor on the expiration
- // for epoch-based leases and leader leases (not yet implemented) to prevent
- // expiration regressions when upgrading from an expiration-based lease. It is
- // not supported for expiration-based leases.
+ // for epoch-based leases and leader leases to prevent expiration regressions
+ // when upgrading from an expiration-based lease. It is not supported for
+ // expiration-based leases.
//
// Like expiration above, this is an exclusive value, i.e. the lease is valid
// in [start, max(min_expiration, )).
util.hlc.Timestamp min_expiration = 9 [(gogoproto.nullable) = false];
+
+ // The term of the raft leader that a leader lease is associated with. The
+ // lease is valid for as long as the raft leader has a guarantee from store
+ // liveness that it remains the leader under this term. The lease is invalid
+ // if the raft leader loses leadership (i.e. changes its term).
+ //
+ // This field is only set for leader leases. If non-zero, the expiration field
+ // and the epoch field must not be set.
+ uint64 term = 10;
}
// AbortSpanEntry contains information about a transaction which has
diff --git a/pkg/roachpb/data_test.go b/pkg/roachpb/data_test.go
index ff594f6f1196..101a7c34fd72 100644
--- a/pkg/roachpb/data_test.go
+++ b/pkg/roachpb/data_test.go
@@ -1121,6 +1121,22 @@ func TestLeaseStringAndSafeFormat(t *testing.T) {
},
exp: "repl=(n1,s1):1 seq=3 start=0.000000001,1 epo=4 min-exp=0.000000002,1 pro=0.000000001,0",
},
+ {
+ name: "leader",
+ lease: Lease{
+ Replica: ReplicaDescriptor{
+ NodeID: 1,
+ StoreID: 1,
+ ReplicaID: 1,
+ },
+ Start: makeClockTS(1, 1),
+ ProposedTS: makeClockTS(1, 0),
+ Sequence: 3,
+ MinExpiration: makeTS(2, 1),
+ Term: 5,
+ },
+ exp: "repl=(n1,s1):1 seq=3 start=0.000000001,1 term=5 min-exp=0.000000002,1 pro=0.000000001,0",
+ },
} {
t.Run(tc.name, func(t *testing.T) {
// String.
@@ -1133,6 +1149,13 @@ func TestLeaseStringAndSafeFormat(t *testing.T) {
}
}
+func TestLeaseType(t *testing.T) {
+ require.Equal(t, LeaseExpiration, Lease{}.Type())
+ require.Equal(t, LeaseEpoch, Lease{Epoch: 1}.Type())
+ require.Equal(t, LeaseLeader, Lease{Term: 1}.Type())
+ require.Panics(t, func() { Lease{Epoch: 1, Term: 1}.Type() })
+}
+
func TestLeaseEquivalence(t *testing.T) {
r1 := ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
r2 := ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
@@ -1150,6 +1173,11 @@ func TestLeaseEquivalence(t *testing.T) {
expire1TS2 := Lease{Replica: r1, Start: ts2, Expiration: ts2.ToTimestamp().Clone()}
expire2 := Lease{Replica: r1, Start: ts1, Expiration: ts3.ToTimestamp().Clone()}
expire2R2TS2 := Lease{Replica: r2, Start: ts2, Expiration: ts3.ToTimestamp().Clone()}
+ leader1 := Lease{Replica: r1, Start: ts1, Term: 1}
+ leader1R2 := Lease{Replica: r2, Start: ts1, Term: 1}
+ leader1TS2 := Lease{Replica: r1, Start: ts2, Term: 1}
+ leader2 := Lease{Replica: r1, Start: ts1, Term: 2}
+ leader2R2TS2 := Lease{Replica: r2, Start: ts2, Term: 2}
proposed1 := Lease{Replica: r1, Start: ts1, Epoch: 1, ProposedTS: ts1}
proposed2 := Lease{Replica: r1, Start: ts1, Epoch: 2, ProposedTS: ts1}
@@ -1167,6 +1195,9 @@ func TestLeaseEquivalence(t *testing.T) {
epoch1MinExp2 := Lease{Replica: r1, Start: ts1, Epoch: 1, MinExpiration: ts2.ToTimestamp()}
epoch1MinExp3 := Lease{Replica: r1, Start: ts1, Epoch: 1, MinExpiration: ts3.ToTimestamp()}
epoch2MinExp2 := Lease{Replica: r1, Start: ts1, Epoch: 2, MinExpiration: ts2.ToTimestamp()}
+ leader1MinExp2 := Lease{Replica: r1, Start: ts1, Term: 1, MinExpiration: ts2.ToTimestamp()}
+ leader1MinExp3 := Lease{Replica: r1, Start: ts1, Term: 1, MinExpiration: ts3.ToTimestamp()}
+ leader2MinExp2 := Lease{Replica: r1, Start: ts1, Term: 2, MinExpiration: ts2.ToTimestamp()}
testCases := []struct {
l, ol Lease
@@ -1174,6 +1205,7 @@ func TestLeaseEquivalence(t *testing.T) {
}{
{epoch1, epoch1, true}, // same epoch lease
{expire1, expire1, true}, // same expiration lease
+ {leader1, leader1, true}, // same leader lease
{epoch1, epoch1R2, false}, // different epoch leases
{epoch1, epoch1TS2, false}, // different epoch leases
{epoch1, epoch2, false}, // different epoch leases
@@ -1183,12 +1215,20 @@ func TestLeaseEquivalence(t *testing.T) {
{expire1, expire2R2TS2, false}, // different expiration leases
{expire1, expire2, true}, // same expiration lease, extended
{expire2, expire1, false}, // same expiration lease, extended but backwards
+ {leader1, leader1R2, false}, // different leader leases
+ {leader1, leader1TS2, false}, // different leader leases
+ {leader1, leader2, false}, // different leader leases
+ {leader1, leader2R2TS2, false}, // different leader leases
{epoch1, expire1, false}, // epoch and expiration leases, same replica and start time
{epoch1, expire1R2, false}, // epoch and expiration leases, different replica
{epoch1, expire1TS2, false}, // epoch and expiration leases, different start time
{expire1, epoch1, true}, // expiration and epoch leases, same replica and start time
{expire1, epoch1R2, false}, // expiration and epoch leases, different replica
{expire1, epoch1TS2, false}, // expiration and epoch leases, different start time
+ {epoch1, leader1, false}, // epoch and leader leases, same replica and start time
+ {leader1, epoch1, false}, // leader and epoch leases, same replica and start time
+ {expire1, leader1, true}, // expiration and leader leases, same replica and start time
+ {leader1, expire1, false}, // leader and expiration leases, same replica and start time
{proposed1, proposed1, true}, // exact leases with identical timestamps
{proposed1, proposed2, false}, // same proposed timestamps, but diff epochs
{proposed1, proposed3, true}, // different proposed timestamps, same lease
@@ -1196,7 +1236,7 @@ func TestLeaseEquivalence(t *testing.T) {
{epoch1, epoch1Voter, true}, // same epoch lease, different replica type
{epoch1, epoch1Learner, true}, // same epoch lease, different replica type
{epoch1Voter, epoch1Learner, true}, // same epoch lease, different replica type
- // Test minimum expiration.
+ // Test minimum expiration with epoch leases.
{epoch1, epoch1MinExp2, true}, // different epoch leases, newer min expiration
{epoch1, epoch1MinExp3, true}, // different epoch leases, newer min expiration
{epoch1MinExp2, epoch1, false}, // different epoch leases, older min expiration
@@ -1206,6 +1246,16 @@ func TestLeaseEquivalence(t *testing.T) {
{epoch1MinExp3, epoch1MinExp2, false}, // different epoch leases, older min expiration
{epoch1MinExp3, epoch1MinExp3, true}, // same epoch leases, same min expiration
{epoch1MinExp2, epoch2MinExp2, false}, // different epoch leases
+ // Test minimum expiration with leader leases.
+ {leader1, leader1MinExp2, true}, // different leader leases, newer min expiration
+ {leader1, leader1MinExp3, true}, // different leader leases, newer min expiration
+ {leader1MinExp2, leader1, false}, // different leader leases, older min expiration
+ {leader1MinExp2, leader1MinExp2, true}, // same leader leases, same min expiration
+ {leader1MinExp2, leader1MinExp3, true}, // different leader leases, newer min expiration
+ {leader1MinExp3, leader1, false}, // different leader leases, older min expiration
+ {leader1MinExp3, leader1MinExp2, false}, // different leader leases, older min expiration
+ {leader1MinExp3, leader1MinExp3, true}, // same leader leases, same min expiration
+ {leader1MinExp2, leader2MinExp2, false}, // different leader leases
}
for i, tc := range testCases {
@@ -1257,6 +1307,7 @@ func TestLeaseEqual(t *testing.T) {
Sequence LeaseSequence
AcquisitionType LeaseAcquisitionType
MinExpiration hlc.Timestamp
+ Term uint64
}
// Verify that the lease structure does not change unexpectedly. If a compile
// error occurs on the following line of code, update the expectedLease
@@ -1312,6 +1363,7 @@ func TestLeaseEqual(t *testing.T) {
{Sequence: 1},
{AcquisitionType: 1},
{MinExpiration: ts},
+ {Term: 1},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go
index f16f56c683e5..958cb29dc738 100644
--- a/pkg/rpc/context_test.go
+++ b/pkg/rpc/context_test.go
@@ -288,6 +288,11 @@ func (s *rangefeedEventSink) Context() context.Context {
return s.ctx
}
+// Note that Send itself is not thread-safe (grpc stream is not thread-safe),
+// but tests were written in a way that sends sequentially, ensuring
+// thread-safety for Send.
+func (s *rangefeedEventSink) SendIsThreadSafe() {}
+
func (s *rangefeedEventSink) Send(event *kvpb.RangeFeedEvent) error {
return s.stream.Send(&kvpb.MuxRangeFeedEvent{RangeFeedEvent: *event})
}
diff --git a/pkg/server/node.go b/pkg/server/node.go
index 7021625a552b..7d700eb99cd3 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -191,18 +191,6 @@ This metric is thus not an indicator of KV health.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
- metaActiveRangeFeed = metric.Metadata{
- Name: "rpc.streams.rangefeed.active",
- Help: `Number of currently running RangeFeed streams`,
- Measurement: "Streams",
- Unit: metric.Unit_COUNT,
- }
- metaTotalRangeFeed = metric.Metadata{
- Name: "rpc.streams.rangefeed.recv",
- Help: `Total number of RangeFeed streams`,
- Measurement: "Streams",
- Unit: metric.Unit_COUNT,
- }
metaActiveMuxRangeFeed = metric.Metadata{
Name: "rpc.streams.mux_rangefeed.active",
Help: `Number of currently running MuxRangeFeed streams`,
@@ -270,8 +258,6 @@ type nodeMetrics struct {
CrossRegionBatchResponseBytes *metric.Counter
CrossZoneBatchRequestBytes *metric.Counter
CrossZoneBatchResponseBytes *metric.Counter
- NumRangeFeed *metric.Counter
- ActiveRangeFeed *metric.Gauge
NumMuxRangeFeed *metric.Counter
ActiveMuxRangeFeed *metric.Gauge
}
@@ -293,8 +279,6 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe
CrossRegionBatchResponseBytes: metric.NewCounter(metaCrossRegionBatchResponse),
CrossZoneBatchRequestBytes: metric.NewCounter(metaCrossZoneBatchRequest),
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
- ActiveRangeFeed: metric.NewGauge(metaActiveRangeFeed),
- NumRangeFeed: metric.NewCounter(metaTotalRangeFeed),
ActiveMuxRangeFeed: metric.NewGauge(metaActiveMuxRangeFeed),
NumMuxRangeFeed: metric.NewCounter(metaTotalMuxRangeFeed),
}
@@ -1832,10 +1816,10 @@ func (n *Node) RangeLookup(
return resp, nil
}
-// setRangeIDEventSink annotates each response with range and stream IDs.
-// This is used by MuxRangeFeed.
-// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and
-// the old style RangeFeed deprecated.
+// setRangeIDEventSink is an implementation of rangefeed.Stream which annotates
+// each response with rangeID and streamID. It is used by MuxRangeFeed. Note
+// that the wrapped stream is a locked mux stream, ensuring safe concurrent Send
+// calls.
type setRangeIDEventSink struct {
ctx context.Context
cancel context.CancelFunc
@@ -1857,10 +1841,14 @@ func (s *setRangeIDEventSink) Send(event *kvpb.RangeFeedEvent) error {
return s.wrapped.Send(response)
}
+// Wrapped stream is a locked mux stream, ensuring safe concurrent Send.
+func (s *setRangeIDEventSink) SendIsThreadSafe() {}
+
var _ kvpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil)
-// lockedMuxStream provides support for concurrent calls to Send.
-// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send.
+// lockedMuxStream provides support for concurrent calls to Send. The underlying
+// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to
+// Send.
type lockedMuxStream struct {
wrapped kvpb.Internal_MuxRangeFeedServer
sendMu syncutil.Mutex
diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go
index 877158806323..e48a1197814b 100644
--- a/pkg/sql/flowinfra/flow_registry_test.go
+++ b/pkg/sql/flowinfra/flow_registry_test.go
@@ -717,6 +717,8 @@ type delayedErrorServerStream struct {
err error
}
+func (s *delayedErrorServerStream) SendIsThreadSafe() {}
+
func (s *delayedErrorServerStream) Send(*execinfrapb.ConsumerSignal) error {
s.rpcCalledCh <- struct{}{}
<-s.delayCh
diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go
index 7053728ff870..4cdccd39433a 100644
--- a/pkg/sql/sqlstats/insights/integration/insights_test.go
+++ b/pkg/sql/sqlstats/insights/integration/insights_test.go
@@ -696,8 +696,6 @@ func TestInsightsIntegrationForContention(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
- skip.WithIssue(t, 121986)
-
// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
@@ -766,13 +764,11 @@ func TestInsightsIntegrationForContention(t *testing.T) {
// lookup this id will result in the resolver potentially missing the event.
txnIDCache := tc.ApplicationLayer(0).SQLServer().(*sql.Server).GetTxnIDCache()
txnIDCache.DrainWriteBuffer()
- var expectedWaitingTxnFingerprintID appstatspb.TransactionFingerprintID
testutils.SucceedsSoon(t, func() error {
waitingTxnFingerprintID, ok := txnIDCache.Lookup(waitingTxnID)
if !ok || waitingTxnFingerprintID == appstatspb.InvalidTransactionFingerprintID {
return fmt.Errorf("waiting txn fingerprint not found in cache")
}
- expectedWaitingTxnFingerprintID = waitingTxnFingerprintID
return nil
})
@@ -849,11 +845,6 @@ func TestInsightsIntegrationForContention(t *testing.T) {
continue
}
- if waitingTxnFingerprintID == "0000000000000000" || waitingTxnFingerprintID == "" {
- lastErr = fmt.Errorf("expected waitingTxnFingerprintID to be %d, but got %s. \nScanned row: \n%s", expectedWaitingTxnFingerprintID, waitingTxnFingerprintID, prettyPrintRow)
- continue
- }
-
foundRow = true
break
}
diff --git a/pkg/ui/workspaces/db-console/src/util/events.spec.ts b/pkg/ui/workspaces/db-console/src/util/events.spec.ts
index 1aa9576860a3..5c4fbb12fe28 100644
--- a/pkg/ui/workspaces/db-console/src/util/events.spec.ts
+++ b/pkg/ui/workspaces/db-console/src/util/events.spec.ts
@@ -8,7 +8,9 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
-import { EventInfo, getDroppedObjectsText } from "src/util/events";
+import { api as clusterUiApi } from "@cockroachlabs/cluster-ui";
+
+import {EventInfo, getDroppedObjectsText, getEventDescription} from "src/util/events";
describe("getDroppedObjectsText", function () {
// The key indicating which objects were dropped in a DROP_DATABASE event has been
@@ -42,3 +44,40 @@ describe("getDroppedObjectsText", function () {
});
});
});
+
+describe("getEventDescription", function () {
+ it("ignores the options field when empty for role changes", function () {
+ interface TestCase {
+ event: Partial;
+ expected: string;
+ }
+ const tcs: TestCase[] = [
+ {
+ event: {
+ eventType: "alter_role",
+ info: '{"User": "abc", "RoleName": "123"}',
+ },
+ expected: "Role Altered: User abc altered role 123",
+ },
+ {
+ event: {
+ eventType: "alter_role",
+ info: '{"User": "abc", "RoleName": "123", "Options": []}',
+ },
+ expected: "Role Altered: User abc altered role 123",
+ },
+ {
+ event: {
+ eventType: "alter_role",
+ info: '{"User": "abc", "RoleName": "123", "Options": ["o1", "o2"]}',
+ },
+ expected: "Role Altered: User abc altered role 123 with options o1,o2",
+ },
+ ];
+ tcs.forEach(tc => {
+ expect(getEventDescription(tc.event as clusterUiApi.EventColumns)).toEqual(
+ tc.expected,
+ );
+ });
+ });
+});
diff --git a/pkg/ui/workspaces/db-console/src/util/events.ts b/pkg/ui/workspaces/db-console/src/util/events.ts
index 00ac42bff2a3..1bcd1c9a0222 100644
--- a/pkg/ui/workspaces/db-console/src/util/events.ts
+++ b/pkg/ui/workspaces/db-console/src/util/events.ts
@@ -179,7 +179,11 @@ export function getEventDescription(e: clusterUiApi.EventColumns): string {
case eventTypes.DROP_ROLE:
return `Role Dropped: User ${info.User} dropped role ${info.RoleName}`;
case eventTypes.ALTER_ROLE:
- return `Role Altered: User ${info.User} altered role ${info.RoleName} with options ${info.Options}`;
+ if (info.Options && info.Options.length > 0) {
+ return `Role Altered: User ${info.User} altered role ${info.RoleName} with options ${info.Options}`;
+ } else {
+ return `Role Altered: User ${info.User} altered role ${info.RoleName}`;
+ }
case eventTypes.IMPORT:
return `Import Job: User ${info.User} has a job ${info.JobID} running with status ${info.Status}`;
case eventTypes.RESTORE: