Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: Rework error propagation in mux rangefeed #100649

Merged
merged 1 commit into from
Apr 13, 2023

Conversation

miretskiy
Copy link
Contributor

Prior to this change, there were cases where a future used to wait for a single range feed completion, may be completed multiple times, or a message about range feed termination may be sent multiple times on a single mux rangefeed stream.

One of those cases was a check for ensureClosedTimestampStarted. If this method returned an error, we would immediately send the error on the rpc stream, and then complete the future with nil error.

Another instance was when registry would DisconnectWithErr -- in that case, we would first complete future in this method, and then, complete it again later.

It appears that completing future multiple times should be okay; however, it is still a bit worrysome. The deadlocks observed were all in the local RPC bypas (rpc/context.go), and it's not a stretch to imagine that as soon as the first error (e.g. from ensureClosedTimestampStarted) is returned, the goroutine reading these messages terminates, and causes the subsequent attempt to send the error deadlock.

Another hypothetical issue is how the mux rangefeed sent the error when the future completed. Prior to this change, this happened inline (via WhenReady closure). This is dangerous since this closure may run when important locks (such as raft mu) are being held. What could happen is that mux rangefeed encounters a retryable error. The future is prepared with error value, which causes an error to be sent to the client. This happens with some lock being held. The client, notices this error, and attempts to restart rangefeed -- to the same server, and that could block; At least in theory. Regardless, it seems that performing IO while the locks could be potentially held, is not a good idea. This PR fixes this problem by shunting logical rangefeed completion notification to a dedicated go routine.

Informs #99560
Informs #99640
Informs #99214
Informs #98925
Informs #99092
Informs #99212
Informs #99910
Informs #99560

Release note: None

@miretskiy miretskiy requested review from erikgrinaker, pav-kv and a team April 4, 2023 21:01
@miretskiy miretskiy requested a review from a team as a code owner April 4, 2023 21:01
@blathers-crl
Copy link

blathers-crl bot commented Apr 4, 2023

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@miretskiy miretskiy force-pushed the muxrf branch 2 times, most recently from 10c7cce to 617aa0a Compare April 4, 2023 23:10
Comment on lines 1567 to 1624
if err == nil {
// RangeFeed shouldn't finish without an error. However, if it does,
// treat it as if this is an EOF error -- that is, treat it as a normal
// stream termination so that the caller restarts.
err = io.EOF
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It certainly can, although maybe not that often in practice -- see e.g.:

func (p *Processor) Stop() {
p.StopWithErr(nil)
}

A nil error needs to be treated like a server shutdown. We should document this in the API contract.

I feel like we should have a separate mux event type for stream closing, but I suppose it's too late in the game for an API change like that now.

io.EOF seems ok, but we need to make sure it's properly encoded/decoded when crossing RPC boundaries (if it isn't already then we need to add an error encoder/decoder), and we should write a test to ensure that it is.

Comment on lines 1576 to 1654
// NB: even though calling sink.Send() to send notification might seem
// correct, it is also unsafe. This future may be completed at any point,
// including during critical section when some important lock (such as
// raftMu in processor) may be held. Issuing potentially blocking IO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about this. Futures make it very hard to reason about which goroutine we run on, and which locks are held while doing so. It would probably be better if all the work was done on the receiver's goroutine, but I suppose the whole point was to avoid that. It's worth reconsidering this design.

muxDoneCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had a linter for this. Server-side goroutines need to be tracked by e.g. RunAsyncTask, so they're registered with the stopper. We also need to handle stopper quiescence in the main loop here.

// When a single rangefeed completes, a notification is sent
// to this channel, which then forwards it to the muxStream.
rangefeedDoneCh := make(chan *kvpb.MuxRangeFeedEvent, 16)
muxDoneCh := make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we combine this with streamCtx so that we only have a single cancellation mechanism? I.e. streamCtx, cancel := context.WithCancel(stream.Context()).


