Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver/rangefeed: remove lockedRangefeedStream #126486

Merged
merged 2 commits into from
Jul 2, 2024

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Jul 1, 2024

kvserver: wrap kvpb.RangeFeedEventSink in Stream

Previously, we declared the same interface signature twice: once in
kvpb.RangeFeedEventSink and again in rangefeed.Stream. This patch embeds
kvpb.RangeFeedEventSink inside rangefeed.Stream, making rangefeed.Stream a
superset of kvpb.RangeFeedEventSink. This approach makes sense, as each
rangefeed server stream should be a rangefeed event sink, capable of making
thread-safe rangefeed event sends.

Epic: none
Release note: none


kvserver/rangefeed: remove lockedRangefeedStream

Previously, we created separate locked rangefeed streams for each individual
rangefeed stream to ensure Send can be called concurrently as the underlying
grpc stream is not thread safe. However, since the introduction of the mux
rangefeed support, we already have a dedicated lock for the underlying mux
stream, making the Send method on each rangefeed stream thread safe already.
This patch removes the redundant locks from each individual rangefeed stream.

Epic: none
Release note: none

Copy link

blathers-crl bot commented Jul 1, 2024

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Jul 1, 2024

I’m not sure the performance implication of such changes. The original way has a more fine grained mutex wrapped under every setRangeIDEventSink, but they would need to access the same underlying muxstream lock anyways. I did a small benchmarks for comparison, and it seems that having one mutex is better (less contentions and also faster). I'm not sure how accurate the benchmark is tho since I'm getting different results with code refactoring.

Benchmark
Benchmark/fine_grained_mutexes
Benchmark/fine_grained_mutexes-12         	 1465849	       863.0 ns/op
Benchmark/only_one_mutex
Benchmark/only_one_mutex-12               	 1804202	       644.2 ns/op
PASS
type SharedCounter struct {
	mu    sync.Mutex
	count int
}

func (s *SharedCounter) Increment() {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.count++
}

type Counter struct {
	shared *SharedCounter
	mu     sync.Mutex
}

func NewCounter(shared *SharedCounter) *Counter {
	return &Counter{
		shared: shared,
	}
}

func (c *Counter) Increment() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.shared.Increment()
}

type OneCounter struct {
	shared *SharedCounter
}

func NewOne(shared *SharedCounter) *OneCounter {
	return &OneCounter{
		shared: shared,
	}
}

func (c *OneCounter) Increment() {
	c.shared.Increment()
}

func BenchmarkSeparateMutexes(b *testing.B) {
	sharedCounter := &SharedCounter{}
	counter1 := NewCounter(sharedCounter)
	counter2 := NewCounter(sharedCounter)

	b.ResetTimer()
	var wg sync.WaitGroup
	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter1.Increment()
		}()
	}

	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter2.Increment()
		}()
	}

	wg.Wait()
	b.StopTimer()
}

func BenchmarkOneMutexes(b *testing.B) {
	sharedCounter := &SharedCounter{}
	counter1 := NewOne(sharedCounter)
	counter2 := NewOne(sharedCounter)

	b.ResetTimer()
	var wg sync.WaitGroup
	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter1.Increment()
		}()
	}

	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			counter2.Increment()
		}()
	}

	wg.Wait()
	b.StopTimer()
}

func Benchmark(b *testing.B) {
	b.ReportAllocs()
	b.Run("fine grained mutexes", func(b *testing.B) {
		BenchmarkSeparateMutexes(b)
	})

	b.Run("only one mutex", func(b *testing.B) {
		BenchmarkOneMutexes(b)
	})
}

@wenyihu6 wenyihu6 marked this pull request as ready for review July 1, 2024 14:07
@wenyihu6 wenyihu6 requested a review from a team July 1, 2024 14:07
@wenyihu6 wenyihu6 requested review from a team as code owners July 1, 2024 14:07
@wenyihu6 wenyihu6 requested a review from nvanbenschoten July 1, 2024 14:10
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


-- commits line 8 at r1:
This assumption feels brittle. What do you think about adding an "annotation method" to RangeFeedEventSink called SendIsThreadSafe so that implementors need to actively opt-in to implementing the contract?


pkg/kv/kvserver/replica_rangefeed.go line 326 at r1 (raw file):

	var done future.ErrorFuture
	p := r.registerWithRangefeedRaftMuLocked(
		ctx, rSpan, args.Timestamp, catchUpIter, args.WithDiff, args.WithFiltering, stream, &done,

Do you understand why we have kvpb.RangeFeedEventSink and rangefeed.Stream? Should we replace rangefeed.Stream with kvpb.RangeFeedEventSink?

@wenyihu6 wenyihu6 requested a review from a team as a code owner July 1, 2024 18:37
@wenyihu6 wenyihu6 requested review from mgartner and removed request for a team July 1, 2024 18:37
Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @mgartner and @nvanbenschoten)


-- commits line 8 at r1:

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This assumption feels brittle. What do you think about adding an "annotation method" to RangeFeedEventSink called SendIsThreadSafe so that implementors need to actively opt-in to implementing the contract?

I like the idea! Made the changes accordingly. Lmkif you had something else in mind.


pkg/kv/kvserver/replica_rangefeed.go line 326 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Do you understand why we have kvpb.RangeFeedEventSink and rangefeed.Stream? Should we replace rangefeed.Stream with kvpb.RangeFeedEventSink?

I agree that the current implementation feels redundant. In the future PR, as we add more methods to the rangefeed.Stream interface, such as Disconnect and RegisterRangefeedCleanup, their purposes start diverging. The kvpb.RangeFeedEventSink is a node level concept, and we have other RPC context tests that implement it directly for testing. It doesn’t need to implement functions like Disconnect. On the other hand, rangefeed.Stream is more specific to the rangefeed level and include functions like Disconnect. Wdyt?

@wenyihu6 wenyihu6 removed the request for review from mgartner July 1, 2024 18:49
@wenyihu6 wenyihu6 force-pushed the removelocked branch 3 times, most recently from 574e7ca to 52f86e7 Compare July 1, 2024 19:13
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r2, 11 of 11 files at r3, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


-- commits line 8 at r1:

Previously, wenyihu6 (Wenyi Hu) wrote…

I like the idea! Made the changes accordingly. Lmkif you had something else in mind.

I'd actually advise we get rid of the bool return value, as that makes the property look optional. The role of the method is to indicate that Send must be thread-safe in every implementation of RangeFeedEventSink. Implementors aren't allowed to return false.


pkg/kv/kvserver/replica_rangefeed.go line 326 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I agree that the current implementation feels redundant. In the future PR, as we add more methods to the rangefeed.Stream interface, such as Disconnect and RegisterRangefeedCleanup, their purposes start diverging. The kvpb.RangeFeedEventSink is a node level concept, and we have other RPC context tests that implement it directly for testing. It doesn’t need to implement functions like Disconnect. On the other hand, rangefeed.Stream is more specific to the rangefeed level and include functions like Disconnect. Wdyt?

That makes sense to me, thanks for explaining. Maybe we should have the rangefeed.Stream interface embed the kvpb.RangeFeedEventSink interface so that it's a clear superset.

Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten)


-- commits line 8 at r1:

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I'd actually advise we get rid of the bool return value, as that makes the property look optional. The role of the method is to indicate that Send must be thread-safe in every implementation of RangeFeedEventSink. Implementors aren't allowed to return false.

I see what you mean. It's a bit tricky here since we actually do have a few test streams that have thread unsafe Send method. For example,

func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
and
func (s *rangefeedEventSink) Send(event *kvpb.RangeFeedEvent) error {
return s.stream.Send(&kvpb.MuxRangeFeedEvent{RangeFeedEvent: *event})
}
. And they need to conform to the RangeFeedEventSink interface. Do you see a clean way to handle this? I could think of

  • define another kvpb.TestStream struct that implements the SendIsThreadSafe() interface. Other test streams will embed the *kvpb.TestStream struct to opt in.
  • change each test stream to become thread safe

pkg/kv/kvserver/replica_rangefeed.go line 326 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

That makes sense to me, thanks for explaining. Maybe we should have the rangefeed.Stream interface embed the kvpb.RangeFeedEventSink interface so that it's a clear superset.

Aha, great idea. I will make the changes in the future pr above.

Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten)


-- commits line 8 at r1:

Previously, wenyihu6 (Wenyi Hu) wrote…

I see what you mean. It's a bit tricky here since we actually do have a few test streams that have thread unsafe Send method. For example,

func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
and
func (s *rangefeedEventSink) SendIsThreadSafe() bool { return false }
. And they need to conform to the RangeFeedEventSink interface. Do you see a clean way to handle this? I could think of

  • define another kvpb.TestStream struct that implements the SendIsThreadSafe() interface. Other test streams will embed the *kvpb.TestStream struct to opt in.
  • change each test stream to become thread safe

