Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver/rangefeed: remove future package #126490

Merged
merged 1 commit into from
Jul 17, 2024
Merged

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Jul 1, 2024

Previously, to reduce O(ranges) goroutines and avoid blocking on rangefeed
completion, we introduced the future package to manage rangefeed completion by
invoking a future callback when an error occurred. The future package makes it
difficult to justify which goroutine the callback runs on and which locks are
being held. This patch introduces a new approach, replacing the future usage.
Each registration now uses the embedded StreamMuxer.DisconnectStreamWithError
method to signal rangefeed completion. StreamMuxer will manage the shutdown
logic and additional stream cleanup.

Part of: #126561
Release note: none

Copy link

blathers-crl bot commented Jul 1, 2024

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@wenyihu6 wenyihu6 marked this pull request as ready for review July 1, 2024 14:07
@wenyihu6 wenyihu6 requested a review from a team July 1, 2024 14:07
@wenyihu6 wenyihu6 requested review from a team as code owners July 1, 2024 14:07
@wenyihu6 wenyihu6 requested a review from nvanbenschoten July 1, 2024 14:10
Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten)


pkg/kv/kvserver/replica_rangefeed.go line 516 at r1 (raw file):

		scanner, err := rangefeed.NewSeparatedIntentScanner(ctx, r.store.TODOEngine(), desc.RSpan())
		if err != nil {
			stream.Disconnect(kvpb.NewError(err))

I attempted to change this to construct the IntentScanner earlier during registerWithRangefeedRaftMuLocked to avoid using stream.Disconnect. But I got the same test failure as seen in #52844 which is why the callback was initially implemented.


pkg/kv/kvclient/rangefeed/rangefeed_external_test.go line 1630 at r1 (raw file):

	ctx  context.Context
	ch   chan<- *kvpb.RangeFeedEvent
	done chan *kvpb.Error

Instead of using a channel for all sink types, we could embed the real StreamMuxer and check for disconnect errors in StreamMuxer.ServerStreamSender. I chose this approach because it aligns more closely with our approach prior to #96756 and also seems much easier. And I plan to change tests in processor_test.go to use a real StreamMuxer in the next PR to expand our test coverage on StreamMuxer. What’s your preference?

@wenyihu6 wenyihu6 force-pushed the addmuxer3 branch 7 times, most recently from d3ecdc3 to a818e9b Compare July 11, 2024 21:40
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 16 of 16 files at r2, 16 of 16 files at r3, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/kv/kvserver/replica_rangefeed.go line 516 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I attempted to change this to construct the IntentScanner earlier during registerWithRangefeedRaftMuLocked to avoid using stream.Disconnect. But I got the same test failure as seen in #52844 which is why the callback was initially implemented.

It's kind of strange that a processor-wide (multiple rangefeeds can attach to the same rangefeed processor) dependency is in charge of sending an error signal to a single stream. What would it look like to add an error return value to IntentScannerConstructor and handle the error case inside of the processor?


pkg/server/node.go line 1867 at r3 (raw file):

// Disconnect implements the rangefeed.Stream interface. It disconnects the
// stream from the StreamMuxer. The StreamMuxer is then responsible for
// disconnecting the stream and cleaning up.

This comment says that the function " disconnects the stream from the StreamMuxer" and that "The StreamMuxer is then responsible for disconnecting the stream". These two sound contradictory. Are both true? Can the responsibility of each be clarified?


pkg/server/node.go line 1938 at r3 (raw file):

			streamMuxer.AddStream(req.StreamID, cancel)

			if err := n.stores.RangeFeed(req, streamSink); err != nil {

Can we add a comment here that this call registers a RangeFeed and returns an error if registration fails, but that it returns without blocking and without an error if the registration succeeds?


pkg/kv/kvclient/rangefeed/rangefeed_external_test.go line 1630 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

Instead of using a channel for all sink types, we could embed the real StreamMuxer and check for disconnect errors in StreamMuxer.ServerStreamSender. I chose this approach because it aligns more closely with our approach prior to #96756. And I plan to change tests in processor_test.go to use a real StreamMuxer in the next PR to expand our test coverage on StreamMuxer. What’s your preference?

This approach seems fine to me. It's nice to avoid the dependency on StreamMuxer.


pkg/kv/kvserver/rangefeed/processor_test.go line 1812 at r3 (raw file):

	case err := <-c.done:
		return err.GoError()
	case <-time.After(30 * time.Second):

Let's replace all of these 30 * time.Second with testutils.DefaultSucceedsSoonDuration.

Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @nvanbenschoten)


pkg/kv/kvserver/replica_rangefeed.go line 516 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

It's kind of strange that a processor-wide (multiple rangefeeds can attach to the same rangefeed processor) dependency is in charge of sending an error signal to a single stream. What would it look like to add an error return value to IntentScannerConstructor and handle the error case inside of the processor?

Thanks for raising this! I looked into this more. There are two things that I'm not quite understanding.

This comment says

// If the iterator is nil then no initialization scan will be performed and
// the resolved timestamp will immediately be considered initialized.
.

However, the nil check is performed on the intent scanner constructor but not on the intent scanner itself

. It seems that it could lead to panic here
func (s *SeparatedIntentScanner) ConsumeIntents(
.

I'm trying to distinguish when the intent scanner constructor returns nil without an error V.S. returns nil with an error. The first case seems only possible in tests. Based on the code here

func NewSeparatedIntentScanner(
, the constructor should never return a nil intent scanner without an error. Is that correct?

  1. Additionally, we only disconnect the first registration here during IntentScannerConstructor . I think this is currently true because we make sure to call the intent scanner constructor under the same lock as the first registration (It just happens that we need this to avoid missing events). No other p.Register could happen before IntentScannerConstructor is called but it feels like a brittle assumption. Should we call p.StopProcessor() when IntentScannerConstructor returns an error?

I'm trying to fix these in #127204 if the concerns look right to you.


pkg/server/node.go line 1867 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This comment says that the function " disconnects the stream from the StreamMuxer" and that "The StreamMuxer is then responsible for disconnecting the stream". These two sound contradictory. Are both true? Can the responsibility of each be clarified?

Done.


pkg/server/node.go line 1938 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Can we add a comment here that this call registers a RangeFeed and returns an error if registration fails, but that it returns without blocking and without an error if the registration succeeds?

Done.


pkg/kv/kvclient/rangefeed/rangefeed_external_test.go line 1630 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This approach seems fine to me. It's nice to avoid the dependency on StreamMuxer.

Ack, cool.


pkg/kv/kvserver/rangefeed/processor_test.go line 1812 at r3 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Let's replace all of these 30 * time.Second with testutils.DefaultSucceedsSoonDuration.

Done.

Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed turnaround. I was looking into a support issue.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @nvanbenschoten)

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 4 of 6 files at r4, 3 of 3 files at r5, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/server/node.go line 1961 at r5 (raw file):

// StreamMuxer to detach the stream. The StreamMuxer is then responsible for
// handling the actual disconnection and additional cleanup. Note that Caller
// should not rely on immediate disconnection as cleanup take place async.

"takes"

Previously, to reduce O(ranges) goroutines and avoid blocking on rangefeed
completion, we introduced the future package to manage rangefeed completion by
invoking a future callback when an error occurred. The future package makes it
difficult to justify which goroutine the callback runs on and which locks are
being held. This patch introduces a new approach, replacing the future usage.
Each registration now uses the embedded StreamMuxer.DisconnectStreamWithError
method to signal rangefeed completion. StreamMuxer will manage the shutdown
logic and additional stream cleanup.

Part of: cockroachdb#126561
Release note: none
@wenyihu6
Copy link
Contributor Author

TFTR!

bors r=nvanbenschoten

@craig craig bot merged commit a488caf into cockroachdb:master Jul 17, 2024
22 checks passed
wenyihu6 added a commit to wenyihu6/cockroach that referenced this pull request Jul 18, 2024
Previously, when `IntentScannerConstructor` returned an error, only the first
rangefeed registration on the replica was disconnected as part of the error
handling. This worked because `IntentScannerConstructor` was called before the
first `Register` function finished to avoid missing events. But this feels like
a brittle assumption.

Additionally, we should not continue running `initialSan.Run` when the
constructor returns an error as this could lead to nil pointer panics.

Comments also incorrectly stated that the resolved timestamp is considered
immediately initialized if the provided iterator is nil. We actually check for
`IntentScannerConstructor`, not the iterator itself. Passing a nil
`IntentScannerConstructor` should only be possible in tests, and it shouldn't be
possible for `IntentScannerConstructor` to return a nil `IntentScanner` without
an error.

This patch fixes these issues by updating the comments and stopping the entire
processor when the constructor returns an error.

Related: cockroachdb#126490
Release note: none
Epic: none
@wenyihu6 wenyihu6 deleted the addmuxer3 branch July 18, 2024 18:38
wenyihu6 added a commit to wenyihu6/cockroach that referenced this pull request Oct 24, 2024
Previously, when `IntentScannerConstructor` returned an error, only the first
rangefeed registration on the replica was disconnected as part of the error
handling. This worked because `IntentScannerConstructor` was called before the
first `Register` function finished to avoid missing events. But this feels like
a brittle assumption.

Additionally, we should not continue running `initialSan.Run` when the
constructor returns an error as this could lead to nil pointer panics.

Comments also incorrectly stated that the resolved timestamp is considered
immediately initialized if the provided iterator is nil. We actually check for
`IntentScannerConstructor`, not the iterator itself. Passing a nil
`IntentScannerConstructor` should only be possible in tests, and it shouldn't be
possible for `IntentScannerConstructor` to return a nil `IntentScanner` without
an error.

This patch fixes these issues by updating the comments and stopping the entire
processor when the constructor returns an error.

Related: cockroachdb#126490
Release note: none
Epic: none
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants