-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
admission: move CreateTime-sequencing below-raft #102819
admission: move CreateTime-sequencing below-raft #102819
Conversation
This was added recently, in cockroachdb#94778, and contributes to log spam of the following sort: I230404 15:00:33.826337 2400 rpc/context.go:2249 [T1,n1,rnode=2,raddr=127.0.0.1:55941,class=default,rpc] 268 connection heartbeat loop ended with err: <nil> I230404 15:00:33.826338 3986 rpc/context.go:2249 [T1,n2,rnode=3,raddr=127.0.0.1:55955,class=system,rpc] 269 connection heartbeat loop ended with err: <nil> I230404 15:00:33.826367 3455 rpc/context.go:2249 [T1,n2,rnode=3,raddr=127.0.0.1:55955,class=default,rpc] 270 connection heartbeat loop ended with err: <nil> I230404 15:00:33.826394 3354 rpc/context.go:2249 [T1,n2,rnode=2,raddr=127.0.0.1:55941,class=default,rpc] 271 connection heartbeat loop ended with err: <nil> Release note: None
44f347c
to
68f332f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r1, 16 of 17 files at r2.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif)
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 106 at r2 (raw file):
DeductTokensFor( context.Context, admissionpb.WorkPriority, time.Time, kvflowcontrolpb.RaftLogPosition, Tokens,
I don't quite understand the reasoning stated in the commit. We are still passing RaftLogPosition
as a parameter, and it eventually goes into a Tracker
that has a log.Errorf
if this is not monotonic. So then why can't we assign CreateTime
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @sumeerbhola)
pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
line 106 at r2 (raw file):
Previously, sumeerbhola wrote…
I don't quite understand the reasoning stated in the commit. We are still passing
RaftLogPosition
as a parameter, and it eventually goes into aTracker
that has alog.Errorf
if this is not monotonic. So then why can't we assignCreateTime
here?
Got it -- the raft entry has been encoded earlier than the point at which we know the RaftLogPosition. Can you add a paragraph or two of code comments about this -- just repeat what you said in the loom. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 17 files at r2, 15 of 15 files at r3, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @irfansharif)
-- commits
line 98 at r3:
I think we've skipped something that was in the design:
To make this work we have to make a change in how tokens are managed in an admission control granter and ioLoadListener. There are logical-tokens and physical-tokens. Physical-tokens are subtracted from when a request proceeds to writing to the raft log (and includes the later cost of application). But since requests don’t physically wait, these tokens can become significantly negative. They exist so that we can properly model how much work was actually done and appropriately fit the models in ioLoadListener and friends. Then there are logical-tokens which represent what will be consumed when the logically queued raft entry for a replica is admitted. This is what WorkQueue consumes. Both sets of tokens are added to by ioLoadListener, by the same amount.
We don't have the physical tokens in this implementation. Which means the model accounting happens when the request is logically admitted which can arbitrarily lag when it was physically performed. We may see some instability in the model if the rate of physical admission and logical admission don't match up.
Can you add a code TODO to remind us of this.
pkg/util/admission/admission.go
line 301 at r3 (raw file):
storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64) // storeReplicatedWorkAdmittedLocked is used by below-raft admission control // to inform granters of work being admitted in order for them. It's invoked
the "in order for them" part doesn't seem relevant to the granter.
We move kvflowsequencer.Sequencer and its use in kvflowhandle.Handle (above-raft) to admission.sequencer, now used by admission.StoreWorkQueue (below-raft). This variant appeared in an earlier revision of cockroachdb#97599 where we first introduced monotonically increasing CreateTimes for a given raft group. In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll observe that it's quite difficult to create sequencing CreateTimes[^1] above raft. This is because these sequence numbers are encoded as part of the raft proposal[^2], and at encode-time, we don't actually know what log position the proposal is going to end up in. It's hard to explicitly guarantee that a proposal with log-position P1 will get encoded before another with log position P2, where P1 < P2. Naively sequencing CreateTimes at proposal-encode-time could result in over-admission. This is because of how we return flow tokens -- up to some log index[^3], and how use these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower sequence number/CreateTime, it would get admitted first, and when returning flow tokens by log position, in specifying up-to-P2, we'll early return P1's flow tokens despite it not being admitted. So we'd over-admit at the sender. This is all within a <tenant,priority> pair. [^1]: We use CreateTimes as "sequence numbers" in replication admission control. We want to assign each AC-queued work below-raft a "sequence number" for FIFO ordering within a <tenant,priority>. We ensure these timestamps are roughly monotonic with respect to log positions of replicated work by sequencing work in log position order. [^2]: In kvflowcontrolpb.RaftAdmissionMeta. [^3]: See kvflowcontrolpb.AdmittedRaftLogEntries. Release note: None
In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll set up the return of flow tokens from the receiver node back to the sender once log entries get (asynchronously) admitted[^1]. So we need to intercept the exact points at which the virtually enqueued work items get admitted, since it all happens asynchronously[^2]. To that end we introduce the following interface: // OnLogEntryAdmitted is used to observe the specific entries // (identified by rangeID + log position) that were admitted. Since // admission control for log entries is asynchronous/non-blocking, // this allows callers to do requisite post-admission // bookkeeping. type OnLogEntryAdmitted interface { AdmittedLogEntry( origin roachpb.NodeID, /* node where the entry originated */ pri admissionpb.WorkPriority, /* admission priority of the entry */ storeID roachpb.StoreID, /* store on which the entry was admitted */ rangeID roachpb.RangeID, /* identifying range for the log entry */ pos LogPosition, /* log position of the entry that was admitted*/ ) } For now we pass in a no-op implementation in production code, but this will change shortly. Seeing as how the asynchronous admit interface is going to be the primary once once we enable replication admission control by default, for IO control, we no longer need the storeWriteDone interfaces and corresponding types. It's being used by our current (and soon-to-be legacy) above-raft IO admission control to inform granters of when the write was actually done, post-admission. For above-raft IO control, at admit-time we do not have sizing info for the writes, so by intercepting these writes at write-done time we're able to make any outstanding token adjustments in the granter. To reflect this new world, we: - Rename setAdmittedDoneModels to setLinearModels. - Introduce a storeReplicatedWorkAdmittedInfo[^3]. It provides information about the size of replicated work once it's admitted (which happens asynchronously from the work itself). This lets us use the underlying linear models for L0 {writes,ingests} to deduct an appropriate number of tokens from the granter, for the admitted work size[^4]. - Rename the granterWithStoreWriteDone interface to granterWithStoreReplicatedWorkAdmitted. We'll still intercept the actual point of admission for some token adjustments, through the the storeReplicatedWorkAdmittedLocked API shown below. There are two callstacks through which this API gets invoked, one where the coord.mu is already held, and one where it isn't. We plumb this information through so the lock is acquired if not already held. The locking structure is unfortunate, but this was a minimally invasive diff. storeReplicatedWorkAdmittedLocked( originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) While here, we also export an admission.TestingReverseWorkPriorityDict. There are at least three tests that have re-invented the wheel. [^1]: This will happen through the kvflowcontrol.Dispatch interface introduced back in cockroachdb#97766, after integrating it with the RaftTransport layer. [^2]: Introduced in cockroachdb#97599, for replicated write work. [^3]: Identical to the previous StoreWorkDoneInfo. [^4]: There's a peculiarity here in that at enqueuing-time we actually know the size of the write, so we could have deducted the right number of tokens upfront and avoid this post-admit granter token adjustment. We inherit this structure from earlier, and just leave a TODO for now. Release note: None
68f332f
to
05c6ae3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a paragraph or two of code comments about this -- just repeat what you said in the loom.
Done, copied some text over from the commit message into actual code comments.
bors r+
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @sumeerbhola)
-- commits
line 98 at r3:
I didn't completely follow the above. I didn't think I was missing anything, just implemented differently, but I've confused myself before. Notes:
We have this code here, which updates the store stats at the point where the physical admission happens (even if logical admission is deferred). So it's done in a timely manner.
cockroach/pkg/util/admission/work_queue.go
Lines 1851 to 1855 in 7a0fb5b
// Update store admission stats, because the write is happening ~this | |
// point. These statistics are used to maintain the underlying linear | |
// models (modeling relation between physical log writes and total L0 | |
// growth, which includes the state machine application). | |
q.updateStoreStatsAfterWorkDone(1, storeWorkDoneInfo, false) |
The model(s) you're referring to, they're the linear models mapping:
- physical/accounted for {write,ingest} bytes => observed L0 growth (which factors in state machine application), and
- physical/accounted for ingest bytes => observed LSM growth.
Right? These models don't factor in logical admission at all, i.e. the number of IO tokens consumed, nor should they of course. So the models are accurate and timely given the stats updates above. The only other "constant model" we have, in ioLoadListener, is the upfront storePerWorkTokenEstimator.atAdmissionWorkTokens
. Which we don't need nor use it anymore after replication admission control since AC is informed of the write when it's being physically done, so we know its size at admit time. I assume you're not talking about this model.
cockroach/pkg/util/admission/work_queue.go
Lines 1783 to 1786 in 68f332f
// stats are used to maintain L0 {write,ingest} linear models, modeling | |
// the relation between accounted for "physical" {write,ingest} bytes | |
// and observed L0 growth (which factors in state machine application). | |
stats storeAdmissionStats |
cockroach/pkg/util/admission/store_token_estimation.go
Lines 172 to 201 in f04439c
intL0WriteBytes := int64(l0Metrics.BytesFlushed) - int64(e.cumL0WriteBytes) | |
intL0IngestedBytes := int64(l0Metrics.BytesIngested) - int64(e.cumL0IngestedBytes) | |
intL0IgnoredIngestedBytes := int64(admissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes) - | |
int64(e.cumStoreAdmissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes) | |
adjustedIntL0IngestedBytes := intL0IngestedBytes - intL0IgnoredIngestedBytes | |
if adjustedIntL0IngestedBytes < 0 { | |
adjustedIntL0IngestedBytes = 0 | |
} | |
intWorkCount := int64(admissionStats.workCount) - | |
int64(e.cumStoreAdmissionStats.workCount) | |
intL0WriteAccountedBytes := | |
int64(admissionStats.writeAccountedBytes) - int64(e.cumStoreAdmissionStats.writeAccountedBytes) | |
// Note that these are not L0 ingested bytes, since we don't know how | |
// many did go to L0. | |
intIngestedAccountedBytes := int64(admissionStats.ingestedAccountedBytes) - | |
int64(e.cumStoreAdmissionStats.ingestedAccountedBytes) | |
e.atDoneL0WriteTokensLinearModel.updateModelUsingIntervalStats( | |
intL0WriteAccountedBytes, intL0WriteBytes, intWorkCount) | |
e.atDoneL0IngestTokensLinearModel.updateModelUsingIntervalStats( | |
intIngestedAccountedBytes, adjustedIntL0IngestedBytes, intWorkCount) | |
// Ingest across all levels model. | |
intLSMIngestedBytes := int64(cumLSMIngestedBytes) - int64(e.cumLSMIngestedBytes) | |
intIgnoredIngestedBytes := | |
int64(admissionStats.statsToIgnore.Bytes) - int64(e.cumStoreAdmissionStats.statsToIgnore.Bytes) | |
adjustedIntLSMIngestedBytes := intLSMIngestedBytes - intIgnoredIngestedBytes | |
if adjustedIntLSMIngestedBytes < 0 { | |
adjustedIntLSMIngestedBytes = 0 | |
} | |
e.atDoneIngestTokensLinearModel.updateModelUsingIntervalStats( | |
intIngestedAccountedBytes, adjustedIntLSMIngestedBytes, intWorkCount) |
Which means the model accounting happens when the request is logically admitted which can arbitrarily lag when it was physically performed.
If logical admission is deferred, all we're actually deferring is when the right number of "logical/granter" tokens are deducted, like we want. But it doesn't affect the "physical" linear models which are used in the granter the logical tokens at the right rate to protect the LSM. So I'm not entirely seeing the model instability thing. Help? I'll bors this but I'll incorporate your explanation (could also talk in our next pod) in a comment/fix later. This (internal thread](https://cockroachlabs.slack.com/archives/C03V96V2S4C/p1671728960799009) is the last time I confused myself.
pkg/util/admission/admission.go
line 301 at r3 (raw file):
Previously, sumeerbhola wrote…
the "in order for them" part doesn't seem relevant to the granter.
Oops, it was an incomplete sentence. I meant to say "in order for them to make any outstanding token adjustments".
Build succeeded: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 16 files at r4, 3 of 16 files at r5.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale)
Previously, irfansharif (irfan sharif) wrote…
I didn't completely follow the above. I didn't think I was missing anything, just implemented differently, but I've confused myself before. Notes:
We have this code here, which updates the store stats at the point where the physical admission happens (even if logical admission is deferred). So it's done in a timely manner.
cockroach/pkg/util/admission/work_queue.go
Lines 1851 to 1855 in 7a0fb5b
// Update store admission stats, because the write is happening ~this // point. These statistics are used to maintain the underlying linear // models (modeling relation between physical log writes and total L0 // growth, which includes the state machine application). q.updateStoreStatsAfterWorkDone(1, storeWorkDoneInfo, false) The model(s) you're referring to, they're the linear models mapping:
- physical/accounted for {write,ingest} bytes => observed L0 growth (which factors in state machine application), and
- physical/accounted for ingest bytes => observed LSM growth.
Right? These models don't factor in logical admission at all, i.e. the number of IO tokens consumed, nor should they of course. So the models are accurate and timely given the stats updates above. The only other "constant model" we have, in ioLoadListener, is the upfront
storePerWorkTokenEstimator.atAdmissionWorkTokens
. Which we don't need nor use it anymore after replication admission control since AC is informed of the write when it's being physically done, so we know its size at admit time. I assume you're not talking about this model.cockroach/pkg/util/admission/work_queue.go
Lines 1783 to 1786 in 68f332f
// stats are used to maintain L0 {write,ingest} linear models, modeling // the relation between accounted for "physical" {write,ingest} bytes // and observed L0 growth (which factors in state machine application). stats storeAdmissionStats cockroach/pkg/util/admission/store_token_estimation.go
Lines 172 to 201 in f04439c
intL0WriteBytes := int64(l0Metrics.BytesFlushed) - int64(e.cumL0WriteBytes) intL0IngestedBytes := int64(l0Metrics.BytesIngested) - int64(e.cumL0IngestedBytes) intL0IgnoredIngestedBytes := int64(admissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes) - int64(e.cumStoreAdmissionStats.statsToIgnore.ApproxIngestedIntoL0Bytes) adjustedIntL0IngestedBytes := intL0IngestedBytes - intL0IgnoredIngestedBytes if adjustedIntL0IngestedBytes < 0 { adjustedIntL0IngestedBytes = 0 } intWorkCount := int64(admissionStats.workCount) - int64(e.cumStoreAdmissionStats.workCount) intL0WriteAccountedBytes := int64(admissionStats.writeAccountedBytes) - int64(e.cumStoreAdmissionStats.writeAccountedBytes) // Note that these are not L0 ingested bytes, since we don't know how // many did go to L0. intIngestedAccountedBytes := int64(admissionStats.ingestedAccountedBytes) - int64(e.cumStoreAdmissionStats.ingestedAccountedBytes) e.atDoneL0WriteTokensLinearModel.updateModelUsingIntervalStats( intL0WriteAccountedBytes, intL0WriteBytes, intWorkCount) e.atDoneL0IngestTokensLinearModel.updateModelUsingIntervalStats( intIngestedAccountedBytes, adjustedIntL0IngestedBytes, intWorkCount) // Ingest across all levels model. intLSMIngestedBytes := int64(cumLSMIngestedBytes) - int64(e.cumLSMIngestedBytes) intIgnoredIngestedBytes := int64(admissionStats.statsToIgnore.Bytes) - int64(e.cumStoreAdmissionStats.statsToIgnore.Bytes) adjustedIntLSMIngestedBytes := intLSMIngestedBytes - intIgnoredIngestedBytes if adjustedIntLSMIngestedBytes < 0 { adjustedIntLSMIngestedBytes = 0 } e.atDoneIngestTokensLinearModel.updateModelUsingIntervalStats( intIngestedAccountedBytes, adjustedIntLSMIngestedBytes, intWorkCount) Which means the model accounting happens when the request is logically admitted which can arbitrarily lag when it was physically performed.
If logical admission is deferred, all we're actually deferring is when the right number of "logical/granter" tokens are deducted, like we want. But it doesn't affect the "physical" linear models which are used in the granter the logical tokens at the right rate to protect the LSM. So I'm not entirely seeing the model instability thing. Help? I'll bors this but I'll incorporate your explanation (could also talk in our next pod) in a comment/fix later. This (internal thread](https://cockroachlabs.slack.com/archives/C03V96V2S4C/p1671728960799009) is the last time I confused myself.
You are right. I had looked at this before and realized this was equivalent, but had completely forgotten.
pkg/util/admission/store_token_estimation.go
line 118 at r5 (raw file):
// TODO(irfansharif): The linear model fitters below are actually not used // for upfront per-work token estimation. They're used in the granter to // figure out the rate of tokens to produce. This code organization is
They are not used to figure out the "rate of tokens to produce". The production is based on what we observing the LSM: flushes, compactions, ingests. They are used to figure out how many tokens to consume for a work item (once its claimed size is known).
These are already reviewed commits from #98308. Part of #95563.
admission: move CreateTime-sequencing below-raft
We move kvflowsequencer.Sequencer and its use in kvflowhandle.Handle (above-raft) to admission.sequencer, now used by admission.StoreWorkQueue (below-raft). This variant appeared in an earlier revision of #97599 where we first introduced monotonically increasing CreateTimes for a given raft group.
In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll observe that it's quite difficult to create sequencing CreateTimes1 above raft. This is because these sequence numbers are encoded as part of the raft proposal2, and at encode-time, we don't actually know what log position the proposal is going to end up in. It's hard to explicitly guarantee that a proposal with log-position P1 will get encoded before another with log position P2, where P1 < P2.
Naively sequencing CreateTimes at proposal-encode-time could result in over-admission. This is because of how we return flow tokens -- up to some log index3, and how use these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower sequence number/CreateTime, it would get admitted first, and when returning flow tokens by log position, in specifying up-to-P2, we'll early return P1's flow tokens despite it not being admitted. So we'd over-admit at the sender. This is all within a <tenant,priority> pair.
admission: add intercept points for when replicated work gets admitted
In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll set up the return of flow tokens from the receiver node back to the sender once log entries get (asynchronously) admitted4. So we need to intercept the exact points at which the virtually enqueued work items get admitted, since it all happens asynchronously5. To that end we introduce the following interface:
For now we pass in a no-op implementation in production code, but this will change shortly.
Seeing as how the asynchronous admit interface is going to be the primary once once we enable replication admission control by default, for IO control, we no longer need the storeWriteDone interfaces and corresponding types. It's being used by our current (and soon-to-be legacy) above-raft IO admission control to inform granters of when the write was actually done, post-admission. For above-raft IO control, at admit-time we do not have sizing info for the writes, so by intercepting these writes at write-done time we're able to make any outstanding token adjustments in the granter.
To reflect this new world, we:
While here, we also export an admission.TestingReverseWorkPriorityDict. There are at least three tests that have re-invented the wheel.
Footnotes
We use CreateTimes as "sequence numbers" in replication admission control. We want to assign each AC-queued work below-raft a "sequence number" for FIFO ordering within a <tenant,priority>. We ensure these timestamps are roughly monotonic with respect to log positions of replicated work by sequencing work in log position order. ↩
In kvflowcontrolpb.RaftAdmissionMeta. ↩
See kvflowcontrolpb.AdmittedRaftLogEntries. ↩
This will happen through the kvflowcontrol.Dispatch interface introduced back in kvflowcontrol: implement kvflowcontrol.Dispatch #97766, after integrating it with the RaftTransport layer. ↩
Introduced in admission: support non-blocking {Store,}WorkQueue.Admit() #97599, for replicated write work. ↩
Identical to the previous StoreWorkDoneInfo. ↩
There's a peculiarity here in that at enqueuing-time we actually know the size of the write, so we could have deducted the right number of tokens upfront and avoid this post-admit granter token adjustment. We inherit this structure from earlier, and just leave a TODO for now. ↩