I went with the first proposed idea above. Lmk if you have a better idea in mind.

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 12 of 12 files at r4, 11 of 11 files at r5, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/kv/kvpb/api.go line 2519 at r5 (raw file):

}

// Every stream used in prod code implementing the RangeFeedEventSink interface

This type doesn't feel like it belongs in the top-level kvpb package. Arguably, RangeFeedEventSink doesn't even belong here, but it's more reasonable.

I'd suggest that we don't introduce this at all. Just because a stream is used in a test, doesn't necessarily mean it doesn't need to be thread-safe. Instead, we should define a SendIsThreadSafe method on all RangeFeedEventSink structs. That gives us a place to talk about thread safety. Either we then make these implementations thread-safe or we just comment about how the test has organized for the sink to not need thread safety (so in a way, the test is providing the thread safety).


pkg/kv/kvpb/api.go line 2531 at r5 (raw file):

	// Context returns the context for this stream.
	Context() context.Context
	// Send blocks until it sends m, the stream is done, or the stream breaks.

nit: The parameter is not named here. We should either name it m, or replace "sends m" with "sends the RangeFeedEvent".


pkg/kv/kvpb/api.go line 2535 at r5 (raw file):

	Send(*RangeFeedEvent) error
	// SendIsThreadSafe is a no-op declaration method. It is a contract that the
	// struct has a thread safe Send method.

nit: s/struct/interface/

nit: s/thread safe/thread-safe/

Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten)


pkg/kv/kvpb/api.go line 2519 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This type doesn't feel like it belongs in the top-level kvpb package. Arguably, RangeFeedEventSink doesn't even belong here, but it's more reasonable.

I'd suggest that we don't introduce this at all. Just because a stream is used in a test, doesn't necessarily mean it doesn't need to be thread-safe. Instead, we should define a SendIsThreadSafe method on all RangeFeedEventSink structs. That gives us a place to talk about thread safety. Either we then make these implementations thread-safe or we just comment about how the test has organized for the sink to not need thread safety (so in a way, the test is providing the thread safety).

I see. This is much cleaner. Done.


pkg/kv/kvpb/api.go line 2531 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: The parameter is not named here. We should either name it m, or replace "sends m" with "sends the RangeFeedEvent".

Ahh oops. Changed it sends the RangeFeedEvent.


pkg/kv/kvpb/api.go line 2535 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: s/struct/interface/

nit: s/thread safe/thread-safe/

Done.

wenyihu6 added 2 commits July 2, 2024 10:26
Previously, we declared the same interface signature twice: once in
kvpb.RangeFeedEventSink and again in rangefeed.Stream. This patch embeds
kvpb.RangeFeedEventSink inside rangefeed.Stream, making rangefeed.Stream a
superset of kvpb.RangeFeedEventSink. This approach makes sense, as each
rangefeed server stream should be a rangefeed event sink, capable of making
thread-safe rangefeed event sends.

Epic: none
Release note: none
Previously, we created separate locked rangefeed streams for each individual
rangefeed stream to ensure Send can be called concurrently as the underlying
grpc stream is not thread safe. However, since the introduction of the mux
rangefeed support, we already have a dedicated lock for the underlying mux
stream, making the Send method on each rangefeed stream thread safe already.
This patch removes the redundant locks from each individual rangefeed stream.

Epic: none
Release note: none
Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong:

Reviewed 11 of 11 files at r7, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @wenyihu6)

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Jul 2, 2024

Thanks for the thorough review!

bors r=nvanbenschoten

@craig craig bot merged commit 786cb46 into cockroachdb:master Jul 2, 2024
22 checks passed
wenyihu6 added a commit to wenyihu6/cockroach that referenced this pull request Jul 30, 2024
Previously, we added `SendIsThreadSafe` to the `delayedErrorServerStream` struct
by mistakes, which is unrelated to `RangeFeedEventSink`. This patch removes it.

Related: cockroachdb#126486
Release note: none
Epic: none
craig bot pushed a commit that referenced this pull request Aug 7, 2024
127877: flowinfra: remove SendIsThreadSafe from delayedErrorServerStream r=nvanbenschoten a=wenyihu6

Previously, we added `SendIsThreadSafe` to the `delayedErrorServerStream` struct
by mistakes, which is unrelated to `RangeFeedEventSink`. This patch removes it.

Related: #126486
Release note: none
Epic: none

Co-authored-by: Wenyi Hu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants