From 7503b1a109e8951a7172657adea48dd5ee855cf9 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 9 Mar 2023 22:28:27 +0000 Subject: [PATCH 1/5] kvserver: make some cluster settings system only Update cluster settings in the `kv/kvserver` pkg to be `SystemOnly`. Previously, there were many cluster settings which which were `TenantWritable` or `TenantReadOnly`. These settings, even if altered by a tenant have no effect. There are settings which are not updated, due due to tests relying on modifying the setting value using a non-system tenant. We ignore these in this commit and defer to #98723 for handling these. These settings are updated to be `SystemOnly`: ``` kv.bulk_io_write.max_rate kv.bulk_sst.max_allowed_overage kv.bulk_sst.target_size kv.closed_timestamp.follower_reads_enabled kv.log_range_and_node_events.enabled kv.range_split.by_load_enabled kv.range_split.load_cpu_threshold kv.range_split.load_qps_threshold kv.replica_stats.addsst_request_size_factor kv.replication_reports.interval server.shutdown.lease_transfer_wait ``` Resolves: #98347 Release note (ops change): Some KV server cluster settings are now system only. These settings could previously be written or read by tenants, however writing to these settings had no effect. --- .../settings/settings-for-tenants.txt | 11 ----------- .../kvserver/allocator/storepool/store_pool.go | 4 ++-- pkg/kv/kvserver/batcheval/cmd_export.go | 4 ++-- .../batcheval/cmd_query_resolved_timestamp.go | 2 +- pkg/kv/kvserver/closedts/setting.go | 2 +- .../concurrency/concurrency_manager.go | 4 ++-- .../kvserver/concurrency/lock_table_waiter.go | 4 ++-- pkg/kv/kvserver/gc/gc.go | 6 +++--- pkg/kv/kvserver/kvserverbase/base.go | 2 +- pkg/kv/kvserver/kvserverbase/syncing_write.go | 2 +- pkg/kv/kvserver/logstore/logstore.go | 4 ++-- pkg/kv/kvserver/protectedts/settings.go | 2 +- pkg/kv/kvserver/raft_transport.go | 2 +- pkg/kv/kvserver/replica_backpressure.go | 4 ++-- pkg/kv/kvserver/replica_follower_read.go | 2 +- pkg/kv/kvserver/replica_rangefeed.go | 2 +- pkg/kv/kvserver/replica_send.go | 2 +- pkg/kv/kvserver/replica_split_load.go | 6 +++--- pkg/kv/kvserver/replica_write.go | 2 +- pkg/kv/kvserver/replicastats/replica_stats.go | 2 +- pkg/kv/kvserver/reports/reporter.go | 2 +- pkg/kv/kvserver/store.go | 18 +++++++++--------- 22 files changed, 39 insertions(+), 50 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 37dfcd072d7f..2d1f1e132d94 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -36,19 +36,9 @@ feature.restore.enabled boolean true set to true to enable restore, false to dis feature.schema_change.enabled boolean true set to true to enable schema changes, false to disable; default is true feature.stats.enabled boolean true set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true jobs.retention_time duration 336h0m0s the amount of time for which records for completed jobs are retained -kv.bulk_io_write.max_rate byte size 1.0 TiB the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops -kv.bulk_sst.max_allowed_overage byte size 64 MiB if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory -kv.bulk_sst.target_size byte size 16 MiB target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory -kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to serve consistent historical reads based on closed timestamp information -kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records -kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated -kv.range_split.load_cpu_threshold duration 500ms the CPU use per second over which, the range becomes a candidate for load based splitting -kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) -kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 -kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed @@ -82,7 +72,6 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) -server.shutdown.lease_transfer_wait duration 5s the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 1618b22e3042..6f59e8758989 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -47,7 +47,7 @@ const ( // replicate queue will not consider stores which have failed a reservation a // viable target. var FailedReservationsTimeout = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "server.failed_reservation_timeout", "the amount of time to consider the store throttled for up-replication after a failed reservation call", 5*time.Second, @@ -59,7 +59,7 @@ const timeAfterStoreSuspectSettingName = "server.time_after_store_suspect" // TimeAfterStoreSuspect measures how long we consider a store suspect since // it's last failure. var TimeAfterStoreSuspect = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, timeAfterStoreSuspectSettingName, "the amount of time we consider a store suspect for after it fails a node liveness heartbeat."+ " A suspect node would not receive any new replicas or lease transfers, but will keep the replicas it has.", diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index cf6cbfbb985a..25713c2f7076 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -37,7 +37,7 @@ const SSTTargetSizeSetting = "kv.bulk_sst.target_size" // ExportRequestTargetFileSize controls the target file size for SSTs created // during backups. var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, SSTTargetSizeSetting, fmt.Sprintf("target size for SSTs emitted from export requests; "+ "export requests (i.e. BACKUP) may buffer up to the sum of %s and %s in memory", @@ -55,7 +55,7 @@ const MaxExportOverageSetting = "kv.bulk_sst.max_allowed_overage" // and an SST would exceed this size (due to large rows or large numbers of // versions), then the export will fail. var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, MaxExportOverageSetting, fmt.Sprintf("if positive, allowed size in excess of target size for SSTs from export requests; "+ "export requests (i.e. BACKUP) may buffer up to the sum of %s and %s in memory", diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go index c844b32feb0a..943906906ad4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go @@ -30,7 +30,7 @@ import ( // QueryResolvedTimestampIntentCleanupAge configures the minimum intent age that // QueryResolvedTimestamp requests will consider for async intent cleanup. var QueryResolvedTimestampIntentCleanupAge = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.query_resolved_timestamp.intent_cleanup_age", "minimum intent age that QueryResolvedTimestamp requests will consider for async intent cleanup", 10*time.Second, diff --git a/pkg/kv/kvserver/closedts/setting.go b/pkg/kv/kvserver/closedts/setting.go index 91230241af37..4e1c9936f12e 100644 --- a/pkg/kv/kvserver/closedts/setting.go +++ b/pkg/kv/kvserver/closedts/setting.go @@ -40,7 +40,7 @@ var SideTransportCloseInterval = settings.RegisterDurationSetting( // (see TargetForPolicy), if it is set to a non-zero value. Meant as an escape // hatch. var LeadForGlobalReadsOverride = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.closed_timestamp.lead_for_global_reads_override", "if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps", 0, diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index c8d60ed0cbcb..ec90fb971340 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -60,7 +60,7 @@ import ( // utilization and runaway queuing for misbehaving clients, a role it is well // positioned to serve. var MaxLockWaitQueueLength = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.maximum_lock_wait_queue_length", "the maximum length of a lock wait-queue that read-write requests are willing "+ "to enter and wait in. The setting can be used to ensure some level of quality-of-service "+ @@ -93,7 +93,7 @@ var MaxLockWaitQueueLength = settings.RegisterIntSetting( // discoveredCount > 100,000, caused by stats collection, where we definitely // want to avoid adding these locks to the lock table, if possible. var DiscoveredLocksThresholdToConsultFinalizedTxnCache = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.discovered_locks_threshold_for_consulting_finalized_txn_cache", "the maximum number of discovered locks by a waiter, above which the finalized txn cache"+ "is consulted and resolvable locks are not added to the lock table -- this should be a small"+ diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index c520f419752e..07d23a0c2ede 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -39,7 +39,7 @@ import ( // LockTableLivenessPushDelay sets the delay before pushing in order to detect // coordinator failures of conflicting transactions. var LockTableLivenessPushDelay = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.coordinator_liveness_push_delay", "the delay before pushing in order to detect coordinator failures of conflicting transactions", // This is set to a short duration to ensure that we quickly detect failed @@ -71,7 +71,7 @@ var LockTableLivenessPushDelay = settings.RegisterDurationSetting( // LockTableDeadlockDetectionPushDelay sets the delay before pushing in order to // detect dependency cycles between transactions. var LockTableDeadlockDetectionPushDelay = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.deadlock_detection_push_delay", "the delay before pushing in order to detect dependency cycles between transactions", // This is set to a medium duration to ensure that deadlock caused by diff --git a/pkg/kv/kvserver/gc/gc.go b/pkg/kv/kvserver/gc/gc.go index 9768e4719ae7..78589ef3d3c3 100644 --- a/pkg/kv/kvserver/gc/gc.go +++ b/pkg/kv/kvserver/gc/gc.go @@ -67,7 +67,7 @@ const ( // IntentAgeThreshold is the threshold after which an extant intent // will be resolved. var IntentAgeThreshold = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.gc.intent_age_threshold", "intents older than this threshold will be resolved when encountered by the MVCC GC queue", 2*time.Hour, @@ -106,7 +106,7 @@ var TxnCleanupThreshold = settings.RegisterDurationSetting( // of writing. This value is subject to tuning in real environment as we have // more data available. var MaxIntentsPerCleanupBatch = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.gc.intent_cleanup_batch_size", "if non zero, gc will split found intents into batches of this size when trying to resolve them", 5000, @@ -125,7 +125,7 @@ var MaxIntentsPerCleanupBatch = settings.RegisterIntSetting( // The default value is a conservative limit to prevent pending intent key sizes // from ballooning. var MaxIntentKeyBytesPerCleanupBatch = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.gc.intent_cleanup_batch_byte_size", "if non zero, gc will split found intents into batches of this size when trying to resolve them", 1e6, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index b9569e56037f..07911950a868 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -231,7 +231,7 @@ func IntersectSpan( // SplitByLoadMergeDelay wraps "kv.range_split.by_load_merge_delay". var SplitByLoadMergeDelay = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.by_load_merge_delay", "the delay that range splits created due to load will wait before considering being merged away", 5*time.Minute, diff --git a/pkg/kv/kvserver/kvserverbase/syncing_write.go b/pkg/kv/kvserver/kvserverbase/syncing_write.go index 0f6078126cb3..d44e865394b9 100644 --- a/pkg/kv/kvserver/kvserverbase/syncing_write.go +++ b/pkg/kv/kvserver/kvserverbase/syncing_write.go @@ -65,7 +65,7 @@ func LimitBulkIOWrite(ctx context.Context, limiter *rate.Limiter, cost int) erro // sstWriteSyncRate wraps "kv.bulk_sst.sync_size". 0 disables syncing. var sstWriteSyncRate = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_sst.sync_size", "threshold after which non-Rocks SST writes must fsync (0 disables)", BulkIOWriteBurst, diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index e6243bd236ff..a8986cf826f1 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -38,7 +38,7 @@ import ( ) var disableSyncRaftLog = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.raft_log.disable_synchronization_unsafe", "set to true to disable synchronization on Raft log writes to persistent storage. "+ "Setting to true risks data loss or data corruption on server crashes. "+ @@ -47,7 +47,7 @@ var disableSyncRaftLog = settings.RegisterBoolSetting( ) var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.raft_log.non_blocking_synchronization.enabled", "set to true to enable non-blocking synchronization on Raft log writes to "+ "persistent storage. Setting to true does not risk data loss or data corruption "+ diff --git a/pkg/kv/kvserver/protectedts/settings.go b/pkg/kv/kvserver/protectedts/settings.go index 4add96509a48..97eec60ce01f 100644 --- a/pkg/kv/kvserver/protectedts/settings.go +++ b/pkg/kv/kvserver/protectedts/settings.go @@ -32,7 +32,7 @@ var MaxBytes = settings.RegisterIntSetting( // MaxSpans controls the maximum number of spans which can be protected // by all protected timestamp records. var MaxSpans = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.protectedts.max_spans", "if non-zero the limit of the number of spans which can be protected", 32768, diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 265ea7ceea69..9fc321ff86a5 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -59,7 +59,7 @@ const ( // targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size". var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.raft.command.target_batch_size", "size of a batch of raft commands after which it will be sent without further batching", 64<<20, // 64 MB diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index 2fa57104e018..fe68b3f7209b 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -28,7 +28,7 @@ var backpressureLogLimiter = log.Every(500 * time.Millisecond) // range's size must grow to before backpressure will be applied on writes. Set // to 0 to disable backpressure altogether. var backpressureRangeSizeMultiplier = settings.RegisterFloatSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range.backpressure_range_size_multiplier", "multiple of range_max_bytes that a range is allowed to grow to without "+ "splitting before writes to that range are blocked, or 0 to disable", @@ -66,7 +66,7 @@ var backpressureRangeSizeMultiplier = settings.RegisterFloatSetting( // currently backpressuring than ranges which are larger but are not // applying backpressure. var backpressureByteTolerance = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range.backpressure_byte_tolerance", "defines the number of bytes above the product of "+ "backpressure_range_size_multiplier and the range_max_size at which "+ diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 93e4600b30d0..9120a2422aee 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -28,7 +28,7 @@ import ( // information is collected and passed around, regardless of the value of this // setting. var FollowerReadsEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.closed_timestamp.follower_reads_enabled", "allow (all) replicas to serve consistent historical reads based on closed timestamp information", true, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 3df568e64fec..494128048e77 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -53,7 +53,7 @@ var RangefeedEnabled = settings.RegisterBoolSetting( // RangeFeedRefreshInterval controls the frequency with which we deliver closed // timestamp updates to rangefeeds. var RangeFeedRefreshInterval = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.rangefeed.closed_timestamp_refresh_interval", "the interval at which closed-timestamp updates"+ "are delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 3d36d1ff81ec..5840d0ac18c6 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -38,7 +38,7 @@ import ( ) var optimisticEvalLimitedScans = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.concurrency.optimistic_eval_limited_scans.enabled", "when true, limited scans are optimistically evaluated in the sense of not checking for "+ "conflicting latches or locks up front for the full key range of the scan, and instead "+ diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 03780af53a2e..1f806c6f2259 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -27,7 +27,7 @@ import ( // SplitByLoadEnabled wraps "kv.range_split.by_load_enabled". var SplitByLoadEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.by_load_enabled", "allow automatic splits of ranges based on where load is concentrated", true, @@ -35,7 +35,7 @@ var SplitByLoadEnabled = settings.RegisterBoolSetting( // SplitByLoadQPSThreshold wraps "kv.range_split.load_qps_threshold". var SplitByLoadQPSThreshold = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.load_qps_threshold", "the QPS over which, the range becomes a candidate for load based splitting", 2500, // 2500 req/s @@ -53,7 +53,7 @@ var SplitByLoadQPSThreshold = settings.RegisterIntSetting( // measured as max ops/s for kv and resource balance for allocbench. See #96869 // for more details. var SplitByLoadCPUThreshold = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.load_cpu_threshold", "the CPU use per second over which, the range becomes a candidate for load based splitting", 500*time.Millisecond, diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 2b3841db472f..94be47931caf 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -43,7 +43,7 @@ import ( // TODO(erikgrinaker): this, and the timeout handling, should be moved into a // migration helper that manages checkpointing and retries as well. var migrateApplicationTimeout = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.migration.migrate_application.timeout", "timeout for a Migrate request to be applied across all replicas of a range", 1*time.Minute, diff --git a/pkg/kv/kvserver/replicastats/replica_stats.go b/pkg/kv/kvserver/replicastats/replica_stats.go index 905e35135635..a47e2eb780f9 100644 --- a/pkg/kv/kvserver/replicastats/replica_stats.go +++ b/pkg/kv/kvserver/replicastats/replica_stats.go @@ -38,7 +38,7 @@ const ( // SSTable data, divided by this factor. Thereby, the magnitude of this factor // is inversely related to QPS sensitivity to AddSSTableRequests. var AddSSTableRequestSizeFactor = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.replica_stats.addsst_request_size_factor", "the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1", // The default value of 50,000 was chosen as the default divisor, following manual testing that diff --git a/pkg/kv/kvserver/reports/reporter.go b/pkg/kv/kvserver/reports/reporter.go index 5d5b1ee43542..7c56db693fc3 100644 --- a/pkg/kv/kvserver/reports/reporter.go +++ b/pkg/kv/kvserver/reports/reporter.go @@ -46,7 +46,7 @@ import ( // ReporterInterval is the interval between two generations of the reports. // When set to zero - disables the report generation. var ReporterInterval = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.replication_reports.interval", "the frequency for generating the replication_constraint_stats, replication_stats_report and "+ "replication_critical_localities reports (set to 0 to disable)", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 45ae1e241d3b..338b476dca86 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -135,7 +135,7 @@ var logStoreTelemetryTicks = envutil.EnvOrDefaultInt( // bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter. var bulkIOWriteLimit = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.max_rate", "the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops", 1<<40, @@ -143,7 +143,7 @@ var bulkIOWriteLimit = settings.RegisterByteSizeSetting( // addSSTableRequestLimit limits concurrent AddSSTable requests. var addSSTableRequestLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.concurrent_addsstable_requests", "number of concurrent AddSSTable requests per store before queueing", 1, @@ -156,7 +156,7 @@ var addSSTableRequestLimit = settings.RegisterIntSetting( // disk, so we can allow a greater amount of concurrency than regular AddSSTable // requests. Applied independently of concurrent_addsstable_requests. var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.concurrent_addsstable_as_writes_requests", "number of concurrent AddSSTable requests ingested as writes per store before queueing", 10, @@ -165,7 +165,7 @@ var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting( // concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators. var concurrentRangefeedItersLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.rangefeed.concurrent_catchup_iterators", "number of rangefeeds catchup iterators a store will allow concurrently before queueing", 16, @@ -175,7 +175,7 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.store.system_config_update.queue_add_rate", "the rate (per second) at which the store will add, all replicas to the split and merge queue due to system config gossip", .5, @@ -186,7 +186,7 @@ var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( // enqueuing replicas. The default is relatively high to deal with startup // scenarios. var queueAdditionOnSystemConfigUpdateBurst = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.store.system_config_update.queue_add_burst", "the burst rate at which the store will add all replicas to the split and merge queue due to system config gossip", 32, @@ -196,7 +196,7 @@ var queueAdditionOnSystemConfigUpdateBurst = settings.RegisterIntSetting( // leaseTransferWait is the timeout for a single iteration of draining range leases. var leaseTransferWait = func() *settings.DurationSetting { s := settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, leaseTransferWaitSettingName, "the timeout for a single iteration of the range lease transfer phase of draining "+ "(note that the --drain-wait parameter for cockroach node drain may need adjustment "+ @@ -224,7 +224,7 @@ const leaseTransferWaitSettingName = "server.shutdown.lease_transfer_wait" // here since we check it in the caller to limit generated requests as well // to prevent excessive queuing. var ExportRequestsLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.concurrent_export_requests", "number of export requests a store will handle concurrently before queuing", 3, @@ -1129,7 +1129,7 @@ type StoreConfig struct { // Decommissioning events are not controlled by this setting. var logRangeAndNodeEventsEnabled = func() *settings.BoolSetting { s := settings.RegisterBoolSetting( - settings.TenantReadOnly, + settings.SystemOnly, "kv.log_range_and_node_events.enabled", "set to true to transactionally log range events"+ " (e.g., split, merge, add/remove voter/non-voter) into system.rangelog"+ From a491e10eac8f75023f0bdce07d08df637554aa50 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 20 Mar 2023 16:03:14 +0000 Subject: [PATCH 2/5] kvserver: skip `TestStoreRangeSplitRaceUninitializedRHS` under race/deadlock The Raft groups are unable to maintain quorum when stressed under race/deadlock. Epic: none Release note: None --- pkg/kv/kvserver/client_split_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index bd19d50f4c3f..3a399b84dbd7 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -51,6 +51,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" @@ -2027,6 +2028,11 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // The aggressive Raft timeouts in this test prevents it from maintaining + // quorum and making progress when stressing under race or deadlock detection. + skip.UnderRace(t) + skip.UnderDeadlock(t) + currentTrigger := make(chan *roachpb.SplitTrigger, 1) var seen struct { syncutil.Mutex From c7641af41e3b9e80a29ece6f11c5362c28102f48 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 20 Mar 2023 20:38:28 +0000 Subject: [PATCH 3/5] sql: deflake `TestTrace` This has been seen to flake in CI: ``` === RUN TestTrace/ShowTraceForVectorized/TracingOff/node-1 trace_test.go:386: expected span: "session recording", got: "pendingLeaseRequest: requesting lease" trace_test.go:397: remaining span: "session recording" trace_test.go:397: remaining span: "sql query" trace_test.go:397: remaining span: "sql txn" trace_test.go:397: remaining span: "txn coordinator send" --- FAIL: TestTrace/ShowTraceForVectorized/TracingOff/node-1 (0.02s) ``` There was already an exception for this span, but with a `storage.` prefix. This patch removes the prefix, and makes it match on substrings. This flake has possibly been made worse with the introduction of a metamorphic setting to only use expiration-based leases in ecc931becde. Epic: none Release note: None --- pkg/sql/trace_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index 1cd577f82561..4d0585cdd2e8 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -45,8 +45,8 @@ func TestTrace(t *testing.T) { // These are always appended, even without the test specifying it. alwaysOptionalSpans := []string{ "drain", - "storage.pendingLeaseRequest: requesting lease", - "storage.Store: gossip on capacity change", + "pendingLeaseRequest: requesting lease", + "gossip on capacity change", "outbox", "request range lease", "range lookup", @@ -364,17 +364,22 @@ func TestTrace(t *testing.T) { } defer rows.Close() - ignoreSpans := make(map[string]bool) - for _, s := range test.optionalSpans { - ignoreSpans[s] = true + ignoreSpan := func(op string) bool { + for _, s := range test.optionalSpans { + if strings.Contains(op, s) { + return true + } + } + return false } + r := 0 for rows.Next() { var op string if err := rows.Scan(&op); err != nil { t.Fatal(err) } - if ignoreSpans[op] { + if ignoreSpan(op) { continue } @@ -391,7 +396,7 @@ func TestTrace(t *testing.T) { if err := rows.Scan(&op); err != nil { t.Fatal(err) } - if ignoreSpans[op] { + if ignoreSpan(op) { continue } t.Errorf("remaining span: %q", op) From 87776cbb1f1b11eb3debb4485bbf55d84a419448 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Sun, 12 Mar 2023 21:03:39 -0400 Subject: [PATCH 4/5] sql/settings: add `sql.pgwire.multiple_active_portals.enabled` cluster setting Release note (sql change): added a `sql.pgwire.multiple_active_portals.enabled` setting. This setting is only for a PREVIEWABLE feature. With it set true, all non-internal portals with read-only SELECT queries without sub/post queries can be paused and resumed in an interleaving manner, but are executed with local plan. --- docs/generated/settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/sql/distsql_running.go | 11 +++++++++-- pkg/sql/exec_util.go | 8 ++++++++ pkg/sql/pgwire/testdata/pgtest/portals_crbugs | 4 ++-- pkg/sql/prepared_stmt.go | 4 ++++ 6 files changed, 25 insertions(+), 4 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 37dfcd072d7f..57cc47780aba 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -260,6 +260,7 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabled boolean true enable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +sql.pgwire.multiple_active_portals.enabled boolean false if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index cee8b19fe8b1..48d06500dc6f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -200,6 +200,7 @@
sql.multiregion.drop_primary_region.enabled
booleantrueallows dropping the PRIMARY REGION of a database if it is the last region
sql.notices.enabled
booleantrueenable notices in the server/client protocol being sent
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled
booleanfalseif enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +
sql.pgwire.multiple_active_portals.enabled
booleanfalseif true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan
sql.schema.telemetry.recurrence
string@weeklycron-tab recurrence for SQL schema telemetry job
sql.spatial.experimental_box2d_comparison_operators.enabled
booleanfalseenables the use of certain experimental box2d comparison operators
sql.stats.automatic_collection.enabled
booleantrueautomatic statistics collection mode diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 3253634a50a9..c3b49a5c3872 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1505,8 +1505,15 @@ func (r *DistSQLReceiver) PushBatch( var ( // ErrLimitedResultNotSupported is an error produced by pgwire - // indicating an unsupported feature of row count limits was attempted. - ErrLimitedResultNotSupported = unimplemented.NewWithIssue(40195, "multiple active portals not supported") + // indicating the user attempted to have multiple active portals but + // either without setting sql.pgwire.multiple_active_portals.enabled to + // true or the underlying query does not satisfy the restriction. + ErrLimitedResultNotSupported = unimplemented.NewWithIssue( + 40195, + "multiple active portals not supported, "+ + "please set sql.pgwire.multiple_active_portals.enabled to true. "+ + "Note: this feature is in preview", + ) // ErrLimitedResultClosed is a sentinel error produced by pgwire // indicating the portal should be closed without error. ErrLimitedResultClosed = errors.New("row count limit closed") diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index ada8dd8d38a7..6d9ed2a25ed6 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -733,6 +733,14 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting( false, ).WithPublic() +var enableMultipleActivePortals = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.pgwire.multiple_active_portals.enabled", + "if true, portals with read-only SELECT query without sub/post queries "+ + "can be executed in interleaving manner, but with local execution plan", + false, +).WithPublic() + var errNoTransactionInProgress = errors.New("there is no transaction in progress") var errTransactionInProgress = errors.New("there is already a transaction in progress") diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index fe1182d427af..1afeb7f1788f 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -29,7 +29,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"} {"Type":"ReadyForQuery","TxStatus":"E"} {"Type":"ReadyForQuery","TxStatus":"E"} @@ -70,7 +70,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"} {"Type":"ReadyForQuery","TxStatus":"E"} send diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index ecc45f610089..d354e6f0c01e 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -148,6 +148,10 @@ func (ex *connExecutor) makePreparedPortal( Qargs: qargs, OutFormats: outFormats, } + // TODO(janexing): we added this line to avoid the unused lint error. + // Will remove it once the whole functionality of multple active portals + // is merged. + _ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } From cdaff507229920ba92540ee98f6fb0d6b9fe276d Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Sun, 12 Mar 2023 20:48:39 -0400 Subject: [PATCH 5/5] sql: add `switchToAnotherPortal` signal for result consumer This is part of the implementation of multiple active portals. Previously, after pushing the result to the consumer, we only accept the following actions as the next step: 1. Pushing more data from the same source to the same consumer; 2. Drain or close the pipeline. This commit is to add another option: pause the current execution, and switch to the execution to another portal. I.e. when receiving an `ExecPortal` cmd but for another portal, we do nothing and return the control to the connExecutor. This allows us to execute different portals interleavingly. Release note: None --- pkg/sql/conn_executor.go | 2 + pkg/sql/conn_io.go | 1 + pkg/sql/distsql_running.go | 15 ++++++- pkg/sql/execinfra/base.go | 17 +++++++ pkg/sql/execinfra/consumerstatus_string.go | 9 ++-- pkg/sql/flowinfra/inbound.go | 2 + pkg/sql/internal.go | 1 + pkg/sql/pgwire/command_result.go | 52 +++++++++++++++++----- pkg/sql/pgwire/conn.go | 3 +- pkg/sql/prepared_stmt.go | 31 +++++++++++-- pkg/sql/rowexec/processors.go | 9 +++- 11 files changed, 120 insertions(+), 22 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9facd8a25f35..3a6ffbbc5d04 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1975,6 +1975,7 @@ func (ex *connExecutor) execCmd() (retErr error) { 0, /* limit */ "", /* portalName */ ex.implicitTxn(), + PortalPausabilityDisabled, /* portalPausability */ ) res = stmtRes @@ -2057,6 +2058,7 @@ func (ex *connExecutor) execCmd() (retErr error) { tcmd.Limit, portalName, ex.implicitTxn(), + portal.portalPausablity, ) res = stmtRes diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index cd4e426d8ed6..6c202eaf1f1e 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -660,6 +660,7 @@ type ClientComm interface { limit int, portalName string, implicitTxn bool, + portalPausability PortalPausablity, ) CommandResult // CreatePrepareResult creates a result for a PrepareStmt command. CreatePrepareResult(pos CmdPos) ParseResult diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index c3b49a5c3872..4f45787bf977 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1347,6 +1347,8 @@ func (r *DistSQLReceiver) handleCommErr(commErr error) { } else if errors.Is(commErr, errIEResultChannelClosed) { log.VEvent(r.ctx, 1, "encountered errIEResultChannelClosed (transitioning to draining)") r.status = execinfra.DrainRequested + } else if errors.Is(commErr, ErrPortalLimitHasBeenReached) { + r.status = execinfra.SwitchToAnotherPortal } else { // Set the error on the resultWriter to notify the consumer about // it. Most clients don't care to differentiate between @@ -1365,7 +1367,7 @@ func (r *DistSQLReceiver) handleCommErr(commErr error) { // sql/pgwire.limitedCommandResult.moreResultsNeeded). Instead of // changing the signature of AddRow, we have a sentinel error that // is handled specially here. - if !errors.Is(commErr, ErrLimitedResultNotSupported) { + if !errors.Is(commErr, ErrLimitedResultNotSupported) && !errors.Is(commErr, ErrStmtNotSupportedForPausablePortal) { r.commErr = commErr } } @@ -1514,9 +1516,18 @@ var ( "please set sql.pgwire.multiple_active_portals.enabled to true. "+ "Note: this feature is in preview", ) + // ErrStmtNotSupportedForPausablePortal is returned when the user have set + // sql.pgwire.multiple_active_portals.enabled to true but set an unsupported + // statement for a portal. + ErrStmtNotSupportedForPausablePortal = unimplemented.NewWithIssue( + 98911, + "the statement for a pausable portal must be a read-only SELECT query"+ + " with no sub-queries or post-queries", + ) // ErrLimitedResultClosed is a sentinel error produced by pgwire // indicating the portal should be closed without error. - ErrLimitedResultClosed = errors.New("row count limit closed") + ErrLimitedResultClosed = errors.New("row count limit closed") + ErrPortalLimitHasBeenReached = errors.New("limit has been reached") ) // ProducerDone is part of the execinfra.RowReceiver interface. diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index de7a51fe6db9..496e6bce51fa 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -41,6 +41,11 @@ type ConsumerStatus uint32 const ( // NeedMoreRows indicates that the consumer is still expecting more rows. NeedMoreRows ConsumerStatus = iota + // SwitchToAnotherPortal indicates that the we received exec command for + // a different portal, and may come back to continue executing the current + // portal later. If the cluster setting sql.pgwire.multiple_active_portals.enabled + // is set to be true, we do nothing and return the control to the connExecutor. + SwitchToAnotherPortal // DrainRequested indicates that the consumer will not process any more data // rows, but will accept trailing metadata from the producer. DrainRequested @@ -189,6 +194,10 @@ func Run(ctx context.Context, src RowSource, dst RowReceiver) { switch dst.Push(row, meta) { case NeedMoreRows: continue + case SwitchToAnotherPortal: + // Do nothing here and return the control to the connExecutor to execute + // the other portal, i.e. we leave the current portal open. + return case DrainRequested: DrainAndForwardMetadata(ctx, src, dst) dst.ProducerDone() @@ -236,6 +245,8 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver src.ConsumerClosed() return case NeedMoreRows: + case SwitchToAnotherPortal: + panic("current consumer is drained, cannot be paused and switched to another portal") case DrainRequested: } } @@ -457,6 +468,12 @@ func (rc *RowChannel) Push( switch consumerStatus { case NeedMoreRows: rc.dataChan <- RowChannelMsg{Row: row, Meta: meta} + case SwitchToAnotherPortal: + // We currently don't expect this status, so we propagate an assertion + // failure as metadata. + m := execinfrapb.GetProducerMeta() + m.Err = errors.AssertionFailedf("multiple active portals are not expected with the row channel") + rc.dataChan <- RowChannelMsg{Meta: m} case DrainRequested: // If we're draining, only forward metadata. if meta != nil { diff --git a/pkg/sql/execinfra/consumerstatus_string.go b/pkg/sql/execinfra/consumerstatus_string.go index 25ba15bfc228..52aa5cb90d7c 100644 --- a/pkg/sql/execinfra/consumerstatus_string.go +++ b/pkg/sql/execinfra/consumerstatus_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[NeedMoreRows-0] - _ = x[DrainRequested-1] - _ = x[ConsumerClosed-2] + _ = x[SwitchToAnotherPortal-1] + _ = x[DrainRequested-2] + _ = x[ConsumerClosed-3] } -const _ConsumerStatus_name = "NeedMoreRowsDrainRequestedConsumerClosed" +const _ConsumerStatus_name = "NeedMoreRowsSwitchToAnotherPortalDrainRequestedConsumerClosed" -var _ConsumerStatus_index = [...]uint8{0, 12, 26, 40} +var _ConsumerStatus_index = [...]uint8{0, 12, 33, 47, 61} func (i ConsumerStatus) String() string { if i >= ConsumerStatus(len(_ConsumerStatus_index)-1) { diff --git a/pkg/sql/flowinfra/inbound.go b/pkg/sql/flowinfra/inbound.go index dbc891efa847..be39127db694 100644 --- a/pkg/sql/flowinfra/inbound.go +++ b/pkg/sql/flowinfra/inbound.go @@ -238,6 +238,8 @@ func processProducerMessage( switch dst.Push(row, meta) { case execinfra.NeedMoreRows: continue + case execinfra.SwitchToAnotherPortal: + return processMessageResult{err: errors.AssertionFailedf("not allowed to switch to another portal")} case execinfra.DrainRequested: // The rest of rows are not needed by the consumer. We'll send a drain // signal to the producer and expect it to quickly send trailing diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 0ed78b839daf..07df10c387d9 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -1113,6 +1113,7 @@ func (icc *internalClientComm) CreateStatementResult( _ int, _ string, _ bool, + _ PortalPausablity, ) CommandResult { return icc.createRes(pos, nil /* onClose */) } diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 764c70a19648..521dbf098cbb 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -383,6 +383,7 @@ func (c *conn) newCommandResult( limit int, portalName string, implicitTxn bool, + portalPausability sql.PortalPausablity, ) sql.CommandResult { r := c.allocCommandResult() *r = commandResult{ @@ -401,10 +402,11 @@ func (c *conn) newCommandResult( } telemetry.Inc(sqltelemetry.PortalWithLimitRequestCounter) return &limitedCommandResult{ - limit: limit, - portalName: portalName, - implicitTxn: implicitTxn, - commandResult: r, + limit: limit, + portalName: portalName, + implicitTxn: implicitTxn, + commandResult: r, + portalPausablity: portalPausability, } } @@ -445,7 +447,9 @@ type limitedCommandResult struct { seenTuples int // If set, an error will be sent to the client if more rows are produced than // this limit. - limit int + limit int + reachedLimit bool + portalPausablity sql.PortalPausablity } // AddRow is part of the sql.RestrictedCommandResult interface. @@ -462,9 +466,17 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro if err := r.conn.Flush(r.pos); err != nil { return err } - r.seenTuples = 0 - - return r.moreResultsNeeded(ctx) + if r.portalPausablity == sql.PausablePortal { + r.reachedLimit = true + return sql.ErrPortalLimitHasBeenReached + } else { + // TODO(janexing): we keep this part as for general portals, we would like + // to keep the execution logic to avoid bring too many bugs. Eventually + // we should remove them and use the "return the control to connExecutor" + // logic for all portals. + r.seenTuples = 0 + return r.moreResultsNeeded(ctx) + } } return nil } @@ -479,6 +491,17 @@ func (r *limitedCommandResult) SupportsAddBatch() bool { // requests for rows from the active portal, during the "execute portal" flow // when a limit has been specified. func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { + errBasedOnPausability := func(pausablity sql.PortalPausablity) error { + switch pausablity { + case sql.PortalPausabilityDisabled: + return sql.ErrLimitedResultNotSupported + case sql.NotPausablePortalForUnsupportedStmt: + return sql.ErrStmtNotSupportedForPausablePortal + default: + return errors.AssertionFailedf("unsupported pausability type for a portal") + } + } + // Keep track of the previous CmdPos so we can rewind if needed. prevPos := r.conn.stmtBuf.AdvanceOne() for { @@ -493,7 +516,7 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { // next message is a delete portal. if c.Type != pgwirebase.PreparePortal || c.Name != r.portalName { telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter) - return errors.WithDetail(sql.ErrLimitedResultNotSupported, + return errors.WithDetail(errBasedOnPausability(r.portalPausablity), "cannot close a portal while a different one is open") } return r.rewindAndClosePortal(ctx, prevPos) @@ -501,7 +524,7 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { // The happy case: the client wants more rows from the portal. if c.Name != r.portalName { telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter) - return errors.WithDetail(sql.ErrLimitedResultNotSupported, + return errors.WithDetail(errBasedOnPausability(r.portalPausablity), "cannot execute a portal while a different one is open") } r.limit = c.Limit @@ -544,7 +567,7 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { } // We got some other message, but we only support executing to completion. telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter) - return errors.WithDetail(sql.ErrLimitedResultNotSupported, + return errors.WithDetail(errBasedOnPausability(r.portalPausablity), fmt.Sprintf("cannot perform operation %T while a different portal is open", c)) } prevPos = curPos @@ -625,6 +648,13 @@ func (r *limitedCommandResult) rewindAndClosePortal( return sql.ErrLimitedResultClosed } +func (r *limitedCommandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator) { + if r.reachedLimit { + r.commandResult.typ = noCompletionMsg + } + r.commandResult.Close(ctx, t) +} + // Get the column index for job id based on the result header defined in // jobs.BulkJobExecutionResultHeader and jobs.DetachedJobExecutionResultHeader. func init() { diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 853ea614616c..96b0550e0c84 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1796,8 +1796,9 @@ func (c *conn) CreateStatementResult( limit int, portalName string, implicitTxn bool, + portalPausability sql.PortalPausablity, ) sql.CommandResult { - return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn) + return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn, portalPausability) } // CreateSyncResult is part of the sql.ClientComm interface. diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index d354e6f0c01e..5ad4bbd872c6 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -118,9 +118,28 @@ type preparedStatementsAccessor interface { DeleteAll(ctx context.Context) } +// PortalPausablity mark if the portal is pausable and the reason. This is +// needed to give the correct error for usage of multiple active portals. +type PortalPausablity int64 + +const ( + // PortalPausabilityDisabled is the default status of a portal when + // sql.pgwire.multiple_active_portals.enabled is false. + PortalPausabilityDisabled PortalPausablity = iota + // PausablePortal is set when sql.pgwire.multiple_active_portals.enabled is + // set to true and the underlying statement is a read-only SELECT query with + // no sub-queries or post-queries. + PausablePortal + // NotPausablePortalForUnsupportedStmt is used when the cluster setting + // sql.pgwire.multiple_active_portals.enabled is set to true, while we don't + // support underlying statement. + NotPausablePortalForUnsupportedStmt +) + // PreparedPortal is a PreparedStatement that has been bound with query // arguments. type PreparedPortal struct { + Name string Stmt *PreparedStatement Qargs tree.QueryArguments @@ -131,6 +150,11 @@ type PreparedPortal struct { // meaning that any additional attempts to execute it should return no // rows. exhausted bool + + // portalPausablity is used to log the correct error message when user pause + // a portal. + // See comments for PortalPausablity for more details. + portalPausablity PortalPausablity } // makePreparedPortal creates a new PreparedPortal. @@ -144,6 +168,7 @@ func (ex *connExecutor) makePreparedPortal( outFormats []pgwirebase.FormatCode, ) (PreparedPortal, error) { portal := PreparedPortal{ + Name: name, Stmt: stmt, Qargs: qargs, OutFormats: outFormats, @@ -157,7 +182,7 @@ func (ex *connExecutor) makePreparedPortal( // accountForCopy updates the state to account for the copy of the // PreparedPortal (p is the copy). -func (p PreparedPortal) accountForCopy( +func (p *PreparedPortal) accountForCopy( ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string, ) error { if err := prepStmtsNamespaceMemAcc.Grow(ctx, p.size(portalName)); err != nil { @@ -169,13 +194,13 @@ func (p PreparedPortal) accountForCopy( } // close closes this portal. -func (p PreparedPortal) close( +func (p *PreparedPortal) close( ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string, ) { prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName)) p.Stmt.decRef(ctx) } -func (p PreparedPortal) size(portalName string) int64 { +func (p *PreparedPortal) size(portalName string) int64 { return int64(uintptr(len(portalName)) + unsafe.Sizeof(p)) } diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 8feca4cf4c90..e9e390bd6452 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -43,7 +43,8 @@ import ( // metadata (e.g. tracing information and txn updates, if applicable). // // Returns true if more rows are needed, false otherwise. If false is returned -// both the inputs and the output have been properly closed. +// both the inputs and the output have been properly closed, or there is an +// error encountered. func emitHelper( ctx context.Context, output execinfra.RowReceiver, @@ -78,6 +79,12 @@ func emitHelper( switch consumerStatus { case execinfra.NeedMoreRows: return true + case execinfra.SwitchToAnotherPortal: + output.Push(nil /* row */, &execinfrapb.ProducerMetadata{ + Err: errors.AssertionFailedf("not allowed to pause and switch to another portal"), + }) + log.Fatalf(ctx, "not allowed to pause and switch to another portal") + return false case execinfra.DrainRequested: log.VEventf(ctx, 1, "no more rows required. drain requested.") execinfra.DrainAndClose(ctx, output, nil /* cause */, pushTrailingMeta, inputs...)