Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvflowcontrol: introduce v2 flow control core interfaces #128195

Merged
merged 4 commits into from
Aug 5, 2024

Conversation

kvoli
Copy link
Collaborator

@kvoli kvoli commented Aug 2, 2024

Introduce the TokenCounter interface, underneath a new package, rac2.
The TokenCounter will be used by replication flow control v2 in a
similar, but distinct manner from the token buckets in v1. Notably, the
TokenCounter supports returning an encapsulated handle that may be
used in waiting for available tokens, TokenWaitingHandle.

Resolves: #128014
Release note: None


Introduce the StreamTokenCounterProvider interface, which will be used
by replication flow control v2 to access the TokenCounter for a given
tenant, store stream.

Note the original interface in the v2 prototype included two additional
methods for metrics gathering which are omitted here.

Resolves: #128010
Release note: None


Introduce the StreamTokenWatcher interface, which will be used to
watch and be notified when there are tokens available.

Resolves: #128011
Release note: None


Introduce the RangeController interface, which provides flow control
for replication traffic in KV, for a range.

Resolves: #128021
Release note: None

Copy link

blathers-crl bot commented Aug 2, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@kvoli kvoli changed the title kvflowcontrol: introduce token counter interface kvflowcontrol: introduce v2 flow control core interfaces Aug 2, 2024
@kvoli kvoli self-assigned this Aug 2, 2024
@kvoli kvoli force-pushed the 240802.rac-interfaces branch 3 times, most recently from a3c43dd to 62a5d2e Compare August 2, 2024 18:00
@kvoli kvoli marked this pull request as ready for review August 2, 2024 18:03
@kvoli kvoli requested a review from a team as a code owner August 2, 2024 18:03
@kvoli kvoli requested review from pav-kv and sumeerbhola August 2, 2024 18:03
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 3 files at r1, 1 of 1 files at r3, 2 of 2 files at r4, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @kvoli and @pav-kv)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 20 at r4 (raw file):

)

// RangeController provides flow control for replication traffic in KV, for a

should this add:
... range, at the leader.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 21 at r4 (raw file):

// RangeController provides flow control for replication traffic in KV, for a
// range.

Do we need to document any invariants about raftMu or replica mu being held before calling these?

I'm ok with postponing this to later. Once we have RangeControllerOptions, we will need to document what are the consistency invariants wrt what these methods inform it, and what it can grab from methods it can call, say on RaftInterface.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 51 at r4 (raw file):

	// removing it from watching the existing work class, if the work class is
	// different.
	UpdateHandle(StreamWatcherHandleID, admissionpb.WorkClass)

UpdateHandle goes away since the send-queue is always RaftLowPri.
We also don't need the WorkClass parameter in NotifyWhenAvailable, which reduces the surface area to test.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 26 at r4 (raw file):

type TokenCounter interface {
	// TokensAvailable returns true if tokens are available. If false, it returns
	// a handle that may be uses for waiting for tokens to become available.

nit: used


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):

	// unsatisfied with the return values, it can resume waiting using
	// WaitChannel.
	TryDeductAndUnblockNextWaiter(tokens kvflowcontrol.Tokens) (granted kvflowcontrol.Tokens, haveTokens bool)

I think we are always calling this with a parameter value of 0.

Which makes sense in that (a) the eval case does not want to deduct tokens, (b) the storeStreamSendTokensWatcher (send tokens) case we delegate the deduction to TokensAvailableNotification since the watcher does not want to be kept up-to-date on how many tokens a replicaSendStream needs. In (b) we also only have at most one waiter, so there isn't any harm done by the fact that we unblock the "next waiter" before the replicaSendStream has deducted tokens.

We also wouldn't need this separation of WaitChannel and TryDeductAndUnblockNextWaiter if case (a) did not demand a channel, to do a select.

Overall, I think this is still the right-ish interface, but how about we change this method to
ConfirmHaveTokensAndUnblockNextWaiter() bool.

Copy link
Collaborator

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 3 files at r1, 2 of 2 files at r2, 1 of 1 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @kvoli, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 29 at r4 (raw file):

	// HandleRaftEvent handles the provided raft event for the range. This should
	// be called while holding raftMu.
	HandleRaftEvent(ctx context.Context, e RaftEvent) error