// When a single rangefeed completes, a notification is sent
// to this channel, which then forwards it to the muxStream.
rangefeedDoneCh := make(chan *kvpb.MuxRangeFeedEvent, 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A buffered channel feels insufficient here. We're pulling from this in a single goroutine, and we could easily end up with 16 ranges terminating while the goroutine is blocked on IO, which puts us right back in this situation again.

@@ -474,6 +474,11 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
active.setLastError(reason)
active.release()

if reason == io.EOF {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this into handleRangefeedError?

Copy link
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by nits

Comment on lines 435 to 438
if err := stream.Send(&event); err != nil {
return err
}
return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := stream.Send(&event); err != nil {
return err
}
return nil
return stream.Send(&event)

reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) {
r.done.Set(err)
return true, pErr
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return true, pErr
return true /* disconnect */, pErr

@miretskiy miretskiy requested review from erikgrinaker and pav-kv April 7, 2023 14:49
Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker and @pavelkalinnikov)


pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go line 477 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Should we move this into handleRangefeedError?

I was hoping you'd ask that :)
Done.


pkg/kv/kvserver/rangefeed/registry.go line 507 at r1 (raw file):

Previously, pavelkalinnikov (Pavel Kalinnikov) wrote…
		return true /* disconnect */, pErr

Done.


pkg/server/node.go line 1525 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

A buffered channel feels insufficient here. We're pulling from this in a single goroutine, and we could easily end up with 16 ranges terminating while the goroutine is blocked on IO, which puts us right back in this situation again.

Yeah... Well, solution is quite a bit more complex. I started writing some helper libraries for making this kind of
solution generic (basically, a producer consumer queue fronting 1 or more workers)... but for now, it's all inline; albeit more complex.
Let me know what you think.


pkg/server/node.go line 1526 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

nit: Should we combine this with streamCtx so that we only have a single cancellation mechanism? I.e. streamCtx, cancel := context.WithCancel(stream.Context()).

Sure; muxDone channel removed. The problem (perhaps with the future library) is that stream may terminate before all futures complete (that's what mux done was used for; but ctx.Done works equally well)


pkg/server/node.go line 1529 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I thought we had a linter for this. Server-side goroutines need to be tracked by e.g. RunAsyncTask, so they're registered with the stopper. We also need to handle stopper quiescence in the main loop here.

Okay. But clearly, no linter for this.


pkg/server/node.go line 1572 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

It certainly can, although maybe not that often in practice -- see e.g.:

func (p *Processor) Stop() {
p.StopWithErr(nil)
}

A nil error needs to be treated like a server shutdown. We should document this in the API contract.

I feel like we should have a separate mux event type for stream closing, but I suppose it's too late in the game for an API change like that now.

io.EOF seems ok, but we need to make sure it's properly encoded/decoded when crossing RPC boundaries (if it isn't already then we need to add an error encoder/decoder), and we should write a test to ensure that it is.

Yeah; it is too late to change that. io.EOF is exactly how regular stream termination is treated, so I feel it's as close to that as I could get. Comment rephrased a bit.

I think, at least on the surface, it should be safe-ish to add a new RangeFeedRetryError_Reason.
Since functionality is disabled by default, we are roughly okay. What's probably not okay is that
mx-version tests might become broken if mux rangefeed metamorphic constant is set.

I decided to use REPLICA_REMOVED error reason instead of io.EOF. Its handling on the client
side is identical. I left a TODO to add a new retry reason.

Thoughts?


pkg/server/node.go line 1579 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I was worried about this. Futures make it very hard to reason about which goroutine we run on, and which locks are held while doing so. It would probably be better if all the work was done on the receiver's goroutine, but I suppose the whole point was to avoid that. It's worth reconsidering this design.

Yeah... I had quite a bit more knobs in future library to control that... but that was all removed during reviews. Now, it's all on the caller to ensure.


pkg/kv/kvserver/client_replica_circuit_breaker_test.go line 438 at r1 (raw file):

Previously, pavelkalinnikov (Pavel Kalinnikov) wrote…
	return stream.Send(&event)

Doh.

@miretskiy
Copy link
Contributor Author

@pavelkalinnikov @erikgrinaker @aliher1911
I just published #100924
This PR implements some of the primitives I used in newMuxRangeFeedCompletionWatcher (namely, queue, plus a bounded executor). Let me know if you think I should include that PR here instead of inlining the code.
It does have unit tests, which is a plus.
Also, and somewhat shockingly, NewWorkQueue executor is faster when spinning up many go routines.
Anyways, take a look -- let me know what you think.

@pavelkalinnikov @aliher1911 -- the above PR is a bit of a rework of the code used in the "scheduler" PR I sent before (removal of go routines in RF.

@miretskiy
Copy link
Contributor Author

#101024 contains this PR built in top of queue/executor...

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 5 files at r1, 4 of 5 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pavelkalinnikov)


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go line 512 at r2 (raw file):

	}

	if err == io.EOF {

This should be errors.Is(), to handle error wrapping. Let's also move all conditions into the switch below.


pkg/server/node.go line 1525 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Yeah... Well, solution is quite a bit more complex. I started writing some helper libraries for making this kind of
solution generic (basically, a producer consumer queue fronting 1 or more workers)... but for now, it's all inline; albeit more complex.
Let me know what you think.

I think we can simplify this a fair bit by using a plain old channel for notification, which also gets rid of a goroutine. Consider something like the following (which can be optimized a bit, but I don't think it matters much since this is only called during shutdown):

	var fin = struct {
		syncutil.Mutex
		completed []*kvpb.MuxRangeFeedEvent
		signalC   chan struct{}
	}{
		signalC: make(chan struct{}, 1),
	}

	forwardCompletion := func(ctx context.Context) {
		for {
			select {
			case <-fin.signalC:
				var toSend []*kvpb.MuxRangeFeedEvent
				fin.Lock()
				toSend, fin.completed = fin.completed, nil
				fin.Unlock()
				for _, e := range toSend {
					if err := sender.Send(e); err != nil {
						return
					}
				}
			case <-ctx.Done():
				return
			case <-stopper.ShouldQuiesce():
				return // We should probably close the stream here?
			}
		}
	}

	addCompleted := func(event *kvpb.MuxRangeFeedEvent) {
		fin.Lock()
		fin.completed = append(fin.completed, event)
		fin.Unlock()
		select {
		case fin.signalC <- struct{}{}:
		default:
		}
	}

pkg/server/node.go line 1572 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Yeah; it is too late to change that. io.EOF is exactly how regular stream termination is treated, so I feel it's as close to that as I could get. Comment rephrased a bit.

I think, at least on the surface, it should be safe-ish to add a new RangeFeedRetryError_Reason.
Since functionality is disabled by default, we are roughly okay. What's probably not okay is that
mx-version tests might become broken if mux rangefeed metamorphic constant is set.

I decided to use REPLICA_REMOVED error reason instead of io.EOF. Its handling on the client
side is identical. I left a TODO to add a new retry reason.

Thoughts?

I don't love overloading REPLICA_REMOVED for this, but I think it's the most pragmatic solution given where we are. Let's write up a follow-up issue to add a new version-gated reason for 23.2.


pkg/server/node.go line 1579 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Yeah... I had quite a bit more knobs in future library to control that... but that was all removed during reviews. Now, it's all on the caller to ensure.

Let's a least call this out on the relevant rangefeed future that it may be called while holding locks and as such can't block or do work for any significant amount of time.

@miretskiy
Copy link
Contributor Author

addCompleted := func(event *kvpb.MuxRangeFeedEvent) {
		fin.Lock()
		fin.completed = append(fin.completed, event)
		fin.Unlock()
		select {
		case fin.signalC <- struct{}{}:
		default:
		}
	}

This is definitely simpler, but also could block -- I think it's the same issue that you pointed
out before -- we can't know what the size of the buffer should be if you want to guarantee that
nothing blocks...

@erikgrinaker
Copy link
Contributor

erikgrinaker commented Apr 11, 2023

This is definitely simpler, but also could block -- I think it's the same issue that you pointed
out before -- we can't know what the size of the buffer should be if you want to guarantee that
nothing blocks...

No, it's a non-blocking send -- note the default clause at the bottom. The signal channel has a buffer size of 1, which means that while the worker is busy processing the queue, the first addCompleted() call will buffer a signal to the channel, but a second (or later) addCompleted() call will see the already-queued signal and just return. The worker just needs to see a single signal to know that it has queued items to process.

@miretskiy
Copy link
Contributor Author

No, it's a non-blocking send -- note the default clause at the bottom. The signal channel has a buffer size of 1, which means that while the worker is busy processing the queue, the first addCompleted() call will buffer a signal to the channel, but a second (or later) addCompleted() call will see the already-queued signal and just return.

Ahh... missed the default; there might not be a second addCompleted though.
Or the next one might arrive long time after.

@erikgrinaker
Copy link
Contributor

erikgrinaker commented Apr 11, 2023

Ahh... missed the default; there might not be a second addCompleted though.
Or the next one might arrive long time after.

That doesn't matter. All that matters is that the worker is signalled whenever there's queued work, and that always happens here. This is a pretty standard way of doing single-worker notification.

Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oy.... Need a nice comment on that buffer size of 1 on the channel.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @pavelkalinnikov)

Prior to this change, there were cases where a future
used to wait for a single range feed completion, may
be completed multiple times, or a message about range
feed termination may be sent multiple times on a single
mux rangefeed stream.

One of those cases was a check for `ensureClosedTimestampStarted`.
If this method returned an error, we would immediately send
the error on the rpc stream, and then complete the future
with nil error.

Another instance was when registry would `DisconnectWithErr` --
in that case, we would first complete future in this method, and
then, complete it again later.

It appears that completing future multiple times should be
okay; however, it is still a bit worrysome.  The deadlocks observed
were all in the local RPC bypas (`rpc/context.go`), and it's
not a stretch to imagine that as soon as the first error
(e.g. from ensureClosedTimestampStarted) is returned, the
goroutine reading these messages terminates, and causes the
subsequent attempt to send the error deadlock.

Another hypothetical issue is how the mux rangefeed sent
the error when the future completed.  Prior to this change, this
happened inline (via `WhenReady` closure).  This is dangerous since
this closure may run when important locks (such as raft mu) are being
held.  What could happen is that mux rangefeed encounters a retryable
error.  The future is prepared with error value, which causes
an error to be sent to the client.  This happens with some lock being
held.  The client, notices this error, and attempts to restart
rangefeed -- to the same server, and that could block; At least in
theory.  Regardless, it seems that performing IO while the locks could
be potentially held, is not a good idea.  This PR fixes this problem
by shunting logical rangefeed completion notification to a dedicated
go routine.

Informs cockroachdb#99560
Informs cockroachdb#99640
Informs cockroachdb#99214
Informs cockroachdb#98925
Informs cockroachdb#99092
Informs cockroachdb#99212
Informs cockroachdb#99910
Informs cockroachdb#99560

Release note: None
Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker and @pavelkalinnikov)


pkg/server/node.go line 1525 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I think we can simplify this a fair bit by using a plain old channel for notification, which also gets rid of a goroutine. Consider something like the following (which can be optimized a bit, but I don't think it matters much since this is only called during shutdown):

	var fin = struct {
		syncutil.Mutex
		completed []*kvpb.MuxRangeFeedEvent
		signalC   chan struct{}
	}{
		signalC: make(chan struct{}, 1),
	}

	forwardCompletion := func(ctx context.Context) {
		for {
			select {
			case <-fin.signalC:
				var toSend []*kvpb.MuxRangeFeedEvent
				fin.Lock()
				toSend, fin.completed = fin.completed, nil
				fin.Unlock()
				for _, e := range toSend {
					if err := sender.Send(e); err != nil {
						return
					}
				}
			case <-ctx.Done():
				return
			case <-stopper.ShouldQuiesce():
				return // We should probably close the stream here?
			}
		}
	}

	addCompleted := func(event *kvpb.MuxRangeFeedEvent) {
		fin.Lock()
		fin.completed = append(fin.completed, event)
		fin.Unlock()
		select {
		case fin.signalC <- struct{}{}:
		default:
		}
	}

Thank you; much simpler.


pkg/server/node.go line 1572 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I don't love overloading REPLICA_REMOVED for this, but I think it's the most pragmatic solution given where we are. Let's write up a follow-up issue to add a new version-gated reason for 23.2.

Done. Comment added; #101330


pkg/server/node.go line 1579 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Let's a least call this out on the relevant rangefeed future that it may be called while holding locks and as such can't block or do work for any significant amount of time.

I moved the comment right before rangefeedCompleted call; I also have comment on the
newMuxRangeFeedCompletionWatcher method; do you think more is needed?

Copy link
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r3, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @pavelkalinnikov)


