Skip to content

Commit

Permalink
Merge #76951
Browse files Browse the repository at this point in the history
76951: admission: add cluster settings for epoch-LIFO queueing r=ajwerner a=sumeerbhola

The epoch length, the additional delta duration to wait before
closing the epoch, and the queueing delay threshold to switch
to epoch-LIFO are all configurable.

Additionally, the polling for closing an epoch, that was using
a time.Ticker is replaced by a time.Timer, to simplify the code
and to increase accuracy of closing.

Release note (ops change): admission.epoch_lifo.epoch_duration,
admission.epoch_lifo.epoch_closing_delta_duration,
admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo
are new cluster settings for configuring epoch-LIFO queueing.

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Feb 27, 2022
2 parents 6ff86bb + 0078cf5 commit 9b45e99
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 81 deletions.
3 changes: 3 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Setting Type Default Description
admission.epoch_lifo.enabled boolean false when true, epoch-LIFO behavior is enabled when there is significant delay in admission
admission.epoch_lifo.epoch_closing_delta_duration duration 5ms the delta duration before closing an epoch, for epoch-LIFO admission control ordering
admission.epoch_lifo.epoch_duration duration 100ms the duration of an epoch, for epoch-LIFO admission control ordering
admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo duration 105ms the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering
admission.sql_kv_response.enabled boolean true when true, work performed by the SQL layer when receiving a KV response is subject to admission control
admission.sql_sql_response.enabled boolean true when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP
Expand Down
3 changes: 3 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
<thead><tr><th>Setting</th><th>Type</th><th>Default</th><th>Description</th></tr></thead>
<tbody>
<tr><td><code>admission.epoch_lifo.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, epoch-LIFO behavior is enabled when there is significant delay in admission</td></tr>
<tr><td><code>admission.epoch_lifo.epoch_closing_delta_duration</code></td><td>duration</td><td><code>5ms</code></td><td>the delta duration before closing an epoch, for epoch-LIFO admission control ordering</td></tr>
<tr><td><code>admission.epoch_lifo.epoch_duration</code></td><td>duration</td><td><code>100ms</code></td><td>the duration of an epoch, for epoch-LIFO admission control ordering</td></tr>
<tr><td><code>admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo</code></td><td>duration</td><td><code>105ms</code></td><td>the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering</td></tr>
<tr><td><code>admission.kv.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, work performed by the KV layer is subject to admission control</td></tr>
<tr><td><code>admission.sql_kv_response.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, work performed by the SQL layer when receiving a KV response is subject to admission control</td></tr>
<tr><td><code>admission.sql_sql_response.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control</td></tr>
Expand Down
217 changes: 140 additions & 77 deletions pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,42 @@ var EpochLIFOEnabled = settings.RegisterBoolSetting(
"when true, epoch-LIFO behavior is enabled when there is significant delay in admission",
false).WithPublic()

var epochLIFOEpochDuration = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.epoch_duration",
"the duration of an epoch, for epoch-LIFO admission control ordering",
epochLength,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: epoch duration is too small")
}
return nil
}).WithPublic()

var epochLIFOEpochClosingDeltaDuration = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.epoch_closing_delta_duration",
"the delta duration before closing an epoch, for epoch-LIFO admission control ordering",
epochClosingDelta,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: epoch closing delta is too small")
}
return nil
}).WithPublic()

var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSetting(
settings.TenantWritable,
"admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo",
"the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering",
maxQueueDelayToSwitchToLifo,
func(v time.Duration) error {
if v < time.Millisecond {
return errors.Errorf("epoch-LIFO: queue delay threshold is too small")
}
return nil
}).WithPublic()

// WorkPriority represents the priority of work. In an WorkQueue, it is only
// used for ordering within a tenant. High priority work can starve lower
// priority work.
Expand Down Expand Up @@ -203,6 +239,10 @@ type WorkQueue struct {
tenants map[uint64]*tenantInfo
// The highest epoch that is closed.
closedEpochThreshold int64
// Following values are copied from the cluster settings.
epochLengthNanos int64
epochClosingDeltaNanos int64
maxQueueDelayToSwitchToLifo time.Duration
}
logThreshold log.EveryN
metrics WorkQueueMetrics
Expand Down Expand Up @@ -270,7 +310,12 @@ func makeWorkQueue(
stopCh: stopCh,
timeSource: opts.timeSource,
}
q.mu.tenants = make(map[uint64]*tenantInfo)
func() {
q.mu.Lock()
defer q.mu.Unlock()
q.mu.tenants = make(map[uint64]*tenantInfo)
q.sampleEpochLIFOSettingsLocked()
}()
go func() {
ticker := time.NewTicker(time.Second)
for {
Expand All @@ -283,7 +328,7 @@ func makeWorkQueue(
}
}
}()
q.tryCloseEpoch()
q.tryCloseEpoch(q.timeNow())
if !opts.disableEpochClosingGoroutine {
q.startClosingEpochs()
}
Expand All @@ -300,76 +345,90 @@ func (q *WorkQueue) timeNow() time.Time {
}

func (q *WorkQueue) epochLIFOEnabled() bool {
return q.settings != nil && EpochLIFOEnabled.Get(&q.settings.SV)
return EpochLIFOEnabled.Get(&q.settings.SV)
}

// Samples the latest cluster settings for epoch-LIFO.
func (q *WorkQueue) sampleEpochLIFOSettingsLocked() {
epochLengthNanos := int64(epochLIFOEpochDuration.Get(&q.settings.SV))
if epochLengthNanos != q.mu.epochLengthNanos {
// Reset what is closed. A proper closed value will be calculated when the
// next epoch closes. This ensures that if we are increasing the epoch
// length, we will regress what epoch number is closed. Meanwhile, all
// work subject to LIFO queueing will get queued in the openEpochsHeap,
// which is fine (we admit from there too).
q.mu.closedEpochThreshold = 0
}
q.mu.epochLengthNanos = epochLengthNanos
q.mu.epochClosingDeltaNanos = int64(epochLIFOEpochClosingDeltaDuration.Get(&q.settings.SV))
q.mu.maxQueueDelayToSwitchToLifo = epochLIFOQueueDelayThresholdToSwitchToLIFO.Get(&q.settings.SV)
}

func (q *WorkQueue) startClosingEpochs() {
go func() {
// We try to run the ticker with duration equal to the epoch length
// whenever possible. If the error in closing the epoch grows too large,
// we switch to a 1ms ticker. One would expect the overhead of always
// running with a 1ms ticker to be negligible, but we have observed
// 5-10% of cpu utilization on CockroachDB nodes that are doing no other
// work. The cause may be a poor interaction with processor idle state
// https://github.com/golang/go/issues/30740#issuecomment-471634471.
// Note that one of the cases where error in closing is likely to grow
// large is when cpu utilization is close to 100% -- in that case we
// will be doing 1ms ticks, which is fine since there are no idle
// processors.
tickerDurShort := time.Millisecond
acceptableErrorNanos := int64(2 * tickerDurShort)
currentTickerDur := tickerDurShort
if !q.epochLIFOEnabled() {
currentTickerDur = time.Duration(epochLengthNanos)
}
// TODO(sumeer): try using a Timer instead.
ticker := time.NewTicker(currentTickerDur)
// If someone sets the epoch length to a huge value by mistake, we will
// still sample every second, so that we can adjust when they fix their
// mistake.
const maxTimerDur = time.Second
// This is the min duration we set the timer for, to avoid setting smaller
// and smaller timers, in case the timer fires slightly early.
const minTimerDur = time.Millisecond
var timer *time.Timer
for {
select {
case <-ticker.C:
closedEpoch, closingErrorNanos := q.tryCloseEpoch()
if closedEpoch {
epochLIFOEnabled := q.epochLIFOEnabled()
if currentTickerDur == tickerDurShort {
if closingErrorNanos < acceptableErrorNanos || !epochLIFOEnabled {
// Switch to long duration ticking.
currentTickerDur = time.Duration(epochLengthNanos)
ticker.Reset(currentTickerDur)
}
// Else continue ticking at 1ms.
} else if closingErrorNanos >= acceptableErrorNanos && epochLIFOEnabled {
// Ticker was using a long duration and the error became too
// high. Switch to 1ms ticks.
currentTickerDur = tickerDurShort
ticker.Reset(currentTickerDur)
}
q.mu.Lock()
q.sampleEpochLIFOSettingsLocked()
nextCloseTime := q.nextEpochCloseTimeLocked()
q.mu.Unlock()
timeNow := q.timeNow()
timerDur := nextCloseTime.Sub(timeNow)
if timerDur > 0 {
if timerDur > maxTimerDur {
timerDur = maxTimerDur
} else if timerDur < minTimerDur {
timerDur = minTimerDur
}
case <-q.stopCh:
// Channel closed.
return
if timer == nil {
timer = time.NewTimer(timerDur)
} else {
timer.Reset(timerDur)
}
select {
case <-timer.C:
case <-q.stopCh:
// Channel closed.
return
}
} else {
q.tryCloseEpoch(timeNow)
}
}
}()
}

func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64) {
func (q *WorkQueue) nextEpochCloseTimeLocked() time.Time {
// +2 since we need to advance the threshold by 1, and another 1 since the
// epoch closes at its end time.
timeUnixNanos :=
(q.mu.closedEpochThreshold+2)*q.mu.epochLengthNanos + q.mu.epochClosingDeltaNanos
return timeutil.Unix(0, timeUnixNanos)
}

func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) {
epochLIFOEnabled := q.epochLIFOEnabled()
timeNow := q.timeNow()
epochClosingTimeNanos := timeNow.UnixNano() - epochLengthNanos - epochClosingDeltaNanos
epoch := epochForTimeNanos(epochClosingTimeNanos)
q.mu.Lock()
defer q.mu.Unlock()
epochClosingTimeNanos := timeNow.UnixNano() - q.mu.epochLengthNanos - q.mu.epochClosingDeltaNanos
epoch := epochForTimeNanos(epochClosingTimeNanos, q.mu.epochLengthNanos)
if epoch <= q.mu.closedEpochThreshold {
return
}
q.mu.closedEpochThreshold = epoch
closedEpoch = true
closingErrorNanos = epochClosingTimeNanos - (epoch * epochLengthNanos)
doLog := q.logThreshold.ShouldLog()
for _, tenant := range q.mu.tenants {
prevThreshold := tenant.fifoPriorityThreshold
tenant.fifoPriorityThreshold =
tenant.priorityStates.getFIFOPriorityThresholdAndReset(tenant.fifoPriorityThreshold)
tenant.priorityStates.getFIFOPriorityThresholdAndReset(
tenant.fifoPriorityThreshold, q.mu.epochLengthNanos, q.mu.maxQueueDelayToSwitchToLifo)
if !epochLIFOEnabled {
tenant.fifoPriorityThreshold = int(LowPri)
}
Expand Down Expand Up @@ -401,7 +460,6 @@ func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64)
heap.Push(&tenant.waitingWorkHeap, work)
}
}
return closedEpoch, closingErrorNanos
}

// Admit is called when requesting admission for some work. If err!=nil, the
Expand All @@ -411,7 +469,7 @@ func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64)
// enabled=true && err!=nil, and the WorkKind for this queue uses slots.
func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) {
enabledSetting := admissionControlEnabledSettings[q.workKind]
if q.settings != nil && enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) {
if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) {
return false, nil
}
q.metrics.Requested.Inc(1)
Expand Down Expand Up @@ -524,7 +582,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
if int(info.Priority) < tenant.fifoPriorityThreshold {
ordering = lifoWorkOrdering
}
work := newWaitingWork(info.Priority, ordering, info.CreateTime, startTime)
work := newWaitingWork(info.Priority, ordering, info.CreateTime, startTime, q.mu.epochLengthNanos)
inTenantHeap := isInTenantHeap(tenant)
if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering {
heap.Push(&tenant.waitingWorkHeap, work)
Expand Down Expand Up @@ -835,7 +893,9 @@ func (ps *priorityStates) updateDelayLocked(
}
}

func (ps *priorityStates) getFIFOPriorityThresholdAndReset(curPriorityThreshold int) int {
func (ps *priorityStates) getFIFOPriorityThresholdAndReset(
curPriorityThreshold int, epochLengthNanos int64, maxQueueDelayToSwitchToLifo time.Duration,
) int {
// priority is monotonically increasing in the calculation below.
priority := int(LowPri)
foundLowestPriority := false
Expand Down Expand Up @@ -1040,18 +1100,18 @@ var waitingWorkPool = sync.Pool{
},
}

// The epoch length for doing epoch-LIFO. The epoch-LIFO scheme relies on
// clock synchronization and the expectation that transaction/query deadlines
// will be significantly higher than execution time under low load. A standard
// LIFO scheme suffers from a severe problem when a single user transaction
// can result in many lower-level work that get distributed to many nodes, and
// previous work execution can result in new work being submitted for
// admission: the later work for a transaction may no longer be the latest
// seen by the system, so will not be preferred. This means LIFO would do some
// work items from each transaction and starve the remaining work, so nothing
// would complete. This is even worse than FIFO which at least prefers the
// same transactions until they are complete (FIFO and LIFO are using the
// transaction CreateTime, and not the work arrival time).
// The default epoch length for doing epoch-LIFO. The epoch-LIFO scheme relies
// on clock synchronization and the expectation that transaction/query
// deadlines will be significantly higher than execution time under low load.
// A standard LIFO scheme suffers from a severe problem when a single user
// transaction can result in many lower-level work that get distributed to
// many nodes, and previous work execution can result in new work being
// submitted for admission: the later work for a transaction may no longer be
// the latest seen by the system, so will not be preferred. This means LIFO
// would do some work items from each transaction and starve the remaining
// work, so nothing would complete. This is even worse than FIFO which at
// least prefers the same transactions until they are complete (FIFO and LIFO
// are using the transaction CreateTime, and not the work arrival time).
//
// Consider a case where transaction deadlines are 1s (note this may not
// necessarily be an actual deadline, and could be a time duration after which
Expand All @@ -1075,22 +1135,24 @@ var waitingWorkPool = sync.Pool{
// closed epochs, but since they are not bottlenecked, the queueing delay
// should be minimal.
//
// TODO(sumeer): make these configurable via a cluster setting. Increasing
// this value will cause the epoch number to move backwards. This will cause
// These are defaults and can be overridden using cluster settings. Increasing
// the epoch length will cause the epoch number to decrease. This will cause
// some confusion in the ordering between work that was previously queued with
// a higher epoch number. We accept that temporary confusion. We do not try to
// maintain a monotonic epoch based on the epoch number already in place
// before the change since different nodes will see the cluster setting change
// at different times.
const epochLengthNanos = int64(time.Millisecond * 100)
const epochClosingDeltaNanos = int64(time.Millisecond * 5)
// a higher epoch number. We accept that temporary confusion (it will clear
// once old queued work is admitted or canceled). We do not try to maintain a
// monotonic epoch, based on the epoch number already in place before the
// change, since different nodes will see the cluster setting change at
// different times.

const epochLength = time.Millisecond * 100
const epochClosingDelta = time.Millisecond * 5

// Latency threshold for switching to LIFO queuing. Once we switch to LIFO,
// the minimum latency will be epochLenghNanos+epochClosingDeltaNanos, so it
// makes sense not to switch until the observed latency is around the same.
const maxQueueDelayToSwitchToLifo = time.Duration(epochLengthNanos + epochClosingDeltaNanos)
const maxQueueDelayToSwitchToLifo = epochLength + epochClosingDelta

func epochForTimeNanos(t int64) int64 {
func epochForTimeNanos(t int64, epochLengthNanos int64) int64 {
return t / epochLengthNanos
}

Expand All @@ -1099,6 +1161,7 @@ func newWaitingWork(
arrivalTimeWorkOrdering workOrderingKind,
createTime int64,
enqueueingTime time.Time,
epochLengthNanos int64,
) *waitingWork {
ww := waitingWorkPool.Get().(*waitingWork)
ch := ww.ch
Expand All @@ -1109,7 +1172,7 @@ func newWaitingWork(
priority: priority,
arrivalTimeWorkOrdering: arrivalTimeWorkOrdering,
createTime: createTime,
epoch: epochForTimeNanos(createTime),
epoch: epochForTimeNanos(createTime, epochLengthNanos),
ch: ch,
heapIndex: -1,
enqueueingTime: enqueueingTime,
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/admission/work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func TestWorkQueueBasic(t *testing.T) {
return buf.stringAndReset()

case "cancel-work":
// TODO: test cancellation of something in openepochheap
var id int
d.ScanArgs(t, "id", &id)
work, ok := wrkMap.get(id)
Expand Down Expand Up @@ -263,7 +262,7 @@ func TestWorkQueueBasic(t *testing.T) {
d.ScanArgs(t, "millis", &millis)
timeSource.Advance(time.Duration(millis) * time.Millisecond)
EpochLIFOEnabled.Override(context.Background(), &st.SV, true)
q.tryCloseEpoch()
q.tryCloseEpoch(timeSource.Now())
return q.String()

default:
Expand All @@ -289,8 +288,9 @@ func TestWorkQueueTokenResetRace(t *testing.T) {

var buf builderWithMu
tg := &testGranter{buf: &buf}
st := cluster.MakeTestingClusterSettings()
q := makeWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), SQLKVResponseWork, tg,
nil, makeWorkQueueOptions(SQLKVResponseWork)).(*WorkQueue)
st, makeWorkQueueOptions(SQLKVResponseWork)).(*WorkQueue)
tg.r = q
createTime := int64(0)
stopCh := make(chan struct{})
Expand Down Expand Up @@ -400,7 +400,8 @@ func TestPriorityStates(t *testing.T) {
return printFunc()

case "get-threshold":
curThreshold = ps.getFIFOPriorityThresholdAndReset(curThreshold)
curThreshold = ps.getFIFOPriorityThresholdAndReset(
curThreshold, int64(epochLength), maxQueueDelayToSwitchToLifo)
return fmt.Sprintf("threshold: %d", curThreshold)

default:
Expand Down

0 comments on commit 9b45e99

Please sign in to comment.