diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 9cb725f11484..3cb80096fc09 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -1,5 +1,5 @@
Setting Type Default Description
-admission.kv.enabled boolean true when true, work performed by the KV layer is subject to admission control
+admission.epoch_lifo.enabled boolean false when true, epoch-LIFO behavior is enabled when there is significant delay in admission
admission.sql_kv_response.enabled boolean true when true, work performed by the SQL layer when receiving a KV response is subject to admission control
admission.sql_sql_response.enabled boolean true when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index d3c68cc227d2..2c38db0a9e83 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -1,6 +1,7 @@
Setting | Type | Default | Description |
+admission.epoch_lifo.enabled | boolean | false | when true, epoch-LIFO behavior is enabled when there is significant delay in admission |
admission.kv.enabled | boolean | true | when true, work performed by the KV layer is subject to admission control |
admission.sql_kv_response.enabled | boolean | true | when true, work performed by the SQL layer when receiving a KV response is subject to admission control |
admission.sql_sql_response.enabled | boolean | true | when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control |
diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel
index b635b97273d4..e83de867d102 100644
--- a/pkg/util/admission/BUILD.bazel
+++ b/pkg/util/admission/BUILD.bazel
@@ -39,6 +39,8 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
+ "//pkg/util/timeutil",
+ "//pkg/util/tracing",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_stretchr_testify//require",
diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go
index 8d2b3fea08a2..dc2b7e33829e 100644
--- a/pkg/util/admission/granter.go
+++ b/pkg/util/admission/granter.go
@@ -620,7 +620,8 @@ type Options struct {
}
type makeRequesterFunc func(
- workKind WorkKind, granter granter, settings *cluster.Settings, opts workQueueOptions) requester
+ _ log.AmbientContext, workKind WorkKind, granter granter, settings *cluster.Settings,
+ opts workQueueOptions) requester
// NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a
// regular cluster node. Caller is responsible for hooking up
@@ -668,7 +669,7 @@ func NewGrantCoordinators(
usedSlotsMetric: metrics.KVUsedSlots,
}
kvSlotAdjuster.granter = kvg
- coord.queues[KVWork] = makeRequester(KVWork, kvg, st, makeWorkQueueOptions(KVWork))
+ coord.queues[KVWork] = makeRequester(ambientCtx, KVWork, kvg, st, makeWorkQueueOptions(KVWork))
kvg.requester = coord.queues[KVWork]
coord.granters[KVWork] = kvg
@@ -680,7 +681,7 @@ func NewGrantCoordinators(
cpuOverload: kvSlotAdjuster,
}
coord.queues[SQLKVResponseWork] = makeRequester(
- SQLKVResponseWork, tg, st, makeWorkQueueOptions(SQLKVResponseWork))
+ ambientCtx, SQLKVResponseWork, tg, st, makeWorkQueueOptions(SQLKVResponseWork))
tg.requester = coord.queues[SQLKVResponseWork]
coord.granters[SQLKVResponseWork] = tg
@@ -691,7 +692,7 @@ func NewGrantCoordinators(
maxBurstTokens: opts.SQLSQLResponseBurstTokens,
cpuOverload: kvSlotAdjuster,
}
- coord.queues[SQLSQLResponseWork] = makeRequester(
+ coord.queues[SQLSQLResponseWork] = makeRequester(ambientCtx,
SQLSQLResponseWork, tg, st, makeWorkQueueOptions(SQLSQLResponseWork))
tg.requester = coord.queues[SQLSQLResponseWork]
coord.granters[SQLSQLResponseWork] = tg
@@ -703,7 +704,7 @@ func NewGrantCoordinators(
cpuOverload: kvSlotAdjuster,
usedSlotsMetric: metrics.SQLLeafStartUsedSlots,
}
- coord.queues[SQLStatementLeafStartWork] = makeRequester(
+ coord.queues[SQLStatementLeafStartWork] = makeRequester(ambientCtx,
SQLStatementLeafStartWork, sg, st, makeWorkQueueOptions(SQLStatementLeafStartWork))
sg.requester = coord.queues[SQLStatementLeafStartWork]
coord.granters[SQLStatementLeafStartWork] = sg
@@ -715,7 +716,7 @@ func NewGrantCoordinators(
cpuOverload: kvSlotAdjuster,
usedSlotsMetric: metrics.SQLRootStartUsedSlots,
}
- coord.queues[SQLStatementRootStartWork] = makeRequester(
+ coord.queues[SQLStatementRootStartWork] = makeRequester(ambientCtx,
SQLStatementRootStartWork, sg, st, makeWorkQueueOptions(SQLStatementRootStartWork))
sg.requester = coord.queues[SQLStatementRootStartWork]
coord.granters[SQLStatementRootStartWork] = sg
@@ -766,7 +767,7 @@ func NewGrantCoordinatorSQL(
maxBurstTokens: opts.SQLKVResponseBurstTokens,
cpuOverload: sqlNodeCPU,
}
- coord.queues[SQLKVResponseWork] = makeRequester(
+ coord.queues[SQLKVResponseWork] = makeRequester(ambientCtx,
SQLKVResponseWork, tg, st, makeWorkQueueOptions(SQLKVResponseWork))
tg.requester = coord.queues[SQLKVResponseWork]
coord.granters[SQLKVResponseWork] = tg
@@ -778,7 +779,7 @@ func NewGrantCoordinatorSQL(
maxBurstTokens: opts.SQLSQLResponseBurstTokens,
cpuOverload: sqlNodeCPU,
}
- coord.queues[SQLSQLResponseWork] = makeRequester(
+ coord.queues[SQLSQLResponseWork] = makeRequester(ambientCtx,
SQLSQLResponseWork, tg, st, makeWorkQueueOptions(SQLSQLResponseWork))
tg.requester = coord.queues[SQLSQLResponseWork]
coord.granters[SQLSQLResponseWork] = tg
@@ -790,7 +791,7 @@ func NewGrantCoordinatorSQL(
cpuOverload: sqlNodeCPU,
usedSlotsMetric: metrics.SQLLeafStartUsedSlots,
}
- coord.queues[SQLStatementLeafStartWork] = makeRequester(
+ coord.queues[SQLStatementLeafStartWork] = makeRequester(ambientCtx,
SQLStatementLeafStartWork, sg, st, makeWorkQueueOptions(SQLStatementLeafStartWork))
sg.requester = coord.queues[SQLStatementLeafStartWork]
coord.granters[SQLStatementLeafStartWork] = sg
@@ -802,7 +803,7 @@ func NewGrantCoordinatorSQL(
cpuOverload: sqlNodeCPU,
usedSlotsMetric: metrics.SQLRootStartUsedSlots,
}
- coord.queues[SQLStatementRootStartWork] = makeRequester(
+ coord.queues[SQLStatementRootStartWork] = makeRequester(ambientCtx,
SQLStatementRootStartWork, sg, st, makeWorkQueueOptions(SQLStatementRootStartWork))
sg.requester = coord.queues[SQLStatementRootStartWork]
coord.granters[SQLStatementRootStartWork] = sg
@@ -1247,7 +1248,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo
// Share the WorkQueue metrics across all stores.
// TODO(sumeer): add per-store WorkQueue state for debug.zip and db console.
opts.metrics = &sgc.workQueueMetrics
- coord.queues[KVWork] = sgc.makeRequesterFunc(KVWork, kvg, sgc.settings, opts)
+ coord.queues[KVWork] = sgc.makeRequesterFunc(sgc.ambientCtx, KVWork, kvg, sgc.settings, opts)
kvg.requester = coord.queues[KVWork]
coord.granters[KVWork] = kvg
coord.ioLoadListener = &ioLoadListener{
diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go
index e445618dd2e2..fd533a013363 100644
--- a/pkg/util/admission/granter_test.go
+++ b/pkg/util/admission/granter_test.go
@@ -117,7 +117,8 @@ func TestGranterBasic(t *testing.T) {
d.ScanArgs(t, "sql-leaf", &opts.SQLStatementLeafStartWorkSlots)
d.ScanArgs(t, "sql-root", &opts.SQLStatementRootStartWorkSlots)
opts.makeRequesterFunc = func(
- workKind WorkKind, granter granter, _ *cluster.Settings, opts workQueueOptions) requester {
+ _ log.AmbientContext, workKind WorkKind, granter granter, _ *cluster.Settings,
+ opts workQueueOptions) requester {
req := &testRequester{
workKind: workKind,
granter: granter,
@@ -243,7 +244,8 @@ func TestStoreCoordinators(t *testing.T) {
opts := Options{
Settings: settings,
makeRequesterFunc: func(
- workKind WorkKind, granter granter, _ *cluster.Settings, opts workQueueOptions) requester {
+ _ log.AmbientContext, workKind WorkKind, granter granter, _ *cluster.Settings,
+ opts workQueueOptions) requester {
req := &testRequester{
workKind: workKind,
granter: granter,
diff --git a/pkg/util/admission/testdata/priority_states b/pkg/util/admission/testdata/priority_states
new file mode 100644
index 000000000000..3c48786e9340
--- /dev/null
+++ b/pkg/util/admission/testdata/priority_states
@@ -0,0 +1,231 @@
+init
+----
+
+# One request at priority=-128 sees high latency. Requests at priority 0, 127
+# do not see high latency. So FIFO priority is set >= -127.
+request-received priority=127
+----
+lowest-priority: 127
+
+update priority=127 delay-millis=10
+----
+lowest-priority: 127 (pri: 127, delay-millis: 10, admitted: 1)
+
+request-received priority=-128
+----
+lowest-priority: -128 (pri: 127, delay-millis: 10, admitted: 1)
+
+update priority=-128 delay-millis=106
+----
+lowest-priority: -128 (pri: -128, delay-millis: 106, admitted: 1) (pri: 127, delay-millis: 10, admitted: 1)
+
+request-received priority=0
+----
+lowest-priority: -128 (pri: -128, delay-millis: 106, admitted: 1) (pri: 127, delay-millis: 10, admitted: 1)
+
+update priority=0 delay-millis=20
+----
+lowest-priority: -128 (pri: -128, delay-millis: 106, admitted: 1) (pri: 0, delay-millis: 20, admitted: 1) (pri: 127, delay-millis: 10, admitted: 1)
+
+get-threshold
+----
+threshold: -127
+
+# The latency seen by priority=-128 decreases but not below the threshold
+# needed to return to FIFO. So FIFO priority continues to be >= -127.
+request-received priority=-128
+----
+lowest-priority: -128
+
+update priority=-128 delay-millis=11
+----
+lowest-priority: -128 (pri: -128, delay-millis: 11, admitted: 1)
+
+get-threshold
+----
+threshold: -127
+
+# The latency seen by priority=-128 is low enough to return to FIFO.
+request-received priority=-128
+----
+lowest-priority: -128
+
+update priority=-128 delay-millis=10
+----
+lowest-priority: -128 (pri: -128, delay-millis: 10, admitted: 1)
+
+get-threshold
+----
+threshold: -128
+
+# Priority=127 sees high latency. FIFO priority is now >= 128.
+request-received priority=127
+----
+lowest-priority: 127
+
+update priority=127 delay-millis=106
+----
+lowest-priority: 127 (pri: 127, delay-millis: 106, admitted: 1)
+
+get-threshold
+----
+threshold: 128
+
+# Both priority 24 and 127 see high latency. FIFO priority stays at >=128.
+request-received priority=127
+----
+lowest-priority: 127
+
+update priority=127 delay-millis=106
+----
+lowest-priority: 127 (pri: 127, delay-millis: 106, admitted: 1)
+
+request-received priority=24
+----
+lowest-priority: 24 (pri: 127, delay-millis: 106, admitted: 1)
+
+update priority=24 delay-millis=107
+----
+lowest-priority: 24 (pri: 24, delay-millis: 107, admitted: 1) (pri: 127, delay-millis: 106, admitted: 1)
+
+get-threshold
+----
+threshold: 128
+
+# Priority -5 and 20 see high latency. There are no requests at any other
+# priority. The FIFO priority threshold reduces to >= 21.
+request-received priority=20
+----
+lowest-priority: 20
+
+update priority=20 delay-millis=111
+----
+lowest-priority: 20 (pri: 20, delay-millis: 111, admitted: 1)
+
+request-received priority=-5
+----
+lowest-priority: -5 (pri: 20, delay-millis: 111, admitted: 1)
+
+update priority=-5 delay-millis=110
+----
+lowest-priority: -5 (pri: -5, delay-millis: 110, admitted: 1) (pri: 20, delay-millis: 111, admitted: 1)
+
+get-threshold
+----
+threshold: 21
+
+# Priority 0 is LIFO and sees latency that is not low enough to return it to
+# FIFO. The FIFO priority threshold reduces to >= 1.
+request-received priority=0
+----
+lowest-priority: 0
+
+update priority=0 delay-millis=11
+----
+lowest-priority: 0 (pri: 0, delay-millis: 11, admitted: 1)
+
+get-threshold
+----
+threshold: 1
+
+# Priority -128 is LIFO and sees latency that is not low enough to return it
+# to FIFO. The FIFO priority threshold reduces to >= -127.
+request-received priority=-128
+----
+lowest-priority: -128
+
+update priority=-128 delay-millis=11
+----
+lowest-priority: -128 (pri: -128, delay-millis: 11, admitted: 1)
+
+get-threshold
+----
+threshold: -127
+
+# Priority -128 is LIFO and sees very low latency and switches back to FIFO.
+request-received priority=-128
+----
+lowest-priority: -128
+
+update priority=-128 delay-millis=9
+----
+lowest-priority: -128 (pri: -128, delay-millis: 9, admitted: 1)
+
+get-threshold
+----
+threshold: -128
+
+# Priority 0 is FIFO and sees a canceled request that does not meet the
+# latency threshold to switch to LIFO. It stays as FIFO.
+request-received priority=0
+----
+lowest-priority: 0
+
+update priority=0 delay-millis=20 canceled=true
+----
+lowest-priority: 0 (pri: 0, delay-millis: 20, admitted: 0)
+
+get-threshold
+----
+threshold: -128
+
+# Priority 0 is FIFO and sees a canceled request with very high latency, so
+# switched to LIFO.
+request-received priority=0
+----
+lowest-priority: 0
+
+update priority=0 delay-millis=120 canceled=true
+----
+lowest-priority: 0 (pri: 0, delay-millis: 120, admitted: 0)
+
+get-threshold
+----
+threshold: 1
+
+# Priority 0 receives a request, but nothing exits admission control, so it
+# stays as LIFO.
+request-received priority=0
+----
+lowest-priority: 0
+
+get-threshold
+----
+threshold: 1
+
+# Priority 10 sees a request with low latency. Priority 0 has a request that
+# does not exit admission control. Priority 0 stays as LIFO.
+request-received priority=10
+----
+lowest-priority: 10
+
+update priority=10 delay-millis=5
+----
+lowest-priority: 10 (pri: 10, delay-millis: 5, admitted: 1)
+
+request-received priority=0
+----
+lowest-priority: 0 (pri: 10, delay-millis: 5, admitted: 1)
+
+get-threshold
+----
+threshold: 1
+
+# Priority -10 sees a request with low enough latency to switch back to FIFO.
+# Priority 0 has a request that does not exit admission control. Because of
+# the observation at priority=-10 we switch everything back to FIFO.
+request-received priority=-10
+----
+lowest-priority: -10
+
+update priority=-10 delay-millis=5
+----
+lowest-priority: -10 (pri: -10, delay-millis: 5, admitted: 1)
+
+request-received priority=0
+----
+lowest-priority: -10 (pri: -10, delay-millis: 5, admitted: 1)
+
+get-threshold
+----
+threshold: -128
diff --git a/pkg/util/admission/testdata/work_queue b/pkg/util/admission/testdata/work_queue
index d7acaf2e0fc9..6be21dd0b31b 100644
--- a/pkg/util/admission/testdata/work_queue
+++ b/pkg/util/admission/testdata/work_queue
@@ -4,55 +4,55 @@ init
set-try-get-return-value v=true
----
-admit id=1 tenant=53 priority=0 create-time=1 bypass=false
+admit id=1 tenant=53 priority=0 create-time-millis=1 bypass=false
----
tryGet: returning true
id 1: admit succeeded
print
----
-tenantHeap len: 0
- tenant-id: 53 used: 1
+closed epoch: 0 tenantHeap len: 0
+ tenant-id: 53 used: 1, fifo: -128
# tryGet will return false, so work will queue up.
set-try-get-return-value v=false
----
# bypass=true is ignored since not system tenant.
-admit id=2 tenant=53 priority=0 create-time=3 bypass=true
+admit id=2 tenant=53 priority=0 create-time-millis=3 bypass=true
----
tryGet: returning false
print
----
-tenantHeap len: 1 top tenant: 53
- tenant-id: 53 used: 1 heap: 0: pri: 0, ct: 3
+closed epoch: 0 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100]
-admit id=3 tenant=53 priority=0 create-time=2 bypass=false
+admit id=3 tenant=53 priority=0 create-time-millis=2 bypass=false
----
# Tenant 53 has two waiting requests. The one that arrived second is earlier
-# in the heap because of a smaller create-time.
+# in the heap because of a smaller create-time-millis.
print
----
-tenantHeap len: 1 top tenant: 53
- tenant-id: 53 used: 1 heap: 0: pri: 0, ct: 2 1: pri: 0, ct: 3
+closed epoch: 0 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100]
# Request from tenant 71.
-admit id=4 tenant=71 priority=-128 create-time=4 bypass=false
+admit id=4 tenant=71 priority=-128 create-time-millis=4 bypass=false
----
# Another request from tenant 71. This one has higher priority so will be
-# earlier in the heap, even though it has higher create-time.
-admit id=5 tenant=71 priority=0 create-time=5 bypass=false
+# earlier in the heap, even though it has higher create-time-millis.
+admit id=5 tenant=71 priority=0 create-time-millis=5 bypass=false
----
# Tenant 71 is the top of the heap since not using any slots.
print
----
-tenantHeap len: 2 top tenant: 71
- tenant-id: 53 used: 1 heap: 0: pri: 0, ct: 2 1: pri: 0, ct: 3
- tenant-id: 71 used: 0 heap: 0: pri: 0, ct: 5 1: pri: -128, ct: 4
+closed epoch: 0 tenantHeap len: 2 top tenant: 71
+ tenant-id: 53 used: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100]
+ tenant-id: 71 used: 0, fifo: -128 waiting work heap: [0: pri: 0, ct: 5, epoch: 0, qt: 100] [1: pri: -128, ct: 4, epoch: 0, qt: 100]
granted chain-id=5
----
@@ -64,9 +64,9 @@ granted: returned true
# tenant 71.
print
----
-tenantHeap len: 2 top tenant: 71
- tenant-id: 53 used: 1 heap: 0: pri: 0, ct: 2 1: pri: 0, ct: 3
- tenant-id: 71 used: 1 heap: 0: pri: -128, ct: 4
+closed epoch: 0 tenantHeap len: 2 top tenant: 71
+ tenant-id: 53 used: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 2, epoch: 0, qt: 100] [1: pri: 0, ct: 3, epoch: 0, qt: 100]
+ tenant-id: 71 used: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100]
# Cancel a request from tenant 53.
cancel-work id=3
@@ -75,9 +75,9 @@ id 3: admit failed
print
----
-tenantHeap len: 2 top tenant: 71
- tenant-id: 53 used: 1 heap: 0: pri: 0, ct: 3
- tenant-id: 71 used: 1 heap: 0: pri: -128, ct: 4
+closed epoch: 0 tenantHeap len: 2 top tenant: 71
+ tenant-id: 53 used: 1, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100]
+ tenant-id: 71 used: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100]
# The work admitted for tenant 53 is done.
work-done id=1
@@ -87,23 +87,23 @@ returnGrant
# Tenant 53 now using fewer slots so it becomes the top of the heap.
print
----
-tenantHeap len: 2 top tenant: 53
- tenant-id: 53 used: 0 heap: 0: pri: 0, ct: 3
- tenant-id: 71 used: 1 heap: 0: pri: -128, ct: 4
+closed epoch: 0 tenantHeap len: 2 top tenant: 53
+ tenant-id: 53 used: 0, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100]
+ tenant-id: 71 used: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100]
# A request from the system tenant bypasses admission control, but is
# reflected in the WorkQueue state.
-admit id=6 tenant=1 priority=0 create-time=6 bypass=true
+admit id=6 tenant=1 priority=0 create-time-millis=6 bypass=true
----
tookWithoutPermission
id 6: admit succeeded
print
----
-tenantHeap len: 2 top tenant: 53
- tenant-id: 1 used: 1
- tenant-id: 53 used: 0 heap: 0: pri: 0, ct: 3
- tenant-id: 71 used: 1 heap: 0: pri: -128, ct: 4
+closed epoch: 0 tenantHeap len: 2 top tenant: 53
+ tenant-id: 1 used: 1, fifo: -128
+ tenant-id: 53 used: 0, fifo: -128 waiting work heap: [0: pri: 0, ct: 3, epoch: 0, qt: 100]
+ tenant-id: 71 used: 1, fifo: -128 waiting work heap: [0: pri: -128, ct: 4, epoch: 0, qt: 100]
granted chain-id=7
----
@@ -120,10 +120,10 @@ granted: returned true
# No more waiting requests.
print
----
-tenantHeap len: 0
- tenant-id: 1 used: 1
- tenant-id: 53 used: 1
- tenant-id: 71 used: 2
+closed epoch: 0 tenantHeap len: 0
+ tenant-id: 1 used: 1, fifo: -128
+ tenant-id: 53 used: 1, fifo: -128
+ tenant-id: 71 used: 2, fifo: -128
# Granted returns false.
granted chain-id=10
@@ -132,7 +132,296 @@ granted: returned false
print
----
-tenantHeap len: 0
- tenant-id: 1 used: 1
- tenant-id: 53 used: 1
- tenant-id: 71 used: 2
+closed epoch: 0 tenantHeap len: 0
+ tenant-id: 1 used: 1, fifo: -128
+ tenant-id: 53 used: 1, fifo: -128
+ tenant-id: 71 used: 2, fifo: -128
+
+init
+----
+
+set-try-get-return-value v=false
+----
+
+admit id=1 tenant=53 priority=0 create-time-millis=1 bypass=false
+----
+tryGet: returning false
+
+# Make the request wait long enough that we switch to LIFO.
+advance-time millis=205
+----
+closed epoch: 2 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 0, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100]
+
+print
+----
+closed epoch: 2 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 0, fifo: -128 waiting work heap: [0: pri: 0, ct: 1, epoch: 0, qt: 100]
+
+granted chain-id=5
+----
+continueGrantChain 5
+id 1: admit succeeded
+granted: returned true
+
+print
+----
+closed epoch: 2 tenantHeap len: 0
+ tenant-id: 53 used: 1, fifo: -128
+
+# Switch to LIFO since request waited for 205ms.
+advance-time millis=100
+----
+closed epoch: 3 tenantHeap len: 0
+ tenant-id: 53 used: 1, fifo: 1
+
+admit id=2 tenant=53 priority=0 create-time-millis=50 bypass=false
+----
+tryGet: returning false
+
+admit id=3 tenant=53 priority=0 create-time-millis=399 bypass=false
+----
+
+admit id=4 tenant=53 priority=0 create-time-millis=400 bypass=false
+----
+
+# Two requests are in closed epochs and one is in open epoch.
+print
+----
+closed epoch: 3 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 1, fifo: 1 waiting work heap: [0: pri: 0, ct: 399, epoch: 3, qt: 405, lifo-ordering] [1: pri: 0, ct: 50, epoch: 0, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405]
+
+# Latest request in closed epoch is granted.
+granted chain-id=6
+----
+continueGrantChain 6
+id 3: admit succeeded
+granted: returned true
+
+# Older request in closed epoch is granted.
+granted chain-id=7
+----
+continueGrantChain 7
+id 2: admit succeeded
+granted: returned true
+
+# Only request is in open epoch.
+print
+----
+closed epoch: 3 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 3, fifo: 1 open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405]
+
+# Add request to closed epoch.
+admit id=5 tenant=53 priority=0 create-time-millis=300 bypass=false
+----
+
+# Add request in open epoch 5 which is different from existing open epoch
+# request that has epoch 4.
+admit id=6 tenant=53 priority=0 create-time-millis=500 bypass=false
+----
+
+# Open epochs heap is ordered in rough FIFO.
+print
+----
+closed epoch: 3 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 3, fifo: 1 waiting work heap: [0: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+# Add high priority request in open epoch 5.
+admit id=7 tenant=53 priority=127 create-time-millis=550 bypass=false
+----
+
+# The high priority request goes into the normal heap since it is >= the fifo
+# threshold, and so is still using FIFO ordering.
+print
+----
+closed epoch: 3 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 3, fifo: 1 waiting work heap: [0: pri: 127, ct: 550, epoch: 5, qt: 405] [1: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+# Make the request wait for 60ms so we don't switch back to fifo.
+advance-time millis=60
+----
+closed epoch: 3 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 3, fifo: 1 waiting work heap: [0: pri: 127, ct: 550, epoch: 5, qt: 405] [1: pri: 0, ct: 300, epoch: 3, qt: 405, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+granted chain-id=8
+----
+continueGrantChain 8
+id 7: admit succeeded
+granted: returned true
+
+granted chain-id=9
+----
+continueGrantChain 9
+id 5: admit succeeded
+granted: returned true
+
+# Add another request to closed epoch that is subject to LIFO ordering.
+admit id=8 tenant=53 priority=0 create-time-millis=350 bypass=false
+----
+
+print
+----
+closed epoch: 3 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 5, fifo: 1 waiting work heap: [0: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405] [1: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+# One request moved from open to closed epoch heap.
+advance-time millis=40
+----
+closed epoch: 4 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 5, fifo: 1 waiting work heap: [0: pri: 0, ct: 400, epoch: 4, qt: 405, lifo-ordering] [1: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+granted chain-id=10
+----
+continueGrantChain 10
+id 4: admit succeeded
+granted: returned true
+
+print
+----
+closed epoch: 4 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 6, fifo: 1 waiting work heap: [0: pri: 0, ct: 350, epoch: 3, qt: 465, lifo-ordering] open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+granted chain-id=11
+----
+continueGrantChain 11
+id 8: admit succeeded
+granted: returned true
+
+print
+----
+closed epoch: 4 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 7, fifo: 1 open epochs heap: [0: pri: 0, ct: 500, epoch: 5, qt: 405]
+
+# Can dequeue from the open epochs heap if nothing else is remaining.
+granted chain-id=12
+----
+continueGrantChain 12
+id 6: admit succeeded
+granted: returned true
+
+print
+----
+closed epoch: 4 tenantHeap len: 0
+ tenant-id: 53 used: 8, fifo: 1
+
+# Add a request for an already closed epoch.
+admit id=9 tenant=53 priority=0 create-time-millis=380 bypass=false
+----
+tryGet: returning false
+
+print
+----
+closed epoch: 4 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 8, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering]
+
+# This time advance means the previous request will see significant queueing.
+advance-time millis=100
+----
+closed epoch: 5 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 8, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering]
+
+# This request in an already closed epoch gets ahead because of higher
+# create-time-millis.
+admit id=10 tenant=53 priority=0 create-time-millis=390 bypass=false
+----
+
+print
+----
+closed epoch: 5 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 8, fifo: 1 waiting work heap: [0: pri: 0, ct: 390, epoch: 3, qt: 605, lifo-ordering] [1: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering]
+
+granted chain-id=12
+----
+continueGrantChain 12
+id 10: admit succeeded
+granted: returned true
+
+print
+----
+closed epoch: 5 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 9, fifo: 1 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering]
+
+# This advance will switch all priorities back to FIFO.
+advance-time millis=100
+----
+closed epoch: 6 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 9, fifo: -128 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering]
+
+admit id=11 tenant=53 priority=0 create-time-millis=610 bypass=false
+----
+
+admit id=12 tenant=53 priority=-128 create-time-millis=615 bypass=false
+----
+
+# When comparing work with LIFO and FIFO marking, with the same priority, we
+# pick LIFO. The lower priority request is ordered at the end even though it
+# has the highest create time.
+print
+----
+closed epoch: 6 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 9, fifo: -128 waiting work heap: [0: pri: 0, ct: 610, epoch: 6, qt: 705] [1: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] [2: pri: -128, ct: 615, epoch: 6, qt: 705]
+
+granted chain-id=13
+----
+continueGrantChain 13
+id 11: admit succeeded
+granted: returned true
+
+# With the remaining two items, the priority is different, so higher priority
+# is preferred.
+print
+----
+closed epoch: 6 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 10, fifo: -128 waiting work heap: [0: pri: 0, ct: 380, epoch: 3, qt: 505, lifo-ordering] [1: pri: -128, ct: 615, epoch: 6, qt: 705]
+
+granted chain-id=14
+----
+continueGrantChain 14
+id 9: admit succeeded
+granted: returned true
+
+granted chain-id=15
+----
+continueGrantChain 15
+id 12: admit succeeded
+granted: returned true
+
+# Advance time again. Since one of the priority=0 requests experienced high
+# latency, switch that back to LIFO.
+advance-time millis=100
+----
+closed epoch: 7 tenantHeap len: 0
+ tenant-id: 53 used: 12, fifo: 1
+
+# Add a request whose epoch is not closed.
+admit id=13 tenant=53 priority=0 create-time-millis=810 bypass=false
+----
+tryGet: returning false
+
+print
+----
+closed epoch: 7 tenantHeap len: 1 top tenant: 53
+ tenant-id: 53 used: 12, fifo: 1 open epochs heap: [0: pri: 0, ct: 810, epoch: 8, qt: 805]
+
+# Cancel that request.
+cancel-work id=13
+----
+id 13: admit failed
+
+print
+----
+closed epoch: 7 tenantHeap len: 0
+ tenant-id: 53 used: 12, fifo: 1
+
+# Closed epoch advances. The FIFO threshold is not changed since the only
+# request was canceled.
+advance-time millis=100
+----
+closed epoch: 8 tenantHeap len: 0
+ tenant-id: 53 used: 12, fifo: 1
+
+# Closed epoch advances. All priorities are now subject to FIFO.
+advance-time millis=100
+----
+closed epoch: 9 tenantHeap len: 0
+ tenant-id: 53 used: 12, fifo: -128
diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go
index 25052dcb7563..96de2b41cd77 100644
--- a/pkg/util/admission/work_queue.go
+++ b/pkg/util/admission/work_queue.go
@@ -31,10 +31,20 @@ import (
"github.com/cockroachdb/redact"
)
+// Use of the admission control package spans the SQL and KV layers. When
+// running in a multi-tenant setting, we have per-tenant SQL-only servers and
+// multi-tenant storage servers. These multi-tenant storage servers contain
+// the multi-tenant KV layer, and the SQL layer for the system tenant. Most of
+// the following settings are relevant to both kinds of servers (except for
+// KVAdmissionControlEnabled). Only the system tenant can modify these
+// settings in the storage servers, while a regular tenant can modify these
+// settings for their SQL-only servers. Which is why these are typically
+// TenantWritable.
+
// KVAdmissionControlEnabled controls whether KV server-side admission control
// is enabled.
var KVAdmissionControlEnabled = settings.RegisterBoolSetting(
- settings.TenantWritable,
+ settings.SystemOnly,
"admission.kv.enabled",
"when true, work performed by the KV layer is subject to admission control",
true).WithPublic()
@@ -63,6 +73,19 @@ var admissionControlEnabledSettings = [numWorkKinds]*settings.BoolSetting{
SQLSQLResponseWork: SQLSQLResponseAdmissionControlEnabled,
}
+// EpochLIFOEnabled controls whether the adaptive epoch-LIFO scheme is enabled
+// for admission control. Is only relevant when the above admission control
+// settings are also set to true. Unlike those settings, which are granular
+// for each kind of admission queue, this setting applies to all the queues.
+// This is because we recommend that all those settings be enabled or none be
+// enabled, and we don't want to carry forward unnecessarily granular
+// settings.
+var EpochLIFOEnabled = settings.RegisterBoolSetting(
+ settings.TenantWritable,
+ "admission.epoch_lifo.enabled",
+ "when true, epoch-LIFO behavior is enabled when there is significant delay in admission",
+ false).WithPublic()
+
// WorkPriority represents the priority of work. In an WorkQueue, it is only
// used for ordering within a tenant. High priority work can starve lower
// priority work.
@@ -74,7 +97,8 @@ const (
// NormalPri is normal priority work.
NormalPri WorkPriority = 0
// HighPri is high priority work.
- HighPri WorkPriority = math.MaxInt8
+ HighPri WorkPriority = math.MaxInt8
+ oneAboveHighPri int = int(HighPri) + 1
)
// Prevent the linter from emitting unused warnings.
@@ -147,6 +171,7 @@ type WorkInfo struct {
// kvQueue.AdmittedWorkDone(tid)
// }
type WorkQueue struct {
+ ambientCtx context.Context
workKind WorkKind
granter granter
usesTokens bool
@@ -164,10 +189,15 @@ type WorkQueue struct {
tenantHeap tenantHeap
// All tenants, including those without waiting work. Periodically cleaned.
tenants map[uint64]*tenantInfo
+ // The highest epoch that is closed.
+ closedEpochThreshold int64
}
+ logThreshold log.EveryN
metrics WorkQueueMetrics
admittedCount uint64
- gcStopCh chan struct{}
+ stopCh chan struct{}
+
+ timeSource timeutil.TimeSource
}
var _ requester = &WorkQueue{}
@@ -178,6 +208,12 @@ type workQueueOptions struct {
// If non-nil, the WorkQueue should use the supplied metrics instead of
// creating its own.
metrics *WorkQueueMetrics
+
+ // timeSource can be set to non-nil for tests. If nil,
+ // the timeutil.DefaultTimeSource will be used.
+ timeSource timeutil.TimeSource
+ // The epoch closing goroutine can be disabled for tests.
+ disableEpochClosingGoroutine bool
}
func makeWorkQueueOptions(workKind WorkKind) workQueueOptions {
@@ -194,41 +230,168 @@ func makeWorkQueueOptions(workKind WorkKind) workQueueOptions {
}
func makeWorkQueue(
- workKind WorkKind, granter granter, settings *cluster.Settings, opts workQueueOptions,
+ ambientCtx log.AmbientContext,
+ workKind WorkKind,
+ granter granter,
+ settings *cluster.Settings,
+ opts workQueueOptions,
) requester {
- gcStopCh := make(chan struct{})
+ stopCh := make(chan struct{})
var metrics WorkQueueMetrics
if opts.metrics == nil {
metrics = makeWorkQueueMetrics(string(workKindString(workKind)))
} else {
metrics = *opts.metrics
}
+ if opts.timeSource == nil {
+ opts.timeSource = timeutil.DefaultTimeSource{}
+ }
q := &WorkQueue{
- workKind: workKind,
- granter: granter,
- usesTokens: opts.usesTokens,
- tiedToRange: opts.tiedToRange,
- settings: settings,
- metrics: metrics,
- gcStopCh: gcStopCh,
+ ambientCtx: ambientCtx.AnnotateCtx(context.Background()),
+ workKind: workKind,
+ granter: granter,
+ usesTokens: opts.usesTokens,
+ tiedToRange: opts.tiedToRange,
+ settings: settings,
+ logThreshold: log.Every(5 * time.Minute),
+ metrics: metrics,
+ stopCh: stopCh,
+ timeSource: opts.timeSource,
}
q.mu.tenants = make(map[uint64]*tenantInfo)
go func() {
ticker := time.NewTicker(time.Second)
- done := false
- for !done {
+ for {
select {
case <-ticker.C:
q.gcTenantsAndResetTokens()
- case <-gcStopCh:
- done = true
+ case <-stopCh:
+ // Channel closed.
+ return
}
}
- close(gcStopCh)
}()
+ q.tryCloseEpoch()
+ if !opts.disableEpochClosingGoroutine {
+ q.startClosingEpochs()
+ }
return q
}
+func isInTenantHeap(tenant *tenantInfo) bool {
+ // If there is some waiting work, this tenant is in tenantHeap.
+ return len(tenant.waitingWorkHeap) > 0 || len(tenant.openEpochsHeap) > 0
+}
+
+func (q *WorkQueue) timeNow() time.Time {
+ return q.timeSource.Now()
+}
+
+func (q *WorkQueue) epochLIFOEnabled() bool {
+ return q.settings != nil && EpochLIFOEnabled.Get(&q.settings.SV)
+}
+
+func (q *WorkQueue) startClosingEpochs() {
+ go func() {
+ // We try to run the ticker with duration equal to the epoch length
+ // whenever possible. If the error in closing the epoch grows too large,
+ // we switch to a 1ms ticker. One would expect the overhead of always
+ // running with a 1ms ticker to be negligible, but we have observed
+ // 5-10% of cpu utilization on CockroachDB nodes that are doing no other
+ // work. The cause may be a poor interaction with processor idle state
+ // https://github.com/golang/go/issues/30740#issuecomment-471634471.
+ // Note that one of the cases where error in closing is likely to grow
+ // large is when cpu utilization is close to 100% -- in that case we
+ // will be doing 1ms ticks, which is fine since there are no idle
+ // processors.
+ tickerDurShort := time.Millisecond
+ acceptableErrorNanos := int64(2 * tickerDurShort)
+ currentTickerDur := tickerDurShort
+ if !q.epochLIFOEnabled() {
+ currentTickerDur = time.Duration(epochLengthNanos)
+ }
+ // TODO(sumeer): try using a Timer instead.
+ ticker := time.NewTicker(currentTickerDur)
+ for {
+ select {
+ case <-ticker.C:
+ closedEpoch, closingErrorNanos := q.tryCloseEpoch()
+ if closedEpoch {
+ epochLIFOEnabled := q.epochLIFOEnabled()
+ if currentTickerDur == tickerDurShort {
+ if closingErrorNanos < acceptableErrorNanos || !epochLIFOEnabled {
+ // Switch to long duration ticking.
+ currentTickerDur = time.Duration(epochLengthNanos)
+ ticker.Reset(currentTickerDur)
+ }
+ // Else continue ticking at 1ms.
+ } else if closingErrorNanos >= acceptableErrorNanos && epochLIFOEnabled {
+ // Ticker was using a long duration and the error became too
+ // high. Switch to 1ms ticks.
+ currentTickerDur = tickerDurShort
+ ticker.Reset(currentTickerDur)
+ }
+ }
+ case <-q.stopCh:
+ // Channel closed.
+ return
+ }
+ }
+ }()
+}
+
+func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64) {
+ epochLIFOEnabled := q.epochLIFOEnabled()
+ timeNow := q.timeNow()
+ epochClosingTimeNanos := timeNow.UnixNano() - epochLengthNanos - epochClosingDeltaNanos
+ epoch := epochForTimeNanos(epochClosingTimeNanos)
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ if epoch <= q.mu.closedEpochThreshold {
+ return
+ }
+ q.mu.closedEpochThreshold = epoch
+ closedEpoch = true
+ closingErrorNanos = epochClosingTimeNanos - (epoch * epochLengthNanos)
+ doLog := q.logThreshold.ShouldLog()
+ for _, tenant := range q.mu.tenants {
+ prevThreshold := tenant.fifoPriorityThreshold
+ tenant.fifoPriorityThreshold =
+ tenant.priorityStates.getFIFOPriorityThresholdAndReset(tenant.fifoPriorityThreshold)
+ if !epochLIFOEnabled {
+ tenant.fifoPriorityThreshold = int(LowPri)
+ }
+ if tenant.fifoPriorityThreshold != prevThreshold || doLog {
+ logVerb := "is"
+ if tenant.fifoPriorityThreshold != prevThreshold {
+ logVerb = "changed to"
+ }
+ // TODO(sumeer): export this as a per-tenant metric somehow. We could
+ // start with this being a per-WorkQueue metric for only the system
+ // tenant. However, currently we share metrics across WorkQueues --
+ // specifically all the store WorkQueues share the same metric. We
+ // should eliminate that sharing and make those per store metrics.
+ log.Infof(q.ambientCtx, "%s: FIFO threshold for tenant %d %s %d",
+ string(workKindString(q.workKind)), tenant.id, logVerb, tenant.fifoPriorityThreshold)
+ }
+ // Note that we are ignoring the new priority threshold and only
+ // dequeueing the ones that are in the closed epoch. It is possible to
+ // have work items that are not in the closed epoch and whose priority
+ // makes them no longer subject to LIFO, but they will need to wait here
+ // until their epochs close. This is considered acceptable since the
+ // priority threshold should not fluctuate rapidly.
+ for len(tenant.openEpochsHeap) > 0 {
+ work := tenant.openEpochsHeap[0]
+ if work.epoch > epoch {
+ break
+ }
+ heap.Pop(&tenant.openEpochsHeap)
+ heap.Push(&tenant.waitingWorkHeap, work)
+ }
+ }
+ return closedEpoch, closingErrorNanos
+}
+
// Admit is called when requesting admission for some work. If err!=nil, the
// request was not admitted, potentially due to the deadline being exceeded.
// The enabled return value is relevant when err=nil, and represents whether
@@ -255,7 +418,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
}
if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork {
tenant.used++
- if len(tenant.waitingWorkHeap) > 0 {
+ if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
}
q.mu.Unlock()
@@ -265,6 +428,12 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
atomic.AddUint64(&q.admittedCount, 1)
return true, nil
}
+ // Work is subject to admission control.
+
+ // Tell priorityStates about this received work. We don't tell it about work
+ // that has bypassed admission control, since priorityStates is deciding the
+ // threshold for LIFO queueing based on observed admission latency.
+ tenant.priorityStates.requestAtPriority(info.Priority)
if len(q.mu.tenantHeap) == 0 {
// Fast-path. Try to grab token/slot.
@@ -319,7 +488,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
}
}
// Check for cancellation.
- startTime := timeutil.Now()
+ startTime := q.timeNow()
doneCh := ctx.Done()
if doneCh != nil {
select {
@@ -339,9 +508,18 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
}
}
// Push onto heap(s).
- work := newWaitingWork(info.Priority, info.CreateTime)
- heap.Push(&tenant.waitingWorkHeap, work)
- if len(tenant.waitingWorkHeap) == 1 {
+ ordering := fifoWorkOrdering
+ if int(info.Priority) < tenant.fifoPriorityThreshold {
+ ordering = lifoWorkOrdering
+ }
+ work := newWaitingWork(info.Priority, ordering, info.CreateTime, startTime)
+ inTenantHeap := isInTenantHeap(tenant)
+ if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering {
+ heap.Push(&tenant.waitingWorkHeap, work)
+ } else {
+ heap.Push(&tenant.openEpochsHeap, work)
+ }
+ if !inTenantHeap {
heap.Push(&q.mu.tenantHeap, tenant)
}
// Else already in tenantHeap.
@@ -354,7 +532,15 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
defer releaseWaitingWork(work)
select {
case <-doneCh:
+ waitDur := q.timeNow().Sub(startTime)
q.mu.Lock()
+ // The work was cancelled, so waitDur is less than the wait time this work
+ // would have encountered if it actually waited until admission. However,
+ // this lower bound is still useful for calculating the FIFO=>LIFO switch
+ // since it is possible that all work at this priority is exceeding the
+ // deadline and being cancelled. The risk here is that if the deadlines
+ // are too short, we could underestimate the actual wait time.
+ tenant.priorityStates.updateDelayLocked(work.priority, waitDur, true /* canceled */)
if work.heapIndex == -1 {
// No longer in heap. Raced with token/slot grant.
if !q.usesTokens {
@@ -374,14 +560,17 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
chainID := <-work.ch
q.granter.continueGrantChain(chainID)
} else {
- tenant.waitingWorkHeap.remove(work)
- if len(tenant.waitingWorkHeap) == 0 {
+ if work.inWaitingWorkHeap {
+ tenant.waitingWorkHeap.remove(work)
+ } else {
+ tenant.openEpochsHeap.remove(work)
+ }
+ if !isInTenantHeap(tenant) {
q.mu.tenantHeap.remove(tenant)
}
q.mu.Unlock()
}
q.metrics.Errored.Inc(1)
- waitDur := timeutil.Since(startTime)
q.metrics.WaitDurationSum.Inc(waitDur.Microseconds())
q.metrics.WaitDurations.RecordValue(waitDur.Nanoseconds())
q.metrics.WaitQueueLength.Dec(1)
@@ -397,7 +586,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
}
q.metrics.Admitted.Inc(1)
atomic.AddUint64(&q.admittedCount, 1)
- waitDur := timeutil.Since(startTime)
+ waitDur := q.timeNow().Sub(startTime)
q.metrics.WaitDurationSum.Inc(waitDur.Microseconds())
q.metrics.WaitDurations.RecordValue(waitDur.Nanoseconds())
q.metrics.WaitQueueLength.Dec(1)
@@ -424,7 +613,7 @@ func (q *WorkQueue) AdmittedWorkDone(tenantID roachpb.TenantID) {
panic(errors.AssertionFailedf("tenant not found"))
}
tenant.used--
- if len(tenant.waitingWorkHeap) > 0 {
+ if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
}
q.mu.Unlock()
@@ -443,17 +632,23 @@ func (q *WorkQueue) hasWaitingRequests() bool {
func (q *WorkQueue) granted(grantChainID grantChainID) bool {
// Reduce critical section by getting time before mutex acquisition.
- now := timeutil.Now()
+ now := q.timeNow()
q.mu.Lock()
if len(q.mu.tenantHeap) == 0 {
q.mu.Unlock()
return false
}
tenant := q.mu.tenantHeap[0]
- item := heap.Pop(&tenant.waitingWorkHeap).(*waitingWork)
- item.grantTime = now
- tenant.used++
+ var item *waitingWork
if len(tenant.waitingWorkHeap) > 0 {
+ item = heap.Pop(&tenant.waitingWorkHeap).(*waitingWork)
+ } else {
+ item = heap.Pop(&tenant.openEpochsHeap).(*waitingWork)
+ }
+ waitDur := now.Sub(item.enqueueingTime)
+ tenant.priorityStates.updateDelayLocked(item.priority, waitDur, false /* canceled */)
+ tenant.used++
+ if isInTenantHeap(tenant) {
q.mu.tenantHeap.fix(tenant)
} else {
q.mu.tenantHeap.remove(tenant)
@@ -471,7 +666,7 @@ func (q *WorkQueue) gcTenantsAndResetTokens() {
// longer than desired. We could break this iteration into smaller parts if
// needed.
for id, info := range q.mu.tenants {
- if info.used == 0 && len(info.waitingWorkHeap) == 0 {
+ if info.used == 0 && !isInTenantHeap(info) {
delete(q.mu.tenants, id)
releaseTenantInfo(info)
} else if q.usesTokens {
@@ -490,6 +685,7 @@ func (q *WorkQueue) String() string {
func (q *WorkQueue) SafeFormat(s redact.SafePrinter, verb rune) {
q.mu.Lock()
defer q.mu.Unlock()
+ s.Printf("closed epoch: %d ", q.mu.closedEpochThreshold)
s.Printf("tenantHeap len: %d", len(q.mu.tenantHeap))
if len(q.mu.tenantHeap) > 0 {
s.Printf(" top tenant: %d", q.mu.tenantHeap[0].id)
@@ -501,12 +697,30 @@ func (q *WorkQueue) SafeFormat(s redact.SafePrinter, verb rune) {
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
for _, id := range ids {
tenant := q.mu.tenants[id]
- s.Printf("\n tenant-id: %d used: %d", tenant.id, tenant.used)
+ s.Printf("\n tenant-id: %d used: %d, fifo: %d", tenant.id, tenant.used,
+ tenant.fifoPriorityThreshold)
if len(tenant.waitingWorkHeap) > 0 {
- s.Printf(" heap:")
+ s.Printf(" waiting work heap:")
for i := range tenant.waitingWorkHeap {
- s.Printf(" %d: pri: %d, ct: %d", i, tenant.waitingWorkHeap[i].priority,
- tenant.waitingWorkHeap[i].createTime)
+ var workOrdering string
+ if tenant.waitingWorkHeap[i].arrivalTimeWorkOrdering == lifoWorkOrdering {
+ workOrdering = ", lifo-ordering"
+ }
+ s.Printf(" [%d: pri: %d, ct: %d, epoch: %d, qt: %d%s]", i,
+ tenant.waitingWorkHeap[i].priority,
+ tenant.waitingWorkHeap[i].createTime/int64(time.Millisecond),
+ tenant.waitingWorkHeap[i].epoch,
+ tenant.waitingWorkHeap[i].enqueueingTime.UnixNano()/int64(time.Millisecond), workOrdering)
+ }
+ }
+ if len(tenant.openEpochsHeap) > 0 {
+ s.Printf(" open epochs heap:")
+ for i := range tenant.openEpochsHeap {
+ s.Printf(" [%d: pri: %d, ct: %d, epoch: %d, qt: %d]", i,
+ tenant.openEpochsHeap[i].priority,
+ tenant.openEpochsHeap[i].createTime/int64(time.Millisecond),
+ tenant.openEpochsHeap[i].epoch,
+ tenant.openEpochsHeap[i].enqueueingTime.UnixNano()/int64(time.Millisecond))
}
}
}
@@ -514,7 +728,146 @@ func (q *WorkQueue) SafeFormat(s redact.SafePrinter, verb rune) {
// close tells the gc goroutine to stop.
func (q *WorkQueue) close() {
- q.gcStopCh <- struct{}{}
+ close(q.stopCh)
+}
+
+type workOrderingKind int8
+
+const (
+ fifoWorkOrdering workOrderingKind = iota
+ lifoWorkOrdering
+)
+
+type priorityState struct {
+ priority WorkPriority
+ // maxQueueDelay includes the delay of both successfully admitted and
+ // canceled requests.
+ //
+ // NB: The maxQueueDelay value is an incomplete picture of delay since it
+ // does not have visibility into work that is still waiting in the queue.
+ // However, since we use the maxQueueDelay across a collection of priorities
+ // to set a priority threshold, we expect that usually there will be some
+ // work just below the priority threshold that does dequeue (with high
+ // latency) -- if not, it is likely that the next high priority is actually
+ // the one experiencing some instances of high latency. That is, it is very
+ // unlikely to be the case that a certain priority sees admission with no
+ // high latency while the next lower priority never gets work dequeued
+ // because of resource saturation.
+ maxQueueDelay time.Duration
+ // Count of requests that were successfully admitted (not canceled). This is
+ // used in concert with lowestPriorityWithRequests to detect priorities
+ // where work was queued but nothing was successfully admitted.
+ admittedCount int
+}
+
+// priorityStates tracks information about admission requests and admission
+// grants at various priorities. It is used to set a priority threshold for
+// LIFO queuing. There is one priorityStates per tenant, since it is embedded
+// in a tenantInfo.
+type priorityStates struct {
+ // In increasing order of priority. Expected to not have more than 10
+ // elements, so a linear search is fast. The slice is emptied after each
+ // epoch is closed.
+ ps []priorityState
+ lowestPriorityWithRequests int
+}
+
+// makePriorityStates returns an empty priorityStates, that reuses the
+// ps slice.
+func makePriorityStates(ps []priorityState) priorityStates {
+ return priorityStates{ps: ps[:0], lowestPriorityWithRequests: oneAboveHighPri}
+}
+
+// requestAtPriority is called when a request is received at the given
+// priority.
+func (ps *priorityStates) requestAtPriority(priority WorkPriority) {
+ if int(priority) < ps.lowestPriorityWithRequests {
+ ps.lowestPriorityWithRequests = int(priority)
+ }
+}
+
+// updateDelayLocked is called with the delay experienced by work at the given
+// priority. This is used to compute priorityState.maxQueueDelay. Canceled
+// indicates whether the request was canceled while waiting in the queue, or
+// successfully admitted.
+func (ps *priorityStates) updateDelayLocked(
+ priority WorkPriority, delay time.Duration, canceled bool,
+) {
+ i := 0
+ n := len(ps.ps)
+ for ; i < n; i++ {
+ pri := ps.ps[i].priority
+ if pri == priority {
+ if !canceled {
+ ps.ps[i].admittedCount++
+ }
+ if ps.ps[i].maxQueueDelay < delay {
+ ps.ps[i].maxQueueDelay = delay
+ }
+ return
+ }
+ if pri > priority {
+ break
+ }
+ }
+ admittedCount := 1
+ if canceled {
+ admittedCount = 0
+ }
+ state := priorityState{priority: priority, maxQueueDelay: delay, admittedCount: admittedCount}
+ if i == n {
+ ps.ps = append(ps.ps, state)
+ } else {
+ ps.ps = append(ps.ps[:i+1], ps.ps[i:]...)
+ ps.ps[i] = state
+ }
+}
+
+func (ps *priorityStates) getFIFOPriorityThresholdAndReset(curPriorityThreshold int) int {
+ // priority is monotonically increasing in the calculation below.
+ priority := int(LowPri)
+ foundLowestPriority := false
+ handlePriorityState := func(p priorityState) {
+ if p.maxQueueDelay > maxQueueDelayToSwitchToLifo {
+ // LIFO.
+ priority = int(p.priority) + 1
+ } else if int(p.priority) < curPriorityThreshold {
+ // Currently LIFO. If the delay is above some fraction of the threshold,
+ // we continue as LIFO. If the delay is below that fraction, we could
+ // have a situation where requests were made at this priority but
+ // nothing was admitted -- we continue with LIFO in that case too.
+ if p.maxQueueDelay > time.Duration(epochLengthNanos)/10 ||
+ (p.admittedCount == 0 && int(p.priority) >= ps.lowestPriorityWithRequests) {
+ priority = int(p.priority) + 1
+ }
+ // Else, can switch to FIFO, at least based on queue delay at this
+ // priority. But we examine the other higher priorities too, since it is
+ // possible that few things were received for this priority and it got
+ // lucky in getting them admitted.
+ }
+ }
+ for i := range ps.ps {
+ p := ps.ps[i]
+ if int(p.priority) == ps.lowestPriorityWithRequests {
+ foundLowestPriority = true
+ }
+ handlePriorityState(p)
+ }
+ if !foundLowestPriority && ps.lowestPriorityWithRequests != oneAboveHighPri &&
+ priority <= ps.lowestPriorityWithRequests {
+ // The new threshold will cause lowestPriorityWithRequests to be FIFO, and
+ // we know nothing exited admission control for this lowest priority.
+ // Since !foundLowestPriority, we know we haven't explicitly considered
+ // this priority in the above loop. So we consider it now.
+ handlePriorityState(priorityState{
+ priority: WorkPriority(ps.lowestPriorityWithRequests),
+ maxQueueDelay: 0,
+ admittedCount: 0,
+ })
+ }
+ ps.ps = ps.ps[:0]
+ ps.lowestPriorityWithRequests = oneAboveHighPri
+ return priority
}
// tenantInfo is the per-tenant information in the tenantHeap.
@@ -547,6 +900,12 @@ type tenantInfo struct {
// needed.
used uint64
waitingWorkHeap waitingWorkHeap
+ openEpochsHeap openEpochsHeap
+
+ priorityStates priorityStates
+ // priority >= fifoPriorityThreshold is FIFO. This uses a larger sized type
+ // than WorkPriority since the threshold can be > MaxPri.
+ fifoPriorityThreshold int
// The heapIndex is maintained by the heap.Interface methods, and represents
// the heapIndex of the item in the heap.
@@ -568,25 +927,33 @@ var tenantInfoPool = sync.Pool{
func newTenantInfo(id uint64) *tenantInfo {
ti := tenantInfoPool.Get().(*tenantInfo)
*ti = tenantInfo{
- id: id,
- waitingWorkHeap: ti.waitingWorkHeap,
- heapIndex: -1,
+ id: id,
+ waitingWorkHeap: ti.waitingWorkHeap,
+ openEpochsHeap: ti.openEpochsHeap,
+ priorityStates: makePriorityStates(ti.priorityStates.ps),
+ fifoPriorityThreshold: int(LowPri),
+ heapIndex: -1,
}
return ti
}
func releaseTenantInfo(ti *tenantInfo) {
- if len(ti.waitingWorkHeap) != 0 {
+ if isInTenantHeap(ti) {
panic("tenantInfo has non-empty heap")
}
- // NB: waitingWorkHeap.Pop nils the slice elements when removing, so we are
- // not inadvertently holding any references.
+ // NB: {waitingWorkHeap,openEpochsHeap}.Pop nil the slice elements when
+ // removing, so we are not inadvertently holding any references.
if cap(ti.waitingWorkHeap) > 100 {
ti.waitingWorkHeap = nil
}
- wwh := ti.waitingWorkHeap
+ if cap(ti.openEpochsHeap) > 100 {
+ ti.openEpochsHeap = nil
+ }
+
*ti = tenantInfo{
- waitingWorkHeap: wwh,
+ waitingWorkHeap: ti.waitingWorkHeap,
+ openEpochsHeap: ti.openEpochsHeap,
+ priorityStates: makePriorityStates(ti.priorityStates.ps),
}
tenantInfoPool.Put(ti)
}
@@ -632,41 +999,108 @@ func (th *tenantHeap) Pop() interface{} {
// waitingWork is the per-work information in the waitingWorkHeap.
type waitingWork struct {
- priority WorkPriority
- createTime int64
+ priority WorkPriority
+ // The workOrderingKind for this priority when this work was queued.
+ arrivalTimeWorkOrdering workOrderingKind
+ createTime int64
+ // epoch is a function of the createTime.
+ epoch int64
+
// ch is used to communicate a grant to the waiting goroutine. The
// grantChainID is used by the waiting goroutine to call continueGrantChain.
ch chan grantChainID
// The heapIndex is maintained by the heap.Interface methods, and represents
- // the heapIndex of the item in the heap. -1 when not in the heap.
+ // the heapIndex of the item in the heap. -1 when not in the heap. The same
+ // heapIndex is used by the waitingWorkHeap and the openEpochsHeap since a
+ // waitingWork is only in one of them.
heapIndex int
- grantTime time.Time
+ // Set to true when added to waitingWorkHeap. Only used to disambiguate
+ // which heap the waitingWork is in, when we know it is in one of the heaps.
+ // The only state transition is from false => true, and never restored back
+ // to false.
+ inWaitingWorkHeap bool
+ enqueueingTime time.Time
}
-// waitingWorkHeap is a heap of waiting work within a tenant. It is ordered in
-// decreasing order of priority, and within the same priority in increasing
-// order of createTime (to prefer older work).
-type waitingWorkHeap []*waitingWork
-
-var _ heap.Interface = (*waitingWorkHeap)(nil)
-
var waitingWorkPool = sync.Pool{
New: func() interface{} {
return &waitingWork{}
},
}
-func newWaitingWork(priority WorkPriority, createTime int64) *waitingWork {
+// The epoch length for doing epoch-LIFO. The epoch-LIFO scheme relies on
+// clock synchronization and the expectation that transaction/query deadlines
+// will be significantly higher than execution time under low load. A standard
+// LIFO scheme suffers from a severe problem when a single user transaction
+// can result in many lower-level work that get distributed to many nodes, and
+// previous work execution can result in new work being submitted for
+// admission: the later work for a transaction may no longer be the latest
+// seen by the system, so will not be preferred. This means LIFO would do some
+// work items from each transaction and starve the remaining work, so nothing
+// would complete. This is even worse than FIFO which at least prefers the
+// same transactions until they are complete (FIFO and LIFO are using the
+// transaction CreateTime, and not the work arrival time).
+//
+// Consider a case where transaction deadlines are 1s (note this may not
+// necessarily be an actual deadline, and could be a time duration after which
+// the user impact is extremely negative), and typical transaction execution
+// times (under low load) of 10ms. A 100ms epoch will increase transaction
+// latency to at most 100ms + 5ms + 10ms, since execution will not start until
+// the epoch of the transaction's CreateTime is closed. At that time, due to
+// clock synchronization, all nodes will start executing that epoch and will
+// implicitly have the same set of competing transactions. By the time the
+// next epoch closes and the current epoch's transactions are deprioritized,
+// 100ms will have elapsed, which is enough time for most of these
+// transactions that get admitted to have finished all their work.
+//
+// Note that LIFO queueing will only happen at bottleneck nodes, and decided
+// on a (tenant, priority) basis. So if there is even a single bottleneck node
+// for a (tenant, priority), the above delay will occur. When the epoch closes
+// at the bottleneck node, the creation time for this transaction will be
+// sufficiently in the past, so the non-bottleneck nodes (using FIFO) will
+// prioritize it over recent transactions. Note that there is an inversion in
+// that the non-bottleneck nodes are ordering in the opposite way for such
+// closed epochs, but since they are not bottlenecked, the queueing delay
+// should be minimal.
+//
+// TODO(sumeer): make these configurable via a cluster setting. Increasing
+// this value will cause the epoch number to move backwards. This will cause
+// some confusion in the ordering between work that was previously queued with
+// a higher epoch number. We accept that temporary confusion. We do not try to
+// maintain a monotonic epoch based on the epoch number already in place
+// before the change since different nodes will see the cluster setting change
+// at different times.
+const epochLengthNanos = int64(time.Millisecond * 100)
+const epochClosingDeltaNanos = int64(time.Millisecond * 5)
+
+// Latency threshold for switching to LIFO queuing. Once we switch to LIFO,
+// the minimum latency will be epochLenghNanos+epochClosingDeltaNanos, so it
+// makes sense not to switch until the observed latency is around the same.
+const maxQueueDelayToSwitchToLifo = time.Duration(epochLengthNanos + epochClosingDeltaNanos)
+
+func epochForTimeNanos(t int64) int64 {
+ return t / epochLengthNanos
+}
+
+func newWaitingWork(
+ priority WorkPriority,
+ arrivalTimeWorkOrdering workOrderingKind,
+ createTime int64,
+ enqueueingTime time.Time,
+) *waitingWork {
ww := waitingWorkPool.Get().(*waitingWork)
ch := ww.ch
if ch == nil {
ch = make(chan grantChainID, 1)
}
*ww = waitingWork{
- priority: priority,
- createTime: createTime,
- ch: ch,
- heapIndex: -1,
+ priority: priority,
+ arrivalTimeWorkOrdering: arrivalTimeWorkOrdering,
+ createTime: createTime,
+ epoch: epochForTimeNanos(createTime),
+ ch: ch,
+ heapIndex: -1,
+ enqueueingTime: enqueueingTime,
}
return ww
}
@@ -685,14 +1119,48 @@ func releaseWaitingWork(ww *waitingWork) {
waitingWorkPool.Put(ww)
}
+// waitingWorkHeap is a heap of waiting work within a tenant. It is ordered in
+// decreasing order of priority, and within the same priority in increasing
+// order of createTime (to prefer older work) for FIFO, and in decreasing
+// order of createTime for LIFO. In the LIFO case the heap only contains
+// epochs that are closed.
+type waitingWorkHeap []*waitingWork
+
+var _ heap.Interface = (*waitingWorkHeap)(nil)
+
func (wwh *waitingWorkHeap) remove(item *waitingWork) {
heap.Remove(wwh, item.heapIndex)
}
func (wwh *waitingWorkHeap) Len() int { return len(*wwh) }
+// Less does LIFO or FIFO ordering among work with the same priority. The
+// ordering to use is specified by the arrivalTimeWorkOrdering. When
+// transitioning from LIFO => FIFO or FIFO => LIFO, we can have work with
+// different arrivalTimeWorkOrderings in the heap (for the same priority). In
+// this case we err towards LIFO since this indicates a new or recent overload
+// situation. If it was a recent overload that no longer exists, we will be
+// able to soon drain these LIFO work items from the queue since they will get
+// admitted. Erring towards FIFO has the danger that if we are transitioning
+// to LIFO we will need to wait for those old queued items to be serviced
+// first, which will delay the transition.
+//
+// Less is not strict weak ordering since the transitivity property is not
+// satisfied in the presence of elements that have different values of
+// arrivalTimeWorkOrdering. This is acceptable for heap maintenance.
+// Example: Three work items with the same epoch where t1 < t2 < t3
+// w3: (fifo, create: t3, epoch: e)
+// w2: (lifo, create: t2, epoch: e)
+// w1: (fifo, create: t1, epoch: e)
+// w1 < w3, w3 < w2, w2 < w1, which is a cycle.
func (wwh *waitingWorkHeap) Less(i, j int) bool {
if (*wwh)[i].priority == (*wwh)[j].priority {
+ if (*wwh)[i].arrivalTimeWorkOrdering == lifoWorkOrdering ||
+ (*wwh)[i].arrivalTimeWorkOrdering != (*wwh)[j].arrivalTimeWorkOrdering {
+ // LIFO, and the epoch is closed, so can simply use createTime.
+ return (*wwh)[i].createTime > (*wwh)[j].createTime
+ }
+ // FIFO.
return (*wwh)[i].createTime < (*wwh)[j].createTime
}
return (*wwh)[i].priority > (*wwh)[j].priority
@@ -708,6 +1176,7 @@ func (wwh *waitingWorkHeap) Push(x interface{}) {
n := len(*wwh)
item := x.(*waitingWork)
item.heapIndex = n
+ item.inWaitingWorkHeap = true
*wwh = append(*wwh, item)
}
@@ -721,6 +1190,66 @@ func (wwh *waitingWorkHeap) Pop() interface{} {
return item
}
+// openEpochsHeap is a heap of waiting work within a tenant that will be
+// subject to LIFO ordering (when transferred to the waitingWorkHeap) and
+// whose epoch is not yet closed. See the Less method for the ordering applied
+// here.
+type openEpochsHeap []*waitingWork
+
+var _ heap.Interface = (*openEpochsHeap)(nil)
+
+func (oeh *openEpochsHeap) remove(item *waitingWork) {
+ heap.Remove(oeh, item.heapIndex)
+}
+
+func (oeh *openEpochsHeap) Len() int { return len(*oeh) }
+
+// Less orders in increasing order of epoch, and within the same epoch, with
+// decreasing priority and with the same priority with increasing CreateTime.
+// It is not typically dequeued from to admit work, but if it is, it will
+// behave close to the FIFO ordering in the waitingWorkHeap (not exactly FIFO
+// because work items with higher priority can be later than those with lower
+// priority if they have a higher epoch -- but epochs are coarse enough that
+// this should not be a factor). This close-to-FIFO is preferable since
+// dequeuing from this queue may be an indicator that the overload is going
+// away. There is also a risk with this close-to-FIFO behavior if we rapidly
+// fluctuate between overload and normal: doing FIFO here could cause
+// transaction work to start but not finish because the rest of the work may
+// be done using LIFO ordering. When an epoch closes, a prefix of this heap
+// will be dequeued and added to the waitingWorkHeap.
+func (oeh *openEpochsHeap) Less(i, j int) bool {
+ if (*oeh)[i].epoch == (*oeh)[j].epoch {
+ if (*oeh)[i].priority == (*oeh)[j].priority {
+ return (*oeh)[i].createTime < (*oeh)[j].createTime
+ }
+ return (*oeh)[i].priority > (*oeh)[j].priority
+ }
+ return (*oeh)[i].epoch < (*oeh)[j].epoch
+}
+
+func (oeh *openEpochsHeap) Swap(i, j int) {
+ (*oeh)[i], (*oeh)[j] = (*oeh)[j], (*oeh)[i]
+ (*oeh)[i].heapIndex = i
+ (*oeh)[j].heapIndex = j
+}
+
+func (oeh *openEpochsHeap) Push(x interface{}) {
+ n := len(*oeh)
+ item := x.(*waitingWork)
+ item.heapIndex = n
+ *oeh = append(*oeh, item)
+}
+
+func (oeh *openEpochsHeap) Pop() interface{} {
+ old := *oeh
+ n := len(old)
+ item := old[n-1]
+ old[n-1] = nil
+ item.heapIndex = -1
+ *oeh = old[0 : n-1]
+ return item
+}
+
var (
requestedMeta = metric.Metadata{
Name: "admission.requested.",
diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go
index ab1062c62bf7..caae670df3ae 100644
--- a/pkg/util/admission/work_queue_test.go
+++ b/pkg/util/admission/work_queue_test.go
@@ -18,10 +18,13 @@ import (
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)
@@ -93,6 +96,12 @@ type workMap struct {
workMap map[int]*testWork
}
+func (m *workMap) resetMap() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.workMap = make(map[int]*testWork)
+}
+
func (m *workMap) set(id int, w *testWork) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -124,11 +133,12 @@ func (m *workMap) get(id int) (work testWork, ok bool) {
/*
TestWorkQueueBasic is a datadriven test with the following commands:
init
-admit id= tenant= priority= create-time= bypass=
+admit id= tenant= priority= create-time-millis= bypass=
set-try-get-return-value v=
granted chain-id=
cancel-work id=
work-done id=
+advance-time millis=
print
*/
func TestWorkQueueBasic(t *testing.T) {
@@ -136,17 +146,34 @@ func TestWorkQueueBasic(t *testing.T) {
defer log.Scope(t).Close(t)
var q *WorkQueue
+ closeFn := func() {
+ if q != nil {
+ q.close()
+ }
+ }
+ defer closeFn()
var tg *testGranter
var wrkMap workMap
var buf builderWithMu
+ // 100ms after epoch.
+ initialTime := timeutil.FromUnixMicros(int64(100) * int64(time.Millisecond/time.Microsecond))
+ var timeSource *timeutil.ManualTime
+ var st *cluster.Settings
datadriven.RunTest(t, testutils.TestDataPath(t, "work_queue"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
+ closeFn()
tg = &testGranter{buf: &buf}
- q = makeWorkQueue(KVWork, tg, nil, makeWorkQueueOptions(KVWork)).(*WorkQueue)
+ opts := makeWorkQueueOptions(KVWork)
+ timeSource = timeutil.NewManualTime(initialTime)
+ opts.timeSource = timeSource
+ opts.disableEpochClosingGoroutine = true
+ st = cluster.MakeTestingClusterSettings()
+ q = makeWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()),
+ KVWork, tg, st, opts).(*WorkQueue)
tg.r = q
- wrkMap.workMap = make(map[int]*testWork)
+ wrkMap.resetMap()
return ""
case "admit":
@@ -158,7 +185,7 @@ func TestWorkQueueBasic(t *testing.T) {
tenant := scanTenantID(t, d)
var priority, createTime int
d.ScanArgs(t, "priority", &priority)
- d.ScanArgs(t, "create-time", &createTime)
+ d.ScanArgs(t, "create-time-millis", &createTime)
var bypass bool
d.ScanArgs(t, "bypass", &bypass)
ctx, cancel := context.WithCancel(context.Background())
@@ -166,7 +193,7 @@ func TestWorkQueueBasic(t *testing.T) {
workInfo := WorkInfo{
TenantID: tenant,
Priority: WorkPriority(priority),
- CreateTime: int64(createTime),
+ CreateTime: int64(createTime) * int64(time.Millisecond),
BypassAdmission: bypass,
}
go func(ctx context.Context, info WorkInfo, id int) {
@@ -198,6 +225,7 @@ func TestWorkQueueBasic(t *testing.T) {
return buf.stringAndReset()
case "cancel-work":
+ // TODO: test cancellation of something in openepochheap
var id int
d.ScanArgs(t, "id", &id)
work, ok := wrkMap.get(id)
@@ -230,13 +258,18 @@ func TestWorkQueueBasic(t *testing.T) {
case "print":
return q.String()
+ case "advance-time":
+ var millis int
+ d.ScanArgs(t, "millis", &millis)
+ timeSource.Advance(time.Duration(millis) * time.Millisecond)
+ EpochLIFOEnabled.Override(context.Background(), &st.SV, true)
+ q.tryCloseEpoch()
+ return q.String()
+
default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
- if q != nil {
- q.close()
- }
}
func scanTenantID(t *testing.T, d *datadriven.TestData) roachpb.TenantID {
@@ -256,8 +289,8 @@ func TestWorkQueueTokenResetRace(t *testing.T) {
var buf builderWithMu
tg := &testGranter{buf: &buf}
- q := makeWorkQueue(SQLKVResponseWork, tg, nil,
- makeWorkQueueOptions(SQLKVResponseWork)).(*WorkQueue)
+ q := makeWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), SQLKVResponseWork, tg,
+ nil, makeWorkQueueOptions(SQLKVResponseWork)).(*WorkQueue)
tg.r = q
createTime := int64(0)
stopCh := make(chan struct{})
@@ -324,6 +357,58 @@ func TestWorkQueueTokenResetRace(t *testing.T) {
mu.Unlock()
}
+func TestPriorityStates(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ var ps priorityStates
+ curThreshold := int(LowPri)
+ printFunc := func() string {
+ var b strings.Builder
+ fmt.Fprintf(&b, "lowest-priority: %d", ps.lowestPriorityWithRequests)
+ for _, state := range ps.ps {
+ fmt.Fprintf(&b, " (pri: %d, delay-millis: %d, admitted: %d)",
+ state.priority, state.maxQueueDelay/time.Millisecond, state.admittedCount)
+ }
+ return b.String()
+ }
+ datadriven.RunTest(t, "testdata/priority_states",
+ func(t *testing.T, d *datadriven.TestData) string {
+ switch d.Cmd {
+ case "init":
+ ps = priorityStates{
+ lowestPriorityWithRequests: oneAboveHighPri,
+ }
+ return ""
+
+ case "request-received":
+ var priority int
+ d.ScanArgs(t, "priority", &priority)
+ ps.requestAtPriority(WorkPriority(priority))
+ return printFunc()
+
+ case "update":
+ var priority, delayMillis int
+ d.ScanArgs(t, "priority", &priority)
+ d.ScanArgs(t, "delay-millis", &delayMillis)
+ canceled := false
+ if d.HasArg("canceled") {
+ d.ScanArgs(t, "canceled", &canceled)
+ }
+ ps.updateDelayLocked(WorkPriority(priority), time.Duration(delayMillis)*time.Millisecond,
+ canceled)
+ return printFunc()
+
+ case "get-threshold":
+ curThreshold = ps.getFIFOPriorityThresholdAndReset(curThreshold)
+ return fmt.Sprintf("threshold: %d", curThreshold)
+
+ default:
+ return fmt.Sprintf("unknown command: %s", d.Cmd)
+ }
+ })
+}
+
// TODO(sumeer):
// - Test metrics
// - Test race between grant and cancellation