pkg/server/node.go line 1579 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I moved the comment right before rangefeedCompleted call; I also have comment on the
newMuxRangeFeedCompletionWatcher method; do you think more is needed?

I was thinking on the RangeFeed() method -- a mention that callers must not block when the returned future is ready. Not worth another CI roundtrip though.


pkg/server/node.go line 1560 at r3 (raw file):

				// There is nothing we can do here; stream cancellation is usually
				// triggered by the client.  We don't have access to stream cancellation
				// function; so, just let things proceed until the server shuts down.

Actually, I think this is fine. I think the gRPC server will disconnect the client when we return from the MuxRangeFeed method (right?). Otherwise, there would be a risk that the client is stuck while the server is shutting down, since long-running tasks can prevent server shutdown for several minutes.

@miretskiy
Copy link
Contributor Author

Actually, I think this is fine. I think the gRPC server will disconnect the client when we return from the MuxRangeFeed method (right?). Otherwise, there would be a risk that the client is stuck while the server is shutting down, since long-running tasks can prevent server shutdown for several minutes.

It is fine; and we will return nil from the stream (because client handles disconned); I just wante to make
sure that the reason why exit here is correct is documented.

@miretskiy
Copy link
Contributor Author

bors r+

@miretskiy
Copy link
Contributor Author

bors r+

@miretskiy miretskiy added backport-23.1.x Flags PRs that need to be backported to 23.1 backport-23.1.0 labels Apr 13, 2023
@craig
Copy link
Contributor

craig bot commented Apr 13, 2023

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-23.1.x Flags PRs that need to be backported to 23.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants