diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 37dfcd072d7f..40ee3c481a31 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -36,19 +36,9 @@ feature.restore.enabled boolean true set to true to enable restore, false to dis feature.schema_change.enabled boolean true set to true to enable schema changes, false to disable; default is true feature.stats.enabled boolean true set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true jobs.retention_time duration 336h0m0s the amount of time for which records for completed jobs are retained -kv.bulk_io_write.max_rate byte size 1.0 TiB the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops -kv.bulk_sst.max_allowed_overage byte size 64 MiB if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory -kv.bulk_sst.target_size byte size 16 MiB target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory -kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to serve consistent historical reads based on closed timestamp information -kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records -kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated -kv.range_split.load_cpu_threshold duration 500ms the CPU use per second over which, the range becomes a candidate for load based splitting -kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) -kv.replica_stats.addsst_request_size_factor integer 50000 the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 -kv.replication_reports.interval duration 1m0s the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed @@ -82,7 +72,6 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) -server.shutdown.lease_transfer_wait duration 5s the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM @@ -260,6 +249,7 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabled boolean true enable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +sql.pgwire.multiple_active_portals.enabled boolean false if true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan sql.schema.telemetry.recurrence string @weekly cron-tab recurrence for SQL schema telemetry job sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index cee8b19fe8b1..48d06500dc6f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -200,6 +200,7 @@
sql.multiregion.drop_primary_region.enabled
booleantrueallows dropping the PRIMARY REGION of a database if it is the last region
sql.notices.enabled
booleantrueenable notices in the server/client protocol being sent
sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled
booleanfalseif enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +
sql.pgwire.multiple_active_portals.enabled
booleanfalseif true, portals with read-only SELECT query without sub/post queries can be executed in interleaving manner, but with local execution plan
sql.schema.telemetry.recurrence
string@weeklycron-tab recurrence for SQL schema telemetry job
sql.spatial.experimental_box2d_comparison_operators.enabled
booleanfalseenables the use of certain experimental box2d comparison operators
sql.stats.automatic_collection.enabled
booleantrueautomatic statistics collection mode diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index e90a5ee710b6..d6ba4881ea47 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -47,7 +47,7 @@ const ( // replicate queue will not consider stores which have failed a reservation a // viable target. var FailedReservationsTimeout = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "server.failed_reservation_timeout", "the amount of time to consider the store throttled for up-replication after a failed reservation call", 5*time.Second, @@ -59,7 +59,7 @@ const timeAfterStoreSuspectSettingName = "server.time_after_store_suspect" // TimeAfterStoreSuspect measures how long we consider a store suspect since // it's last failure. var TimeAfterStoreSuspect = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, timeAfterStoreSuspectSettingName, "the amount of time we consider a store suspect for after it fails a node liveness heartbeat."+ " A suspect node would not receive any new replicas or lease transfers, but will keep the replicas it has.", diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index cf6cbfbb985a..25713c2f7076 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -37,7 +37,7 @@ const SSTTargetSizeSetting = "kv.bulk_sst.target_size" // ExportRequestTargetFileSize controls the target file size for SSTs created // during backups. var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, SSTTargetSizeSetting, fmt.Sprintf("target size for SSTs emitted from export requests; "+ "export requests (i.e. BACKUP) may buffer up to the sum of %s and %s in memory", @@ -55,7 +55,7 @@ const MaxExportOverageSetting = "kv.bulk_sst.max_allowed_overage" // and an SST would exceed this size (due to large rows or large numbers of // versions), then the export will fail. var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, MaxExportOverageSetting, fmt.Sprintf("if positive, allowed size in excess of target size for SSTs from export requests; "+ "export requests (i.e. BACKUP) may buffer up to the sum of %s and %s in memory", diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go index c844b32feb0a..943906906ad4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go @@ -30,7 +30,7 @@ import ( // QueryResolvedTimestampIntentCleanupAge configures the minimum intent age that // QueryResolvedTimestamp requests will consider for async intent cleanup. var QueryResolvedTimestampIntentCleanupAge = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.query_resolved_timestamp.intent_cleanup_age", "minimum intent age that QueryResolvedTimestamp requests will consider for async intent cleanup", 10*time.Second, diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index bd19d50f4c3f..3a399b84dbd7 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -51,6 +51,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" @@ -2027,6 +2028,11 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // The aggressive Raft timeouts in this test prevents it from maintaining + // quorum and making progress when stressing under race or deadlock detection. + skip.UnderRace(t) + skip.UnderDeadlock(t) + currentTrigger := make(chan *roachpb.SplitTrigger, 1) var seen struct { syncutil.Mutex diff --git a/pkg/kv/kvserver/closedts/setting.go b/pkg/kv/kvserver/closedts/setting.go index 91230241af37..4e1c9936f12e 100644 --- a/pkg/kv/kvserver/closedts/setting.go +++ b/pkg/kv/kvserver/closedts/setting.go @@ -40,7 +40,7 @@ var SideTransportCloseInterval = settings.RegisterDurationSetting( // (see TargetForPolicy), if it is set to a non-zero value. Meant as an escape // hatch. var LeadForGlobalReadsOverride = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.closed_timestamp.lead_for_global_reads_override", "if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps", 0, diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index c8d60ed0cbcb..ec90fb971340 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -60,7 +60,7 @@ import ( // utilization and runaway queuing for misbehaving clients, a role it is well // positioned to serve. var MaxLockWaitQueueLength = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.maximum_lock_wait_queue_length", "the maximum length of a lock wait-queue that read-write requests are willing "+ "to enter and wait in. The setting can be used to ensure some level of quality-of-service "+ @@ -93,7 +93,7 @@ var MaxLockWaitQueueLength = settings.RegisterIntSetting( // discoveredCount > 100,000, caused by stats collection, where we definitely // want to avoid adding these locks to the lock table, if possible. var DiscoveredLocksThresholdToConsultFinalizedTxnCache = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.discovered_locks_threshold_for_consulting_finalized_txn_cache", "the maximum number of discovered locks by a waiter, above which the finalized txn cache"+ "is consulted and resolvable locks are not added to the lock table -- this should be a small"+ diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index c520f419752e..07d23a0c2ede 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -39,7 +39,7 @@ import ( // LockTableLivenessPushDelay sets the delay before pushing in order to detect // coordinator failures of conflicting transactions. var LockTableLivenessPushDelay = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.coordinator_liveness_push_delay", "the delay before pushing in order to detect coordinator failures of conflicting transactions", // This is set to a short duration to ensure that we quickly detect failed @@ -71,7 +71,7 @@ var LockTableLivenessPushDelay = settings.RegisterDurationSetting( // LockTableDeadlockDetectionPushDelay sets the delay before pushing in order to // detect dependency cycles between transactions. var LockTableDeadlockDetectionPushDelay = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.lock_table.deadlock_detection_push_delay", "the delay before pushing in order to detect dependency cycles between transactions", // This is set to a medium duration to ensure that deadlock caused by diff --git a/pkg/kv/kvserver/gc/gc.go b/pkg/kv/kvserver/gc/gc.go index 9768e4719ae7..78589ef3d3c3 100644 --- a/pkg/kv/kvserver/gc/gc.go +++ b/pkg/kv/kvserver/gc/gc.go @@ -67,7 +67,7 @@ const ( // IntentAgeThreshold is the threshold after which an extant intent // will be resolved. var IntentAgeThreshold = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.gc.intent_age_threshold", "intents older than this threshold will be resolved when encountered by the MVCC GC queue", 2*time.Hour, @@ -106,7 +106,7 @@ var TxnCleanupThreshold = settings.RegisterDurationSetting( // of writing. This value is subject to tuning in real environment as we have // more data available. var MaxIntentsPerCleanupBatch = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.gc.intent_cleanup_batch_size", "if non zero, gc will split found intents into batches of this size when trying to resolve them", 5000, @@ -125,7 +125,7 @@ var MaxIntentsPerCleanupBatch = settings.RegisterIntSetting( // The default value is a conservative limit to prevent pending intent key sizes // from ballooning. var MaxIntentKeyBytesPerCleanupBatch = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.gc.intent_cleanup_batch_byte_size", "if non zero, gc will split found intents into batches of this size when trying to resolve them", 1e6, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index b9569e56037f..07911950a868 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -231,7 +231,7 @@ func IntersectSpan( // SplitByLoadMergeDelay wraps "kv.range_split.by_load_merge_delay". var SplitByLoadMergeDelay = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.by_load_merge_delay", "the delay that range splits created due to load will wait before considering being merged away", 5*time.Minute, diff --git a/pkg/kv/kvserver/kvserverbase/syncing_write.go b/pkg/kv/kvserver/kvserverbase/syncing_write.go index 0f6078126cb3..d44e865394b9 100644 --- a/pkg/kv/kvserver/kvserverbase/syncing_write.go +++ b/pkg/kv/kvserver/kvserverbase/syncing_write.go @@ -65,7 +65,7 @@ func LimitBulkIOWrite(ctx context.Context, limiter *rate.Limiter, cost int) erro // sstWriteSyncRate wraps "kv.bulk_sst.sync_size". 0 disables syncing. var sstWriteSyncRate = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_sst.sync_size", "threshold after which non-Rocks SST writes must fsync (0 disables)", BulkIOWriteBurst, diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index e6243bd236ff..a8986cf826f1 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -38,7 +38,7 @@ import ( ) var disableSyncRaftLog = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.raft_log.disable_synchronization_unsafe", "set to true to disable synchronization on Raft log writes to persistent storage. "+ "Setting to true risks data loss or data corruption on server crashes. "+ @@ -47,7 +47,7 @@ var disableSyncRaftLog = settings.RegisterBoolSetting( ) var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.raft_log.non_blocking_synchronization.enabled", "set to true to enable non-blocking synchronization on Raft log writes to "+ "persistent storage. Setting to true does not risk data loss or data corruption "+ diff --git a/pkg/kv/kvserver/protectedts/settings.go b/pkg/kv/kvserver/protectedts/settings.go index 4add96509a48..97eec60ce01f 100644 --- a/pkg/kv/kvserver/protectedts/settings.go +++ b/pkg/kv/kvserver/protectedts/settings.go @@ -32,7 +32,7 @@ var MaxBytes = settings.RegisterIntSetting( // MaxSpans controls the maximum number of spans which can be protected // by all protected timestamp records. var MaxSpans = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.protectedts.max_spans", "if non-zero the limit of the number of spans which can be protected", 32768, diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 265ea7ceea69..9fc321ff86a5 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -59,7 +59,7 @@ const ( // targetRaftOutgoingBatchSize wraps "kv.raft.command.target_batch_size". var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.raft.command.target_batch_size", "size of a batch of raft commands after which it will be sent without further batching", 64<<20, // 64 MB diff --git a/pkg/kv/kvserver/replica_backpressure.go b/pkg/kv/kvserver/replica_backpressure.go index 2fa57104e018..fe68b3f7209b 100644 --- a/pkg/kv/kvserver/replica_backpressure.go +++ b/pkg/kv/kvserver/replica_backpressure.go @@ -28,7 +28,7 @@ var backpressureLogLimiter = log.Every(500 * time.Millisecond) // range's size must grow to before backpressure will be applied on writes. Set // to 0 to disable backpressure altogether. var backpressureRangeSizeMultiplier = settings.RegisterFloatSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range.backpressure_range_size_multiplier", "multiple of range_max_bytes that a range is allowed to grow to without "+ "splitting before writes to that range are blocked, or 0 to disable", @@ -66,7 +66,7 @@ var backpressureRangeSizeMultiplier = settings.RegisterFloatSetting( // currently backpressuring than ranges which are larger but are not // applying backpressure. var backpressureByteTolerance = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range.backpressure_byte_tolerance", "defines the number of bytes above the product of "+ "backpressure_range_size_multiplier and the range_max_size at which "+ diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 93e4600b30d0..9120a2422aee 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -28,7 +28,7 @@ import ( // information is collected and passed around, regardless of the value of this // setting. var FollowerReadsEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.closed_timestamp.follower_reads_enabled", "allow (all) replicas to serve consistent historical reads based on closed timestamp information", true, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 3df568e64fec..494128048e77 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -53,7 +53,7 @@ var RangefeedEnabled = settings.RegisterBoolSetting( // RangeFeedRefreshInterval controls the frequency with which we deliver closed // timestamp updates to rangefeeds. var RangeFeedRefreshInterval = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.rangefeed.closed_timestamp_refresh_interval", "the interval at which closed-timestamp updates"+ "are delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval", diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 3d36d1ff81ec..5840d0ac18c6 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -38,7 +38,7 @@ import ( ) var optimisticEvalLimitedScans = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.concurrency.optimistic_eval_limited_scans.enabled", "when true, limited scans are optimistically evaluated in the sense of not checking for "+ "conflicting latches or locks up front for the full key range of the scan, and instead "+ diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 03780af53a2e..1f806c6f2259 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -27,7 +27,7 @@ import ( // SplitByLoadEnabled wraps "kv.range_split.by_load_enabled". var SplitByLoadEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.by_load_enabled", "allow automatic splits of ranges based on where load is concentrated", true, @@ -35,7 +35,7 @@ var SplitByLoadEnabled = settings.RegisterBoolSetting( // SplitByLoadQPSThreshold wraps "kv.range_split.load_qps_threshold". var SplitByLoadQPSThreshold = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.load_qps_threshold", "the QPS over which, the range becomes a candidate for load based splitting", 2500, // 2500 req/s @@ -53,7 +53,7 @@ var SplitByLoadQPSThreshold = settings.RegisterIntSetting( // measured as max ops/s for kv and resource balance for allocbench. See #96869 // for more details. var SplitByLoadCPUThreshold = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.range_split.load_cpu_threshold", "the CPU use per second over which, the range becomes a candidate for load based splitting", 500*time.Millisecond, diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 2b3841db472f..94be47931caf 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -43,7 +43,7 @@ import ( // TODO(erikgrinaker): this, and the timeout handling, should be moved into a // migration helper that manages checkpointing and retries as well. var migrateApplicationTimeout = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.migration.migrate_application.timeout", "timeout for a Migrate request to be applied across all replicas of a range", 1*time.Minute, diff --git a/pkg/kv/kvserver/replicastats/replica_stats.go b/pkg/kv/kvserver/replicastats/replica_stats.go index 905e35135635..a47e2eb780f9 100644 --- a/pkg/kv/kvserver/replicastats/replica_stats.go +++ b/pkg/kv/kvserver/replicastats/replica_stats.go @@ -38,7 +38,7 @@ const ( // SSTable data, divided by this factor. Thereby, the magnitude of this factor // is inversely related to QPS sensitivity to AddSSTableRequests. var AddSSTableRequestSizeFactor = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.replica_stats.addsst_request_size_factor", "the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1", // The default value of 50,000 was chosen as the default divisor, following manual testing that diff --git a/pkg/kv/kvserver/reports/reporter.go b/pkg/kv/kvserver/reports/reporter.go index 5d5b1ee43542..7c56db693fc3 100644 --- a/pkg/kv/kvserver/reports/reporter.go +++ b/pkg/kv/kvserver/reports/reporter.go @@ -46,7 +46,7 @@ import ( // ReporterInterval is the interval between two generations of the reports. // When set to zero - disables the report generation. var ReporterInterval = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.replication_reports.interval", "the frequency for generating the replication_constraint_stats, replication_stats_report and "+ "replication_critical_localities reports (set to 0 to disable)", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 45ae1e241d3b..338b476dca86 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -135,7 +135,7 @@ var logStoreTelemetryTicks = envutil.EnvOrDefaultInt( // bulkIOWriteLimit is defined here because it is used by BulkIOWriteLimiter. var bulkIOWriteLimit = settings.RegisterByteSizeSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.max_rate", "the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops", 1<<40, @@ -143,7 +143,7 @@ var bulkIOWriteLimit = settings.RegisterByteSizeSetting( // addSSTableRequestLimit limits concurrent AddSSTable requests. var addSSTableRequestLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.concurrent_addsstable_requests", "number of concurrent AddSSTable requests per store before queueing", 1, @@ -156,7 +156,7 @@ var addSSTableRequestLimit = settings.RegisterIntSetting( // disk, so we can allow a greater amount of concurrency than regular AddSSTable // requests. Applied independently of concurrent_addsstable_requests. var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.concurrent_addsstable_as_writes_requests", "number of concurrent AddSSTable requests ingested as writes per store before queueing", 10, @@ -165,7 +165,7 @@ var addSSTableAsWritesRequestLimit = settings.RegisterIntSetting( // concurrentRangefeedItersLimit limits concurrent rangefeed catchup iterators. var concurrentRangefeedItersLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.rangefeed.concurrent_catchup_iterators", "number of rangefeeds catchup iterators a store will allow concurrently before queueing", 16, @@ -175,7 +175,7 @@ var concurrentRangefeedItersLimit = settings.RegisterIntSetting( // Minimum time interval between system config updates which will lead to // enqueuing replicas. var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.store.system_config_update.queue_add_rate", "the rate (per second) at which the store will add, all replicas to the split and merge queue due to system config gossip", .5, @@ -186,7 +186,7 @@ var queueAdditionOnSystemConfigUpdateRate = settings.RegisterFloatSetting( // enqueuing replicas. The default is relatively high to deal with startup // scenarios. var queueAdditionOnSystemConfigUpdateBurst = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.store.system_config_update.queue_add_burst", "the burst rate at which the store will add all replicas to the split and merge queue due to system config gossip", 32, @@ -196,7 +196,7 @@ var queueAdditionOnSystemConfigUpdateBurst = settings.RegisterIntSetting( // leaseTransferWait is the timeout for a single iteration of draining range leases. var leaseTransferWait = func() *settings.DurationSetting { s := settings.RegisterDurationSetting( - settings.TenantWritable, + settings.SystemOnly, leaseTransferWaitSettingName, "the timeout for a single iteration of the range lease transfer phase of draining "+ "(note that the --drain-wait parameter for cockroach node drain may need adjustment "+ @@ -224,7 +224,7 @@ const leaseTransferWaitSettingName = "server.shutdown.lease_transfer_wait" // here since we check it in the caller to limit generated requests as well // to prevent excessive queuing. var ExportRequestsLimit = settings.RegisterIntSetting( - settings.TenantWritable, + settings.SystemOnly, "kv.bulk_io_write.concurrent_export_requests", "number of export requests a store will handle concurrently before queuing", 3, @@ -1129,7 +1129,7 @@ type StoreConfig struct { // Decommissioning events are not controlled by this setting. var logRangeAndNodeEventsEnabled = func() *settings.BoolSetting { s := settings.RegisterBoolSetting( - settings.TenantReadOnly, + settings.SystemOnly, "kv.log_range_and_node_events.enabled", "set to true to transactionally log range events"+ " (e.g., split, merge, add/remove voter/non-voter) into system.rangelog"+ diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9facd8a25f35..3a6ffbbc5d04 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1975,6 +1975,7 @@ func (ex *connExecutor) execCmd() (retErr error) { 0, /* limit */ "", /* portalName */ ex.implicitTxn(), + PortalPausabilityDisabled, /* portalPausability */ ) res = stmtRes @@ -2057,6 +2058,7 @@ func (ex *connExecutor) execCmd() (retErr error) { tcmd.Limit, portalName, ex.implicitTxn(), + portal.portalPausablity, ) res = stmtRes diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index cd4e426d8ed6..6c202eaf1f1e 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -660,6 +660,7 @@ type ClientComm interface { limit int, portalName string, implicitTxn bool, + portalPausability PortalPausablity, ) CommandResult // CreatePrepareResult creates a result for a PrepareStmt command. CreatePrepareResult(pos CmdPos) ParseResult diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 3253634a50a9..4f45787bf977 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1347,6 +1347,8 @@ func (r *DistSQLReceiver) handleCommErr(commErr error) { } else if errors.Is(commErr, errIEResultChannelClosed) { log.VEvent(r.ctx, 1, "encountered errIEResultChannelClosed (transitioning to draining)") r.status = execinfra.DrainRequested + } else if errors.Is(commErr, ErrPortalLimitHasBeenReached) { + r.status = execinfra.SwitchToAnotherPortal } else { // Set the error on the resultWriter to notify the consumer about // it. Most clients don't care to differentiate between @@ -1365,7 +1367,7 @@ func (r *DistSQLReceiver) handleCommErr(commErr error) { // sql/pgwire.limitedCommandResult.moreResultsNeeded). Instead of // changing the signature of AddRow, we have a sentinel error that // is handled specially here. - if !errors.Is(commErr, ErrLimitedResultNotSupported) { + if !errors.Is(commErr, ErrLimitedResultNotSupported) && !errors.Is(commErr, ErrStmtNotSupportedForPausablePortal) { r.commErr = commErr } } @@ -1505,11 +1507,27 @@ func (r *DistSQLReceiver) PushBatch( var ( // ErrLimitedResultNotSupported is an error produced by pgwire - // indicating an unsupported feature of row count limits was attempted. - ErrLimitedResultNotSupported = unimplemented.NewWithIssue(40195, "multiple active portals not supported") + // indicating the user attempted to have multiple active portals but + // either without setting sql.pgwire.multiple_active_portals.enabled to + // true or the underlying query does not satisfy the restriction. + ErrLimitedResultNotSupported = unimplemented.NewWithIssue( + 40195, + "multiple active portals not supported, "+ + "please set sql.pgwire.multiple_active_portals.enabled to true. "+ + "Note: this feature is in preview", + ) + // ErrStmtNotSupportedForPausablePortal is returned when the user have set + // sql.pgwire.multiple_active_portals.enabled to true but set an unsupported + // statement for a portal. + ErrStmtNotSupportedForPausablePortal = unimplemented.NewWithIssue( + 98911, + "the statement for a pausable portal must be a read-only SELECT query"+ + " with no sub-queries or post-queries", + ) // ErrLimitedResultClosed is a sentinel error produced by pgwire // indicating the portal should be closed without error. - ErrLimitedResultClosed = errors.New("row count limit closed") + ErrLimitedResultClosed = errors.New("row count limit closed") + ErrPortalLimitHasBeenReached = errors.New("limit has been reached") ) // ProducerDone is part of the execinfra.RowReceiver interface. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index ada8dd8d38a7..6d9ed2a25ed6 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -733,6 +733,14 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting( false, ).WithPublic() +var enableMultipleActivePortals = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.pgwire.multiple_active_portals.enabled", + "if true, portals with read-only SELECT query without sub/post queries "+ + "can be executed in interleaving manner, but with local execution plan", + false, +).WithPublic() + var errNoTransactionInProgress = errors.New("there is no transaction in progress") var errTransactionInProgress = errors.New("there is already a transaction in progress") diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index de7a51fe6db9..496e6bce51fa 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -41,6 +41,11 @@ type ConsumerStatus uint32 const ( // NeedMoreRows indicates that the consumer is still expecting more rows. NeedMoreRows ConsumerStatus = iota + // SwitchToAnotherPortal indicates that the we received exec command for + // a different portal, and may come back to continue executing the current + // portal later. If the cluster setting sql.pgwire.multiple_active_portals.enabled + // is set to be true, we do nothing and return the control to the connExecutor. + SwitchToAnotherPortal // DrainRequested indicates that the consumer will not process any more data // rows, but will accept trailing metadata from the producer. DrainRequested @@ -189,6 +194,10 @@ func Run(ctx context.Context, src RowSource, dst RowReceiver) { switch dst.Push(row, meta) { case NeedMoreRows: continue + case SwitchToAnotherPortal: + // Do nothing here and return the control to the connExecutor to execute + // the other portal, i.e. we leave the current portal open. + return case DrainRequested: DrainAndForwardMetadata(ctx, src, dst) dst.ProducerDone() @@ -236,6 +245,8 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver src.ConsumerClosed() return case NeedMoreRows: + case SwitchToAnotherPortal: + panic("current consumer is drained, cannot be paused and switched to another portal") case DrainRequested: } } @@ -457,6 +468,12 @@ func (rc *RowChannel) Push( switch consumerStatus { case NeedMoreRows: rc.dataChan <- RowChannelMsg{Row: row, Meta: meta} + case SwitchToAnotherPortal: + // We currently don't expect this status, so we propagate an assertion + // failure as metadata. + m := execinfrapb.GetProducerMeta() + m.Err = errors.AssertionFailedf("multiple active portals are not expected with the row channel") + rc.dataChan <- RowChannelMsg{Meta: m} case DrainRequested: // If we're draining, only forward metadata. if meta != nil { diff --git a/pkg/sql/execinfra/consumerstatus_string.go b/pkg/sql/execinfra/consumerstatus_string.go index 25ba15bfc228..52aa5cb90d7c 100644 --- a/pkg/sql/execinfra/consumerstatus_string.go +++ b/pkg/sql/execinfra/consumerstatus_string.go @@ -9,13 +9,14 @@ func _() { // Re-run the stringer command to generate them again. var x [1]struct{} _ = x[NeedMoreRows-0] - _ = x[DrainRequested-1] - _ = x[ConsumerClosed-2] + _ = x[SwitchToAnotherPortal-1] + _ = x[DrainRequested-2] + _ = x[ConsumerClosed-3] } -const _ConsumerStatus_name = "NeedMoreRowsDrainRequestedConsumerClosed" +const _ConsumerStatus_name = "NeedMoreRowsSwitchToAnotherPortalDrainRequestedConsumerClosed" -var _ConsumerStatus_index = [...]uint8{0, 12, 26, 40} +var _ConsumerStatus_index = [...]uint8{0, 12, 33, 47, 61} func (i ConsumerStatus) String() string { if i >= ConsumerStatus(len(_ConsumerStatus_index)-1) { diff --git a/pkg/sql/flowinfra/inbound.go b/pkg/sql/flowinfra/inbound.go index dbc891efa847..be39127db694 100644 --- a/pkg/sql/flowinfra/inbound.go +++ b/pkg/sql/flowinfra/inbound.go @@ -238,6 +238,8 @@ func processProducerMessage( switch dst.Push(row, meta) { case execinfra.NeedMoreRows: continue + case execinfra.SwitchToAnotherPortal: + return processMessageResult{err: errors.AssertionFailedf("not allowed to switch to another portal")} case execinfra.DrainRequested: // The rest of rows are not needed by the consumer. We'll send a drain // signal to the producer and expect it to quickly send trailing diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 0ed78b839daf..07df10c387d9 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -1113,6 +1113,7 @@ func (icc *internalClientComm) CreateStatementResult( _ int, _ string, _ bool, + _ PortalPausablity, ) CommandResult { return icc.createRes(pos, nil /* onClose */) } diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index 764c70a19648..521dbf098cbb 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -383,6 +383,7 @@ func (c *conn) newCommandResult( limit int, portalName string, implicitTxn bool, + portalPausability sql.PortalPausablity, ) sql.CommandResult { r := c.allocCommandResult() *r = commandResult{ @@ -401,10 +402,11 @@ func (c *conn) newCommandResult( } telemetry.Inc(sqltelemetry.PortalWithLimitRequestCounter) return &limitedCommandResult{ - limit: limit, - portalName: portalName, - implicitTxn: implicitTxn, - commandResult: r, + limit: limit, + portalName: portalName, + implicitTxn: implicitTxn, + commandResult: r, + portalPausablity: portalPausability, } } @@ -445,7 +447,9 @@ type limitedCommandResult struct { seenTuples int // If set, an error will be sent to the client if more rows are produced than // this limit. - limit int + limit int + reachedLimit bool + portalPausablity sql.PortalPausablity } // AddRow is part of the sql.RestrictedCommandResult interface. @@ -462,9 +466,17 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro if err := r.conn.Flush(r.pos); err != nil { return err } - r.seenTuples = 0 - - return r.moreResultsNeeded(ctx) + if r.portalPausablity == sql.PausablePortal { + r.reachedLimit = true + return sql.ErrPortalLimitHasBeenReached + } else { + // TODO(janexing): we keep this part as for general portals, we would like + // to keep the execution logic to avoid bring too many bugs. Eventually + // we should remove them and use the "return the control to connExecutor" + // logic for all portals. + r.seenTuples = 0 + return r.moreResultsNeeded(ctx) + } } return nil } @@ -479,6 +491,17 @@ func (r *limitedCommandResult) SupportsAddBatch() bool { // requests for rows from the active portal, during the "execute portal" flow // when a limit has been specified. func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { + errBasedOnPausability := func(pausablity sql.PortalPausablity) error { + switch pausablity { + case sql.PortalPausabilityDisabled: + return sql.ErrLimitedResultNotSupported + case sql.NotPausablePortalForUnsupportedStmt: + return sql.ErrStmtNotSupportedForPausablePortal + default: + return errors.AssertionFailedf("unsupported pausability type for a portal") + } + } + // Keep track of the previous CmdPos so we can rewind if needed. prevPos := r.conn.stmtBuf.AdvanceOne() for { @@ -493,7 +516,7 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { // next message is a delete portal. if c.Type != pgwirebase.PreparePortal || c.Name != r.portalName { telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter) - return errors.WithDetail(sql.ErrLimitedResultNotSupported, + return errors.WithDetail(errBasedOnPausability(r.portalPausablity), "cannot close a portal while a different one is open") } return r.rewindAndClosePortal(ctx, prevPos) @@ -501,7 +524,7 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { // The happy case: the client wants more rows from the portal. if c.Name != r.portalName { telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter) - return errors.WithDetail(sql.ErrLimitedResultNotSupported, + return errors.WithDetail(errBasedOnPausability(r.portalPausablity), "cannot execute a portal while a different one is open") } r.limit = c.Limit @@ -544,7 +567,7 @@ func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error { } // We got some other message, but we only support executing to completion. telemetry.Inc(sqltelemetry.InterleavedPortalRequestCounter) - return errors.WithDetail(sql.ErrLimitedResultNotSupported, + return errors.WithDetail(errBasedOnPausability(r.portalPausablity), fmt.Sprintf("cannot perform operation %T while a different portal is open", c)) } prevPos = curPos @@ -625,6 +648,13 @@ func (r *limitedCommandResult) rewindAndClosePortal( return sql.ErrLimitedResultClosed } +func (r *limitedCommandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator) { + if r.reachedLimit { + r.commandResult.typ = noCompletionMsg + } + r.commandResult.Close(ctx, t) +} + // Get the column index for job id based on the result header defined in // jobs.BulkJobExecutionResultHeader and jobs.DetachedJobExecutionResultHeader. func init() { diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 853ea614616c..96b0550e0c84 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1796,8 +1796,9 @@ func (c *conn) CreateStatementResult( limit int, portalName string, implicitTxn bool, + portalPausability sql.PortalPausablity, ) sql.CommandResult { - return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn) + return c.newCommandResult(descOpt, pos, stmt, formatCodes, conv, location, limit, portalName, implicitTxn, portalPausability) } // CreateSyncResult is part of the sql.ClientComm interface. diff --git a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs index fe1182d427af..1afeb7f1788f 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals_crbugs +++ b/pkg/sql/pgwire/testdata/pgtest/portals_crbugs @@ -29,7 +29,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"} {"Type":"ReadyForQuery","TxStatus":"E"} {"Type":"ReadyForQuery","TxStatus":"E"} @@ -70,7 +70,7 @@ ReadyForQuery {"Type":"BindComplete"} {"Type":"DataRow","Values":[{"text":"1"}]} {"Type":"PortalSuspended"} -{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported"} +{"Type":"ErrorResponse","Code":"0A000","Message":"unimplemented: multiple active portals not supported, please set sql.pgwire.multiple_active_portals.enabled to true. Note: this feature is in preview"} {"Type":"ReadyForQuery","TxStatus":"E"} send diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index ecc45f610089..5ad4bbd872c6 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -118,9 +118,28 @@ type preparedStatementsAccessor interface { DeleteAll(ctx context.Context) } +// PortalPausablity mark if the portal is pausable and the reason. This is +// needed to give the correct error for usage of multiple active portals. +type PortalPausablity int64 + +const ( + // PortalPausabilityDisabled is the default status of a portal when + // sql.pgwire.multiple_active_portals.enabled is false. + PortalPausabilityDisabled PortalPausablity = iota + // PausablePortal is set when sql.pgwire.multiple_active_portals.enabled is + // set to true and the underlying statement is a read-only SELECT query with + // no sub-queries or post-queries. + PausablePortal + // NotPausablePortalForUnsupportedStmt is used when the cluster setting + // sql.pgwire.multiple_active_portals.enabled is set to true, while we don't + // support underlying statement. + NotPausablePortalForUnsupportedStmt +) + // PreparedPortal is a PreparedStatement that has been bound with query // arguments. type PreparedPortal struct { + Name string Stmt *PreparedStatement Qargs tree.QueryArguments @@ -131,6 +150,11 @@ type PreparedPortal struct { // meaning that any additional attempts to execute it should return no // rows. exhausted bool + + // portalPausablity is used to log the correct error message when user pause + // a portal. + // See comments for PortalPausablity for more details. + portalPausablity PortalPausablity } // makePreparedPortal creates a new PreparedPortal. @@ -144,16 +168,21 @@ func (ex *connExecutor) makePreparedPortal( outFormats []pgwirebase.FormatCode, ) (PreparedPortal, error) { portal := PreparedPortal{ + Name: name, Stmt: stmt, Qargs: qargs, OutFormats: outFormats, } + // TODO(janexing): we added this line to avoid the unused lint error. + // Will remove it once the whole functionality of multple active portals + // is merged. + _ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } // accountForCopy updates the state to account for the copy of the // PreparedPortal (p is the copy). -func (p PreparedPortal) accountForCopy( +func (p *PreparedPortal) accountForCopy( ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string, ) error { if err := prepStmtsNamespaceMemAcc.Grow(ctx, p.size(portalName)); err != nil { @@ -165,13 +194,13 @@ func (p PreparedPortal) accountForCopy( } // close closes this portal. -func (p PreparedPortal) close( +func (p *PreparedPortal) close( ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string, ) { prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName)) p.Stmt.decRef(ctx) } -func (p PreparedPortal) size(portalName string) int64 { +func (p *PreparedPortal) size(portalName string) int64 { return int64(uintptr(len(portalName)) + unsafe.Sizeof(p)) } diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 8feca4cf4c90..e9e390bd6452 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -43,7 +43,8 @@ import ( // metadata (e.g. tracing information and txn updates, if applicable). // // Returns true if more rows are needed, false otherwise. If false is returned -// both the inputs and the output have been properly closed. +// both the inputs and the output have been properly closed, or there is an +// error encountered. func emitHelper( ctx context.Context, output execinfra.RowReceiver, @@ -78,6 +79,12 @@ func emitHelper( switch consumerStatus { case execinfra.NeedMoreRows: return true + case execinfra.SwitchToAnotherPortal: + output.Push(nil /* row */, &execinfrapb.ProducerMetadata{ + Err: errors.AssertionFailedf("not allowed to pause and switch to another portal"), + }) + log.Fatalf(ctx, "not allowed to pause and switch to another portal") + return false case execinfra.DrainRequested: log.VEventf(ctx, 1, "no more rows required. drain requested.") execinfra.DrainAndClose(ctx, output, nil /* cause */, pushTrailingMeta, inputs...) diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index 1cd577f82561..4d0585cdd2e8 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -45,8 +45,8 @@ func TestTrace(t *testing.T) { // These are always appended, even without the test specifying it. alwaysOptionalSpans := []string{ "drain", - "storage.pendingLeaseRequest: requesting lease", - "storage.Store: gossip on capacity change", + "pendingLeaseRequest: requesting lease", + "gossip on capacity change", "outbox", "request range lease", "range lookup", @@ -364,17 +364,22 @@ func TestTrace(t *testing.T) { } defer rows.Close() - ignoreSpans := make(map[string]bool) - for _, s := range test.optionalSpans { - ignoreSpans[s] = true + ignoreSpan := func(op string) bool { + for _, s := range test.optionalSpans { + if strings.Contains(op, s) { + return true + } + } + return false } + r := 0 for rows.Next() { var op string if err := rows.Scan(&op); err != nil { t.Fatal(err) } - if ignoreSpans[op] { + if ignoreSpan(op) { continue } @@ -391,7 +396,7 @@ func TestTrace(t *testing.T) { if err := rows.Scan(&op); err != nil { t.Fatal(err) } - if ignoreSpans[op] { + if ignoreSpan(op) { continue } t.Errorf("remaining span: %q", op)