Skip to content

Commit

Permalink
kvserver/rangefeed: introduce stream muxer
Browse files Browse the repository at this point in the history
This patch introduces a new StreamMuxer struct to manage server rangefeed
streams. Currently, it just takes over the responsibilities as the existing
rangefeed completion watcher without changing any existing behaviour. In a future
commit, we will move the rangefeed cleanup process to StreamMuxer as well.

Part of: cockroachdb#126561
Release note: none
  • Loading branch information
wenyihu6 committed Jul 10, 2024
1 parent 7adfcb1 commit 0ed32cc
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 71 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"resolved_timestamp.go",
"scheduled_processor.go",
"scheduler.go",
"stream_muxer.go",
"task.go",
"testutil.go",
],
Expand Down
203 changes: 203 additions & 0 deletions pkg/kv/kvserver/rangefeed/stream_muxer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// ServerStreamSender forwards MuxRangefeedEvents from StreamMuxer to the
// underlying stream.
type ServerStreamSender interface {
// Send must be thread safe to be called concurrently.
Send(*kvpb.MuxRangeFeedEvent) error
// SendIsThreadSafe is a no-op declaration method. It is a contract that the
// interface has a thread-safe Send method.
SendIsThreadSafe()
}

// StreamMuxer is responsible for managing a set of active rangefeed streams and
// forwarding rangefeed completion errors to the client.
//
// ┌───────────────────────────┐
// │ DistSender.RangefeedSpans │ rangefeedMuxer
// └───────────────────────────┘
// │ divideAllSpansOnRangeBoundaries
// ┌───────────────────────────┬───────────────────────────┐
// ▼ ▼ ▼
// ┌────────────────────┐ ┌────────────────────┐ ┌────────────────────┐
// │ rangefeedMuxer │ │ rangefeedMuxer │ │ rangefeedMuxer │
// │startSingleRangefeed│ │startSingleRangefeed│ │startSingleRangefeed│
// └─────────┬──────────┘ └──────────┬─────────┘ └──────────┬─────────┘
// ▼ ▼ ▼
// new streamID new streamID new streamID
// ┌────────────────┐ ┌────────────────┐ ┌────────────────┐
// │RangefeedRequest│ │RangefeedRequest│ │RangefeedRequest│
// └────────────────┘ └────────────────┘ └────────────────┘
// rangefeedMuxer rangefeedMuxer rangefeedMuxer
// establishMuxConnection establishMuxConnection establishMuxConnection
// │ │ │
// ▼ ▼ ▼
// rangefeedMuxer.startNodeMuxRangefeed rangefeedMuxer.startNodeMuxRangefeed
// rangefeedMuxer.receiveEventsFromNode rangefeedMuxer.receiveEventsFromNode
// ┌─────────────────────────────────────────┐ ┌─────────────────────────────────────────┐
// │rpc.RestrictedInternalClient.MuxRangeFeed│ │rpc.RestrictedInternalClient.MuxRangeFeed│
// └─────────────┬────────────▲──────────────┘ └─────────────────────────────────────────┘
// kvpb.RangefeedRequest │ │ kvpb.MuxRangefeedEvent
// ┌─────────────▼────────────┴──────────────┐
// │ Node.MuxRangeFeed │◄───────────────── MuxRangefeedEvent with kvpb.RangeFeedError
// └─────────────────┬───▲───────────────────┘ (client: rangefeedMuxer.restartActiveRangeFeed)
// StreamMuxer.AddStream │ │LockedMuxStream.Send(*kvpb.MuxRangefeedEvent) │
// ┌────▼───┴────┐ │
// │ StreamMuxer ├────────────────────────────────────┬─────────────────────────────┐
// └──────┬──────┘ │ │
// │ │ │
// ┌────────▼─────────┐ │ │
// │ Stores.Rangefeed │ │ │
// └────────┬─────────┘ │ │
// │ │ │
// ┌───────▼─────────┐ StreamMuxer StreamMuxer
// │ Store.Rangefeed │ RegisterRangefeedCleanUp DisconnectStreamWithError
// └───────┬─────────┘ ▲ ▲
// │ │ │
// ┌────────▼──────────┐ │ │
// │ Replica.Rangefeed │ │ │
// └────────┬──────────┘ │ │
// │ │ │
// ┌──────▼───────┐ │ │
// │ Registration ├───────────────────────────────────┘ │
// └──────┬───────┘ ScheduledProcessor.Register │
// │ │
// └─────────────────────────────────────────────────────────────────────────┘
// registration.disconnect
type StreamMuxer struct {
// taskCancel is a function to cancel StreamMuxer.run spawned in the
// background. It is called by StreamMuxer.Stop. It is expected to be called
// after StreamMuxer.Start.
taskCancel context.CancelFunc

// wg is used to coordinate async tasks spawned by StreamMuxer. Currently,
// there is only one task spawned by StreamMuxer.Start (StreamMuxer.run).
wg sync.WaitGroup

// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
// thread safety.
sender ServerStreamSender

// notifyMuxError is a buffered channel of size 1 used to signal the presence
// of muxErrors. Additional signals are dropped if the channel is already full
// so that it's non-blocking.
notifyMuxError chan struct{}

mu struct {
syncutil.Mutex
// muxErrors is a slice of mux rangefeed completion errors to be sent back
// to the client. Upon receiving the error, the client restart rangefeed
// when possible.
muxErrors []*kvpb.MuxRangeFeedEvent
}
}

