Skip to content

Commit

Permalink
admission: epoch based LIFO to prevent throughput collapse
Browse files Browse the repository at this point in the history
The epoch-LIFO scheme monitors the queueing delay for each (tenant, priority)
pair and switches between FIFO and LIFO queueing based on the maximum
observed delay. Lower percentile latency can be reduced under LIFO, at
the expense of increasing higher percentile latency. This behavior can
help when it is important to finish some transactions in a timely manner,
for scenarios which have external deadlines. Under FIFO, one could
experience throughput collapse in the presence of such deadlines and
an open loop workload, since when the first work item for a transaction
reaches the front of the queue, the transaction is close to exceeding
its deadline.

The epoch aspect of this scheme relies on clock synchronization (which
we have in CockroachDB deployments) and the expectation that
transaction/query deadlines will be significantly higher than execution
time under low load. A standard LIFO scheme suffers from a severe problem
when a single user transaction can result in multiple units of lower-level
work that get distributed to many nodes, and work execution can result in
new work being submitted for admission: the later work for a transaction
may no longer be the latest seen by the system (since "latest" is defined
based on transaction start time), so will not be preferred. This means
LIFO would do some work items from each transaction and starve the
remaining work, so nothing would complete. This can be as bad or worse
than FIFO which at least prefers the same transactions until they are
complete (both FIFO and LIFO are using the transaction start time, and
not the individual work arrival time).

Consider a case where transaction deadlines are 1s (note this may not
necessarily be an actual deadline, and could be a time duration after which
the user impact is extremely negative), and typical transaction execution
times (under low load) of 10ms. A 100ms epoch will increase transaction
latency to at most 100ms + 5ms + 10ms, since execution will not start until
the epoch of the transaction's start time is closed (5ms is the grace
period before we "close" an epoch). At that time, due to clock
synchronization, all nodes will start executing that epoch and will
implicitly have the same set of competing transactions, which are ordered
in the same manner. This set of competing transactions will stay unchanged
until the next epoch close. And by the time the next epoch closes and
the current epoch's transactions are deprioritized, 100ms will have
elapsed, which is enough time for most of these transactions that got
admitted to have finished all their work. The clock synchronization
expected here is stronger than the default 500ms value of --max-offset,
but that value is deliberately set to be extremely conservative to avoid
stale reads, while the use here has no effect on correctness.

Note that LIFO queueing will only happen at bottleneck nodes, and decided
on a (tenant, priority) basis. So if there is even a single bottleneck node
for a (tenant, priority), the above delay will occur. When the epoch closes
at the bottleneck node, the creation time for this transaction will be
sufficiently in the past, so the non-bottleneck nodes (using FIFO) will
prioritize it over recent transactions. There is a queue ordering
inversion in that the non-bottleneck nodes are ordering in the opposite
way for such closed epochs, but since they are not bottlenecked, the
queueing delay should be minimal.

Preliminary experiments with kv50/enc=false/nodes=1/conc=8192 are
promising in reducing p50 and p75 latency.

Release note (ops change): The admission.epoch_lifo.enabled cluster
setting, disabled by default, enabled the use of epoch-LIFO adaptive
queueing behavior in admission control.
  • Loading branch information
sumeerbhola authored and RajivTS committed Mar 6, 2022
1 parent 0906205 commit db6fa2a
Show file tree
Hide file tree
Showing 9 changed files with 1,263 additions and 123 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Setting Type Default Description
admission.kv.enabled boolean true when true, work performed by the KV layer is subject to admission control
admission.epoch_lifo.enabled boolean false when true, epoch-LIFO behavior is enabled when there is significant delay in admission
admission.sql_kv_response.enabled boolean true when true, work performed by the SQL layer when receiving a KV response is subject to admission control
admission.sql_sql_response.enabled boolean true when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<table>
<thead><tr><th>Setting</th><th>Type</th><th>Default</th><th>Description</th></tr></thead>
<tbody>
<tr><td><code>admission.epoch_lifo.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, epoch-LIFO behavior is enabled when there is significant delay in admission</td></tr>
<tr><td><code>admission.kv.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, work performed by the KV layer is subject to admission control</td></tr>
<tr><td><code>admission.sql_kv_response.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, work performed by the SQL layer when receiving a KV response is subject to admission control</td></tr>
<tr><td><code>admission.sql_sql_response.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_stretchr_testify//require",
Expand Down
23 changes: 12 additions & 11 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ type Options struct {
}

type makeRequesterFunc func(
workKind WorkKind, granter granter, settings *cluster.Settings, opts workQueueOptions) requester
_ log.AmbientContext, workKind WorkKind, granter granter, settings *cluster.Settings,
opts workQueueOptions) requester

// NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a
// regular cluster node. Caller is responsible for hooking up
Expand Down Expand Up @@ -668,7 +669,7 @@ func NewGrantCoordinators(
usedSlotsMetric: metrics.KVUsedSlots,
}
kvSlotAdjuster.granter = kvg
coord.queues[KVWork] = makeRequester(KVWork, kvg, st, makeWorkQueueOptions(KVWork))
coord.queues[KVWork] = makeRequester(ambientCtx, KVWork, kvg, st, makeWorkQueueOptions(KVWork))
kvg.requester = coord.queues[KVWork]
coord.granters[KVWork] = kvg

Expand All @@ -680,7 +681,7 @@ func NewGrantCoordinators(
cpuOverload: kvSlotAdjuster,
}
coord.queues[SQLKVResponseWork] = makeRequester(
SQLKVResponseWork, tg, st, makeWorkQueueOptions(SQLKVResponseWork))
ambientCtx, SQLKVResponseWork, tg, st, makeWorkQueueOptions(SQLKVResponseWork))
tg.requester = coord.queues[SQLKVResponseWork]
coord.granters[SQLKVResponseWork] = tg

