Skip to content

Commit

Permalink
kvserver/rangefeed: move metrics updates to stream muxer
Browse files Browse the repository at this point in the history
This patch moves the existing rangefeed metrics management to StreamMuxer
without changing any existing behaviour. The main purpose is to make future
commits cleaner.

Part of: cockroachdb#126561
Epic: none
Release note: none
  • Loading branch information
wenyihu6 committed Jul 2, 2024
1 parent 00d0810 commit 2f33ec8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 16 deletions.
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/rangefeed/stream_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Implemented by nodeMetrics.
type rangefeedMetricsRecorder interface {
IncrementRangefeedCounter()
DecrementRangefeedCounter()
}

// Note that Send must be thread safe to be called concurrently.
type severStreamSender interface {
Send(*kvpb.MuxRangeFeedEvent) error
Expand All @@ -32,7 +38,8 @@ type severStreamSender interface {
type StreamMuxer struct {
// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
// thread safety.
sender severStreamSender
sender severStreamSender
metrics rangefeedMetricsRecorder

// streamID -> streamInfo for active rangefeeds
activeStreams sync.Map
Expand All @@ -48,9 +55,10 @@ type StreamMuxer struct {
}
}

func NewStreamMuxer(sender severStreamSender) *StreamMuxer {
func NewStreamMuxer(sender severStreamSender, metrics rangefeedMetricsRecorder) *StreamMuxer {
return &StreamMuxer{
sender: sender,
metrics: metrics,
notifyMuxError: make(chan struct{}, 1),
}
}
Expand All @@ -63,7 +71,7 @@ func (sm *StreamMuxer) AddStream(streamID int64, cancel context.CancelFunc) {
if _, loaded := sm.activeStreams.LoadOrStore(streamID, cancel); loaded {
log.Fatalf(context.Background(), "stream %d already exists", streamID)
}

sm.metrics.IncrementRangefeedCounter()
}

// transformRangefeedErrToClientError converts a rangefeed error to a client
Expand Down Expand Up @@ -114,6 +122,7 @@ func (sm *StreamMuxer) DisconnectRangefeedWithError(
Error: *clientErrorEvent,
})
sm.appendMuxError(ev)
sm.metrics.DecrementRangefeedCounter()
}
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvserver/rangefeed/stream_muxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func TestStreamMuxerOnContextCancel(t *testing.T) {
stopper := stop.NewStopper()

testServerStream := newTestServerStream()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream)
testRangefeedCounter := newTestRangefeedCounter()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream, testRangefeedCounter)
defer cleanUp()
defer stopper.Stop(ctx)

cancel()
time.Sleep(10 * time.Millisecond)
expectedErrEvent := &kvpb.MuxRangeFeedEvent{
StreamID: 0,
RangeID: 1,
Expand All @@ -62,7 +64,8 @@ func TestStreamMuxer(t *testing.T) {
stopper := stop.NewStopper()

testServerStream := newTestServerStream()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream)
testRangefeedCounter := newTestRangefeedCounter()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream, testRangefeedCounter)
defer cleanUp()

// Note that this also tests that the StreamMuxer stops when the stopper is
Expand All @@ -75,7 +78,9 @@ func TestStreamMuxer(t *testing.T) {
streamCtx, cancel := context.WithCancel(context.Background())
muxer.AddStream(0, cancel)
// Note that kvpb.NewError(nil) == nil.
require.Equal(t, testRangefeedCounter.get(), int32(1))
muxer.DisconnectRangefeedWithError(streamID, rangeID, kvpb.NewError(nil))
require.Equal(t, testRangefeedCounter.get(), int32(0))
require.Equal(t, context.Canceled, streamCtx.Err())
expectedErrEvent := &kvpb.MuxRangeFeedEvent{
StreamID: streamID,
Expand Down Expand Up @@ -106,10 +111,14 @@ func TestStreamMuxer(t *testing.T) {
{2, 2, &kvpb.NodeUnavailableError{}},
}

require.Equal(t, testRangefeedCounter.get(), int32(0))

for _, muxError := range testRangefeedCompletionErrors {
muxer.AddStream(muxError.streamID, func() {})
}

require.Equal(t, testRangefeedCounter.get(), int32(3))

var wg sync.WaitGroup
for _, muxError := range testRangefeedCompletionErrors {
wg.Add(1)
Expand All @@ -135,6 +144,7 @@ func TestStreamMuxer(t *testing.T) {
return errors.Newf("expected error %v not found", muxError)
})
}
require.Equal(t, testRangefeedCounter.get(), int32(0))
})
}

Expand All @@ -146,7 +156,8 @@ func TestStreamMuxerOnBlockingIO(t *testing.T) {
stopper := stop.NewStopper()

testServerStream := newTestServerStream()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream)
testRangefeedCounter := newTestRangefeedCounter()
muxer, cleanUp := NewTestStreamMuxer(t, ctx, stopper, testServerStream, testRangefeedCounter)
defer cleanUp()
defer stopper.Stop(ctx)

Expand Down
31 changes: 29 additions & 2 deletions pkg/kv/kvserver/rangefeed/stream_muxer_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,35 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// testRangefeedCounter mocks nodeMetrics for testing.
type testRangefeedCounter struct {
count atomic.Int32
}

func newTestRangefeedCounter() *testRangefeedCounter {
return &testRangefeedCounter{}
}

func (c *testRangefeedCounter) IncrementRangefeedCounter() {
c.count.Add(1)
}

func (c *testRangefeedCounter) DecrementRangefeedCounter() {
c.count.Add(-1)
}

func (c *testRangefeedCounter) get() int32 {
return c.count.Load()
}

// testServerStream mocks grpc server stream for testing.
type testServerStream struct {
syncutil.Mutex
Expand Down Expand Up @@ -96,9 +119,13 @@ func (s *testServerStream) BlockSend() func() {
// defer cleanUp()
// defer stopper.Stop(ctx) // or defer cancel() - important to stop the muxer before cleanUp()
func NewTestStreamMuxer(
t *testing.T, ctx context.Context, stopper *stop.Stopper, sender severStreamSender,
t *testing.T,
ctx context.Context,
stopper *stop.Stopper,
sender severStreamSender,
metrics rangefeedMetricsRecorder,
) (muxer *StreamMuxer, cleanUp func()) {
muxer = NewStreamMuxer(sender)
muxer = NewStreamMuxer(sender, metrics)
var wg sync.WaitGroup
wg.Add(1)
if err := stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) {
Expand Down
18 changes: 10 additions & 8 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchResponse(
}
}

func (nm nodeMetrics) IncrementRangefeedCounter() {
nm.NumMuxRangeFeed.Inc(1)
nm.ActiveMuxRangeFeed.Inc(1)
}

func (nm nodeMetrics) DecrementRangefeedCounter() {
nm.ActiveMuxRangeFeed.Dec(1)
}

// A Node manages a map of stores (by store ID) for which it serves
// traffic. A node is the top-level data structure. There is one node
// instance per process. A node accepts incoming RPCs and services
Expand Down Expand Up @@ -1875,7 +1884,7 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
ctx, cancel := context.WithCancel(n.AnnotateCtx(stream.Context()))
defer cancel()

streamMuxer := rangefeed.NewStreamMuxer(muxStream)
streamMuxer := rangefeed.NewStreamMuxer(muxStream, n.metrics)
wg.Add(1)
if err := n.stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) {
defer wg.Done()
Expand All @@ -1885,10 +1894,6 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
return err
}

n.metrics.NumMuxRangeFeed.Inc(1)
n.metrics.ActiveMuxRangeFeed.Inc(1)
defer n.metrics.ActiveMuxRangeFeed.Inc(-1)

for {
req, err := stream.Recv()
if err != nil {
Expand Down Expand Up @@ -1917,11 +1922,8 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
}
streamMuxer.AddStream(req.StreamID, cancel)

n.metrics.NumMuxRangeFeed.Inc(1)
n.metrics.ActiveMuxRangeFeed.Inc(1)
f := n.stores.RangeFeed(req, streamSink)
f.WhenReady(func(err error) {
n.metrics.ActiveMuxRangeFeed.Inc(-1)
streamMuxer.DisconnectRangefeedWithError(req.StreamID, req.RangeID, kvpb.NewError(err))
})
}
Expand Down

0 comments on commit 2f33ec8

Please sign in to comment.