nit: Consider adding RaftMuLocked to this and other methods. This is the typical pattern as makes auditing callers easier.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 36 at r4 (raw file):

	SetReplicas(ctx context.Context, replicas roachpb.ReplicaSet) error
	// SetLeaseholder sets the leaseholder of the range.
	SetLeaseholder(ctx context.Context, replica roachpb.ReplicaID)

nit: consider splitting the SetReplicas and SetLeaseholder methods out to a different interface. This will simplify testing.

Also I'm assuming the reason that the implementation needs these at all is to handle changes that occur. However I'm a little concerned about the async nature of these updates and whether the "cache" in the implementation will be stale at different points in time. At a minimum define when these are called. For example for the SetLeaseholder is it when the proposal for the lease is received, when it is appended to the raft log, when the state is updated or when the new leaseholder applies the log.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):

Previously, sumeerbhola wrote…

I think we are always calling this with a parameter value of 0.

Which makes sense in that (a) the eval case does not want to deduct tokens, (b) the storeStreamSendTokensWatcher (send tokens) case we delegate the deduction to TokensAvailableNotification since the watcher does not want to be kept up-to-date on how many tokens a replicaSendStream needs. In (b) we also only have at most one waiter, so there isn't any harm done by the fact that we unblock the "next waiter" before the replicaSendStream has deducted tokens.

We also wouldn't need this separation of WaitChannel and TryDeductAndUnblockNextWaiter if case (a) did not demand a channel, to do a select.

Overall, I think this is still the right-ish interface, but how about we change this method to
ConfirmHaveTokensAndUnblockNextWaiter() bool.

I agree this interface isn't ideal. It seems much nicer to have it look something like (WaitChannel(tokens kvflowcontrol.Tokens) <- chan kvflowcontrol.Tokens) Is there a difference between the TryDeduct here and the one in TokenCounter?

@kvoli kvoli force-pushed the 240802.rac-interfaces branch from 62a5d2e to 0149f92 Compare August 2, 2024 19:47
Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 20 at r4 (raw file):

Previously, sumeerbhola wrote…

should this add:
... range, at the leader.

Updated.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 21 at r4 (raw file):

Previously, sumeerbhola wrote…

Do we need to document any invariants about raftMu or replica mu being held before calling these?

I'm ok with postponing this to later. Once we have RangeControllerOptions, we will need to document what are the consistency invariants wrt what these methods inform it, and what it can grab from methods it can call, say on RaftInterface.

I was going to postpone until later, however given the rename of methods to include locked, I added it for each now.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 29 at r4 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

nit: Consider adding RaftMuLocked to this and other methods. This is the typical pattern as makes auditing callers easier.

Good idea, updated.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 36 at r4 (raw file):

Also I'm assuming the reason that the implementation needs these at all is to handle changes that occur

Naturally, the leaseholder and replica set can change.

However I'm a little concerned about the async nature of these updates and whether the "cache" in the implementation will be stale at different points in time.

These aren't async, they are synchronous w.r.t the caller. The leaseholder is fine to be eventually consistent, the replica set won't be, see below.

At a minimum define when these are called. For example for the SetLeaseholder is it when the proposal for the lease is received, when it is appended to the raft log, when the state is updated or when the new leaseholder applies the log.

I'd prefer to leave that to the caller implementation to specify, given these are interfaces. The replica set change follows a well established pattern so I won't comment on that. For the leaseholder, its harmless for it to be eventually consistent and is handled under raft ready, prior to calling the controller with entries.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 51 at r4 (raw file):

Previously, sumeerbhola wrote…

UpdateHandle goes away since the send-queue is always RaftLowPri.
We also don't need the WorkClass parameter in NotifyWhenAvailable, which reduces the surface area to test.

Ack, I was thinking this was the case. Updated.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 26 at r4 (raw file):

Previously, sumeerbhola wrote…

nit: used

Updated.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):
We are always calling with a parameter of 0 in the prototype. Updated to ConfirmHaveTokensAndUnblockNextWaiter() bool.

I agree this interface isn't ideal. It seems much nicer to have it look something like (WaitChannel(tokens kvflowcontrol.Tokens) <- chan kvflowcontrol.Tokens) Is there a difference between the TryDeduct here and the one in TokenCounter?

They are different, this method is signaling the next caller waiting on tokens, trying to deduct tokens and returning whether there were any tokens. TryDeduct is simply deducting tokens. Since we removed the deduction part, this method is for checking positive tokens and signaling.

@kvoli kvoli requested a review from sumeerbhola August 2, 2024 19:47
Copy link
Collaborator

@andrewbaptist andrewbaptist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @kvoli, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 36 at r4 (raw file):

Previously, kvoli (Austen) wrote…

Also I'm assuming the reason that the implementation needs these at all is to handle changes that occur

Naturally, the leaseholder and replica set can change.

However I'm a little concerned about the async nature of these updates and whether the "cache" in the implementation will be stale at different points in time.

These aren't async, they are synchronous w.r.t the caller. The leaseholder is fine to be eventually consistent, the replica set won't be, see below.

At a minimum define when these are called. For example for the SetLeaseholder is it when the proposal for the lease is received, when it is appended to the raft log, when the state is updated or when the new leaseholder applies the log.

I'd prefer to leave that to the caller implementation to specify, given these are interfaces. The replica set change follows a well established pattern so I won't comment on that. For the leaseholder, its harmless for it to be eventually consistent and is handled under raft ready, prior to calling the controller with entries.

That's reasonable - thanks for the clarification!


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 28 at r5 (raw file):

	//
	// No mutexes should be held.
	WaitForEval(ctx context.Context, pri admissionpb.WorkPriority) error

nit: The comment say "until there are enough tokens", but there is no token count passed to this. Is it waiting until the token count is positive?


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 41 at r5 (raw file):

	//
	// Requires replica.raftMu and replica.mu to be held.
	SetReplicasLocked(ctx context.Context, replicas roachpb.ReplicaSet) error

nit: Update the name of this method as well.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):

Previously, kvoli (Austen) wrote…

We are always calling with a parameter of 0 in the prototype. Updated to ConfirmHaveTokensAndUnblockNextWaiter() bool.

I agree this interface isn't ideal. It seems much nicer to have it look something like (WaitChannel(tokens kvflowcontrol.Tokens) <- chan kvflowcontrol.Tokens) Is there a difference between the TryDeduct here and the one in TokenCounter?

They are different, this method is signaling the next caller waiting on tokens, trying to deduct tokens and returning whether there were any tokens. TryDeduct is simply deducting tokens. Since we removed the deduction part, this method is for checking positive tokens and signaling.

Can you clarify in the comment what the "possibly available" means. Are there spurious wake-ups or is it because you are racing with other waiters who might grab the tokens before you call Confirm.

Also the interface is still a little clunky. I'm probably missing some context, however it seems like you could have it be:

WaitChannel() <-chan bool with the bool signifying whether there were tokens available.

My understanding of how its used is something like this:

for {
	select {
	case <-handle.WaitChannel():
		if handle.ConfirmHaveTokensAndUnblockNextWaiter() {
			// Do something
		}
	}
}

Wouldn't this be easier if it returned a bool and you could just write

for {
	select {
	case ready := <-handle.WaitChannel():
		if ready {
			// Do something
		}
	}
}

@kvoli kvoli force-pushed the 240802.rac-interfaces branch from 0149f92 to 0de9b39 Compare August 2, 2024 20:52
Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 28 at r5 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

nit: The comment say "until there are enough tokens", but there is no token count passed to this. Is it waiting until the token count is positive?

It is waiting for positive, nice catch. I've updated the comment to make this clear.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 41 at r5 (raw file):

Previously, andrewbaptist (Andrew Baptist) wrote…

nit: Update the name of this method as well.

This requires both raft mu and replica mu, so I went with Locked, as opposed to RaftMuLockedReplicaLocked or similar.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):

Can you clarify in the comment what the "possibly available" means. Are there spurious wake-ups or is it because you are racing with other waiters who might grab the tokens before you call Confirm.

We hold the mutex when signaling the the channel but that mutex needs to be dropped before the signaler returns. Another request can race to deduct tokens in-between signaling and the signaled goroutine being awoken, as the channel is buffered and non-blocking.

See this comment in the prototype for more details, along with usages of the signalCh.

Wouldn't this be easier if it returned a bool and you could just write

The channel is only ever being signaled when there are positive tokens, the second example is a subset of the first.

Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. Only some remaining questions about locking.

Reviewed 2 of 3 files at r5, 1 of 1 files at r6, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @pav-kv)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 26 at r6 (raw file):

	// This blocks until there are positive tokens available for the request to
	// be admitted for evaluation. Note the number of tokens required by the
	// request is not considered, only the priority of the request.

consider adding a parenthetical remark:
(since it is it not known until after eval)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 33 at r6 (raw file):

	//
	// Requires replica.raftMu to be held.
	HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error

There is a pathway from handleReadyState to newReplicaSendStream to NextUnstableIndexRLocked. So is this also holding replica mu, or is there a bug in the prototype?
Remembering the discussion from https://cockroachlabs.slack.com/archives/C06UFBJ743F/p1721137761404479, I think we needed to ensure replica mu was held for methods called on Raft that mutated raft state, since metrics etc. could call read-only methods while holding replica mu in read mode. So it would make sense that calling RawNode.NextUnstableIndex without holding replica mu is harmless (since it does not mutate state). Which makes me wonder why we even have raftInterfaceImpl.NextUnstableIndex which acquires replica mu. Am I missing something?

Similar question about StableIndexRLocked and GetAdmittedRLocked.

Are SetAdmittedLocked and MakeMsgApp the only Raft state mutators we use in RACv2? The former is not even needed by the core code, since only the integration calls it. Then we only need to be careful in the code code in ensuring that we are only calling MakeMsgApp along paths that hold raftMu but do not hold replica mu. Does that sound correct-ish?

@kvoli kvoli force-pushed the 240802.rac-interfaces branch from 0de9b39 to 44b3f1a Compare August 5, 2024 13:53
Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 26 at r6 (raw file):

Previously, sumeerbhola wrote…

consider adding a parenthetical remark:
(since it is it not known until after eval)

Updated.


pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go line 33 at r6 (raw file):

Which makes me wonder why we even have raftInterfaceImpl.NextUnstableIndex which acquires replica mu. Am I missing something? Similar question about StableIndexRLocked and GetAdmittedRLocked.

FollowerState() at least requires a read lock on the replica, as there are some mutation methods, such as appending entries when flushing the proposal buffer, which don't acquire raftMu but do acquire an exclusive lock on replica.mu.

Are SetAdmittedLocked and MakeMsgApp the only Raft state mutators we use in RACv2?

Yes, these are the only methods which mutate raft state.

@kvoli kvoli requested a review from sumeerbhola August 5, 2024 14:33
Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 21 at r7 (raw file):

