Skip to content

Commit

Permalink
kv: introduce setting for maximum lock wait-queue depth
Browse files Browse the repository at this point in the history
Fixes #66017.

This PR introduces a new `kv.lock_table.maximum_lock_wait_queue_length`
cluster setting, which controls the maximum length of a lock wait-queue
that requests are willing to enter and wait in. The setting can be used
to ensure some level of quality-of-service under severe per-key
contention. If set to a non-zero value and an existing lock wait-queue
is already equal to or exceeding this length, requests will be rejected
eagerly instead of entering the queue and waiting.

Before this change, the lock-table's wait-queues had no limit on the
number of writers that could be queued on a given key. This could lead
to unbounded queueing and diminishing quality of service for all writers
as the queues built up. It could also leave to starvation (i.e. zero
throughput) when requests had a timeout that fires before any single
request can get to the head of the queue. This was all especially bad
with high replication latency in multi-region clusters, as locks are
held for the duration of a consensus replication round.

We can see this in the following experiment. Here, we run a multi-region
(demo) cluster in three configurations: `default`, `timeout`, and
`queue_limit`. Under the `default` configuration, we change nothing.
Under the `timeout` configuration, we set `sql.defaults.statement_timeout='250ms'`.
Under the `queue_limit` configuration, we set `kv.lock_table.maximum_lock_wait_queue_length=3`.
We then run a single-row, update-only workload (workload "U", mocked
out in this patch):
```
./cockroach demo --global --empty --nodes=9
./cockroach sql -e 'create database ycsb primary region "us-east1" regions "us-west1", "europe-west1" survive region failure'
./cockroach workload init ycsb --families=false --insert-count=1
./cockroach workload run  ycsb --families=false --insert-count=1 --workload=U --duration=30s --tolerate-errors --concurrency=?
```

This results in the following behavior:

| setting     | concurrency | qps  | errors | p50 (ms) | p95 (ms) |
|-------------|-------------|------|--------|----------|----------|
| default     | 1           | 13.7 | 0      | 67.1     | 71.3     |
| default     | 2           | 12.6 | 0      | 142.6    | 151.0    |
| default     | 4           | 12.3 | 0      | 302.0    | 385.9    |
| default     | 8           | 12.2 | 0      | 570.4    | 1610.6   |
| default     | 16          | 12.0 | 0      | 1208.0   | 2550.1   |
| default     | 32          | 8.0  | 0      | 4563.4   | 5637.1   |
| timeout     | 1           | 14.7 | 0      | 67.1     | 67.1     |
| timeout     | 2           | 12.8 | 17     | 142.6    | 142.6    |
| timeout     | 4           | 0.2  | 464    | 71.3     | 352.3    |
| timeout     | 8           | 0.2  | 913    | 67.1     | 335.5    |
| timeout     | 16          | 0    | -      | -        | -        |
| timeout     | 32          | 0    | -      | -        | -        |
| queue_limit | 1           | 14.5 | 0      | 67.1     | 71.3     |
| queue_limit | 2           | 14.2 | 0      | 134.2    | 176.2    |
| queue_limit | 4           | 13.3 | 0      | 285.2    | 369.1    |
| queue_limit | 8           | 13.0 | 1934   | 352.3    | 486.5    |
| queue_limit | 16          | 12.8 | 4290   | 352.3    | 486.5    |
| queue_limit | 32          | 11.6 | 9203   | 385.9    | 671.1    |

The first thing to note is that under the `default` config,
throughput remains relatively steady as concurrency grows, but latency
grows linearly with concurrency. Next, note that under the `timeout`
config, throughput falls to 0 the moment the p50 latency exceeds the
`statement_timeout`. Finally, note that under the `queue_limit` config,
errors begin to build once the queue limit is exceeded. However,
throughput and latency hold steady as concurrency grows, as desired. So
some requests are rejected, but the ones that are not are provided a
good quality-of-service.
  • Loading branch information
nvanbenschoten committed Jul 13, 2021
1 parent 832c1ed commit 0786e26
Show file tree
Hide file tree
Showing 9 changed files with 581 additions and 27 deletions.
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ type Request struct {
// transactions.
WaitPolicy lock.WaitPolicy

// The maximum length of a lock wait-queue that the request is willing
// to enter and wait in. Used to provide a release valve and ensure some
// level of quality-of-service under severe per-key contention. If set
// to a non-zero value and an existing lock wait-queue is already equal
// to or exceeding this length, the request will be rejected eagerly
// instead of entering the queue and waiting.
MaxLockWaitQueueLength int

// The individual requests in the batch.
Requests []roachpb.RequestUnion

Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,55 @@ import (
"github.com/cockroachdb/errors"
)

// MaxLockWaitQueueLength sets the maximum length of a lock wait-queue that the
// request is willing to enter and wait in. Used to provide a release valve and
// ensure some level of quality-of-service under severe per-key contention. If
// set to a non-zero value and an existing lock wait-queue is already equal to
// or exceeding this length, the request will be rejected eagerly instead of
// entering the queue and waiting.
//
// This is a fairly blunt mechanism to place an upper bound on resource
// utilization per lock wait-queue and ensure some reasonable level of
// quality-of-service for transactions that enter a lock wait-queue. More
// sophisticated queueing alternatives exist that account for queueing time and
// detect sustained queue growth before rejecting:
// - https://queue.acm.org/detail.cfm?id=2209336
// - https://queue.acm.org/detail.cfm?id=2839461
//
// We could explore these algorithms if this setting is too coarse grained and
// not serving its purpose well enough.
//
// Alternatively, we could implement the lock_timeout session variable that
// exists in Postgres (#67513) and use that to ensure quality-of-service for
// requests that wait for locks. With that configuration, this cluster setting
// would be relegated to a guardrail that protects against unbounded resource
// utilization and runaway queuing for misbehaving clients, a role it is well
// positioned to serve.
var MaxLockWaitQueueLength = settings.RegisterIntSetting(
"kv.lock_table.maximum_lock_wait_queue_length",
"the maximum length of a lock wait-queue that requests are willing to enter "+
"and wait in. The setting can be used to ensure some level of quality-of-service "+
"under severe per-key contention. If set to a non-zero value and an existing lock "+
"wait-queue is already equal to or exceeding this length, requests will be rejected "+
"eagerly instead of entering the queue and waiting. Set to 0 to disable.",
0,
func(v int64) error {
if v < 0 {
return errors.Errorf("cannot be set to a negative value: %d", v)
}
if v == 0 {
return nil // disabled
}
// Don't let the setting be dropped below a reasonable value that we don't
// expect to impact internal transaction processing.
const minSafeMaxLength = 3
if v < minSafeMaxLength {
return errors.Errorf("cannot be set below %d: %d", minSafeMaxLength, v)
}
return nil
},
)

// DiscoveredLocksThresholdToConsultFinalizedTxnCache sets a threshold as
// mentioned in the description string. The default of 200 is somewhat
// arbitrary but should suffice for small OLTP transactions. Given the default
Expand Down Expand Up @@ -230,6 +279,12 @@ func (m *managerImpl) sequenceReqWithGuard(ctx context.Context, g *Guard) (Respo
return nil, nil
}

// Set the request's MaxWaitQueueLength based on the cluster setting, if not
// already set.
if g.Req.MaxLockWaitQueueLength == 0 {
g.Req.MaxLockWaitQueueLength = int(MaxLockWaitQueueLength.Get(&m.st.SV))
}

if g.EvalKind == OptimisticEval {
if g.ltg != nil {
panic("Optimistic locking should not have a non-nil lockTableGuard")
Expand Down
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import (
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>] [max-lock-wait-queue-length=<int>]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name> [eval-kind=<pess|opt|pess-after-opt]
// finish req=<req-name>
Expand Down Expand Up @@ -155,6 +155,11 @@ func TestConcurrencyManagerBasic(t *testing.T) {

waitPolicy := scanWaitPolicy(t, d, false /* required */)

var maxLockWaitQueueLength int
if d.HasArg("max-lock-wait-queue-length") {
d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength)
}

// Each roachpb.Request is provided on an indented line.
reqs, reqUnions := scanRequests(t, d, c)
latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs)
Expand All @@ -163,11 +168,12 @@ func TestConcurrencyManagerBasic(t *testing.T) {
Txn: txn,
Timestamp: ts,
// TODO(nvanbenschoten): test Priority
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
Requests: reqUnions,
LatchSpans: latchSpans,
LockSpans: lockSpans,
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
MaxLockWaitQueueLength: maxLockWaitQueueLength,
Requests: reqUnions,
LatchSpans: latchSpans,
LockSpans: lockSpans,
}
return ""

Expand Down
50 changes: 42 additions & 8 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ const (
// without pushing anyone.
waitSelf

// waitQueueMaxLengthExceeded indicates that the request attempted to enter a
// lock wait-queue as a writer and found that the queue's length was already
// equal to or exceeding the request's configured maximum. As a result, the
// request was rejected.
waitQueueMaxLengthExceeded

// doneWaiting indicates that the request is done waiting on this pass
// through the lockTable and should make another call to ScanAndEnqueue.
doneWaiting
Expand Down Expand Up @@ -112,6 +118,9 @@ func (s waitingState) String() string {
return "wait elsewhere by proceeding to evaluation"
}
return fmt.Sprintf("wait elsewhere for txn %s @ key %s", s.txn.ID.Short(), s.key)
case waitQueueMaxLengthExceeded:
return fmt.Sprintf("wait-queue maximum length exceeded @ key %s with length %d",
s.key, s.queuedWriters)
case doneWaiting:
return "done waiting"
default:
Expand Down Expand Up @@ -316,6 +325,11 @@ func newLockTable(maxLocks int64) *lockTableImpl {
// transaction has a reservation. See the comment about "Reservations" in
// lockState.
//
// - The waitQueueMaxLengthExceeded state is used to indicate that the request
// was rejected because it attempted to enter a lock wait-queue as a writer
// and found that the queue's length was already equal to or exceeding the
// request's configured maximum.
//
// - The doneWaiting state is used to indicate that the request should make
// another call to ScanAndEnqueue() (that next call is more likely to return a
// lockTableGuard that returns false from StartWaiting()).
Expand All @@ -324,9 +338,10 @@ type lockTableGuardImpl struct {
lt *lockTableImpl

// Information about this request.
txn *enginepb.TxnMeta
ts hlc.Timestamp
spans *spanset.SpanSet
txn *enginepb.TxnMeta
ts hlc.Timestamp
spans *spanset.SpanSet
maxWaitQueueLength int

// Snapshots of the trees for which this request has some spans. Note that
// the lockStates in these snapshots may have been removed from
Expand Down Expand Up @@ -1348,7 +1363,13 @@ func (l *lockState) tryActiveWait(
}
}

waitForState := waitingState{kind: waitFor, key: l.key}
waitForState := waitingState{
kind: waitFor,
key: l.key,
queuedWriters: l.queuedWriters.Len(),
queuedReaders: l.waitingReaders.Len(),
guardAccess: sa,
}
if lockHolderTxn != nil {
waitForState.txn = lockHolderTxn
waitForState.held = true
Expand Down Expand Up @@ -1411,8 +1432,21 @@ func (l *lockState) tryActiveWait(
guard: g,
active: true,
}
if l.queuedWriters.Len() == 0 {
if curLen := l.queuedWriters.Len(); curLen == 0 {
l.queuedWriters.PushFront(qg)
} else if g.maxWaitQueueLength > 0 && curLen >= g.maxWaitQueueLength {
// The wait-queue is longer than the request is willing to wait for.
// Instead of entering the queue, immediately reject the request.
g.mu.startWait = true
state := waitForState
state.kind = waitQueueMaxLengthExceeded
g.mu.state = state
if notify {
g.notify()
}
// NOTE: we return wait=true not because the request is waiting, but
// because it should not continue scanning for conflicting locks.
return true, false
} else {
var e *list.Element
for e = l.queuedWriters.Back(); e != nil; e = e.Prev() {
Expand All @@ -1428,6 +1462,7 @@ func (l *lockState) tryActiveWait(
}
}
g.mu.locks[l] = struct{}{}
waitForState.queuedWriters = l.queuedWriters.Len()
}
if replicatedLockFinalizedTxn != nil && l.queuedWriters.Front().Value.(*queuedGuard) == qg {
// First waiter, so should not wait. NB: this inactive waiter can be
Expand All @@ -1443,6 +1478,7 @@ func (l *lockState) tryActiveWait(
} else {
l.waitingReaders.PushFront(g)
g.mu.locks[l] = struct{}{}
waitForState.queuedReaders = l.waitingReaders.Len()
}
}
if !wait {
Expand All @@ -1453,15 +1489,12 @@ func (l *lockState) tryActiveWait(
// Make it an active waiter.
g.key = l.key
g.mu.startWait = true
waitForState.queuedWriters = l.queuedWriters.Len()
waitForState.queuedReaders = l.waitingReaders.Len()
if g.isSameTxnAsReservation(waitForState) {
state := waitForState
state.kind = waitSelf
g.mu.state = state
} else {
state := waitForState
state.guardAccess = sa
if l.distinguishedWaiter == nil {
l.distinguishedWaiter = g
state.kind = waitForDistinguished
Expand Down Expand Up @@ -2180,6 +2213,7 @@ func (t *lockTableImpl) newGuardForReq(req Request) *lockTableGuardImpl {
g.txn = req.txnMeta()
g.ts = req.Timestamp
g.spans = req.LockSpans
g.maxWaitQueueLength = req.MaxLockWaitQueueLength
g.sa = spanset.NumSpanAccess - 1
g.index = -1
return g
Expand Down
17 changes: 13 additions & 4 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ new-txn txn=<name> ts=<int>[,<int>] epoch=<int> [seq=<int>]
Creates a TxnMeta.
new-request r=<name> txn=<name>|none ts=<int>[,<int>] spans=r|w@<start>[,<end>]+...
new-request r=<name> txn=<name>|none ts=<int>[,<int>] spans=r|w@<start>[,<end>]+... [max-lock-wait-queue-length=<int>]
----
Creates a Request.
Expand Down Expand Up @@ -245,11 +245,16 @@ func TestLockTableBasic(t *testing.T) {
d.Fatalf(t, "unknown txn %s", txnName)
}
ts := scanTimestamp(t, d)
var maxLockWaitQueueLength int
if d.HasArg("max-lock-wait-queue-length") {
d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength)
}
spans := scanSpans(t, d, ts)
req := Request{
Timestamp: ts,
LatchSpans: spans,
LockSpans: spans,
Timestamp: ts,
MaxLockWaitQueueLength: maxLockWaitQueueLength,
LatchSpans: spans,
LockSpans: spans,
}
if txnMeta != nil {
// Update the transaction's timestamp, if necessary. The transaction
Expand Down Expand Up @@ -474,12 +479,16 @@ func TestLockTableBasic(t *testing.T) {
typeStr = "waitElsewhere"
case waitSelf:
return str + "state=waitSelf"
case waitQueueMaxLengthExceeded:
typeStr = "waitQueueMaxLengthExceeded"
case doneWaiting:
var toResolveStr string
if stateTransition {
toResolveStr = intentsToResolveToStr(g.ResolveBeforeScanning(), true)
}
return str + "state=doneWaiting" + toResolveStr
default:
d.Fatalf(t, "unexpected state: %v", state.kind)
}
id := state.txn.ID
var txnS string
Expand Down
30 changes: 24 additions & 6 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (w *lockTableWaiterImpl) WaitOn(
if state.held {
err = w.pushLockTxn(ctx, req, state)
} else {
err = newWriteIntentErr(state)
err = newWriteIntentErr(req, state)
}
if err != nil {
return err
Expand Down Expand Up @@ -284,6 +284,12 @@ func (w *lockTableWaiterImpl) WaitOn(
// request's transaction is sending multiple requests concurrently.
// Proceed with waiting without pushing anyone.

case waitQueueMaxLengthExceeded:
// The request attempted to wait in a lock wait-queue whose length was
// already equal to or exceeding the request's configured maximum. As a
// result, the request was rejected.
return newWriteIntentErr(req, state)

case doneWaiting:
// The request has waited for all conflicting locks to be released
// and is at the front of any lock wait-queues. It can now stop
Expand Down Expand Up @@ -394,7 +400,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
ctx context.Context, req Request, ws waitingState,
) *Error {
if w.disableTxnPushing {
return newWriteIntentErr(ws)
return newWriteIntentErr(req, ws)
}

// Construct the request header and determine which form of push to use.
Expand Down Expand Up @@ -436,7 +442,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// If pushing with an Error WaitPolicy and the push fails, then the lock
// holder is still active. Transform the error into a WriteIntentError.
if _, ok := err.GetDetail().(*roachpb.TransactionPushError); ok && req.WaitPolicy == lock.WaitPolicy_Error {
err = newWriteIntentErr(ws)
err = newWriteIntentErr(req, ws)
}
return err
}
Expand Down Expand Up @@ -786,7 +792,7 @@ func (h *contentionEventHelper) emitAndInit(s waitingState) {
}
h.tBegin = timeutil.Now()
}
case waitElsewhere, doneWaiting:
case waitElsewhere, waitQueueMaxLengthExceeded, doneWaiting:
// If we have an event, emit it now and that's it - the case we're in
// does not give us a new transaction/key.
if h.ev != nil {
Expand All @@ -797,10 +803,22 @@ func (h *contentionEventHelper) emitAndInit(s waitingState) {
}
}

func newWriteIntentErr(ws waitingState) *Error {
return roachpb.NewError(&roachpb.WriteIntentError{
func newWriteIntentErr(req Request, ws waitingState) *Error {
err := roachpb.NewError(&roachpb.WriteIntentError{
Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)},
})
// TODO(nvanbenschoten): setting an error index can assist the KV client in
// understanding which request hit an error. This is not necessary, but can
// improve error handling, leading to better error messages and performance
// optimizations in some cases. We don't have an easy way to associate a given
// conflict with a specific request in a batch because we don't retain a
// mapping from lock span to request. However, as a best-effort optimization,
// we set the error index to 0 if this is the only request in the batch (that
// landed on this range, from the client's perspective).
if len(req.Requests) == 1 {
err.SetErrorIndex(0)
}
return err
}

func hasMinPriority(txn *enginepb.TxnMeta) bool {
Expand Down
Loading

0 comments on commit 0786e26

Please sign in to comment.