Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129227: kvserver/rangefeed: move perRangeEventSink to rangefeed pacakge r=nvanbenschoten a=wenyihu6

**kvserver/rangefeed: change p.Register to use registration interface**

Previously, we introduced a registration interface to abstract the
implementation details of buffered and unbuffered registration. This patch
updates the p.Register function to use this interface as well.

Part of: cockroachdb#126560
Release note: none

----

**kvserver/rangefeed: move perRangeEventSink to rangefeed pacakge**

Previously, perRangeEventSink was defined in pkg/server/node.go. This patch
relocates it to the rangefeed package to facilitate future commits testing
within the rangefeed package itself.

Part of: cockroachdb#126560
Release note: none

Co-authored-by: Wenyi Hu <[email protected]>
  • Loading branch information
craig[bot] and wenyihu6 committed Aug 20, 2024
2 parents ec9e227 + 2bb3625 commit ba7c94a
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 57 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"resolved_timestamp.go",
"scheduled_processor.go",
"scheduler.go",
"stream.go",
"stream_muxer.go",
"stream_muxer_test_helper.go",
"task.go",
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Stream is a object capable of transmitting RangeFeedEvents.
type Stream interface {
kvpb.RangeFeedEventSink
// Disconnect disconnects the stream with the provided error. Note that this
// function can be called by the processor worker while holding raftMu, so it
// is important that this function doesn't block IO or try acquiring locks
// that could lead to deadlocks.
Disconnect(err *kvpb.Error)
}

// registration defines an interface for registration that can be added to a
// processor registry. Implemented by bufferedRegistration.
type registration interface {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (p *ScheduledProcessor) Register(
if p.stopping {
return nil
}
if !p.Span.AsRawSpanWithNoLocals().Contains(r.span) {
if !p.Span.AsRawSpanWithNoLocals().Contains(r.getSpan()) {
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

Expand All @@ -348,8 +348,8 @@ func (p *ScheduledProcessor) Register(
if p.unregisterClient(r) {
// unreg callback is set by replica to tear down processors that have
// zero registrations left and to update event filters.
if r.unreg != nil {
r.unreg()
if f := r.getUnreg(); f != nil {
f()
}
}
}
Expand Down
78 changes: 78 additions & 0 deletions pkg/kv/kvserver/rangefeed/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// Stream is an object capable of transmitting RangeFeedEvents from a server
// rangefeed to a client.
type Stream interface {
kvpb.RangeFeedEventSink
// Disconnect disconnects the stream with the provided error. Note that this
// function can be called by the processor worker while holding raftMu, so it
// is important that this function doesn't block IO or try acquiring locks
// that could lead to deadlocks.
Disconnect(err *kvpb.Error)
}

// PerRangeEventSink is an implementation of Stream which annotates each
// response with rangeID and streamID. It is used by MuxRangeFeed.
type PerRangeEventSink struct {
ctx context.Context
rangeID roachpb.RangeID
streamID int64
wrapped *StreamMuxer
}

func NewPerRangeEventSink(
ctx context.Context, rangeID roachpb.RangeID, streamID int64, wrapped *StreamMuxer,
) *PerRangeEventSink {
return &PerRangeEventSink{
ctx: ctx,
rangeID: rangeID,
streamID: streamID,
wrapped: wrapped,
}
}

var _ kvpb.RangeFeedEventSink = (*PerRangeEventSink)(nil)
var _ Stream = (*PerRangeEventSink)(nil)

func (s *PerRangeEventSink) Context() context.Context {
return s.ctx
}

// SendIsThreadSafe is a no-op declaration method. It is a contract that the
// Send method is thread-safe. Note that Send wraps StreamMuxer which declares
// its Send method to be thread-safe.
func (s *PerRangeEventSink) SendIsThreadSafe() {}

func (s *PerRangeEventSink) Send(event *kvpb.RangeFeedEvent) error {
response := &kvpb.MuxRangeFeedEvent{
RangeFeedEvent: *event,
RangeID: s.rangeID,
StreamID: s.streamID,
}
return s.wrapped.Send(response)
}

// Disconnect implements the Stream interface. It requests the StreamMuxer to
// detach the stream. The StreamMuxer is then responsible for handling the
// actual disconnection and additional cleanup. Note that Caller should not rely
// on immediate disconnection as cleanup takes place async.
func (s *PerRangeEventSink) Disconnect(err *kvpb.Error) {
s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err)
}
45 changes: 1 addition & 44 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,44 +1925,6 @@ func (n *Node) RangeLookup(
return resp, nil
}

// perRangeEventSink is an implementation of rangefeed.Stream which annotates
// each response with rangeID and streamID. It is used by MuxRangeFeed.
type perRangeEventSink struct {
ctx context.Context
rangeID roachpb.RangeID
streamID int64
wrapped *rangefeed.StreamMuxer
}

var _ kvpb.RangeFeedEventSink = (*perRangeEventSink)(nil)
var _ rangefeed.Stream = (*perRangeEventSink)(nil)

func (s *perRangeEventSink) Context() context.Context {
return s.ctx
}

// SendIsThreadSafe is a no-op declaration method. It is a contract that the
// Send method is thread-safe. Note that Send wraps rangefeed.StreamMuxer which
// declares its Send method to be thread-safe.
func (s *perRangeEventSink) SendIsThreadSafe() {}

func (s *perRangeEventSink) Send(event *kvpb.RangeFeedEvent) error {
response := &kvpb.MuxRangeFeedEvent{
RangeFeedEvent: *event,
RangeID: s.rangeID,
StreamID: s.streamID,
}
return s.wrapped.Send(response)
}

// Disconnect implements the rangefeed.Stream interface. It requests the
// StreamMuxer to detach the stream. The StreamMuxer is then responsible for
// handling the actual disconnection and additional cleanup. Note that Caller
// should not rely on immediate disconnection as cleanup takes place async.
func (s *perRangeEventSink) Disconnect(err *kvpb.Error) {
s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err)
}

// lockedMuxStream provides support for concurrent calls to Send. The underlying
// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to
// Send.
Expand Down Expand Up @@ -2021,12 +1983,7 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID)
streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID)

streamSink := &perRangeEventSink{
ctx: streamCtx,
rangeID: req.RangeID,
streamID: req.StreamID,
wrapped: streamMuxer,
}
streamSink := rangefeed.NewPerRangeEventSink(streamCtx, req.RangeID, req.StreamID, streamMuxer)
streamMuxer.AddStream(req.StreamID, req.RangeID, cancel)

// Rangefeed attempts to register rangefeed a request over the specified
Expand Down

0 comments on commit ba7c94a

Please sign in to comment.