// StreamTokenCounterProvider is the interface for retrieving token counters
// for a given stream.
type StreamTokenCounterProvider interface {

Code structure question: why is this hierarchy of interfaces needed, instead of a collection of concrete types? Is this code going to be extensively mocked?

This has a Java interface-heavy feel, and I'm just wondering how much this is needed. It's not obvious from structuring the work like this - interfaces first. Maybe having implementations first, and adding interfaces when needed (e.g. the moment you need to mock them) would make this need more obvious.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 24 at r7 (raw file):

	// EvalTokenCounterForStream returns the evaluation token counter for the
	// given stream.
	EvalTokenCounterForStream(kvflowcontrol.Stream) TokenCounter

Go naming minimalizm nit: the TokenCounter return type eliminates the need to have "TokenCounter" in the method name. "ForStream" kinda as well, given the parameter is a Stream as well as the interface name has it, and there are no other methods taking something other than a stream. So the pair of methods could be just Eval(Stream) TokenCounter, Send(Stream) TokenCounter.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 35 at r7 (raw file):

// StreamTokenWatcher is the interface for watching and waiting on available
// elastic tokens. The watcher registers a notification, which will be called

Why only elastic? Is this to do with the fact that send-queue always waits/uses elastic tokens? Could be worth noting it in the comments.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 54 at r7 (raw file):

// TokenGrantNotification is an interface for notifying when tokens are
// available.
type TokenGrantNotification interface {

How about a func(context.Context) callback in the interface above, to avoid the interface boilerplate?


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):

Previously, kvoli (Austen) wrote…

Can you clarify in the comment what the "possibly available" means. Are there spurious wake-ups or is it because you are racing with other waiters who might grab the tokens before you call Confirm.

We hold the mutex when signaling the the channel but that mutex needs to be dropped before the signaler returns. Another request can race to deduct tokens in-between signaling and the signaled goroutine being awoken, as the channel is buffered and non-blocking.

See this comment in the prototype for more details, along with usages of the signalCh.

Wouldn't this be easier if it returned a bool and you could just write

The channel is only ever being signaled when there are positive tokens, the second example is a subset of the first.

Are we basically implementing a cond-var loop here? And the reason we're using a channel is make the waits cancelable?
Or there is more to this particular structuring?


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 24 at r7 (raw file):

// tokens to become available, and to check if tokens are available without
// blocking.
type TokenCounter interface {

Why do we need this interface? Could we use its implementation directly, or we need a second implementation (e.g. for mocking in tests)? Could we introduce the interface when it's needed for the second implementation if there is one?


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 27 at r7 (raw file):

	// TokensAvailable returns true if tokens are available. If false, it returns
	// a handle that may be used for waiting for tokens to become available.
	TokensAvailable(admissionpb.WorkClass) (available bool, tokenWaitingHandle TokenWaitingHandle)

Any reason to have WorkClass part of the methods, instead of having a couple of TokenCounters (one per priority)? Does the implementation have any cross-work-class logic (such as deduct lower-pri tokens while also deducting high-pri tokens), or is simply a union of per-class logic and the cross-class logic sits above the interface?


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 27 at r7 (raw file):

	// TokensAvailable returns true if tokens are available. If false, it returns
	// a handle that may be used for waiting for tokens to become available.
	TokensAvailable(admissionpb.WorkClass) (available bool, tokenWaitingHandle TokenWaitingHandle)

Any reason not to have 2 separate methods: TokensAvailable() bool (or TokensAvailable() kvflowcontrol.Tokens), and GetHandle()? Optimization purposes?

@sumeerbhola sumeerbhola requested a review from pav-kv August 5, 2024 17:12
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @pav-kv)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 21 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Code structure question: why is this hierarchy of interfaces needed, instead of a collection of concrete types? Is this code going to be extensively mocked?

This has a Java interface-heavy feel, and I'm just wondering how much this is needed. It's not obvious from structuring the work like this - interfaces first. Maybe having implementations first, and adding interfaces when needed (e.g. the moment you need to mock them) would make this need more obvious.

Would you be satisfied with a TODO to remove interfaces if/when they are determined not necessary for unit testing.

I would like to avoid getting bogged down in a debate at this stage -- some of the interface-heaviness in the prototype was to allow concurrency of implementation, to have a negotiated contract, and we do want to retain some of that concurrency in this phase of the project.

@kvoli kvoli force-pushed the 240802.rac-interfaces branch from 44b3f1a to 81c7116 Compare August 5, 2024 17:20
Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 21 at r7 (raw file):
This is pretty much verbatim copied from the prototype. I agree this could just be a concrete type, these didn't get heavily mocked in the earlier testing we had.

Maybe having implementations first, and adding interfaces when needed (e.g. the moment you need to mock them) would make this need more obvious.

That said, there is some benefit to defining ifaces for certain components first, to enable work parallelism.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 24 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Go naming minimalizm nit: the TokenCounter return type eliminates the need to have "TokenCounter" in the method name. "ForStream" kinda as well, given the parameter is a Stream as well as the interface name has it, and there are no other methods taking something other than a stream. So the pair of methods could be just Eval(Stream) TokenCounter, Send(Stream) TokenCounter.

Good idea, updated to just eval/send.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 35 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Why only elastic? Is this to do with the fact that send-queue always waits/uses elastic tokens? Could be worth noting it in the comments.

Yes, however the structure is slightly more general to allow for any stream (send/eval), just deducting elastic specifically. I've added a comment.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 54 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

How about a func(context.Context) callback in the interface above, to avoid the interface boilerplate?

Sure, either is fine, the boilerplate isn't that egregious. I've updated to a func.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 58 at r4 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Are we basically implementing a cond-var loop here? And the reason we're using a channel is make the waits cancelable?
Or there is more to this particular structuring?

Fairly similar to a cond-var loop. The channel is returned because we may wish to wait on multiple such channels (for eval), instead of blocking return here. See the link above for details on usage in the prototype.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 24 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Why do we need this interface? Could we use its implementation directly, or we need a second implementation (e.g. for mocking in tests)? Could we introduce the interface when it's needed for the second implementation if there is one?

This one is for testing, we could introduce it when testing, later but materially we use the iface below here, so it makes sense to include both at once.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 27 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Any reason to have WorkClass part of the methods, instead of having a couple of TokenCounters (one per priority)? Does the implementation have any cross-work-class logic (such as deduct lower-pri tokens while also deducting high-pri tokens), or is simply a union of per-class logic and the cross-class logic sits above the interface?

They do have cross-work logic. Regular tokens deduct from both elastic and regular.


pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go line 27 at r7 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Any reason not to have 2 separate methods: TokensAvailable() bool (or TokensAvailable() kvflowcontrol.Tokens), and GetHandle()? Optimization purposes?

TryDeduct is used for that purpose mostly. We could update this method to just return a handle or nil but the current approach is fairly clear.

@pav-kv pav-kv requested a review from sumeerbhola August 5, 2024 18:01
Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Previously, kvoli (Austen) wrote…

StreamTokenWatcher is one per node. We could nest the provider inside if we wish, in order to pull out the elastic token counter for the relevant stream.

Right, so now the StreamTokenWatcher sort of multiplexes across all per-stream TokenCounters, it's 1-to-many. I wonder though if there is benefit in making the watcher per-stream, so that it's 1-to-1.

In the prototype, most of the logic is implemented in the tokenWatcher sub-type, and storeStreamSendTokensWatcher is basically just a container for many of them. So my question is: should this interface mirror the tokenWatcher instead of storeStreamSendTokensWatcher? And then the Stream would have a StreamTokenWatcher returned by some stream.GetSendWatcher(). This would make the map searches on every watch call unnecessary.

Are there some benefits of making the StreamTokenWatcher a singleton?

Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Right, so now the StreamTokenWatcher sort of multiplexes across all per-stream TokenCounters, it's 1-to-many. I wonder though if there is benefit in making the watcher per-stream, so that it's 1-to-1.

In the prototype, most of the logic is implemented in the tokenWatcher sub-type, and storeStreamSendTokensWatcher is basically just a container for many of them. So my question is: should this interface mirror the tokenWatcher instead of storeStreamSendTokensWatcher? And then the Stream would have a StreamTokenWatcher returned by some stream.GetSendWatcher(). This would make the map searches on every watch call unnecessary.

Are there some benefits of making the StreamTokenWatcher a singleton?

I guess the benefit is the watchTokens goroutine? Having one per stream would be an overkill?

@kvoli kvoli force-pushed the 240802.rac-interfaces branch from ec73a10 to 658b444 Compare August 5, 2024 20:16
Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

So my question is: should this interface mirror the tokenWatcher instead of storeStreamSendTokensWatcher? And then the Stream would have a StreamTokenWatcher returned by some stream.GetSendWatcher(). This would make the map searches on every watch call unnecessary.

It could, that'd be a reasonable change to make here, we would include a SendTokenWatcher provider in the range controller init options and use it when initializing a replica send stream. I've updated the code to reflect this.

I guess the benefit is the watchTokens goroutine? Having one per stream would be an overkill?

The prototype used 1 goroutine per-stream (and work class, but now its all elastic). Having one per-stream is still probably the direction we should go, assuming these are shared across all ranges on the node.

@kvoli kvoli requested a review from pav-kv August 5, 2024 20:16
Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Previously, kvoli (Austen) wrote…

So my question is: should this interface mirror the tokenWatcher instead of storeStreamSendTokensWatcher? And then the Stream would have a StreamTokenWatcher returned by some stream.GetSendWatcher(). This would make the map searches on every watch call unnecessary.

It could, that'd be a reasonable change to make here, we would include a SendTokenWatcher provider in the range controller init options and use it when initializing a replica send stream. I've updated the code to reflect this.

I guess the benefit is the watchTokens goroutine? Having one per stream would be an overkill?

The prototype used 1 goroutine per-stream (and work class, but now its all elastic). Having one per-stream is still probably the direction we should go, assuming these are shared across all ranges on the node.

Although, the watchTokens goroutine is per-stream in the prototype - is this the intention to keep it like this? Dunno if it could become problematic in large clusters (hundreds of runnable gs potentially?). But if it's intended and is fine, then there seems to be no much benefit in making the watcher a singleton? It logically sounds like a per-stream concept.

@sumeerbhola sumeerbhola requested a review from pav-kv August 5, 2024 20:26
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist and @pav-kv)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 42 at r8 (raw file):

Previously, kvoli (Austen) wrote…

See the other comment, I've updated to pass in a stream instead of the counter. The elastic distinction for send queueing is somewhat clearer now, so I don't think its necessary to also include it in the method naming, if the iface references Send.

I prefer to change this back to accepting a TokenCounter. The caller already has a TokenCounter and should not have to pass a Stream that the callee then needs to use to lookup the TokenCounter via StreamTokenCounterProvider. RACv1 had a bunch of such indirection, and I'd like to avoid such unnecessary lookups and locking.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Although, the watchTokens goroutine is per-stream in the prototype - is this the intention to keep it like this? Dunno if it could become problematic in large clusters (hundreds of runnable gs potentially?). But if it's intended and is fine, then there seems to be no much benefit in making the watcher a singleton? It logically sounds like a per-stream concept.

We need a container for the per-stream watcher, even if we expose the per-stream watcher as its own type. If we ever get concerned about the number of streams, we need to be able to garbage collect these per-stream watchers, which means the container also needs to know when to remove them from its map. Having all methods flow via the container makes that viable.

@pav-kv pav-kv requested a review from sumeerbhola August 5, 2024 20:32
Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 39 at r11 (raw file):

//
// TODO(kvoli): Consider de-interfacing if not necessary for testing.
type SendTokenWatcherProvider interface {

Does this need to exist outside the lifetime of the send stream? Same for StreamTokenCounterProvider.

I.e. do we just need a "send stream" interface that, once created, provides both the token counters and the token watcher (and maybe some other supporting things)? Or these objects somehow need to outlive the stream in some cases?

Probably ok to have these stubs in place now, and it'll become clearer when some meaty implementations are added. This is more for my understanding of the semantics.

Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Previously, sumeerbhola wrote…

We need a container for the per-stream watcher, even if we expose the per-stream watcher as its own type. If we ever get concerned about the number of streams, we need to be able to garbage collect these per-stream watchers, which means the container also needs to know when to remove them from its map. Having all methods flow via the container makes that viable.

Yeah, SGTM to decouple the per-stream interface vs the physical container / goroutines. In the extreme, all things could be run by a singleton object / pool, yet the watcher "interface" per stream can use it at the "backend".

@sumeerbhola sumeerbhola requested a review from pav-kv August 5, 2024 20:36
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r7.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @pav-kv)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 64 at r11 (raw file):
assumed this changed due to