// NewStreamMuxer creates a new StreamMuxer. There should only one for each
// incoming node.MuxRangefeed RPC stream.
func NewStreamMuxer(sender ServerStreamSender) *StreamMuxer {
return &StreamMuxer{
sender: sender,
notifyMuxError: make(chan struct{}, 1),
}
}

// AppendMuxError appends a mux rangefeed completion error to be sent back to
// the client. Note that this method cannot block on IO. If the underlying
// stream is broken, the error will be dropped.
func (sm *StreamMuxer) AppendMuxError(e *kvpb.MuxRangeFeedEvent) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.mu.muxErrors = append(sm.mu.muxErrors, e)
// Note that notify is non-blocking.
select {
case sm.notifyMuxError <- struct{}{}:
default:
}
}

// detachMuxErrors returns muxErrors and clears the slice. Caller must ensure
// the returned errors are sent back to the client.
func (sm *StreamMuxer) detachMuxErrors() []*kvpb.MuxRangeFeedEvent {
sm.mu.Lock()
defer sm.mu.Unlock()
toSend := sm.mu.muxErrors
sm.mu.muxErrors = nil
return toSend
}

// run forwards rangefeed completion errors back to the client. run is expected
// to be called in a goroutine and will block until the context is done or the
// stopper is quiesced. StreamMuxer will stop forward rangefeed completion
// errors after run completes, and caller is responsible for handling shutdown.
func (sm *StreamMuxer) run(ctx context.Context, stopper *stop.Stopper) {
for {
select {
case <-sm.notifyMuxError:
for _, clientErr := range sm.detachMuxErrors() {
if err := sm.sender.Send(clientErr); err != nil {
log.Errorf(ctx,
"failed to send rangefeed completion error back to client due to broken stream: %v", err)
return
}
}
case <-ctx.Done():
return
case <-stopper.ShouldQuiesce():
return
}
}
}

// Stop cancels the StreamMuxer.run task and waits for it to complete. It does
// nothing if StreamMuxer.run is already finished. It is expected to be called
// after StreamMuxer.Start. Note that the caller is responsible for handling any
// cleanups for any active streams.
func (sm *StreamMuxer) Stop() {
sm.taskCancel()
sm.wg.Wait()
}

// Start launches StreamMuxer.run in the background if no error is returned.
// StreamMuxer.run continues running until it errors or StreamMuxer.Stop is
// called. The caller is responsible for calling StreamMuxer.Stop and handle any
// cleanups for any active streams. Note that it is not valid to call Start
// multiple times or restart after Stop. Example usage:
//
// if err := streamMuxer.Start(ctx, stopper); err != nil {
// return err
// }
//
// defer streamMuxer.Stop()
func (sm *StreamMuxer) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx, sm.taskCancel = context.WithCancel(ctx)
sm.wg.Add(1)
if err := stopper.RunAsyncTask(ctx, "test-stream-muxer", func(ctx context.Context) {
defer sm.wg.Done()
sm.run(ctx, stopper)
}); err != nil {
sm.taskCancel()
sm.wg.Done()
return err
}
return nil
}
78 changes: 7 additions & 71 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher"
Expand Down Expand Up @@ -1854,79 +1855,14 @@ type lockedMuxStream struct {
sendMu syncutil.Mutex
}

func (s *lockedMuxStream) SendIsThreadSafe() {}

func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.wrapped.Send(e)
}

// newMuxRangeFeedCompletionWatcher returns 2 functions: one to forward mux
// rangefeed completion events to the sender, and a cleanup function. Mux
// rangefeed completion events can be triggered at any point, and we would like
// to avoid blocking on IO (sender.Send) during potentially critical areas.
// Thus, the forwarding should happen on a dedicated goroutine.
func newMuxRangeFeedCompletionWatcher(
ctx context.Context, stopper *stop.Stopper, send func(e *kvpb.MuxRangeFeedEvent) error,
) (doneFn func(event *kvpb.MuxRangeFeedEvent), cleanup func(), _ error) {
// structure to help coordination of event forwarding and shutdown.
var fin = struct {
syncutil.Mutex
completed []*kvpb.MuxRangeFeedEvent
signalC chan struct{}
}{
// NB: a buffer of 1 ensures we can always send a signal when rangefeed completes.
signalC: make(chan struct{}, 1),
}

// forwardCompletion listens to completion notifications and forwards
// them to the sender.
forwardCompletion := func(ctx context.Context) {
for {
select {
case <-fin.signalC:
var toSend []*kvpb.MuxRangeFeedEvent
fin.Lock()
toSend, fin.completed = fin.completed, nil
fin.Unlock()
for _, e := range toSend {
if err := send(e); err != nil {
// If we failed to send, there is nothing else we can do.
// The stream is broken anyway.
return
}
}
case <-ctx.Done():
return
case <-stopper.ShouldQuiesce():
// There is nothing we can do here; stream cancellation is usually
// triggered by the client. We don't have access to stream cancellation
// function; so, just let things proceed until the server shuts down.
return
}
}
}

var wg sync.WaitGroup
wg.Add(1)
if err := stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) {
defer wg.Done()
forwardCompletion(ctx)
}); err != nil {
return nil, nil, err
}

addCompleted := func(event *kvpb.MuxRangeFeedEvent) {
fin.Lock()
fin.completed = append(fin.completed, event)
fin.Unlock()
select {
case fin.signalC <- struct{}{}:
default:
}
}
return addCompleted, wg.Wait, nil
}

// MuxRangeFeed implements the roachpb.InternalServer interface.
func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
muxStream := &lockedMuxStream{wrapped: stream}
Expand All @@ -1936,11 +1872,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
ctx, cancel := context.WithCancel(n.AnnotateCtx(stream.Context()))
defer cancel()

rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(ctx, n.stopper, muxStream.Send)
if err != nil {
streamMuxer := rangefeed.NewStreamMuxer(muxStream)
if err := streamMuxer.Start(ctx, n.stopper); err != nil {
return err
}
defer cleanup()
defer streamMuxer.Stop()

n.metrics.NumMuxRangeFeed.Inc(1)
n.metrics.ActiveMuxRangeFeed.Inc(1)
Expand Down Expand Up @@ -2022,7 +1958,7 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
// raftMu in processor) may be held. Issuing potentially blocking IO
// during that time is not a good idea. Thus, we shunt the notification to
// a dedicated goroutine.
rangefeedCompleted(e)
streamMuxer.AppendMuxError(e)
})
}
}
Expand Down

0 comments on commit 0ed32cc

Please sign in to comment.