Skip to content

Commit

Permalink
kvcoord: Eliminate 1 Go routine from MuxRangeFeed
Browse files Browse the repository at this point in the history
Prior to this PR, the server side `MuxRangeFeed`
implementation spawned a separate Go routine executing
single RangeFeed for each incoming request.

This is wasteful and unnecessary.
Instead of blocking, and waiting for a single RangeFeed to complete,
have rangefeed related functions return a promise to return
a `*kvpb.Error` once rangefeed completes (`future.Future[*kvpb.Error]`).

Prior to this change MuxRangeFeed would spin up 4 Go routines
per range.  With this PR, the number is down to 3.
This improvement is particularly important when executing
rangefeed against large tables (10s-100s of thousands of ranges).

Informs #96395
Epic: None

Release note (enterprise change): Changefeeds running with
`changefeed.mux_rangefeed.enabled` setting set to true are
more efficient, particularly when executing against large tables.
  • Loading branch information
Yevgeniy Miretskiy committed Mar 14, 2023
1 parent 62a7258 commit c3bac9d
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 198 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ go_library(
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/future",
"//pkg/util/grpcutil",
"//pkg/util/grunning",
"//pkg/util/hlc",
Expand Down Expand Up @@ -432,6 +433,7 @@ go_test(
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/errorutil",
"//pkg/util/future",
"//pkg/util/grunning",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
Expand Down
28 changes: 19 additions & 9 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -397,10 +398,6 @@ func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) {

type dummyStream struct {
name string
t interface {
Helper()
Logf(string, ...interface{})
}
ctx context.Context
recv chan *kvpb.RangeFeedEvent
}
Expand All @@ -411,9 +408,9 @@ func (s *dummyStream) Context() context.Context {

func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error {
if ev.Val == nil && ev.Error == nil {
s.t.Logf("%s: ignoring event: %v", s.name, ev)
return nil
}

select {
case <-s.ctx.Done():
return s.ctx.Err()
Expand All @@ -422,6 +419,19 @@ func (s *dummyStream) Send(ev *kvpb.RangeFeedEvent) error {
}
}

func waitReplicaRangeFeed(
ctx context.Context,
r *kvserver.Replica,
req *kvpb.RangeFeedRequest,
stream kvpb.RangeFeedEventSink,
) error {
rfErr, ctxErr := future.Wait(ctx, r.RangeFeed(req, stream, nil /* pacer */))
if ctxErr != nil {
return ctxErr
}
return rfErr
}

// This test verifies that RangeFeed bypasses the circuit breaker. When the
// breaker is tripped, existing RangeFeeds remain in place and new ones can be
// started. When the breaker untrips, these feeds can make progress. The test
Expand All @@ -441,9 +451,9 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *kvpb.RangeFeedEvent)}
stream1 := &dummyStream{ctx: ctx, name: "rangefeed1", recv: make(chan *kvpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream1, nil /* pacer */).GoError()
err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream1)
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down Expand Up @@ -495,9 +505,9 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {

// Start another stream during the "outage" to make sure it isn't rejected by
// the breaker.
stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *kvpb.RangeFeedEvent)}
stream2 := &dummyStream{ctx: ctx, name: "rangefeed2", recv: make(chan *kvpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(args, stream2, nil /* pacer */).GoError()
err := waitReplicaRangeFeed(ctx, tc.repls[0].Replica, args, stream2)
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/envutil",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/log",
Expand Down Expand Up @@ -67,6 +68,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/util",
"//pkg/util/encoding",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -319,6 +320,9 @@ func (p *Processor) run(
r.runOutputLoop(ctx, p.RangeID)
select {
case p.unregC <- &r:
if r.unreg != nil {
r.unreg()
}
case <-p.stoppedC:
}
}
Expand Down Expand Up @@ -474,7 +478,8 @@ func (p *Processor) Register(
catchUpIterConstructor CatchUpIteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *kvpb.Error,
disconnectFn func(),
done *future.ErrorFuture,
) (bool, *Filter) {
// Synchronize the event channel so that this registration doesn't see any
// events that were consumed before this registration was called. Instead,
Expand All @@ -483,7 +488,7 @@ func (p *Processor) Register(

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
p.Config.EventChanCap, p.Metrics, stream, disconnectFn, done,
)
select {
case p.regC <- r:
Expand Down
Loading

0 comments on commit c3bac9d

Please sign in to comment.