How about a func(context.Context) callback in the interface above, to avoid the interface boilerplate?

I believe this will always allocate when NotifyWhenAvailable is called, due to the need to capture the state for the closure (a word for the replicaSendStream pointer, and another for its function pointer). In comparison, with an interface, the two words needed for the interface are already included.

@pav-kv pav-kv requested a review from sumeerbhola August 5, 2024 20:40
Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 42 at r8 (raw file):

Previously, sumeerbhola wrote…

I prefer to change this back to accepting a TokenCounter. The caller already has a TokenCounter and should not have to pass a Stream that the callee then needs to use to lookup the TokenCounter via StreamTokenCounterProvider. RACv1 had a bunch of such indirection, and I'd like to avoid such unnecessary lookups and locking.

Isn't the Stream supposed to contain a TokenCounter inited pretty much at creation, which can be returned without locking and lookups? Or there is some decoupling/indirection there?

@sumeerbhola sumeerbhola requested a review from pav-kv August 5, 2024 20:41
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @pav-kv)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Yeah, SGTM to decouple the per-stream interface vs the physical container / goroutines. In the extreme, all things could be run by a singleton object / pool, yet the watcher "interface" per stream can use it at the "backend".

I would prefer to go back to the singleton with a NotifyWhenAvailable(TokenCounter ...) interface. It ensures everything flows through the "container" whether the container is dumb-ish, like now, or becomes more sophisticated. This enables more sophisticated garbage collection of state. It also removes unnecessary abstractions we are having to introduce like SendTokenWatcherProvider.

Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 42 at r8 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Isn't the Stream supposed to contain a TokenCounter inited pretty much at creation, which can be returned without locking and lookups? Or there is some decoupling/indirection there?

Which Stream? The replicaSendStream does get init'd with the counter on creation.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 43 at r8 (raw file):

Although, the watchTokens goroutine is per-stream in the prototype - is this the intention to keep it like this?

Intention is keeping it per-stream. There weren't any issues with this approach in the prototype, when running a large number of stores, so lets keep it unless we run into problems, which we can tackle then.

We need a container for the per-stream watcher, even if we expose the per-stream watcher as its own type. If we ever get concerned about the number of streams, we need to be able to garbage collect these per-stream watchers, which means the container also needs to know when to remove them from its map. Having all methods flow via the container makes that viable.

The per-stream watchers wouldn't be GC'd from a map until the newer map type is added in golang anyway. I'm going to revert back to the older interface and when we implement the watcher, we can discuss further how these may be refined.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 39 at r11 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

Does this need to exist outside the lifetime of the send stream? Same for StreamTokenCounterProvider.

I.e. do we just need a "send stream" interface that, once created, provides both the token counters and the token watcher (and maybe some other supporting things)? Or these objects somehow need to outlive the stream in some cases?

Probably ok to have these stubs in place now, and it'll become clearer when some meaty implementations are added. This is more for my understanding of the semantics.

They may need to outlive the lifetime, returning tokens after close etc.


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 64 at r11 (raw file):

Previously, sumeerbhola wrote…

assumed this changed due to

How about a func(context.Context) callback in the interface above, to avoid the interface boilerplate?

I believe this will always allocate when NotifyWhenAvailable is called, due to the need to capture the state for the closure (a word for the replicaSendStream pointer, and another for its function pointer). In comparison, with an interface, the two words needed for the interface are already included.

I'll revert.

kvoli added 2 commits August 5, 2024 16:48
Introduce the `SendTokenWatcher` interface, which will be used to watch
and be notified when there are elastic send tokens available for a
stream. It is intended to be used when a send queue develops and the
caller wishes to be notified of available elastic send tokens, in order
to dequeue and send entries.

Resolves: cockroachdb#128011
Release note: None
Introduce the `RangeController` interface, which provides flow control
for replication traffic in KV, for a range.

Resolves: cockroachdb#128021
Release note: None
@kvoli kvoli force-pushed the 240802.rac-interfaces branch from 658b444 to b0f296c Compare August 5, 2024 20:49
@pav-kv pav-kv requested a review from sumeerbhola August 5, 2024 20:52
Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 64 at r11 (raw file):

Previously, kvoli (Austen) wrote…

I'll revert.

TIL

Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @kvoli, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 42 at r8 (raw file):

Previously, kvoli (Austen) wrote…

Which Stream? The replicaSendStream does get init'd with the counter on creation.

I mean replicaSendStream, yeah. It references the parent *replicaState which contains the token counters. Aren't they always initialized throughout the send stream lifetime?

Copy link
Collaborator Author

@kvoli kvoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrewbaptist, @pav-kv, and @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go line 42 at r8 (raw file):

Previously, pav-kv (Pavel Kalinnikov) wrote…

I mean replicaSendStream, yeah. It references the parent *replicaState which contains the token counters. Aren't they always initialized throughout the send stream lifetime?

Yeah, they should always be initialized.

@sumeerbhola sumeerbhola requested a review from pav-kv August 5, 2024 21:15
Copy link
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 2 files at r9, 1 of 1 files at r12, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @andrewbaptist and @pav-kv)

@kvoli
Copy link
Collaborator Author

kvoli commented Aug 5, 2024

TYFTR

bors r=sumeerbhola

@craig craig bot merged commit be9b35d into cockroachdb:master Aug 5, 2024
21 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants