Skip to content

Commit

Permalink
admission: support non-blocking {Store,}WorkQueue.Admit()
Browse files Browse the repository at this point in the history
Part of cockroachdb#95563. For end-to-end flow control of replicated writes, we
want to enable below-raft admission control through the following API on
kvadmission.Controller:

  // AdmitRaftEntry informs admission control of a raft log entry being
  // written to storage (for the given tenant, the specific range, and
  // on the named store).
  AdmitRaftEntry(
    context.Context, roachpb.TenantID,
    roachpb.StoreID, roachpb.RangeID, raftpb.Entry.
  )

This serves as the integration point for log entries received below
raft right as they're being written to stable storage. It's a
non-blocking interface since we're below-raft and in the raft.Ready()
loop. What it effectively does is enqueues a "virtual" work item in the
underlying StoreWorkQueue mediating all store IO. This virtual work item
is what later gets dequeued once the IO granter informs the work queue
of newly available IO tokens. When enqueueing the virtual work item, we
still update the StoreWorkQueue's physically-accounted-for bytes since
the actual write is not deferred, and timely statistic updates improves
accuracy for the underlying linear models (that map between
accounted-for writes and observed L0 growth, using it to inform IO token
generation rates).

For each of the arguments above:
- The roachpb.TenantID is plumbed to find the right tenant heap to queue
  it under (for inter-tenant isolation).
- The roachpb.StoreID to find the right store work queue on multi-store
  nodes. We'll also use the StoreID when informing the origin node of
  log entries being admitted[^1].
- We pass in the roachpb.RangeID on behalf of which work is being
  admitted. This, along side the raftpb.Entry.{Term,Index} for the
  replicated write uniquely identifies where the write is to end up.
  We use these identifiers to:
  - Return flow tokens on the origin node[^1][^2].
  - In WorkQueue ordering -- for replicated writes below-raft, we ignore
    CreateTime/epoch-LIFO, and instead sort by priority and within a
    priority, sort by log position.
- For standard work queue ordering, our work item needs to include the
  CreateTime and AdmissionPriority, details that are passed down using
  dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry
  parameter above.
  - Since the raftpb.Entry encodes within it its origin node[^4], it
    will be used post-admission to dispatch flow tokens to the right
    node. This integration is left to future PRs.

We use the above to populate the following fields on a per-(replicated
write)work basis:

    // ReplicatedWorkInfo groups everything needed to admit replicated
    // writes, done so asynchronously below-raft as part of replication
    // admission control.
    type ReplicatedWorkInfo struct {
      RangeID roachpb.RangeID
      Origin roachpb.NodeID
      LogPosition LogPosition
      Ingested bool
    }

Since admission is happening below-raft where the size of the write is
known, we no longer need per-work estimates for upfront IO token
deductions. Since admission is asynchronous, we also don't use
the AdmittedWorkDone interface which was to make token adjustments
(without blocking) given the upfront estimates. We still want to
intercept the exact point when some write work gets admitted in order to
inform the origin node so it can release flow tokens. We do so through
the following interface satisfied by the StoreWorkQueue:

  // onAdmittedReplicatedWork is used to intercept the
  // point-of-admission for replicated writes.
  type onAdmittedReplicatedWork interface {
    admittedReplicatedWork(
      tenantID roachpb.TenantID,
      pri admissionpb.WorkPriority,
      rwi ReplicatedWorkInfo,
      requestedTokens int64,
    )
  }

[^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in cockroachdb#95637.
[^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor}
      introduced in cockroachdb#95637. Token deductions and returns are tied to
      raft log positions.
[^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in
      cockroachdb#95748.
[^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in cockroachdb#95637.
        message RaftAdmissionMeta {
          int32 admission_priority = ...;
          int64 admission_create_time = ...;
          int32 admission_origin_node = ...;
        }

Release note: None
  • Loading branch information
irfansharif committed Feb 25, 2023
1 parent fca69b0 commit 05ec485
Show file tree
Hide file tree
Showing 27 changed files with 1,260 additions and 156 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/raftlog",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
60 changes: 56 additions & 4 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -120,7 +121,7 @@ type Controller interface {
FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes)
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
AdmitRaftEntry(context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
}

// TenantWeightProvider can be periodically asked to provide the tenant
Expand Down Expand Up @@ -244,7 +245,7 @@ func (n *controllerImpl) AdmitKVWork(
if err != nil {
return Handle{}, err
}
admissionEnabled = storeWorkHandle.AdmissionEnabled()
admissionEnabled = storeWorkHandle.UseAdmittedWorkDone()
if admissionEnabled {
defer func() {
if retErr != nil {
Expand Down Expand Up @@ -409,9 +410,60 @@ func (n *controllerImpl) FollowerStoreWriteBytes(

// AdmitRaftEntry implements the Controller interface.
func (n *controllerImpl) AdmitRaftEntry(
roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry,
ctx context.Context,
tenantID roachpb.TenantID,
storeID roachpb.StoreID,
rangeID roachpb.RangeID,
entry raftpb.Entry,
) {
panic("unimplemented")
typ, err := raftlog.EncodingOf(entry)
if err != nil {
log.Errorf(ctx, "unable to determine raft command encoding: %v", err)
return
}
if !typ.UsesAdmissionControl() {
return // nothing to do
}
meta, err := raftlog.DecodeRaftAdmissionMeta(entry.Data)
if err != nil {
log.Errorf(ctx, "unable to decode raft command admission data: %v", err)
return
}

storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID))
if storeAdmissionQ == nil {
log.Errorf(ctx, "unable to find queue for store: %s", storeID)
return // nothing to do
}

wi := admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.WorkPriority(meta.AdmissionPriority),
CreateTime: meta.AdmissionCreateTime,
BypassAdmission: false,
RequestedCount: int64(entry.Size()),
}
wi.ReplicatedWorkInfo = admission.ReplicatedWorkInfo{
Enabled: true,
RangeID: rangeID,
Origin: meta.AdmissionOriginNode,
LogPosition: admission.LogPosition{
Term: entry.Term,
Index: entry.Index,
},
Ingested: typ.IsSideloaded(),
}

handle, err := storeAdmissionQ.Admit(ctx, admission.StoreWriteWorkInfo{
WorkInfo: wi,
})
if err != nil {
log.Errorf(ctx, "error while admitting to store admission queue: %v", err)
return
}
if handle.UseAdmittedWorkDone() {
log.Fatalf(ctx, "unexpected handle.UseAdmittedWorkDone")
}
}

// FollowerStoreWriteBytes captures stats about writes done to a store by a
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
diskStats = s
}
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
StoreID: store.StoreID(),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
DiskStats: diskStats})
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"scheduler_latency_listener.go",
"sql_cpu_overload_indicator.go",
"store_token_estimation.go",
"testing_knobs.go",
"tokens_linear_model.go",
"work_queue.go",
],
Expand Down Expand Up @@ -53,6 +54,7 @@ go_test(
"elastic_cpu_work_queue_test.go",
"granter_test.go",
"io_load_listener_test.go",
"replicated_write_admission_test.go",
"scheduler_latency_listener_test.go",
"store_token_estimation_test.go",
"tokens_linear_model_test.go",
Expand All @@ -67,6 +69,7 @@ go_test(
"//pkg/testutils/datapathutils",
"//pkg/testutils/echotest",
"//pkg/util/admission/admissionpb",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,9 @@ func workKindString(workKind WorkKind) string {
// all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these
// cumulative values are mutually consistent.
type storeAdmissionStats struct {
// Total requests that called AdmittedWorkDone or BypassedWorkDone.
admittedCount uint64
// Total requests that called {Admitted,Bypassed}WorkDone, or in the case of
// replicated writes, the requests that called Admit.
workCount uint64
// Sum of StoreWorkDoneInfo.WriteBytes.
//
// TODO(sumeer): writeAccountedBytes and ingestedAccountedBytes are not
Expand All @@ -548,7 +549,7 @@ type storeAdmissionStats struct {
// (e.g. for logging).
aux struct {
// These bypassed numbers are already included in the corresponding
// {admittedCount, writeAccountedBytes, ingestedAccountedBytes}.
// {workCount, writeAccountedBytes, ingestedAccountedBytes}.
bypassedCount uint64
writeBypassedAccountedBytes uint64
ingestedBypassedAccountedBytes uint64
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/elastic_cpu_work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (e *ElasticCPUWorkQueue) Admit(
if duration > MaxElasticCPUDuration {
duration = MaxElasticCPUDuration
}
info.requestedCount = duration.Nanoseconds()
info.RequestedCount = duration.Nanoseconds()
enabled, err := e.workQueue.Admit(ctx, info)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/elastic_cpu_work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (t *testElasticCPUInternalWorkQueue) Admit(
_ context.Context, info WorkInfo,
) (enabled bool, err error) {
if !t.disabled {
t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.requestedCount)))
t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.RequestedCount)))
}
return !t.disabled, nil
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
if unsafeGc, ok := sgc.gcMap.Load(int64(m.StoreID)); ok {
gc := (*GrantCoordinator)(unsafeGc)
gc.pebbleMetricsTick(ctx, m)
iotc.UpdateIOThreshold(roachpb.StoreID(m.StoreID), gc.ioLoadListener.ioThreshold)
iotc.UpdateIOThreshold(m.StoreID, gc.ioLoadListener.ioThreshold)
} else {
log.Warningf(ctx,
"seeing metrics for unknown storeID %d", m.StoreID)
Expand All @@ -135,7 +135,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
}()
}

func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoordinator {
func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) *GrantCoordinator {
coord := &GrantCoordinator{
settings: sgc.settings,
useGrantChains: false,
Expand Down Expand Up @@ -168,7 +168,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo
},
}

storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, granters, sgc.settings, sgc.workQueueMetrics, opts)
storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil)
coord.queues[KVWork] = storeReq
requesters := storeReq.getRequesters()
kvg.regularRequester = requesters[admissionpb.RegularWorkClass]
Expand Down Expand Up @@ -336,8 +336,9 @@ type makeRequesterFunc func(
metrics *WorkQueueMetrics, opts workQueueOptions) requester

type makeStoreRequesterFunc func(
_ log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester
_ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs,
) storeRequester

// NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a
// regular cluster node. Caller is responsible for:
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ type IOThresholdConsumer interface {

// StoreMetrics are the metrics for a store.
type StoreMetrics struct {
StoreID int32
StoreID roachpb.StoreID
*pebble.Metrics
WriteStallCount int64
// Optional.
Expand Down
11 changes: 6 additions & 5 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func TestGranterBasic(t *testing.T) {
storeCoordinators := &StoreGrantCoordinators{
settings: settings,
makeStoreRequesterFunc: func(
ambientCtx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester {
ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs,
) storeRequester {
makeTestRequester := func(wc admissionpb.WorkClass) *testRequester {
req := &testRequester{
workKind: KVWork,
Expand Down Expand Up @@ -273,8 +274,8 @@ func TestStoreCoordinators(t *testing.T) {
opts := Options{
makeRequesterFunc: makeRequesterFunc,
makeStoreRequesterFunc: func(
ctx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester {
ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester {
reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts)
reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts)
str := &storeTestRequester{}
Expand Down Expand Up @@ -444,7 +445,7 @@ func (m *testMetricsProvider) setMetricsForStores(stores []int32, metrics pebble
m.metrics = m.metrics[:0]
for _, s := range stores {
m.metrics = append(m.metrics, StoreMetrics{
StoreID: s,
StoreID: roachpb.StoreID(s),
Metrics: &metrics,
})
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
Expand Down Expand Up @@ -151,7 +152,7 @@ const l0SubLevelCountOverloadThreshold = 20
// storeWriteDone), the tokens are adjusted differently for the
// flush/compaction L0 tokens and for the "disk bandwidth" tokens.
type ioLoadListener struct {
storeID int32
storeID roachpb.StoreID
settings *cluster.Settings
kvRequester storeRequester
kvGranter granterWithIOTokens
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func TestIOLoadListener(t *testing.T) {

case "prep-admission-stats":
req.stats = storeAdmissionStats{
admittedCount: 0,
workCount: 0,
writeAccountedBytes: 0,
ingestedAccountedBytes: 0,
}
d.ScanArgs(t, "admitted", &req.stats.admittedCount)
d.ScanArgs(t, "admitted", &req.stats.workCount)
if d.HasArg("write-bytes") {
d.ScanArgs(t, "write-bytes", &req.stats.writeAccountedBytes)
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestBadIOLoadListenerStats(t *testing.T) {
d.BytesRead = rand.Uint64()
d.BytesWritten = rand.Uint64()
d.ProvisionedBandwidth = 1 << 20
req.stats.admittedCount = rand.Uint64()
req.stats.workCount = rand.Uint64()
req.stats.writeAccountedBytes = rand.Uint64()
req.stats.ingestedAccountedBytes = rand.Uint64()
req.stats.statsToIgnore.Bytes = rand.Uint64()
Expand Down
Loading

0 comments on commit 05ec485

Please sign in to comment.