From 535bef46d3e62d9f280c1eff809fbb100a216c37 Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Thu, 27 Jun 2024 10:24:54 -0400 Subject: [PATCH 1/9] cli: add fallback query support for debug zip Previously, when SQL queries for dumping tables to debug zip would fail, we would have no follow-up. Engineers can now define "fallback" queries for tables in debug zip in order to make a second attempt with a simpler query. Often we want to run a more complex query to gather more debug data but these queries can fail when the cluster is experiencing problems. This change gives us a chance to define a simpler approach that can be attempted when necessary. In order to define a fallback, there are two new optional fields in the `TableRegistryConfig` struct for redacted and unredacted queries respectively. Debug zip output will still include the failed attempts at the original query along with the error message file as before. If a fallback query is defined, that query will produce its own output (and error) file with an additional `.fallback` suffix added to the base table name to identify it. Resolves: #123964 Epic: CRDB-35278 Release note: None --- pkg/cli/testdata/zip/testzip_fallback | 275 ++++++++++++++++++++++++++ pkg/cli/zip.go | 19 +- pkg/cli/zip_table_registry.go | 46 ++++- pkg/cli/zip_table_registry_test.go | 30 ++- pkg/cli/zip_test.go | 131 +++++++++++- 5 files changed, 484 insertions(+), 17 deletions(-) create mode 100644 pkg/cli/testdata/zip/testzip_fallback diff --git a/pkg/cli/testdata/zip/testzip_fallback b/pkg/cli/testdata/zip/testzip_fallback new file mode 100644 index 000000000000..4d4ba0fa0a09 --- /dev/null +++ b/pkg/cli/testdata/zip/testzip_fallback @@ -0,0 +1,275 @@ +---- +debug zip --concurrency=1 --cpu-profile-duration=1s /dev/null +[cluster] discovering virtual clusters... done +[cluster] creating output file /dev/null... done +[cluster] establishing RPC connection to ... +[cluster] using SQL address: ... +[cluster] requesting data for debug/events... received response... writing JSON output: debug/events.json... done +[cluster] requesting data for debug/rangelog... received response... writing JSON output: debug/rangelog.json... done +[cluster] requesting data for debug/settings... received response... writing JSON output: debug/settings.json... done +[cluster] requesting data for debug/reports/problemranges... received response... writing JSON output: debug/reports/problemranges.json... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_checkpoints... writing output: debug/crdb_internal.cluster_replication_node_stream_checkpoints.txt... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_spans... writing output: debug/crdb_internal.cluster_replication_node_stream_spans.txt... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_streams... writing output: debug/crdb_internal.cluster_replication_node_streams.txt... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_spans... writing output: debug/crdb_internal.cluster_replication_spans.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_function_statements... writing output: debug/crdb_internal.create_function_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_procedure_statements... writing output: debug/crdb_internal.create_procedure_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_schema_statements... writing output: debug/crdb_internal.create_schema_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_statements... writing output: debug/crdb_internal.create_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_type_statements... writing output: debug/crdb_internal.create_type_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.logical_replication_node_processors... writing output: debug/crdb_internal.logical_replication_node_processors.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_contention_events... writing output: debug/crdb_internal.cluster_contention_events.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_database_privileges... writing output: debug/crdb_internal.cluster_database_privileges.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_distsql_flows... writing output: debug/crdb_internal.cluster_distsql_flows.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_locks... writing output: debug/crdb_internal.cluster_locks.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_queries... writing output: debug/crdb_internal.cluster_queries.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_sessions... writing output: debug/crdb_internal.cluster_sessions.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_settings... writing output: debug/crdb_internal.cluster_settings.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_transactions... writing output: debug/crdb_internal.cluster_transactions.txt... done +[cluster] retrieving SQL data for crdb_internal.default_privileges... writing output: debug/crdb_internal.default_privileges.txt... done +[cluster] retrieving SQL data for crdb_internal.index_usage_statistics... writing output: debug/crdb_internal.index_usage_statistics.txt... done +[cluster] retrieving SQL data for crdb_internal.invalid_objects... writing output: debug/crdb_internal.invalid_objects.txt... done +[cluster] retrieving SQL data for crdb_internal.jobs... writing output: debug/crdb_internal.jobs.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_node_liveness... writing output: debug/crdb_internal.kv_node_liveness.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_node_status... writing output: debug/crdb_internal.kv_node_status.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_protected_ts_records... writing output: debug/crdb_internal.kv_protected_ts_records.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_store_status... writing output: debug/crdb_internal.kv_store_status.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_system_privileges... writing output: debug/crdb_internal.kv_system_privileges.txt... done +[cluster] retrieving SQL data for crdb_internal.partitions... writing output: debug/crdb_internal.partitions.txt... done +[cluster] retrieving SQL data for crdb_internal.probe_ranges_1s_read_limit_100... writing output: debug/crdb_internal.probe_ranges_1s_read_limit_100.txt... done +[cluster] retrieving SQL data for crdb_internal.regions... writing output: debug/crdb_internal.regions.txt... done +[cluster] retrieving SQL data for crdb_internal.schema_changes... writing output: debug/crdb_internal.schema_changes.txt... done +[cluster] retrieving SQL data for crdb_internal.super_regions... writing output: debug/crdb_internal.super_regions.txt... done +[cluster] retrieving SQL data for crdb_internal.system_jobs... writing output: debug/crdb_internal.system_jobs.txt... done +[cluster] retrieving SQL data for crdb_internal.table_indexes... writing output: debug/crdb_internal.table_indexes.txt... done +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events... writing output: debug/crdb_internal.transaction_contention_events.txt... +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: last request failed: ERROR: column "fail" does not exist (SQLSTATE 42703) +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: creating error output: debug/crdb_internal.transaction_contention_events.txt.err.txt... done +[cluster] retrieving SQL data for crdb_internal.zones... writing output: debug/crdb_internal.zones.txt... done +[cluster] retrieving SQL data for system.database_role_settings... writing output: debug/system.database_role_settings.txt... done +[cluster] retrieving SQL data for system.descriptor... writing output: debug/system.descriptor.txt... done +[cluster] retrieving SQL data for system.eventlog... writing output: debug/system.eventlog.txt... done +[cluster] retrieving SQL data for system.external_connections... writing output: debug/system.external_connections.txt... done +[cluster] retrieving SQL data for system.job_info... writing output: debug/system.job_info.txt... done +[cluster] retrieving SQL data for system.jobs... writing output: debug/system.jobs.txt... done +[cluster] retrieving SQL data for system.lease... writing output: debug/system.lease.txt... done +[cluster] retrieving SQL data for system.locations... writing output: debug/system.locations.txt... done +[cluster] retrieving SQL data for system.migrations... writing output: debug/system.migrations.txt... done +[cluster] retrieving SQL data for system.namespace... writing output: debug/system.namespace.txt... done +[cluster] retrieving SQL data for system.privileges... writing output: debug/system.privileges.txt... done +[cluster] retrieving SQL data for system.protected_ts_meta... writing output: debug/system.protected_ts_meta.txt... done +[cluster] retrieving SQL data for system.protected_ts_records... writing output: debug/system.protected_ts_records.txt... done +[cluster] retrieving SQL data for system.rangelog... writing output: debug/system.rangelog.txt... done +[cluster] retrieving SQL data for system.replication_constraint_stats... writing output: debug/system.replication_constraint_stats.txt... done +[cluster] retrieving SQL data for system.replication_critical_localities... writing output: debug/system.replication_critical_localities.txt... done +[cluster] retrieving SQL data for system.replication_stats... writing output: debug/system.replication_stats.txt... done +[cluster] retrieving SQL data for system.reports_meta... writing output: debug/system.reports_meta.txt... done +[cluster] retrieving SQL data for system.role_id_seq... writing output: debug/system.role_id_seq.txt... done +[cluster] retrieving SQL data for system.role_members... writing output: debug/system.role_members.txt... done +[cluster] retrieving SQL data for system.role_options... writing output: debug/system.role_options.txt... done +[cluster] retrieving SQL data for system.scheduled_jobs... writing output: debug/system.scheduled_jobs.txt... done +[cluster] retrieving SQL data for system.settings... writing output: debug/system.settings.txt... done +[cluster] retrieving SQL data for system.span_configurations... writing output: debug/system.span_configurations.txt... done +[cluster] retrieving SQL data for system.sql_instances... writing output: debug/system.sql_instances.txt... done +[cluster] retrieving SQL data for system.sql_stats_cardinality... writing output: debug/system.sql_stats_cardinality.txt... done +[cluster] retrieving SQL data for system.sqlliveness... writing output: debug/system.sqlliveness.txt... done +[cluster] retrieving SQL data for system.statement_diagnostics... writing output: debug/system.statement_diagnostics.txt... done +[cluster] retrieving SQL data for system.statement_diagnostics_requests... writing output: debug/system.statement_diagnostics_requests.txt... done +[cluster] retrieving SQL data for system.statement_statistics_limit_5000... writing output: debug/system.statement_statistics_limit_5000.txt... done +[cluster] retrieving SQL data for system.table_statistics... writing output: debug/system.table_statistics.txt... done +[cluster] retrieving SQL data for system.task_payloads... writing output: debug/system.task_payloads.txt... done +[cluster] retrieving SQL data for system.tenant_settings... writing output: debug/system.tenant_settings.txt... done +[cluster] retrieving SQL data for system.tenant_tasks... writing output: debug/system.tenant_tasks.txt... done +[cluster] retrieving SQL data for system.tenant_usage... writing output: debug/system.tenant_usage.txt... done +[cluster] retrieving SQL data for system.tenants... writing output: debug/system.tenants.txt... done +[cluster] requesting nodes... received response... writing JSON output: debug/nodes.json... done +[cluster] requesting liveness... received response... writing JSON output: debug/liveness.json... done +[cluster] requesting tenant ranges... received response... +[cluster] requesting tenant ranges: last request failed: rpc error: ... +[cluster] requesting tenant ranges: creating error output: debug/tenant_ranges.err.txt... done +[cluster] collecting the inflight traces for jobs... received response... done +[cluster] requesting CPU profiles +[cluster] profiles generated +[cluster] profile for node 1... writing binary output: debug/nodes/1/cpu.pprof... done +[node 1] node status... writing JSON output: debug/nodes/1/status.json... done +[node 1] using SQL connection URL: postgresql://... +[node 1] retrieving SQL data for crdb_internal.active_range_feeds... writing output: debug/nodes/1/crdb_internal.active_range_feeds.txt... done +[node 1] retrieving SQL data for crdb_internal.feature_usage... writing output: debug/nodes/1/crdb_internal.feature_usage.txt... done +[node 1] retrieving SQL data for crdb_internal.gossip_alerts... writing output: debug/nodes/1/crdb_internal.gossip_alerts.txt... done +[node 1] retrieving SQL data for crdb_internal.gossip_liveness... writing output: debug/nodes/1/crdb_internal.gossip_liveness.txt... done +[node 1] retrieving SQL data for crdb_internal.gossip_nodes... writing output: debug/nodes/1/crdb_internal.gossip_nodes.txt... done +[node 1] retrieving SQL data for crdb_internal.kv_session_based_leases... writing output: debug/nodes/1/crdb_internal.kv_session_based_leases.txt... done +[node 1] retrieving SQL data for crdb_internal.leases... writing output: debug/nodes/1/crdb_internal.leases.txt... done +[node 1] retrieving SQL data for crdb_internal.node_build_info... writing output: debug/nodes/1/crdb_internal.node_build_info.txt... done +[node 1] retrieving SQL data for crdb_internal.node_contention_events... writing output: debug/nodes/1/crdb_internal.node_contention_events.txt... done +[node 1] retrieving SQL data for crdb_internal.node_distsql_flows... writing output: debug/nodes/1/crdb_internal.node_distsql_flows.txt... done +[node 1] retrieving SQL data for crdb_internal.node_execution_insights... writing output: debug/nodes/1/crdb_internal.node_execution_insights.txt... done +[node 1] retrieving SQL data for crdb_internal.node_inflight_trace_spans... writing output: debug/nodes/1/crdb_internal.node_inflight_trace_spans.txt... done +[node 1] retrieving SQL data for crdb_internal.node_memory_monitors... writing output: debug/nodes/1/crdb_internal.node_memory_monitors.txt... done +[node 1] retrieving SQL data for crdb_internal.node_metrics... writing output: debug/nodes/1/crdb_internal.node_metrics.txt... done +[node 1] retrieving SQL data for crdb_internal.node_queries... writing output: debug/nodes/1/crdb_internal.node_queries.txt... done +[node 1] retrieving SQL data for crdb_internal.node_runtime_info... writing output: debug/nodes/1/crdb_internal.node_runtime_info.txt... done +[node 1] retrieving SQL data for crdb_internal.node_sessions... writing output: debug/nodes/1/crdb_internal.node_sessions.txt... done +[node 1] retrieving SQL data for crdb_internal.node_statement_statistics... writing output: debug/nodes/1/crdb_internal.node_statement_statistics.txt... done +[node 1] retrieving SQL data for crdb_internal.node_tenant_capabilities_cache... writing output: debug/nodes/1/crdb_internal.node_tenant_capabilities_cache.txt... done +[node 1] retrieving SQL data for crdb_internal.node_transaction_statistics... writing output: debug/nodes/1/crdb_internal.node_transaction_statistics.txt... done +[node 1] retrieving SQL data for crdb_internal.node_transactions... writing output: debug/nodes/1/crdb_internal.node_transactions.txt... done +[node 1] retrieving SQL data for crdb_internal.node_txn_execution_insights... writing output: debug/nodes/1/crdb_internal.node_txn_execution_insights.txt... done +[node 1] retrieving SQL data for crdb_internal.node_txn_stats... writing output: debug/nodes/1/crdb_internal.node_txn_stats.txt... done +[node 1] requesting data for debug/nodes/1/details... received response... writing JSON output: debug/nodes/1/details.json... done +[node 1] requesting data for debug/nodes/1/gossip... received response... writing JSON output: debug/nodes/1/gossip.json... done +[node 1] requesting stacks... received response... writing binary output: debug/nodes/1/stacks.txt... done +[node 1] requesting stacks with labels... received response... writing binary output: debug/nodes/1/stacks_with_labels.txt... done +[node 1] requesting heap profile... received response... writing binary output: debug/nodes/1/heap.pprof... done +[node 1] requesting engine stats... received response... writing binary output: debug/nodes/1/lsm.txt... done +[node 1] requesting heap profile list... received response... done +[node ?] ? heap profiles found +[node 1] requesting goroutine dump list... received response... done +[node ?] ? goroutine dumps found +[node 1] requesting cpu profile list... received response... done +[node ?] ? cpu profiles found +[node 1] requesting log files list... received response... done +[node ?] ? log files found +[node 1] requesting ranges... received response... done +[node 1] writing ranges... writing JSON output: debug/nodes/1/ranges.json... done +[cluster] pprof summary script... writing binary output: debug/pprof-summary.sh... done +[cluster] hot range summary script... writing binary output: debug/hot-ranges.sh... done +[cluster] tenant hot range summary script... writing binary output: debug/hot-ranges-tenant.sh... done +---- +debug zip --concurrency=1 --cpu-profile-duration=1s /dev/null +[cluster] discovering virtual clusters... done +[cluster] creating output file /dev/null... done +[cluster] establishing RPC connection to ... +[cluster] using SQL address: ... +[cluster] requesting data for debug/events... received response... writing JSON output: debug/events.json... done +[cluster] requesting data for debug/rangelog... received response... writing JSON output: debug/rangelog.json... done +[cluster] requesting data for debug/settings... received response... writing JSON output: debug/settings.json... done +[cluster] requesting data for debug/reports/problemranges... received response... writing JSON output: debug/reports/problemranges.json... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_checkpoints... writing output: debug/crdb_internal.cluster_replication_node_stream_checkpoints.txt... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_stream_spans... writing output: debug/crdb_internal.cluster_replication_node_stream_spans.txt... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_node_streams... writing output: debug/crdb_internal.cluster_replication_node_streams.txt... done +[cluster] retrieving SQL data for "".crdb_internal.cluster_replication_spans... writing output: debug/crdb_internal.cluster_replication_spans.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_function_statements... writing output: debug/crdb_internal.create_function_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_procedure_statements... writing output: debug/crdb_internal.create_procedure_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_schema_statements... writing output: debug/crdb_internal.create_schema_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_statements... writing output: debug/crdb_internal.create_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.create_type_statements... writing output: debug/crdb_internal.create_type_statements.txt... done +[cluster] retrieving SQL data for "".crdb_internal.logical_replication_node_processors... writing output: debug/crdb_internal.logical_replication_node_processors.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_contention_events... writing output: debug/crdb_internal.cluster_contention_events.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_database_privileges... writing output: debug/crdb_internal.cluster_database_privileges.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_distsql_flows... writing output: debug/crdb_internal.cluster_distsql_flows.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_locks... writing output: debug/crdb_internal.cluster_locks.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_queries... writing output: debug/crdb_internal.cluster_queries.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_sessions... writing output: debug/crdb_internal.cluster_sessions.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_settings... writing output: debug/crdb_internal.cluster_settings.txt... done +[cluster] retrieving SQL data for crdb_internal.cluster_transactions... writing output: debug/crdb_internal.cluster_transactions.txt... done +[cluster] retrieving SQL data for crdb_internal.default_privileges... writing output: debug/crdb_internal.default_privileges.txt... done +[cluster] retrieving SQL data for crdb_internal.index_usage_statistics... writing output: debug/crdb_internal.index_usage_statistics.txt... done +[cluster] retrieving SQL data for crdb_internal.invalid_objects... writing output: debug/crdb_internal.invalid_objects.txt... done +[cluster] retrieving SQL data for crdb_internal.jobs... writing output: debug/crdb_internal.jobs.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_node_liveness... writing output: debug/crdb_internal.kv_node_liveness.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_node_status... writing output: debug/crdb_internal.kv_node_status.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_protected_ts_records... writing output: debug/crdb_internal.kv_protected_ts_records.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_store_status... writing output: debug/crdb_internal.kv_store_status.txt... done +[cluster] retrieving SQL data for crdb_internal.kv_system_privileges... writing output: debug/crdb_internal.kv_system_privileges.txt... done +[cluster] retrieving SQL data for crdb_internal.partitions... writing output: debug/crdb_internal.partitions.txt... done +[cluster] retrieving SQL data for crdb_internal.probe_ranges_1s_read_limit_100... writing output: debug/crdb_internal.probe_ranges_1s_read_limit_100.txt... done +[cluster] retrieving SQL data for crdb_internal.regions... writing output: debug/crdb_internal.regions.txt... done +[cluster] retrieving SQL data for crdb_internal.schema_changes... writing output: debug/crdb_internal.schema_changes.txt... done +[cluster] retrieving SQL data for crdb_internal.super_regions... writing output: debug/crdb_internal.super_regions.txt... done +[cluster] retrieving SQL data for crdb_internal.system_jobs... writing output: debug/crdb_internal.system_jobs.txt... done +[cluster] retrieving SQL data for crdb_internal.table_indexes... writing output: debug/crdb_internal.table_indexes.txt... done +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events... writing output: debug/crdb_internal.transaction_contention_events.txt... +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: last request failed: ERROR: column "fail" does not exist (SQLSTATE 42703) +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events: creating error output: debug/crdb_internal.transaction_contention_events.txt.err.txt... done +[cluster] retrieving SQL data for crdb_internal.transaction_contention_events (fallback)... writing output: debug/crdb_internal.transaction_contention_events.fallback.txt... done +[cluster] retrieving SQL data for crdb_internal.zones... writing output: debug/crdb_internal.zones.txt... done +[cluster] retrieving SQL data for system.database_role_settings... writing output: debug/system.database_role_settings.txt... done +[cluster] retrieving SQL data for system.descriptor... writing output: debug/system.descriptor.txt... done +[cluster] retrieving SQL data for system.eventlog... writing output: debug/system.eventlog.txt... done +[cluster] retrieving SQL data for system.external_connections... writing output: debug/system.external_connections.txt... done +[cluster] retrieving SQL data for system.job_info... writing output: debug/system.job_info.txt... done +[cluster] retrieving SQL data for system.jobs... writing output: debug/system.jobs.txt... done +[cluster] retrieving SQL data for system.lease... writing output: debug/system.lease.txt... done +[cluster] retrieving SQL data for system.locations... writing output: debug/system.locations.txt... done +[cluster] retrieving SQL data for system.migrations... writing output: debug/system.migrations.txt... done +[cluster] retrieving SQL data for system.namespace... writing output: debug/system.namespace.txt... done +[cluster] retrieving SQL data for system.privileges... writing output: debug/system.privileges.txt... done +[cluster] retrieving SQL data for system.protected_ts_meta... writing output: debug/system.protected_ts_meta.txt... done +[cluster] retrieving SQL data for system.protected_ts_records... writing output: debug/system.protected_ts_records.txt... done +[cluster] retrieving SQL data for system.rangelog... writing output: debug/system.rangelog.txt... done +[cluster] retrieving SQL data for system.replication_constraint_stats... writing output: debug/system.replication_constraint_stats.txt... done +[cluster] retrieving SQL data for system.replication_critical_localities... writing output: debug/system.replication_critical_localities.txt... done +[cluster] retrieving SQL data for system.replication_stats... writing output: debug/system.replication_stats.txt... done +[cluster] retrieving SQL data for system.reports_meta... writing output: debug/system.reports_meta.txt... done +[cluster] retrieving SQL data for system.role_id_seq... writing output: debug/system.role_id_seq.txt... done +[cluster] retrieving SQL data for system.role_members... writing output: debug/system.role_members.txt... done +[cluster] retrieving SQL data for system.role_options... writing output: debug/system.role_options.txt... done +[cluster] retrieving SQL data for system.scheduled_jobs... writing output: debug/system.scheduled_jobs.txt... done +[cluster] retrieving SQL data for system.settings... writing output: debug/system.settings.txt... done +[cluster] retrieving SQL data for system.span_configurations... writing output: debug/system.span_configurations.txt... done +[cluster] retrieving SQL data for system.sql_instances... writing output: debug/system.sql_instances.txt... done +[cluster] retrieving SQL data for system.sql_stats_cardinality... writing output: debug/system.sql_stats_cardinality.txt... done +[cluster] retrieving SQL data for system.sqlliveness... writing output: debug/system.sqlliveness.txt... done +[cluster] retrieving SQL data for system.statement_diagnostics... writing output: debug/system.statement_diagnostics.txt... done +[cluster] retrieving SQL data for system.statement_diagnostics_requests... writing output: debug/system.statement_diagnostics_requests.txt... done +[cluster] retrieving SQL data for system.statement_statistics_limit_5000... writing output: debug/system.statement_statistics_limit_5000.txt... done +[cluster] retrieving SQL data for system.table_statistics... writing output: debug/system.table_statistics.txt... done +[cluster] retrieving SQL data for system.task_payloads... writing output: debug/system.task_payloads.txt... done +[cluster] retrieving SQL data for system.tenant_settings... writing output: debug/system.tenant_settings.txt... done +[cluster] retrieving SQL data for system.tenant_tasks... writing output: debug/system.tenant_tasks.txt... done +[cluster] retrieving SQL data for system.tenant_usage... writing output: debug/system.tenant_usage.txt... done +[cluster] retrieving SQL data for system.tenants... writing output: debug/system.tenants.txt... done +[cluster] requesting nodes... received response... writing JSON output: debug/nodes.json... done +[cluster] requesting liveness... received response... writing JSON output: debug/liveness.json... done +[cluster] requesting tenant ranges... received response... +[cluster] requesting tenant ranges: last request failed: rpc error: ... +[cluster] requesting tenant ranges: creating error output: debug/tenant_ranges.err.txt... done +[cluster] collecting the inflight traces for jobs... received response... done +[cluster] requesting CPU profiles +[cluster] profiles generated +[cluster] profile for node 1... writing binary output: debug/nodes/1/cpu.pprof... done +[node 1] node status... writing JSON output: debug/nodes/1/status.json... done +[node 1] using SQL connection URL: postgresql://... +[node 1] retrieving SQL data for crdb_internal.active_range_feeds... writing output: debug/nodes/1/crdb_internal.active_range_feeds.txt... done +[node 1] retrieving SQL data for crdb_internal.feature_usage... writing output: debug/nodes/1/crdb_internal.feature_usage.txt... done +[node 1] retrieving SQL data for crdb_internal.gossip_alerts... writing output: debug/nodes/1/crdb_internal.gossip_alerts.txt... done +[node 1] retrieving SQL data for crdb_internal.gossip_liveness... writing output: debug/nodes/1/crdb_internal.gossip_liveness.txt... done +[node 1] retrieving SQL data for crdb_internal.gossip_nodes... writing output: debug/nodes/1/crdb_internal.gossip_nodes.txt... done +[node 1] retrieving SQL data for crdb_internal.kv_session_based_leases... writing output: debug/nodes/1/crdb_internal.kv_session_based_leases.txt... done +[node 1] retrieving SQL data for crdb_internal.leases... writing output: debug/nodes/1/crdb_internal.leases.txt... done +[node 1] retrieving SQL data for crdb_internal.node_build_info... writing output: debug/nodes/1/crdb_internal.node_build_info.txt... done +[node 1] retrieving SQL data for crdb_internal.node_contention_events... writing output: debug/nodes/1/crdb_internal.node_contention_events.txt... done +[node 1] retrieving SQL data for crdb_internal.node_distsql_flows... writing output: debug/nodes/1/crdb_internal.node_distsql_flows.txt... done +[node 1] retrieving SQL data for crdb_internal.node_execution_insights... writing output: debug/nodes/1/crdb_internal.node_execution_insights.txt... done +[node 1] retrieving SQL data for crdb_internal.node_inflight_trace_spans... writing output: debug/nodes/1/crdb_internal.node_inflight_trace_spans.txt... done +[node 1] retrieving SQL data for crdb_internal.node_memory_monitors... writing output: debug/nodes/1/crdb_internal.node_memory_monitors.txt... done +[node 1] retrieving SQL data for crdb_internal.node_metrics... writing output: debug/nodes/1/crdb_internal.node_metrics.txt... done +[node 1] retrieving SQL data for crdb_internal.node_queries... writing output: debug/nodes/1/crdb_internal.node_queries.txt... done +[node 1] retrieving SQL data for crdb_internal.node_runtime_info... writing output: debug/nodes/1/crdb_internal.node_runtime_info.txt... done +[node 1] retrieving SQL data for crdb_internal.node_sessions... writing output: debug/nodes/1/crdb_internal.node_sessions.txt... done +[node 1] retrieving SQL data for crdb_internal.node_statement_statistics... writing output: debug/nodes/1/crdb_internal.node_statement_statistics.txt... done +[node 1] retrieving SQL data for crdb_internal.node_tenant_capabilities_cache... writing output: debug/nodes/1/crdb_internal.node_tenant_capabilities_cache.txt... done +[node 1] retrieving SQL data for crdb_internal.node_transaction_statistics... writing output: debug/nodes/1/crdb_internal.node_transaction_statistics.txt... done +[node 1] retrieving SQL data for crdb_internal.node_transactions... writing output: debug/nodes/1/crdb_internal.node_transactions.txt... done +[node 1] retrieving SQL data for crdb_internal.node_txn_execution_insights... writing output: debug/nodes/1/crdb_internal.node_txn_execution_insights.txt... done +[node 1] retrieving SQL data for crdb_internal.node_txn_stats... writing output: debug/nodes/1/crdb_internal.node_txn_stats.txt... done +[node 1] requesting data for debug/nodes/1/details... received response... writing JSON output: debug/nodes/1/details.json... done +[node 1] requesting data for debug/nodes/1/gossip... received response... writing JSON output: debug/nodes/1/gossip.json... done +[node 1] requesting stacks... received response... writing binary output: debug/nodes/1/stacks.txt... done +[node 1] requesting stacks with labels... received response... writing binary output: debug/nodes/1/stacks_with_labels.txt... done +[node 1] requesting heap profile... received response... writing binary output: debug/nodes/1/heap.pprof... done +[node 1] requesting engine stats... received response... writing binary output: debug/nodes/1/lsm.txt... done +[node 1] requesting heap profile list... received response... done +[node ?] ? heap profiles found +[node 1] requesting goroutine dump list... received response... done +[node ?] ? goroutine dumps found +[node 1] requesting cpu profile list... received response... done +[node ?] ? cpu profiles found +[node 1] requesting log files list... received response... done +[node ?] ? log files found +[node 1] requesting ranges... received response... done +[node 1] writing ranges... writing JSON output: debug/nodes/1/ranges.json... done +[cluster] pprof summary script... writing binary output: debug/pprof-summary.sh... done +[cluster] hot range summary script... writing binary output: debug/hot-ranges.sh... done +[cluster] tenant hot range summary script... writing binary output: debug/hot-ranges-tenant.sh... done diff --git a/pkg/cli/zip.go b/pkg/cli/zip.go index eb5eaa6ccab6..e2c2044b503f 100644 --- a/pkg/cli/zip.go +++ b/pkg/cli/zip.go @@ -490,7 +490,7 @@ INNER JOIN latestprogress ON j.id = latestprogress.job_id;`, // An error is returned by this function if it is unable to write to // the output file or some other unrecoverable error is encountered. func (zc *debugZipContext) dumpTableDataForZip( - zr *zipReporter, conn clisqlclient.Conn, base, table, query string, + zr *zipReporter, conn clisqlclient.Conn, base, table string, tableQuery TableQuery, ) error { ctx := context.Background() baseName := base + "/" + sanitizeFilename(table) @@ -498,6 +498,10 @@ func (zc *debugZipContext) dumpTableDataForZip( s := zr.start("retrieving SQL data for %s", table) const maxRetries = 5 suffix := "" + + query := tableQuery.query + fallback := tableQuery.fallback != "" + for numRetries := 1; numRetries <= maxRetries; numRetries++ { name := baseName + suffix + "." + zc.clusterPrinter.sqlOutputFilenameExtension s.progress("writing output: %s", name) @@ -545,7 +549,18 @@ func (zc *debugZipContext) dumpTableDataForZip( break } if pgcode.MakeCode(pgErr.Code) != pgcode.SerializationFailure { - // A non-retry error. We've printed the error, and + // A non-retry error. If we have a fallback, try with that. + if fallback { + fallback = false + + query = tableQuery.fallback + numRetries = 1 // Reset counter since this is a different query. + baseName = baseName + ".fallback" + s = zr.start("retrieving SQL data for %s (fallback)", table) + + continue + } + // A non-retry error, no fallback. We've printed the error, and // there's nothing to retry. Stop here. break } diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index 395adfd5d2d9..c8d1913f62d9 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -58,6 +58,15 @@ type TableRegistryConfig struct { // customQueryRedacted should NOT be a `SELECT * FROM table` type query, as // this could leak newly added sensitive columns into the output. customQueryRedacted string + + // customQueryUnredactedFallback is an alternative query that will be + // attempted if `customQueryUnredacted` does not return within the + // timeout or fails. If empty it will be ignored. + customQueryUnredactedFallback string + // customQueryRedactedFallback is an alternative query that will be + // attempted if `customQueryRedacted` does not return within the + // timeout or fails. If empty it will be ignored. + customQueryRedactedFallback string } // DebugZipTableRegistry is a registry of `crdb_internal` and `system` tables @@ -74,26 +83,35 @@ type TableRegistryConfig struct { // may be a way to avoid having to completely omit entire columns. type DebugZipTableRegistry map[string]TableRegistryConfig +// TableQuery holds two sql query strings together that are used to +// dump tables when generating a debug zip. `query` is the primary +// query to run, and `fallback` is one to try if the primary fails or +// times out. +type TableQuery struct { + query string + fallback string +} + // QueryForTable produces the appropriate query for `debug zip` for the given // table to use, taking redaction into account. If the provided tableName does // not exist in the registry, or no redacted config exists in the registry for // the tableName, an error is returned. -func (r DebugZipTableRegistry) QueryForTable(tableName string, redact bool) (string, error) { +func (r DebugZipTableRegistry) QueryForTable(tableName string, redact bool) (TableQuery, error) { tableConfig, ok := r[tableName] if !ok { - return "", errors.Newf("no entry found in table registry for: %s", tableName) + return TableQuery{}, errors.Newf("no entry found in table registry for: %s", tableName) } if !redact { if tableConfig.customQueryUnredacted != "" { - return tableConfig.customQueryUnredacted, nil + return TableQuery{tableConfig.customQueryUnredacted, tableConfig.customQueryUnredactedFallback}, nil } - return fmt.Sprintf("TABLE %s", tableName), nil + return TableQuery{fmt.Sprintf("TABLE %s", tableName), ""}, nil } if tableConfig.customQueryRedacted != "" { - return tableConfig.customQueryRedacted, nil + return TableQuery{tableConfig.customQueryRedacted, tableConfig.customQueryRedactedFallback}, nil } if len(tableConfig.nonSensitiveCols) == 0 { - return "", errors.Newf("requested redacted query for table %s, but no non-sensitive columns defined", tableName) + return TableQuery{}, errors.Newf("requested redacted query for table %s, but no non-sensitive columns defined", tableName) } var colsString strings.Builder for i, colName := range tableConfig.nonSensitiveCols { @@ -103,7 +121,7 @@ func (r DebugZipTableRegistry) QueryForTable(tableName string, redact bool) (str colsString.WriteString(", ") } } - return fmt.Sprintf("SELECT %s FROM %s", colsString.String(), tableName), nil + return TableQuery{fmt.Sprintf("SELECT %s FROM %s", colsString.String(), tableName), ""}, nil } // GetTables returns all the table names within the registry. Useful for @@ -611,6 +629,20 @@ WHERE ss.transaction_fingerprint_id != '\x0000000000000000' AND s.fingerprint_id GROUP BY collection_ts, contention_duration, waiting_txn_id, waiting_txn_fingerprint_id, blocking_txn_id, blocking_txn_fingerprint_id, waiting_stmt_fingerprint_id, contending_pretty_key, s.metadata ->> 'query', index_name, table_name, database_name +`, + customQueryUnredactedFallback: ` +SELECT collection_ts, + contention_duration, + waiting_txn_id, + waiting_txn_fingerprint_id, + waiting_stmt_fingerprint_id, + blocking_txn_id, + blocking_txn_fingerprint_id, + contending_pretty_key, + index_name, + table_name, + database_name +FROM crdb_internal.transaction_contention_events `, // `contending_key` column contains the contended key, which may // contain sensitive row-level data. So, we will only fetch if the diff --git a/pkg/cli/zip_table_registry_test.go b/pkg/cli/zip_table_registry_test.go index 643624825708..1f88be28746a 100644 --- a/pkg/cli/zip_table_registry_test.go +++ b/pkg/cli/zip_table_registry_test.go @@ -42,6 +42,11 @@ func TestQueryForTable(t *testing.T) { nonSensitiveCols: NonSensitiveColumns{"x", "crdb_internal.pretty_key(y, 0) as y", "z"}, customQueryUnredacted: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query", }, + "table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback": { + nonSensitiveCols: NonSensitiveColumns{"x", "crdb_internal.pretty_key(y, 0) as y", "z"}, + customQueryUnredacted: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback", + customQueryUnredactedFallback: "SELECT x FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback", + }, } t.Run("errors if no table config present in registry", func(t *testing.T) { @@ -53,7 +58,7 @@ func TestQueryForTable(t *testing.T) { t.Run("produces `TABLE` query when unredacted with no custom query", func(t *testing.T) { table := "table_with_sensitive_cols" - expected := "TABLE table_with_sensitive_cols" + expected := TableQuery{query: "TABLE table_with_sensitive_cols"} actual, err := reg.QueryForTable(table, false /* redact */) assert.NoError(t, err) assert.Equal(t, expected, actual) @@ -61,7 +66,7 @@ func TestQueryForTable(t *testing.T) { t.Run("produces custom query when unredacted and custom query supplied", func(t *testing.T) { table := "table_with_custom_queries" - expected := "SELECT * FROM table_with_custom_queries" + expected := TableQuery{query: "SELECT * FROM table_with_custom_queries"} actual, err := reg.QueryForTable(table, false /* redact */) assert.NoError(t, err) assert.Equal(t, expected, actual) @@ -69,7 +74,7 @@ func TestQueryForTable(t *testing.T) { t.Run("produces query with only non-sensitive columns when redacted and no custom query", func(t *testing.T) { table := "table_with_sensitive_cols" - expected := `SELECT x, y, z FROM table_with_sensitive_cols` + expected := TableQuery{query: `SELECT x, y, z FROM table_with_sensitive_cols`} actual, err := reg.QueryForTable(table, true /* redact */) assert.NoError(t, err) assert.Equal(t, expected, actual) @@ -77,7 +82,7 @@ func TestQueryForTable(t *testing.T) { t.Run("produces custom when redacted and custom query supplied", func(t *testing.T) { table := "table_with_custom_queries" - expected := "SELECT a, b, c FROM table_with_custom_queries" + expected := TableQuery{query: "SELECT a, b, c FROM table_with_custom_queries"} actual, err := reg.QueryForTable(table, true /* redact */) assert.NoError(t, err) assert.Equal(t, expected, actual) @@ -93,7 +98,7 @@ func TestQueryForTable(t *testing.T) { t.Run("produces query when a combination of nonSensitiveCols and customQueryUnredacted is supplied", func(t *testing.T) { table := "table_with_non_sensitive_cols_and_custom_unredacted_query" - expected := "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query" + expected := TableQuery{query: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query"} t.Run("with redact flag", func(t *testing.T) { actual, err := reg.QueryForTable(table, true /* redact */) @@ -107,6 +112,17 @@ func TestQueryForTable(t *testing.T) { assert.Equal(t, expected, actual) }) }) + + t.Run("with fallback query", func(t *testing.T) { + table := "table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback" + expected := TableQuery{ + query: "SELECT x, crdb_internal.pretty_key(y, 0) as y, z FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback", + fallback: "SELECT x FROM table_with_non_sensitive_cols_and_custom_unredacted_query_with_fallback", + } + actual, err := reg.QueryForTable(table, false /* redact */) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + }) } func TestNoForbiddenSystemTablesInDebugZip(t *testing.T) { @@ -125,8 +141,8 @@ func TestNoForbiddenSystemTablesInDebugZip(t *testing.T) { "system.transaction_activity", } for _, forbiddenTable := range forbiddenSysTables { - query, err := zipSystemTables.QueryForTable(forbiddenTable, false /* redact */) - assert.Equal(t, "", query) + tableQuery, err := zipSystemTables.QueryForTable(forbiddenTable, false /* redact */) + assert.Equal(t, "", tableQuery.query) assert.Error(t, err) assert.Equal(t, fmt.Sprintf("no entry found in table registry for: %s", forbiddenTable), err.Error()) } diff --git a/pkg/cli/zip_test.go b/pkg/cli/zip_test.go index ea718146c39d..7dc13c3fc193 100644 --- a/pkg/cli/zip_test.go +++ b/pkg/cli/zip_test.go @@ -177,6 +177,46 @@ func TestZip(t *testing.T) { }) } +// This tests the operation of zip over secure clusters. +func TestZipQueryFallback(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t, "test too slow under race") + + existing := zipInternalTablesPerCluster["crdb_internal.transaction_contention_events"] + zipInternalTablesPerCluster["crdb_internal.transaction_contention_events"] = TableRegistryConfig{ + nonSensitiveCols: existing.nonSensitiveCols, + // We want this to fail to trigger the fallback. + customQueryUnredacted: "SELECT FAIL;", + customQueryUnredactedFallback: existing.customQueryUnredactedFallback, + } + + dir, cleanupFn := testutils.TempDir(t) + defer cleanupFn() + + c := NewCLITest(TestCLIParams{ + StoreSpecs: []base.StoreSpec{{ + Path: dir, + }}, + }) + defer c.Cleanup() + + out, err := c.RunWithCapture("debug zip --concurrency=1 --cpu-profile-duration=1s " + os.DevNull) + if err != nil { + t.Fatal(err) + } + + // Strip any non-deterministic messages. + out = eraseNonDeterministicZipOutput(out) + + // We use datadriven simply to read the golden output file; we don't actually + // run any commands. Using datadriven allows TESTFLAGS=-rewrite. + datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "testzip_fallback"), func(t *testing.T, td *datadriven.TestData) string { + return out + }) +} + // This tests the operation of redacted zip over secure clusters. func TestZipRedacted(t *testing.T) { defer leaktest.AfterTest(t)() @@ -801,7 +841,7 @@ func TestZipRetries(t *testing.T) { sqlConn, "test", `generate_series(1,15000) as t(x)`, - `select if(x<11000,x,crdb_internal.force_retry('1h')) from generate_series(1,15000) as t(x)`, + TableQuery{query: `select if(x<11000,x,crdb_internal.force_retry('1h')) from generate_series(1,15000) as t(x)`}, ); err != nil { t.Fatal(err) } @@ -830,6 +870,95 @@ test/generate_series(1,15000) as t(x).4.json.err.txt assert.Equal(t, expected, fileList.String()) } +// This checks that SQL retry errors are properly handled. +func TestZipFallback(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s := serverutils.StartServerOnly(t, base.TestServerArgs{Insecure: true}) + defer s.Stopper().Stop(context.Background()) + + dir, cleanupFn := testutils.TempDir(t) + defer cleanupFn() + + zipName := filepath.Join(dir, "test.zip") + + func() { + out, err := os.Create(zipName) + if err != nil { + t.Fatal(err) + } + z := newZipper(out) + defer func() { + if err := z.close(); err != nil { + t.Fatal(err) + } + }() + + // Lower the buffer size so that an error is returned when running the + // generate_series query. + sqlURL := url.URL{ + Scheme: "postgres", + User: url.User(username.RootUser), + Host: s.AdvSQLAddr(), + } + sqlConn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, sqlURL.String()) + defer func() { + if err := sqlConn.Close(); err != nil { + t.Fatal(err) + } + }() + + zr := zipCtx.newZipReporter("test") + zr.sqlOutputFilenameExtension = "json" + zc := debugZipContext{ + z: z, + clusterPrinter: zr, + timeout: 3 * time.Second, + } + if err := zc.dumpTableDataForZip( + zr, + sqlConn, + "test", + `test_table_fail`, + TableQuery{ + query: `SELECT blah`, + }, + ); err != nil { + t.Fatal(err) + } + if err := zc.dumpTableDataForZip( + zr, + sqlConn, + "test", + `test_table_succeed`, + TableQuery{ + query: `SELECT blah`, + fallback: `SELECT 1`, + }, + ); err != nil { + t.Fatal(err) + } + }() + + r, err := zip.OpenReader(zipName) + if err != nil { + t.Fatal(err) + } + defer func() { _ = r.Close() }() + var fileList bytes.Buffer + for _, f := range r.File { + fmt.Fprintln(&fileList, f.Name) + } + const expected = `test/test_table_fail.json +test/test_table_fail.json.err.txt +test/test_table_succeed.json +test/test_table_succeed.json.err.txt +test/test_table_succeed.fallback.json +` + assert.Equal(t, expected, fileList.String()) +} + // This test the operation of zip over secure clusters. func TestToHex(t *testing.T) { defer leaktest.AfterTest(t)() From 84cf77d5c9d3082dc89fa533bc534f4e86f2693d Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Thu, 27 Jun 2024 18:05:55 -0400 Subject: [PATCH 2/9] ui: alter role events render correctly Previously, ALTER ROLE events without role options would render with an "undefined" option in the event log on the DB Console. This change amends the rendering logic to correctly render events without any options. Resolves #124871 Epic: None Release note (bug fix,ui change): ALTER ROLE events in the DB Console event log now render correctly when the event does not contain any role options. --- .../db-console/src/util/events.spec.ts | 41 ++++++++++++++++++- .../workspaces/db-console/src/util/events.ts | 6 ++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/pkg/ui/workspaces/db-console/src/util/events.spec.ts b/pkg/ui/workspaces/db-console/src/util/events.spec.ts index 1aa9576860a3..5c4fbb12fe28 100644 --- a/pkg/ui/workspaces/db-console/src/util/events.spec.ts +++ b/pkg/ui/workspaces/db-console/src/util/events.spec.ts @@ -8,7 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -import { EventInfo, getDroppedObjectsText } from "src/util/events"; +import { api as clusterUiApi } from "@cockroachlabs/cluster-ui"; + +import {EventInfo, getDroppedObjectsText, getEventDescription} from "src/util/events"; describe("getDroppedObjectsText", function () { // The key indicating which objects were dropped in a DROP_DATABASE event has been @@ -42,3 +44,40 @@ describe("getDroppedObjectsText", function () { }); }); }); + +describe("getEventDescription", function () { + it("ignores the options field when empty for role changes", function () { + interface TestCase { + event: Partial; + expected: string; + } + const tcs: TestCase[] = [ + { + event: { + eventType: "alter_role", + info: '{"User": "abc", "RoleName": "123"}', + }, + expected: "Role Altered: User abc altered role 123", + }, + { + event: { + eventType: "alter_role", + info: '{"User": "abc", "RoleName": "123", "Options": []}', + }, + expected: "Role Altered: User abc altered role 123", + }, + { + event: { + eventType: "alter_role", + info: '{"User": "abc", "RoleName": "123", "Options": ["o1", "o2"]}', + }, + expected: "Role Altered: User abc altered role 123 with options o1,o2", + }, + ]; + tcs.forEach(tc => { + expect(getEventDescription(tc.event as clusterUiApi.EventColumns)).toEqual( + tc.expected, + ); + }); + }); +}); diff --git a/pkg/ui/workspaces/db-console/src/util/events.ts b/pkg/ui/workspaces/db-console/src/util/events.ts index 00ac42bff2a3..1bcd1c9a0222 100644 --- a/pkg/ui/workspaces/db-console/src/util/events.ts +++ b/pkg/ui/workspaces/db-console/src/util/events.ts @@ -179,7 +179,11 @@ export function getEventDescription(e: clusterUiApi.EventColumns): string { case eventTypes.DROP_ROLE: return `Role Dropped: User ${info.User} dropped role ${info.RoleName}`; case eventTypes.ALTER_ROLE: - return `Role Altered: User ${info.User} altered role ${info.RoleName} with options ${info.Options}`; + if (info.Options && info.Options.length > 0) { + return `Role Altered: User ${info.User} altered role ${info.RoleName} with options ${info.Options}`; + } else { + return `Role Altered: User ${info.User} altered role ${info.RoleName}`; + } case eventTypes.IMPORT: return `Import Job: User ${info.User} has a job ${info.JobID} running with status ${info.Status}`; case eventTypes.RESTORE: From 0c15a91c1014b977c675fe2905857318aea12ecf Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Mon, 1 Jul 2024 17:07:43 -0400 Subject: [PATCH 3/9] sql: unskip Insights test This test has been flaky for a while because of the async tagging of the TransactionID to the insight that somtimes takes too long to complete. This change removes that check and unskips the test so that we can catch regressions for this feature. In the future we may want to write a separate test to verify the async transactionID tagging separately. Resolves: #125771 Resolves: #121986 Epic: None Release note: None --- pkg/sql/sqlstats/insights/integration/insights_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index 7053728ff870..4cdccd39433a 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -696,8 +696,6 @@ func TestInsightsIntegrationForContention(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 121986) - // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) ctx := context.Background() settings := cluster.MakeTestingClusterSettings() @@ -766,13 +764,11 @@ func TestInsightsIntegrationForContention(t *testing.T) { // lookup this id will result in the resolver potentially missing the event. txnIDCache := tc.ApplicationLayer(0).SQLServer().(*sql.Server).GetTxnIDCache() txnIDCache.DrainWriteBuffer() - var expectedWaitingTxnFingerprintID appstatspb.TransactionFingerprintID testutils.SucceedsSoon(t, func() error { waitingTxnFingerprintID, ok := txnIDCache.Lookup(waitingTxnID) if !ok || waitingTxnFingerprintID == appstatspb.InvalidTransactionFingerprintID { return fmt.Errorf("waiting txn fingerprint not found in cache") } - expectedWaitingTxnFingerprintID = waitingTxnFingerprintID return nil }) @@ -849,11 +845,6 @@ func TestInsightsIntegrationForContention(t *testing.T) { continue } - if waitingTxnFingerprintID == "0000000000000000" || waitingTxnFingerprintID == "" { - lastErr = fmt.Errorf("expected waitingTxnFingerprintID to be %d, but got %s. \nScanned row: \n%s", expectedWaitingTxnFingerprintID, waitingTxnFingerprintID, prettyPrintRow) - continue - } - foundRow = true break } From 7d4d517ebe2162e33566a2efff29fc37878a4b83 Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Mon, 1 Jul 2024 11:20:35 -0400 Subject: [PATCH 4/9] status: fix TestTenantStatusAPI test Previously, this test would use a single connection, cancel it, and then use the connection to verify the cancellation. The test is adjusted here to use two separate sessions, one to cancel for testing, and another to observe the cancellation. Resolves: #125404 Epic: None Release note: None --- .../serverccl/statusccl/tenant_status_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 2b8f857e7746..641353a1884c 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -1102,8 +1102,16 @@ func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { func testTenantStatusCancelSession(t *testing.T, helper serverccl.TenantTestHelper) { // Open a SQL session on tenant SQL pod 0. - sqlPod0 := helper.TestCluster().TenantConn(0) - sqlPod0.Exec(t, "SELECT 1") + ctx := context.Background() + // Open two different SQL sessions on tenant SQL pod 0. + sqlPod0 := helper.TestCluster().TenantDB(0) + sqlPod0SessionToCancel, err := sqlPod0.Conn(ctx) + require.NoError(t, err) + sqlPod0SessionForIntrospection, err := sqlPod0.Conn(ctx) + require.NoError(t, err) + _, err = sqlPod0SessionToCancel.ExecContext(ctx, "SELECT 1") + require.NoError(t, err) + introspectionRunner := sqlutils.MakeSQLRunner(sqlPod0SessionForIntrospection) // See the session over HTTP on tenant SQL pod 1. httpPod1 := helper.TestCluster().TenantAdminHTTPClient(t, 1) @@ -1122,7 +1130,7 @@ func testTenantStatusCancelSession(t *testing.T, helper serverccl.TenantTestHelp // See the session over SQL on tenant SQL pod 0. sessionID := hex.EncodeToString(session.ID) require.Eventually(t, func() bool { - return strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID) + return strings.Contains(strings.Join(selectClusterSessionIDs(t, introspectionRunner), ","), sessionID) }, 5*time.Second, 100*time.Millisecond) // Cancel the session over HTTP from tenant SQL pod 1. @@ -1134,7 +1142,7 @@ func testTenantStatusCancelSession(t *testing.T, helper serverccl.TenantTestHelp // No longer see the session over SQL from tenant SQL pod 0. // (The SQL client maintains an internal connection pool and automatically reconnects.) require.Eventually(t, func() bool { - return !strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID) + return !strings.Contains(strings.Join(selectClusterSessionIDs(t, introspectionRunner), ","), sessionID) }, 5*time.Second, 100*time.Millisecond) // Attempt to cancel the session again over HTTP from tenant SQL pod 1, so that we can see the error message. From 96aea875e2d41a92fef68b0477240fdf502fccad Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Mon, 1 Jul 2024 22:09:57 -0400 Subject: [PATCH 5/9] kvserver: wrap kvpb.RangeFeedEventSink in Stream Previously, we declared the same interface signature twice: once in kvpb.RangeFeedEventSink and again in rangefeed.Stream. This patch embeds kvpb.RangeFeedEventSink inside rangefeed.Stream, making rangefeed.Stream a superset of kvpb.RangeFeedEventSink. This approach makes sense, as each rangefeed server stream should be a rangefeed event sink, capable of making thread-safe rangefeed event sends. Epic: none Release note: none --- pkg/kv/kvpb/api.go | 4 ++++ pkg/kv/kvserver/rangefeed/registry.go | 6 +----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 9180a09107d6..6dc08c93fcd2 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2518,7 +2518,11 @@ func (s *ScanStats) String() string { // RangeFeedEventSink is an interface for sending a single rangefeed event. type RangeFeedEventSink interface { + // Context returns the context for this stream. Context() context.Context + // Send blocks until it sends the RangeFeedEvent, the stream is done, or the + // stream breaks. Send must be safe to call on the same stream in different + // goroutines. Send(*RangeFeedEvent) error } diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index a65cbf36a3fe..532485616962 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -30,11 +30,7 @@ import ( // Stream is a object capable of transmitting RangeFeedEvents. type Stream interface { - // Context returns the context for this stream. - Context() context.Context - // Send blocks until it sends m, the stream is done, or the stream breaks. - // Send must be safe to call on the same stream in different goroutines. - Send(*kvpb.RangeFeedEvent) error + kvpb.RangeFeedEventSink } // Shared event is an entry stored in registration channel. Each entry is From 2df6a9c2d863424884eaaf873ab902cf7bcb8c4c Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 28 Jun 2024 20:30:48 -0400 Subject: [PATCH 6/9] kvserver/rangefeed: remove lockedRangefeedStream Previously, we created separate locked rangefeed streams for each individual rangefeed stream to ensure Send can be called concurrently as the underlying grpc stream is not thread safe. However, since the introduction of the mux rangefeed support, we already have a dedicated lock for the underlying mux stream, making the Send method on each rangefeed stream thread safe already. This patch removes the redundant locks from each individual rangefeed stream. Epic: none Release note: none --- .../rangefeed/rangefeed_external_test.go | 2 ++ pkg/kv/kvpb/api.go | 3 +++ .../client_replica_circuit_breaker_test.go | 2 ++ pkg/kv/kvserver/rangefeed/bench_test.go | 4 ++++ pkg/kv/kvserver/rangefeed/processor_test.go | 2 ++ pkg/kv/kvserver/rangefeed/registry_test.go | 2 ++ pkg/kv/kvserver/replica_rangefeed.go | 23 +------------------ pkg/kv/kvserver/replica_rangefeed_test.go | 2 ++ pkg/rpc/context_test.go | 5 ++++ pkg/server/node.go | 16 ++++++++----- pkg/sql/flowinfra/flow_registry_test.go | 2 ++ 11 files changed, 35 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e5ba37ce8890..5faedb453121 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -1638,6 +1638,8 @@ func (c *channelSink) Context() context.Context { return c.ctx } +func (c *channelSink) SendIsThreadSafe() {} + func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error { select { case c.ch <- e: diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 6dc08c93fcd2..0507872f9575 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2524,6 +2524,9 @@ type RangeFeedEventSink interface { // stream breaks. Send must be safe to call on the same stream in different // goroutines. Send(*RangeFeedEvent) error + // SendIsThreadSafe is a no-op declaration method. It is a contract that the + // interface has a thread-safe Send method. + SendIsThreadSafe() } // RangeFeedEventProducer is an adapter for receiving rangefeed events with either diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 44abe30267e1..a4cb5c810eba 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -456,6 +456,8 @@ func (s *dummyStream) Context() context.Context { return s.ctx } +func (s *dummyStream) SendIsThreadSafe() {} + func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error { if ev.Val == nil && ev.Error == nil { return nil diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index 50a2c39cdfec..af59ca71bf5e 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -207,3 +207,7 @@ func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { s.events++ return nil } + +// Note that Send itself is not thread-safe, but it is written to be used only +// in a single threaded environment in this test, ensuring thread-safety. +func (s *noopStream) SendIsThreadSafe() {} diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 954b179ebcb2..b0e30c67c112 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1811,6 +1811,8 @@ func newConsumer(blockAfter int) *consumer { } } +func (c *consumer) SendIsThreadSafe() {} + func (c *consumer) Send(e *kvpb.RangeFeedEvent) error { if e.Val != nil { v := int(atomic.AddInt32(&c.sentValues, 1)) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index f3e43fc36a89..4b3e76dede3a 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -65,6 +65,8 @@ func (s *testStream) Cancel() { s.ctxDone() } +func (s *testStream) SendIsThreadSafe() {} + func (s *testStream) Send(e *kvpb.RangeFeedEvent) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e11ca36b9a9f..6e6f5475a505 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -122,24 +121,6 @@ const defaultEventChanCap = 4096 var defaultEventChanTimeout = envutil.EnvOrDefaultDuration( "COCKROACH_RANGEFEED_SEND_TIMEOUT", 50*time.Millisecond) -// lockedRangefeedStream is an implementation of rangefeed.Stream which provides -// support for concurrent calls to Send. Note that the default implementation of -// grpc.Stream is not safe for concurrent calls to Send. -type lockedRangefeedStream struct { - wrapped kvpb.RangeFeedEventSink - sendMu syncutil.Mutex -} - -func (s *lockedRangefeedStream) Context() context.Context { - return s.wrapped.Context() -} - -func (s *lockedRangefeedStream) Send(e *kvpb.RangeFeedEvent) error { - s.sendMu.Lock() - defer s.sendMu.Unlock() - return s.wrapped.Send(e) -} - // rangefeedTxnPusher is a shim around intentResolver that implements the // rangefeed.TxnPusher interface. type rangefeedTxnPusher struct { @@ -294,8 +275,6 @@ func (r *Replica) RangeFeed( checkTS = r.Clock().Now() } - lockedStream := &lockedRangefeedStream{wrapped: stream} - // If we will be using a catch-up iterator, wait for the limiter here before // locking raftMu. usingCatchUpIter := false @@ -352,7 +331,7 @@ func (r *Replica) RangeFeed( var done future.ErrorFuture p := r.registerWithRangefeedRaftMuLocked( - ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, lockedStream, &done, + ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, omitRemote, stream, &done, ) r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 08d188b65849..2ba4716d6d71 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -79,6 +79,8 @@ func (s *testStream) Cancel() { s.cancel() } +func (s *testStream) SendIsThreadSafe() {} + func (s *testStream) Send(e *kvpb.RangeFeedEvent) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index f16f56c683e5..958cb29dc738 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -288,6 +288,11 @@ func (s *rangefeedEventSink) Context() context.Context { return s.ctx } +// Note that Send itself is not thread-safe (grpc stream is not thread-safe), +// but tests were written in a way that sends sequentially, ensuring +// thread-safety for Send. +func (s *rangefeedEventSink) SendIsThreadSafe() {} + func (s *rangefeedEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.stream.Send(&kvpb.MuxRangeFeedEvent{RangeFeedEvent: *event}) } diff --git a/pkg/server/node.go b/pkg/server/node.go index 7021625a552b..c6e587f05857 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1832,10 +1832,10 @@ func (n *Node) RangeLookup( return resp, nil } -// setRangeIDEventSink annotates each response with range and stream IDs. -// This is used by MuxRangeFeed. -// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and -// the old style RangeFeed deprecated. +// setRangeIDEventSink is an implementation of rangefeed.Stream which annotates +// each response with rangeID and streamID. It is used by MuxRangeFeed. Note +// that the wrapped stream is a locked mux stream, ensuring safe concurrent Send +// calls. type setRangeIDEventSink struct { ctx context.Context cancel context.CancelFunc @@ -1857,10 +1857,14 @@ func (s *setRangeIDEventSink) Send(event *kvpb.RangeFeedEvent) error { return s.wrapped.Send(response) } +// Wrapped stream is a locked mux stream, ensuring safe concurrent Send. +func (s *setRangeIDEventSink) SendIsThreadSafe() {} + var _ kvpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) -// lockedMuxStream provides support for concurrent calls to Send. -// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send. +// lockedMuxStream provides support for concurrent calls to Send. The underlying +// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to +// Send. type lockedMuxStream struct { wrapped kvpb.Internal_MuxRangeFeedServer sendMu syncutil.Mutex diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index 877158806323..e48a1197814b 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -717,6 +717,8 @@ type delayedErrorServerStream struct { err error } +func (s *delayedErrorServerStream) SendIsThreadSafe() {} + func (s *delayedErrorServerStream) Send(*execinfrapb.ConsumerSignal) error { s.rpcCalledCh <- struct{}{} <-s.delayCh From 9d963ace140fb44e33b377dc965459454decc240 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 28 Jun 2024 20:31:00 -0400 Subject: [PATCH 7/9] kvserver/rangefeed: remove non-mux rangefeed metrics Previously, we removed non-mux rangefeed code in https://github.com/cockroachdb/cockroach/pull/125610. However, that patch forgot to remove non-mux rangefeed metrics. This patch removes these metrics as they are no longer needed. Epic: none Release note: none --- docs/generated/metrics/metrics.html | 2 -- pkg/server/node.go | 16 ---------------- 2 files changed, 18 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 6539bf87c290..518a4dfafa35 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -629,8 +629,6 @@ STORAGErpc.method.writebatch.recvNumber of WriteBatch requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGErpc.streams.mux_rangefeed.activeNumber of currently running MuxRangeFeed streamsStreamsGAUGECOUNTAVGNONE STORAGErpc.streams.mux_rangefeed.recvTotal number of MuxRangeFeed streamsStreamsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE -STORAGErpc.streams.rangefeed.activeNumber of currently running RangeFeed streamsStreamsGAUGECOUNTAVGNONE -STORAGErpc.streams.rangefeed.recvTotal number of RangeFeed streamsStreamsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGEspanconfig.kvsubscriber.oldest_protected_record_nanosDifference between the current time and the oldest protected timestamp (sudden drops indicate a record being released; an ever increasing number indicates that the oldest record is around and preventing GC if > configured GC TTL)NanosecondsGAUGENANOSECONDSAVGNONE STORAGEspanconfig.kvsubscriber.protected_record_countNumber of protected timestamp records, as seen by KVRecordsGAUGECOUNTAVGNONE STORAGEspanconfig.kvsubscriber.update_behind_nanosDifference between the current time and when the KVSubscriber received its last update (an ever increasing number indicates that we're no longer receiving updates)NanosecondsGAUGENANOSECONDSAVGNONE diff --git a/pkg/server/node.go b/pkg/server/node.go index 7021625a552b..3466d7c90516 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -191,18 +191,6 @@ This metric is thus not an indicator of KV health.`, Measurement: "Bytes", Unit: metric.Unit_BYTES, } - metaActiveRangeFeed = metric.Metadata{ - Name: "rpc.streams.rangefeed.active", - Help: `Number of currently running RangeFeed streams`, - Measurement: "Streams", - Unit: metric.Unit_COUNT, - } - metaTotalRangeFeed = metric.Metadata{ - Name: "rpc.streams.rangefeed.recv", - Help: `Total number of RangeFeed streams`, - Measurement: "Streams", - Unit: metric.Unit_COUNT, - } metaActiveMuxRangeFeed = metric.Metadata{ Name: "rpc.streams.mux_rangefeed.active", Help: `Number of currently running MuxRangeFeed streams`, @@ -270,8 +258,6 @@ type nodeMetrics struct { CrossRegionBatchResponseBytes *metric.Counter CrossZoneBatchRequestBytes *metric.Counter CrossZoneBatchResponseBytes *metric.Counter - NumRangeFeed *metric.Counter - ActiveRangeFeed *metric.Gauge NumMuxRangeFeed *metric.Counter ActiveMuxRangeFeed *metric.Gauge } @@ -293,8 +279,6 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe CrossRegionBatchResponseBytes: metric.NewCounter(metaCrossRegionBatchResponse), CrossZoneBatchRequestBytes: metric.NewCounter(metaCrossZoneBatchRequest), CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse), - ActiveRangeFeed: metric.NewGauge(metaActiveRangeFeed), - NumRangeFeed: metric.NewCounter(metaTotalRangeFeed), ActiveMuxRangeFeed: metric.NewGauge(metaActiveMuxRangeFeed), NumMuxRangeFeed: metric.NewCounter(metaTotalMuxRangeFeed), } From 41c2a01fc5485b7ced208b630f5a0bce42ea7acc Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jul 2024 00:38:32 -0400 Subject: [PATCH 8/9] roachpb: add Leader lease type definition Fixes #125225. This commit adds a new Term field to the Lease struct. This field defines the term of the raft leader that a leader lease is associated with. The lease is valid for as long as the raft leader has a guarantee from store liveness that it remains the leader under this term. The lease is invalid if the raft leader loses leadership (i.e. changes its term). The field is not yet used. Release note: None --- pkg/roachpb/data.go | 79 +++++++++++++++++++++++++++++++++------- pkg/roachpb/data.proto | 25 ++++++++++--- pkg/roachpb/data_test.go | 54 ++++++++++++++++++++++++++- 3 files changed, 137 insertions(+), 21 deletions(-) diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index f9ec1f5baca9..6f7044b8af0d 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1853,10 +1853,15 @@ func (l Lease) SafeFormat(w redact.SafePrinter, _ rune) { return } w.Printf("repl=%s seq=%d start=%s", l.Replica, l.Sequence, l.Start) - if l.Type() == LeaseExpiration { + switch l.Type() { + case LeaseExpiration: w.Printf(" exp=%s", l.Expiration) - } else { + case LeaseEpoch: w.Printf(" epo=%d min-exp=%s", l.Epoch, l.MinExpiration) + case LeaseLeader: + w.Printf(" term=%d min-exp=%s", l.Term, l.MinExpiration) + default: + panic("unexpected lease type") } w.Printf(" pro=%s", l.ProposedTS) } @@ -1883,14 +1888,23 @@ const ( // LeaseEpoch allows range operations while the node liveness epoch // is equal to the lease epoch. LeaseEpoch + // LeaseLeader allows range operations while the replica is guaranteed + // to be the range's raft leader. + LeaseLeader ) // Type returns the lease type. func (l Lease) Type() LeaseType { - if l.Epoch == 0 { - return LeaseExpiration + if l.Epoch != 0 && l.Term != 0 { + panic("lease cannot have both epoch and term") + } + if l.Epoch != 0 { + return LeaseEpoch } - return LeaseEpoch + if l.Term != 0 { + return LeaseLeader + } + return LeaseExpiration } // Speculative returns true if this lease instance doesn't correspond to a @@ -1914,7 +1928,9 @@ func (l Lease) Speculative() bool { // expToEpochEquiv indicates whether an expiration-based lease // can be considered equivalent to an epoch-based lease during // a promotion from expiration-based to epoch-based. It is used -// for mixed-version compatibility. +// for mixed-version compatibility. No such flag is needed for +// expiration-based to leader lease promotion, because there is +// no need for mixed-version compatibility. // // NB: Lease.Equivalent is NOT symmetric. For expiration-based // leases, a lease is equivalent to another with an equal or @@ -1935,14 +1951,14 @@ func (l Lease) Speculative() bool { // times are the same, the leases could turn out to be non-equivalent -- in // that case they will share a start time but not the sequence. // -// NB: we do not allow transitions from epoch-based or leader leases (not -// yet implemented) to expiration-based leases to be equivalent. This was -// because both of the former lease types don't have an expiration in the -// lease, while the latter does. We can introduce safety violations by -// shortening the lease expiration if we allow this transition, since the -// new lease may not apply at the leaseholder until much after it applies at -// some other replica, so the leaseholder may continue acting as one based -// on an old lease, while the other replica has stepped up as leaseholder. +// NB: we do not allow transitions from epoch-based or leader leases to +// expiration-based leases to be equivalent. This was because both of the +// former lease types don't have an expiration in the lease, while the +// latter does. We can introduce safety violations by shortening the lease +// expiration if we allow this transition, since the new lease may not apply +// at the leaseholder until much after it applies at some other replica, so +// the leaseholder may continue acting as one based on an old lease, while +// the other replica has stepped up as leaseholder. func (l Lease) Equivalent(newL Lease, expToEpochEquiv bool) bool { // Ignore proposed timestamp & deprecated start stasis. l.ProposedTS, newL.ProposedTS = hlc.ClockTimestamp{}, hlc.ClockTimestamp{} @@ -1977,6 +1993,17 @@ func (l Lease) Equivalent(newL Lease, expToEpochEquiv bool) bool { if l.MinExpiration.LessEq(newL.MinExpiration) { l.MinExpiration, newL.MinExpiration = hlc.Timestamp{}, hlc.Timestamp{} } + + case LeaseLeader: + if l.Term == newL.Term { + l.Term, newL.Term = 0, 0 + } + // For leader leases, extensions to the minimum expiration are considered + // equivalent. + if l.MinExpiration.LessEq(newL.MinExpiration) { + l.MinExpiration, newL.MinExpiration = hlc.Timestamp{}, hlc.Timestamp{} + } + case LeaseExpiration: switch newL.Type() { case LeaseEpoch: @@ -1999,6 +2026,27 @@ func (l Lease) Equivalent(newL Lease, expToEpochEquiv bool) bool { newL.MinExpiration = hlc.Timestamp{} } + case LeaseLeader: + // An expiration-based lease being promoted to a leader lease. This + // transition occurs after a successful lease transfer if the setting + // kv.transfer_expiration_leases_first.enabled is enabled and leader + // leases are in use. + // + // Expiration-based leases carry a local expiration timestamp. Leader + // leases extend their expiration indirectly through the leadership + // fortification protocol and associated Store Liveness heartbeats. We + // assume that this promotion is only proposed if the leader support + // expiration (and associated min expiration) is equal to or later than + // previous expiration carried by the expiration-based lease. This is a + // case where Equivalent is not commutative, as the reverse transition + // (from leader lease to expiration-based) requires a sequence increment. + // + // Ignore expiration, term, and min expiration. The remaining fields + // which are compared are Replica and Start. + l.Expiration = nil + newL.Term = 0 + newL.MinExpiration = hlc.Timestamp{} + case LeaseExpiration: // See the comment above, though this field's nullability wasn't // changed. We nil it out for completeness only. @@ -2090,6 +2138,9 @@ func (l *Lease) Equal(that interface{}) bool { if !l.MinExpiration.Equal(&that1.MinExpiration) { return false } + if l.Term != that1.Term { + return false + } return true } diff --git a/pkg/roachpb/data.proto b/pkg/roachpb/data.proto index 718ab027f73d..b3b5217c9f8a 100644 --- a/pkg/roachpb/data.proto +++ b/pkg/roachpb/data.proto @@ -704,9 +704,13 @@ message Lease { (gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; - // The epoch of the lease holder's node liveness entry. This field is only set - // for epoch-based leases. If this value is non-zero, the expiration field and - // the term field (not yet implemented) are ignored. + // The epoch of the lease holder's node liveness record. The lease inherits + // the expiration of the node liveness record for as long as the node liveness + // record retains this epoch. The lease is invalid if the node liveness record + // is updated with a different epoch. + // + // This field is only set for epoch-based leases. If this value is non-zero, + // the expiration field and the term field must not be set. int64 epoch = 6; // A zero-indexed sequence number which is incremented during the acquisition @@ -726,13 +730,22 @@ message Lease { // The minimum expiration at which the lease expires, independent of any other // expiry condition. This field can be used to place a floor on the expiration - // for epoch-based leases and leader leases (not yet implemented) to prevent - // expiration regressions when upgrading from an expiration-based lease. It is - // not supported for expiration-based leases. + // for epoch-based leases and leader leases to prevent expiration regressions + // when upgrading from an expiration-based lease. It is not supported for + // expiration-based leases. // // Like expiration above, this is an exclusive value, i.e. the lease is valid // in [start, max(min_expiration, )). util.hlc.Timestamp min_expiration = 9 [(gogoproto.nullable) = false]; + + // The term of the raft leader that a leader lease is associated with. The + // lease is valid for as long as the raft leader has a guarantee from store + // liveness that it remains the leader under this term. The lease is invalid + // if the raft leader loses leadership (i.e. changes its term). + // + // This field is only set for leader leases. If non-zero, the expiration field + // and the epoch field must not be set. + uint64 term = 10; } // AbortSpanEntry contains information about a transaction which has diff --git a/pkg/roachpb/data_test.go b/pkg/roachpb/data_test.go index ff594f6f1196..101a7c34fd72 100644 --- a/pkg/roachpb/data_test.go +++ b/pkg/roachpb/data_test.go @@ -1121,6 +1121,22 @@ func TestLeaseStringAndSafeFormat(t *testing.T) { }, exp: "repl=(n1,s1):1 seq=3 start=0.000000001,1 epo=4 min-exp=0.000000002,1 pro=0.000000001,0", }, + { + name: "leader", + lease: Lease{ + Replica: ReplicaDescriptor{ + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + Start: makeClockTS(1, 1), + ProposedTS: makeClockTS(1, 0), + Sequence: 3, + MinExpiration: makeTS(2, 1), + Term: 5, + }, + exp: "repl=(n1,s1):1 seq=3 start=0.000000001,1 term=5 min-exp=0.000000002,1 pro=0.000000001,0", + }, } { t.Run(tc.name, func(t *testing.T) { // String. @@ -1133,6 +1149,13 @@ func TestLeaseStringAndSafeFormat(t *testing.T) { } } +func TestLeaseType(t *testing.T) { + require.Equal(t, LeaseExpiration, Lease{}.Type()) + require.Equal(t, LeaseEpoch, Lease{Epoch: 1}.Type()) + require.Equal(t, LeaseLeader, Lease{Term: 1}.Type()) + require.Panics(t, func() { Lease{Epoch: 1, Term: 1}.Type() }) +} + func TestLeaseEquivalence(t *testing.T) { r1 := ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1} r2 := ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2} @@ -1150,6 +1173,11 @@ func TestLeaseEquivalence(t *testing.T) { expire1TS2 := Lease{Replica: r1, Start: ts2, Expiration: ts2.ToTimestamp().Clone()} expire2 := Lease{Replica: r1, Start: ts1, Expiration: ts3.ToTimestamp().Clone()} expire2R2TS2 := Lease{Replica: r2, Start: ts2, Expiration: ts3.ToTimestamp().Clone()} + leader1 := Lease{Replica: r1, Start: ts1, Term: 1} + leader1R2 := Lease{Replica: r2, Start: ts1, Term: 1} + leader1TS2 := Lease{Replica: r1, Start: ts2, Term: 1} + leader2 := Lease{Replica: r1, Start: ts1, Term: 2} + leader2R2TS2 := Lease{Replica: r2, Start: ts2, Term: 2} proposed1 := Lease{Replica: r1, Start: ts1, Epoch: 1, ProposedTS: ts1} proposed2 := Lease{Replica: r1, Start: ts1, Epoch: 2, ProposedTS: ts1} @@ -1167,6 +1195,9 @@ func TestLeaseEquivalence(t *testing.T) { epoch1MinExp2 := Lease{Replica: r1, Start: ts1, Epoch: 1, MinExpiration: ts2.ToTimestamp()} epoch1MinExp3 := Lease{Replica: r1, Start: ts1, Epoch: 1, MinExpiration: ts3.ToTimestamp()} epoch2MinExp2 := Lease{Replica: r1, Start: ts1, Epoch: 2, MinExpiration: ts2.ToTimestamp()} + leader1MinExp2 := Lease{Replica: r1, Start: ts1, Term: 1, MinExpiration: ts2.ToTimestamp()} + leader1MinExp3 := Lease{Replica: r1, Start: ts1, Term: 1, MinExpiration: ts3.ToTimestamp()} + leader2MinExp2 := Lease{Replica: r1, Start: ts1, Term: 2, MinExpiration: ts2.ToTimestamp()} testCases := []struct { l, ol Lease @@ -1174,6 +1205,7 @@ func TestLeaseEquivalence(t *testing.T) { }{ {epoch1, epoch1, true}, // same epoch lease {expire1, expire1, true}, // same expiration lease + {leader1, leader1, true}, // same leader lease {epoch1, epoch1R2, false}, // different epoch leases {epoch1, epoch1TS2, false}, // different epoch leases {epoch1, epoch2, false}, // different epoch leases @@ -1183,12 +1215,20 @@ func TestLeaseEquivalence(t *testing.T) { {expire1, expire2R2TS2, false}, // different expiration leases {expire1, expire2, true}, // same expiration lease, extended {expire2, expire1, false}, // same expiration lease, extended but backwards + {leader1, leader1R2, false}, // different leader leases + {leader1, leader1TS2, false}, // different leader leases + {leader1, leader2, false}, // different leader leases + {leader1, leader2R2TS2, false}, // different leader leases {epoch1, expire1, false}, // epoch and expiration leases, same replica and start time {epoch1, expire1R2, false}, // epoch and expiration leases, different replica {epoch1, expire1TS2, false}, // epoch and expiration leases, different start time {expire1, epoch1, true}, // expiration and epoch leases, same replica and start time {expire1, epoch1R2, false}, // expiration and epoch leases, different replica {expire1, epoch1TS2, false}, // expiration and epoch leases, different start time + {epoch1, leader1, false}, // epoch and leader leases, same replica and start time + {leader1, epoch1, false}, // leader and epoch leases, same replica and start time + {expire1, leader1, true}, // expiration and leader leases, same replica and start time + {leader1, expire1, false}, // leader and expiration leases, same replica and start time {proposed1, proposed1, true}, // exact leases with identical timestamps {proposed1, proposed2, false}, // same proposed timestamps, but diff epochs {proposed1, proposed3, true}, // different proposed timestamps, same lease @@ -1196,7 +1236,7 @@ func TestLeaseEquivalence(t *testing.T) { {epoch1, epoch1Voter, true}, // same epoch lease, different replica type {epoch1, epoch1Learner, true}, // same epoch lease, different replica type {epoch1Voter, epoch1Learner, true}, // same epoch lease, different replica type - // Test minimum expiration. + // Test minimum expiration with epoch leases. {epoch1, epoch1MinExp2, true}, // different epoch leases, newer min expiration {epoch1, epoch1MinExp3, true}, // different epoch leases, newer min expiration {epoch1MinExp2, epoch1, false}, // different epoch leases, older min expiration @@ -1206,6 +1246,16 @@ func TestLeaseEquivalence(t *testing.T) { {epoch1MinExp3, epoch1MinExp2, false}, // different epoch leases, older min expiration {epoch1MinExp3, epoch1MinExp3, true}, // same epoch leases, same min expiration {epoch1MinExp2, epoch2MinExp2, false}, // different epoch leases + // Test minimum expiration with leader leases. + {leader1, leader1MinExp2, true}, // different leader leases, newer min expiration + {leader1, leader1MinExp3, true}, // different leader leases, newer min expiration + {leader1MinExp2, leader1, false}, // different leader leases, older min expiration + {leader1MinExp2, leader1MinExp2, true}, // same leader leases, same min expiration + {leader1MinExp2, leader1MinExp3, true}, // different leader leases, newer min expiration + {leader1MinExp3, leader1, false}, // different leader leases, older min expiration + {leader1MinExp3, leader1MinExp2, false}, // different leader leases, older min expiration + {leader1MinExp3, leader1MinExp3, true}, // same leader leases, same min expiration + {leader1MinExp2, leader2MinExp2, false}, // different leader leases } for i, tc := range testCases { @@ -1257,6 +1307,7 @@ func TestLeaseEqual(t *testing.T) { Sequence LeaseSequence AcquisitionType LeaseAcquisitionType MinExpiration hlc.Timestamp + Term uint64 } // Verify that the lease structure does not change unexpectedly. If a compile // error occurs on the following line of code, update the expectedLease @@ -1312,6 +1363,7 @@ func TestLeaseEqual(t *testing.T) { {Sequence: 1}, {AcquisitionType: 1}, {MinExpiration: ts}, + {Term: 1}, } for _, c := range testCases { t.Run("", func(t *testing.T) { From 393d1ee380260133fa3b860ebdc973ff51093df5 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 1 Jul 2024 23:36:35 -0400 Subject: [PATCH 9/9] kv: hook Raft StoreLiveness into storeliveness package Fixes #125242. This commit adds a `replicaRLockedStoreLiveness` adapter type to hook the raft store liveness into the storeliveness package. This is currently unused. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 4 ++ pkg/kv/kvserver/replica_init.go | 3 +- pkg/kv/kvserver/replica_store_liveness.go | 75 ++++++++++++++++++++ pkg/kv/kvserver/store.go | 8 +++ pkg/raft/raftstoreliveness/store_liveness.go | 8 ++- 5 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 pkg/kv/kvserver/replica_store_liveness.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 30788d39ca78..12c45129b105 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -74,6 +74,7 @@ go_library( "replica_send.go", "replica_split_load.go", "replica_sst_snapshot_storage.go", + "replica_store_liveness.go", "replica_tscache.go", "replica_write.go", "replicate_queue.go", @@ -166,6 +167,8 @@ go_library( "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/split", "//pkg/kv/kvserver/stateloader", + "//pkg/kv/kvserver/storeliveness", + "//pkg/kv/kvserver/storeliveness/storelivenesspb", "//pkg/kv/kvserver/tenantrate", "//pkg/kv/kvserver/tscache", "//pkg/kv/kvserver/txnrecovery", @@ -176,6 +179,7 @@ go_library( "//pkg/multitenant/tenantcostmodel", "//pkg/raft", "//pkg/raft/raftpb", + "//pkg/raft/raftstoreliveness", "//pkg/raft/tracker", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 1ab02c394c19..a8c72337179c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -294,11 +294,12 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { ctx := r.AnnotateCtx(context.Background()) rg, err := raft.NewRawNode(newRaftConfig( ctx, - raft.Storage((*replicaRaftStorage)(r)), + (*replicaRaftStorage)(r), raftpb.PeerID(r.replicaID), r.mu.state.RaftAppliedIndex, r.store.cfg, &raftLogger{ctx: ctx}, + (*replicaRLockedStoreLiveness)(r), )) if err != nil { return err diff --git a/pkg/kv/kvserver/replica_store_liveness.go b/pkg/kv/kvserver/replica_store_liveness.go new file mode 100644 index 000000000000..d6d4c4aa4291 --- /dev/null +++ b/pkg/kv/kvserver/replica_store_liveness.go @@ -0,0 +1,75 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// replicaRLockedStoreLiveness implements the raftstoreliveness.StoreLiveness +// interface. The interface methods assume that Replica.mu is held in read mode +// by their callers. +type replicaRLockedStoreLiveness Replica + +var _ raftstoreliveness.StoreLiveness = (*replicaRLockedStoreLiveness)(nil) + +func (r *replicaRLockedStoreLiveness) getStoreIdent(replicaID uint64) (slpb.StoreIdent, bool) { + r.mu.AssertRHeld() + desc, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(replicaID)) + if !ok { + return slpb.StoreIdent{}, false + } + return slpb.StoreIdent{NodeID: desc.NodeID, StoreID: desc.StoreID}, true +} + +// SupportFor implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportFor( + replicaID uint64, +) (raftstoreliveness.StoreLivenessEpoch, bool) { + storeID, ok := r.getStoreIdent(replicaID) + if !ok { + return 0, false + } + epoch, ok := r.store.storeLiveness.SupportFor(storeID) + if !ok { + return 0, false + } + return raftstoreliveness.StoreLivenessEpoch(epoch), true +} + +// SupportFrom implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportFrom( + replicaID uint64, +) (raftstoreliveness.StoreLivenessEpoch, hlc.Timestamp, bool) { + storeID, ok := r.getStoreIdent(replicaID) + if !ok { + return 0, hlc.Timestamp{}, false + } + epoch, exp, ok := r.store.storeLiveness.SupportFrom(storeID) + if !ok { + return 0, hlc.Timestamp{}, false + } + return raftstoreliveness.StoreLivenessEpoch(epoch), exp, true +} + +// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportFromEnabled() bool { + // TODO(nvanbenschoten): hook this up to a version check and cluster setting. + return false +} + +// SupportExpired implements the raftstoreliveness.StoreLiveness interface. +func (r *replicaRLockedStoreLiveness) SupportExpired(ts hlc.Timestamp) bool { + return ts.Less(r.store.Clock().Now()) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index aff99ad05f1f..c7300abaea80 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery" @@ -61,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -387,6 +389,7 @@ func newRaftConfig( appliedIndex kvpb.RaftIndex, storeCfg StoreConfig, logger raft.Logger, + storeLiveness raftstoreliveness.StoreLiveness, ) *raft.Config { return &raft.Config{ ID: id, @@ -402,6 +405,7 @@ func newRaftConfig( MaxInflightBytes: storeCfg.RaftMaxInflightBytes, Storage: strg, Logger: logger, + StoreLiveness: storeLiveness, // We only set this on replica initialization, so replicas without // StepDownOnRemoval may remain on 23.2 nodes until they restart. That's @@ -893,6 +897,7 @@ type Store struct { metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager + storeLiveness storeliveness.Fabric syncWaiter *logstore.SyncWaiterLoop raftEntryCache *raftentry.Cache limiters batcheval.Limiters @@ -2154,6 +2159,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { ) s.metrics.registry.AddMetricStruct(s.recoveryMgr.Metrics()) + // TODO(mira): create the store liveness support manager here. + // s.storeLiveness = ... + s.rangeIDAlloc = idAlloc now := s.cfg.Clock.Now() diff --git a/pkg/raft/raftstoreliveness/store_liveness.go b/pkg/raft/raftstoreliveness/store_liveness.go index fc37bfb913de..758101ba7070 100644 --- a/pkg/raft/raftstoreliveness/store_liveness.go +++ b/pkg/raft/raftstoreliveness/store_liveness.go @@ -64,7 +64,9 @@ type StoreLiveness interface { // active or not, which is what prompts the "SupportFrom" prefix. SupportFromEnabled() bool - // SupportInPast returns whether the supplied timestamp is before the current - // time. - SupportInPast(ts hlc.Timestamp) bool + // SupportExpired returns whether the supplied expiration timestamp is before + // the present time and has therefore expired. If the method returns false, + // the timestamp is still in the future and still provides support up to that + // point in time. + SupportExpired(ts hlc.Timestamp) bool }