Expand All @@ -691,7 +692,7 @@ func NewGrantCoordinators(
maxBurstTokens: opts.SQLSQLResponseBurstTokens,
cpuOverload: kvSlotAdjuster,
}
coord.queues[SQLSQLResponseWork] = makeRequester(
coord.queues[SQLSQLResponseWork] = makeRequester(ambientCtx,
SQLSQLResponseWork, tg, st, makeWorkQueueOptions(SQLSQLResponseWork))
tg.requester = coord.queues[SQLSQLResponseWork]
coord.granters[SQLSQLResponseWork] = tg
Expand All @@ -703,7 +704,7 @@ func NewGrantCoordinators(
cpuOverload: kvSlotAdjuster,
usedSlotsMetric: metrics.SQLLeafStartUsedSlots,
}
coord.queues[SQLStatementLeafStartWork] = makeRequester(
coord.queues[SQLStatementLeafStartWork] = makeRequester(ambientCtx,
SQLStatementLeafStartWork, sg, st, makeWorkQueueOptions(SQLStatementLeafStartWork))
sg.requester = coord.queues[SQLStatementLeafStartWork]
coord.granters[SQLStatementLeafStartWork] = sg
Expand All @@ -715,7 +716,7 @@ func NewGrantCoordinators(
cpuOverload: kvSlotAdjuster,
usedSlotsMetric: metrics.SQLRootStartUsedSlots,
}
coord.queues[SQLStatementRootStartWork] = makeRequester(
coord.queues[SQLStatementRootStartWork] = makeRequester(ambientCtx,
SQLStatementRootStartWork, sg, st, makeWorkQueueOptions(SQLStatementRootStartWork))
sg.requester = coord.queues[SQLStatementRootStartWork]
coord.granters[SQLStatementRootStartWork] = sg
Expand Down Expand Up @@ -766,7 +767,7 @@ func NewGrantCoordinatorSQL(
maxBurstTokens: opts.SQLKVResponseBurstTokens,
cpuOverload: sqlNodeCPU,
}
coord.queues[SQLKVResponseWork] = makeRequester(
coord.queues[SQLKVResponseWork] = makeRequester(ambientCtx,
SQLKVResponseWork, tg, st, makeWorkQueueOptions(SQLKVResponseWork))
tg.requester = coord.queues[SQLKVResponseWork]
coord.granters[SQLKVResponseWork] = tg
Expand All @@ -778,7 +779,7 @@ func NewGrantCoordinatorSQL(
maxBurstTokens: opts.SQLSQLResponseBurstTokens,
cpuOverload: sqlNodeCPU,
}
coord.queues[SQLSQLResponseWork] = makeRequester(
coord.queues[SQLSQLResponseWork] = makeRequester(ambientCtx,
SQLSQLResponseWork, tg, st, makeWorkQueueOptions(SQLSQLResponseWork))
tg.requester = coord.queues[SQLSQLResponseWork]
coord.granters[SQLSQLResponseWork] = tg
Expand All @@ -790,7 +791,7 @@ func NewGrantCoordinatorSQL(
cpuOverload: sqlNodeCPU,
usedSlotsMetric: metrics.SQLLeafStartUsedSlots,
}
coord.queues[SQLStatementLeafStartWork] = makeRequester(
coord.queues[SQLStatementLeafStartWork] = makeRequester(ambientCtx,
SQLStatementLeafStartWork, sg, st, makeWorkQueueOptions(SQLStatementLeafStartWork))
sg.requester = coord.queues[SQLStatementLeafStartWork]
coord.granters[SQLStatementLeafStartWork] = sg
Expand All @@ -802,7 +803,7 @@ func NewGrantCoordinatorSQL(
cpuOverload: sqlNodeCPU,
usedSlotsMetric: metrics.SQLRootStartUsedSlots,
}
coord.queues[SQLStatementRootStartWork] = makeRequester(
coord.queues[SQLStatementRootStartWork] = makeRequester(ambientCtx,
SQLStatementRootStartWork, sg, st, makeWorkQueueOptions(SQLStatementRootStartWork))
sg.requester = coord.queues[SQLStatementRootStartWork]
coord.granters[SQLStatementRootStartWork] = sg
Expand Down Expand Up @@ -1247,7 +1248,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo
// Share the WorkQueue metrics across all stores.
// TODO(sumeer): add per-store WorkQueue state for debug.zip and db console.
opts.metrics = &sgc.workQueueMetrics
coord.queues[KVWork] = sgc.makeRequesterFunc(KVWork, kvg, sgc.settings, opts)
coord.queues[KVWork] = sgc.makeRequesterFunc(sgc.ambientCtx, KVWork, kvg, sgc.settings, opts)
kvg.requester = coord.queues[KVWork]
coord.granters[KVWork] = kvg
coord.ioLoadListener = &ioLoadListener{
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func TestGranterBasic(t *testing.T) {
d.ScanArgs(t, "sql-leaf", &opts.SQLStatementLeafStartWorkSlots)
d.ScanArgs(t, "sql-root", &opts.SQLStatementRootStartWorkSlots)
opts.makeRequesterFunc = func(
workKind WorkKind, granter granter, _ *cluster.Settings, opts workQueueOptions) requester {
_ log.AmbientContext, workKind WorkKind, granter granter, _ *cluster.Settings,
opts workQueueOptions) requester {
req := &testRequester{
workKind: workKind,
granter: granter,
Expand Down Expand Up @@ -243,7 +244,8 @@ func TestStoreCoordinators(t *testing.T) {
opts := Options{
Settings: settings,
makeRequesterFunc: func(
workKind WorkKind, granter granter, _ *cluster.Settings, opts workQueueOptions) requester {
_ log.AmbientContext, workKind WorkKind, granter granter, _ *cluster.Settings,
opts workQueueOptions) requester {
req := &testRequester{
workKind: workKind,
granter: granter,
Expand Down
231 changes: 231 additions & 0 deletions pkg/util/admission/testdata/priority_states
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
init
----

# One request at priority=-128 sees high latency. Requests at priority 0, 127
# do not see high latency. So FIFO priority is set >= -127.
request-received priority=127
----
lowest-priority: 127

update priority=127 delay-millis=10
----
lowest-priority: 127 (pri: 127, delay-millis: 10, admitted: 1)

request-received priority=-128
----
lowest-priority: -128 (pri: 127, delay-millis: 10, admitted: 1)

update priority=-128 delay-millis=106
----
lowest-priority: -128 (pri: -128, delay-millis: 106, admitted: 1) (pri: 127, delay-millis: 10, admitted: 1)

request-received priority=0
----
lowest-priority: -128 (pri: -128, delay-millis: 106, admitted: 1) (pri: 127, delay-millis: 10, admitted: 1)

update priority=0 delay-millis=20
----
lowest-priority: -128 (pri: -128, delay-millis: 106, admitted: 1) (pri: 0, delay-millis: 20, admitted: 1) (pri: 127, delay-millis: 10, admitted: 1)

get-threshold
----
threshold: -127

# The latency seen by priority=-128 decreases but not below the threshold
# needed to return to FIFO. So FIFO priority continues to be >= -127.
request-received priority=-128
----
lowest-priority: -128

update priority=-128 delay-millis=11
----
lowest-priority: -128 (pri: -128, delay-millis: 11, admitted: 1)

get-threshold
----
threshold: -127

# The latency seen by priority=-128 is low enough to return to FIFO.
request-received priority=-128
----
lowest-priority: -128

update priority=-128 delay-millis=10
----
lowest-priority: -128 (pri: -128, delay-millis: 10, admitted: 1)

get-threshold
----
threshold: -128

# Priority=127 sees high latency. FIFO priority is now >= 128.
request-received priority=127
----
lowest-priority: 127

update priority=127 delay-millis=106
----
lowest-priority: 127 (pri: 127, delay-millis: 106, admitted: 1)

get-threshold
----
threshold: 128

# Both priority 24 and 127 see high latency. FIFO priority stays at >=128.
request-received priority=127
----
lowest-priority: 127

update priority=127 delay-millis=106
----
lowest-priority: 127 (pri: 127, delay-millis: 106, admitted: 1)

request-received priority=24
----
lowest-priority: 24 (pri: 127, delay-millis: 106, admitted: 1)

update priority=24 delay-millis=107
----
lowest-priority: 24 (pri: 24, delay-millis: 107, admitted: 1) (pri: 127, delay-millis: 106, admitted: 1)

get-threshold
----
threshold: 128

# Priority -5 and 20 see high latency. There are no requests at any other
# priority. The FIFO priority threshold reduces to >= 21.
request-received priority=20
----
lowest-priority: 20

update priority=20 delay-millis=111
----
lowest-priority: 20 (pri: 20, delay-millis: 111, admitted: 1)

request-received priority=-5
----
lowest-priority: -5 (pri: 20, delay-millis: 111, admitted: 1)

update priority=-5 delay-millis=110
----
lowest-priority: -5 (pri: -5, delay-millis: 110, admitted: 1) (pri: 20, delay-millis: 111, admitted: 1)

get-threshold
----
threshold: 21

# Priority 0 is LIFO and sees latency that is not low enough to return it to
# FIFO. The FIFO priority threshold reduces to >= 1.
request-received priority=0
----
lowest-priority: 0

update priority=0 delay-millis=11
----
lowest-priority: 0 (pri: 0, delay-millis: 11, admitted: 1)

get-threshold
----
threshold: 1

# Priority -128 is LIFO and sees latency that is not low enough to return it
# to FIFO. The FIFO priority threshold reduces to >= -127.
request-received priority=-128
----
lowest-priority: -128

update priority=-128 delay-millis=11
----
lowest-priority: -128 (pri: -128, delay-millis: 11, admitted: 1)

get-threshold
----
threshold: -127

# Priority -128 is LIFO and sees very low latency and switches back to FIFO.
request-received priority=-128
----
lowest-priority: -128

update priority=-128 delay-millis=9
----
lowest-priority: -128 (pri: -128, delay-millis: 9, admitted: 1)

get-threshold
----
threshold: -128

# Priority 0 is FIFO and sees a canceled request that does not meet the
# latency threshold to switch to LIFO. It stays as FIFO.
request-received priority=0
----
lowest-priority: 0

update priority=0 delay-millis=20 canceled=true
----
lowest-priority: 0 (pri: 0, delay-millis: 20, admitted: 0)

get-threshold
----
threshold: -128

# Priority 0 is FIFO and sees a canceled request with very high latency, so
# switched to LIFO.
request-received priority=0
----
lowest-priority: 0

update priority=0 delay-millis=120 canceled=true
----
lowest-priority: 0 (pri: 0, delay-millis: 120, admitted: 0)

get-threshold
----
threshold: 1

# Priority 0 receives a request, but nothing exits admission control, so it
# stays as LIFO.
request-received priority=0
----
lowest-priority: 0

get-threshold
----
threshold: 1

# Priority 10 sees a request with low latency. Priority 0 has a request that
# does not exit admission control. Priority 0 stays as LIFO.
request-received priority=10
----
lowest-priority: 10

update priority=10 delay-millis=5
----
lowest-priority: 10 (pri: 10, delay-millis: 5, admitted: 1)

request-received priority=0
----
lowest-priority: 0 (pri: 10, delay-millis: 5, admitted: 1)

get-threshold
----
threshold: 1

# Priority -10 sees a request with low enough latency to switch back to FIFO.
# Priority 0 has a request that does not exit admission control. Because of
# the observation at priority=-10 we switch everything back to FIFO.
request-received priority=-10
----
lowest-priority: -10

update priority=-10 delay-millis=5
----
lowest-priority: -10 (pri: -10, delay-millis: 5, admitted: 1)

request-received priority=0
----
lowest-priority: -10 (pri: -10, delay-millis: 5, admitted: 1)

get-threshold
----
threshold: -128
Loading

0 comments on commit db6fa2a

Please sign in to comment.