diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 0293123f43c6..fd567fae0719 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -1,5 +1,8 @@
Setting Type Default Description
admission.epoch_lifo.enabled boolean false when true, epoch-LIFO behavior is enabled when there is significant delay in admission
+admission.epoch_lifo.epoch_closing_delta_duration duration 5ms the delta duration before closing an epoch, for epoch-LIFO admission control ordering
+admission.epoch_lifo.epoch_duration duration 100ms the duration of an epoch, for epoch-LIFO admission control ordering
+admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo duration 105ms the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering
admission.sql_kv_response.enabled boolean true when true, work performed by the SQL layer when receiving a KV response is subject to admission control
admission.sql_sql_response.enabled boolean true when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control
bulkio.backup.file_size byte size 128 MiB target size for individual data files produced during BACKUP
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 06874ffa3e1b..e1318d4eeb54 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -2,6 +2,9 @@
Setting | Type | Default | Description |
admission.epoch_lifo.enabled | boolean | false | when true, epoch-LIFO behavior is enabled when there is significant delay in admission |
+admission.epoch_lifo.epoch_closing_delta_duration | duration | 5ms | the delta duration before closing an epoch, for epoch-LIFO admission control ordering |
+admission.epoch_lifo.epoch_duration | duration | 100ms | the duration of an epoch, for epoch-LIFO admission control ordering |
+admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo | duration | 105ms | the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering |
admission.kv.enabled | boolean | true | when true, work performed by the KV layer is subject to admission control |
admission.sql_kv_response.enabled | boolean | true | when true, work performed by the SQL layer when receiving a KV response is subject to admission control |
admission.sql_sql_response.enabled | boolean | true | when true, work performed by the SQL layer when receiving a DistSQL response is subject to admission control |
diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go
index e784dbac64a0..27cf157ce311 100644
--- a/pkg/util/admission/work_queue.go
+++ b/pkg/util/admission/work_queue.go
@@ -86,6 +86,42 @@ var EpochLIFOEnabled = settings.RegisterBoolSetting(
"when true, epoch-LIFO behavior is enabled when there is significant delay in admission",
false).WithPublic()
+var epochLIFOEpochDuration = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "admission.epoch_lifo.epoch_duration",
+ "the duration of an epoch, for epoch-LIFO admission control ordering",
+ epochLength,
+ func(v time.Duration) error {
+ if v < time.Millisecond {
+ return errors.Errorf("epoch-LIFO: epoch duration is too small")
+ }
+ return nil
+ }).WithPublic()
+
+var epochLIFOEpochClosingDeltaDuration = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "admission.epoch_lifo.epoch_closing_delta_duration",
+ "the delta duration before closing an epoch, for epoch-LIFO admission control ordering",
+ epochClosingDelta,
+ func(v time.Duration) error {
+ if v < time.Millisecond {
+ return errors.Errorf("epoch-LIFO: epoch closing delta is too small")
+ }
+ return nil
+ }).WithPublic()
+
+var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "admission.epoch_lifo.queue_delay_threshold_to_switch_to_lifo",
+ "the queue delay encountered by a (tenant,priority) for switching to epoch-LIFO ordering",
+ maxQueueDelayToSwitchToLifo,
+ func(v time.Duration) error {
+ if v < time.Millisecond {
+ return errors.Errorf("epoch-LIFO: queue delay threshold is too small")
+ }
+ return nil
+ }).WithPublic()
+
// WorkPriority represents the priority of work. In an WorkQueue, it is only
// used for ordering within a tenant. High priority work can starve lower
// priority work.
@@ -203,6 +239,10 @@ type WorkQueue struct {
tenants map[uint64]*tenantInfo
// The highest epoch that is closed.
closedEpochThreshold int64
+ // Following values are copied from the cluster settings.
+ epochLengthNanos int64
+ epochClosingDeltaNanos int64
+ maxQueueDelayToSwitchToLifo time.Duration
}
logThreshold log.EveryN
metrics WorkQueueMetrics
@@ -270,7 +310,12 @@ func makeWorkQueue(
stopCh: stopCh,
timeSource: opts.timeSource,
}
- q.mu.tenants = make(map[uint64]*tenantInfo)
+ func() {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ q.mu.tenants = make(map[uint64]*tenantInfo)
+ q.sampleEpochLIFOSettingsLocked()
+ }()
go func() {
ticker := time.NewTicker(time.Second)
for {
@@ -283,7 +328,7 @@ func makeWorkQueue(
}
}
}()
- q.tryCloseEpoch()
+ q.tryCloseEpoch(q.timeNow())
if !opts.disableEpochClosingGoroutine {
q.startClosingEpochs()
}
@@ -300,76 +345,90 @@ func (q *WorkQueue) timeNow() time.Time {
}
func (q *WorkQueue) epochLIFOEnabled() bool {
- return q.settings != nil && EpochLIFOEnabled.Get(&q.settings.SV)
+ return EpochLIFOEnabled.Get(&q.settings.SV)
+}
+
+// Samples the latest cluster settings for epoch-LIFO.
+func (q *WorkQueue) sampleEpochLIFOSettingsLocked() {
+ epochLengthNanos := int64(epochLIFOEpochDuration.Get(&q.settings.SV))
+ if epochLengthNanos != q.mu.epochLengthNanos {
+ // Reset what is closed. A proper closed value will be calculated when the
+ // next epoch closes. This ensures that if we are increasing the epoch
+ // length, we will regress what epoch number is closed. Meanwhile, all
+ // work subject to LIFO queueing will get queued in the openEpochsHeap,
+ // which is fine (we admit from there too).
+ q.mu.closedEpochThreshold = 0
+ }
+ q.mu.epochLengthNanos = epochLengthNanos
+ q.mu.epochClosingDeltaNanos = int64(epochLIFOEpochClosingDeltaDuration.Get(&q.settings.SV))
+ q.mu.maxQueueDelayToSwitchToLifo = epochLIFOQueueDelayThresholdToSwitchToLIFO.Get(&q.settings.SV)
}
func (q *WorkQueue) startClosingEpochs() {
go func() {
- // We try to run the ticker with duration equal to the epoch length
- // whenever possible. If the error in closing the epoch grows too large,
- // we switch to a 1ms ticker. One would expect the overhead of always
- // running with a 1ms ticker to be negligible, but we have observed
- // 5-10% of cpu utilization on CockroachDB nodes that are doing no other
- // work. The cause may be a poor interaction with processor idle state
- // https://github.com/golang/go/issues/30740#issuecomment-471634471.
- // Note that one of the cases where error in closing is likely to grow
- // large is when cpu utilization is close to 100% -- in that case we
- // will be doing 1ms ticks, which is fine since there are no idle
- // processors.
- tickerDurShort := time.Millisecond
- acceptableErrorNanos := int64(2 * tickerDurShort)
- currentTickerDur := tickerDurShort
- if !q.epochLIFOEnabled() {
- currentTickerDur = time.Duration(epochLengthNanos)
- }
- // TODO(sumeer): try using a Timer instead.
- ticker := time.NewTicker(currentTickerDur)
+ // If someone sets the epoch length to a huge value by mistake, we will
+ // still sample every second, so that we can adjust when they fix their
+ // mistake.
+ const maxTimerDur = time.Second
+ // This is the min duration we set the timer for, to avoid setting smaller
+ // and smaller timers, in case the timer fires slightly early.
+ const minTimerDur = time.Millisecond
+ var timer *time.Timer
for {
- select {
- case <-ticker.C:
- closedEpoch, closingErrorNanos := q.tryCloseEpoch()
- if closedEpoch {
- epochLIFOEnabled := q.epochLIFOEnabled()
- if currentTickerDur == tickerDurShort {
- if closingErrorNanos < acceptableErrorNanos || !epochLIFOEnabled {
- // Switch to long duration ticking.
- currentTickerDur = time.Duration(epochLengthNanos)
- ticker.Reset(currentTickerDur)
- }
- // Else continue ticking at 1ms.
- } else if closingErrorNanos >= acceptableErrorNanos && epochLIFOEnabled {
- // Ticker was using a long duration and the error became too
- // high. Switch to 1ms ticks.
- currentTickerDur = tickerDurShort
- ticker.Reset(currentTickerDur)
- }
+ q.mu.Lock()
+ q.sampleEpochLIFOSettingsLocked()
+ nextCloseTime := q.nextEpochCloseTimeLocked()
+ q.mu.Unlock()
+ timeNow := q.timeNow()
+ timerDur := nextCloseTime.Sub(timeNow)
+ if timerDur > 0 {
+ if timerDur > maxTimerDur {
+ timerDur = maxTimerDur
+ } else if timerDur < minTimerDur {
+ timerDur = minTimerDur
}
- case <-q.stopCh:
- // Channel closed.
- return
+ if timer == nil {
+ timer = time.NewTimer(timerDur)
+ } else {
+ timer.Reset(timerDur)
+ }
+ select {
+ case <-timer.C:
+ case <-q.stopCh:
+ // Channel closed.
+ return
+ }
+ } else {
+ q.tryCloseEpoch(timeNow)
}
}
}()
}
-func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64) {
+func (q *WorkQueue) nextEpochCloseTimeLocked() time.Time {
+ // +2 since we need to advance the threshold by 1, and another 1 since the
+ // epoch closes at its end time.
+ timeUnixNanos :=
+ (q.mu.closedEpochThreshold+2)*q.mu.epochLengthNanos + q.mu.epochClosingDeltaNanos
+ return timeutil.Unix(0, timeUnixNanos)
+}
+
+func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) {
epochLIFOEnabled := q.epochLIFOEnabled()
- timeNow := q.timeNow()
- epochClosingTimeNanos := timeNow.UnixNano() - epochLengthNanos - epochClosingDeltaNanos
- epoch := epochForTimeNanos(epochClosingTimeNanos)
q.mu.Lock()
defer q.mu.Unlock()
+ epochClosingTimeNanos := timeNow.UnixNano() - q.mu.epochLengthNanos - q.mu.epochClosingDeltaNanos
+ epoch := epochForTimeNanos(epochClosingTimeNanos, q.mu.epochLengthNanos)
if epoch <= q.mu.closedEpochThreshold {
return
}
q.mu.closedEpochThreshold = epoch
- closedEpoch = true
- closingErrorNanos = epochClosingTimeNanos - (epoch * epochLengthNanos)
doLog := q.logThreshold.ShouldLog()
for _, tenant := range q.mu.tenants {
prevThreshold := tenant.fifoPriorityThreshold
tenant.fifoPriorityThreshold =
- tenant.priorityStates.getFIFOPriorityThresholdAndReset(tenant.fifoPriorityThreshold)
+ tenant.priorityStates.getFIFOPriorityThresholdAndReset(
+ tenant.fifoPriorityThreshold, q.mu.epochLengthNanos, q.mu.maxQueueDelayToSwitchToLifo)
if !epochLIFOEnabled {
tenant.fifoPriorityThreshold = int(LowPri)
}
@@ -401,7 +460,6 @@ func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64)
heap.Push(&tenant.waitingWorkHeap, work)
}
}
- return closedEpoch, closingErrorNanos
}
// Admit is called when requesting admission for some work. If err!=nil, the
@@ -411,7 +469,7 @@ func (q *WorkQueue) tryCloseEpoch() (closedEpoch bool, closingErrorNanos int64)
// enabled=true && err!=nil, and the WorkKind for this queue uses slots.
func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) {
enabledSetting := admissionControlEnabledSettings[q.workKind]
- if q.settings != nil && enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) {
+ if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) {
return false, nil
}
q.metrics.Requested.Inc(1)
@@ -524,7 +582,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
if int(info.Priority) < tenant.fifoPriorityThreshold {
ordering = lifoWorkOrdering
}
- work := newWaitingWork(info.Priority, ordering, info.CreateTime, startTime)
+ work := newWaitingWork(info.Priority, ordering, info.CreateTime, startTime, q.mu.epochLengthNanos)
inTenantHeap := isInTenantHeap(tenant)
if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering {
heap.Push(&tenant.waitingWorkHeap, work)
@@ -835,7 +893,9 @@ func (ps *priorityStates) updateDelayLocked(
}
}
-func (ps *priorityStates) getFIFOPriorityThresholdAndReset(curPriorityThreshold int) int {
+func (ps *priorityStates) getFIFOPriorityThresholdAndReset(
+ curPriorityThreshold int, epochLengthNanos int64, maxQueueDelayToSwitchToLifo time.Duration,
+) int {
// priority is monotonically increasing in the calculation below.
priority := int(LowPri)
foundLowestPriority := false
@@ -1040,18 +1100,18 @@ var waitingWorkPool = sync.Pool{
},
}
-// The epoch length for doing epoch-LIFO. The epoch-LIFO scheme relies on
-// clock synchronization and the expectation that transaction/query deadlines
-// will be significantly higher than execution time under low load. A standard
-// LIFO scheme suffers from a severe problem when a single user transaction
-// can result in many lower-level work that get distributed to many nodes, and
-// previous work execution can result in new work being submitted for
-// admission: the later work for a transaction may no longer be the latest
-// seen by the system, so will not be preferred. This means LIFO would do some
-// work items from each transaction and starve the remaining work, so nothing
-// would complete. This is even worse than FIFO which at least prefers the
-// same transactions until they are complete (FIFO and LIFO are using the
-// transaction CreateTime, and not the work arrival time).
+// The default epoch length for doing epoch-LIFO. The epoch-LIFO scheme relies
+// on clock synchronization and the expectation that transaction/query
+// deadlines will be significantly higher than execution time under low load.
+// A standard LIFO scheme suffers from a severe problem when a single user
+// transaction can result in many lower-level work that get distributed to
+// many nodes, and previous work execution can result in new work being
+// submitted for admission: the later work for a transaction may no longer be
+// the latest seen by the system, so will not be preferred. This means LIFO
+// would do some work items from each transaction and starve the remaining
+// work, so nothing would complete. This is even worse than FIFO which at
+// least prefers the same transactions until they are complete (FIFO and LIFO
+// are using the transaction CreateTime, and not the work arrival time).
//
// Consider a case where transaction deadlines are 1s (note this may not
// necessarily be an actual deadline, and could be a time duration after which
@@ -1075,22 +1135,24 @@ var waitingWorkPool = sync.Pool{
// closed epochs, but since they are not bottlenecked, the queueing delay
// should be minimal.
//
-// TODO(sumeer): make these configurable via a cluster setting. Increasing
-// this value will cause the epoch number to move backwards. This will cause
+// These are defaults and can be overridden using cluster settings. Increasing
+// the epoch length will cause the epoch number to decrease. This will cause
// some confusion in the ordering between work that was previously queued with
-// a higher epoch number. We accept that temporary confusion. We do not try to
-// maintain a monotonic epoch based on the epoch number already in place
-// before the change since different nodes will see the cluster setting change
-// at different times.
-const epochLengthNanos = int64(time.Millisecond * 100)
-const epochClosingDeltaNanos = int64(time.Millisecond * 5)
+// a higher epoch number. We accept that temporary confusion (it will clear
+// once old queued work is admitted or canceled). We do not try to maintain a
+// monotonic epoch, based on the epoch number already in place before the
+// change, since different nodes will see the cluster setting change at
+// different times.
+
+const epochLength = time.Millisecond * 100
+const epochClosingDelta = time.Millisecond * 5
// Latency threshold for switching to LIFO queuing. Once we switch to LIFO,
// the minimum latency will be epochLenghNanos+epochClosingDeltaNanos, so it
// makes sense not to switch until the observed latency is around the same.
-const maxQueueDelayToSwitchToLifo = time.Duration(epochLengthNanos + epochClosingDeltaNanos)
+const maxQueueDelayToSwitchToLifo = epochLength + epochClosingDelta
-func epochForTimeNanos(t int64) int64 {
+func epochForTimeNanos(t int64, epochLengthNanos int64) int64 {
return t / epochLengthNanos
}
@@ -1099,6 +1161,7 @@ func newWaitingWork(
arrivalTimeWorkOrdering workOrderingKind,
createTime int64,
enqueueingTime time.Time,
+ epochLengthNanos int64,
) *waitingWork {
ww := waitingWorkPool.Get().(*waitingWork)
ch := ww.ch
@@ -1109,7 +1172,7 @@ func newWaitingWork(
priority: priority,
arrivalTimeWorkOrdering: arrivalTimeWorkOrdering,
createTime: createTime,
- epoch: epochForTimeNanos(createTime),
+ epoch: epochForTimeNanos(createTime, epochLengthNanos),
ch: ch,
heapIndex: -1,
enqueueingTime: enqueueingTime,
diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go
index caae670df3ae..0ca8c28f01f5 100644
--- a/pkg/util/admission/work_queue_test.go
+++ b/pkg/util/admission/work_queue_test.go
@@ -225,7 +225,6 @@ func TestWorkQueueBasic(t *testing.T) {
return buf.stringAndReset()
case "cancel-work":
- // TODO: test cancellation of something in openepochheap
var id int
d.ScanArgs(t, "id", &id)
work, ok := wrkMap.get(id)
@@ -263,7 +262,7 @@ func TestWorkQueueBasic(t *testing.T) {
d.ScanArgs(t, "millis", &millis)
timeSource.Advance(time.Duration(millis) * time.Millisecond)
EpochLIFOEnabled.Override(context.Background(), &st.SV, true)
- q.tryCloseEpoch()
+ q.tryCloseEpoch(timeSource.Now())
return q.String()
default:
@@ -289,8 +288,9 @@ func TestWorkQueueTokenResetRace(t *testing.T) {
var buf builderWithMu
tg := &testGranter{buf: &buf}
+ st := cluster.MakeTestingClusterSettings()
q := makeWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), SQLKVResponseWork, tg,
- nil, makeWorkQueueOptions(SQLKVResponseWork)).(*WorkQueue)
+ st, makeWorkQueueOptions(SQLKVResponseWork)).(*WorkQueue)
tg.r = q
createTime := int64(0)
stopCh := make(chan struct{})
@@ -400,7 +400,8 @@ func TestPriorityStates(t *testing.T) {
return printFunc()
case "get-threshold":
- curThreshold = ps.getFIFOPriorityThresholdAndReset(curThreshold)
+ curThreshold = ps.getFIFOPriorityThresholdAndReset(
+ curThreshold, int64(epochLength), maxQueueDelayToSwitchToLifo)
return fmt.Sprintf("threshold: %d", curThreshold)
default: