Skip to content

Commit

Permalink
kvcoord: Implement CloseStream for MuxRangeFeed
Browse files Browse the repository at this point in the history
Extend MuxRangeFeed protocol to support explicit,
caller initiated CloseStream operation.

The caller may decide to stop receiving events
for a particular stream, which is part of MuxRangeFeed.
The caller may issue a request to MuxRangeFeed server
to close the stream.  The server will cancel underlying
range feed, and return a `RangeFeedRetryError_REASON_RANGEFEED_CLOSED`
error as a response.

Note, current mux rangefeed clinet does not use this request.
The code to support cancellation is added pre-emptively in case
this functionality will be required in the future to support
restarts due to stuck rangefeeds.

Epic: CRDB-26372
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Aug 8, 2023
1 parent 782a301 commit 8df23f2
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 3,823 deletions.
427 changes: 0 additions & 427 deletions go.mod

Large diffs are not rendered by default.

3,378 changes: 0 additions & 3,378 deletions go.sum

Large diffs are not rendered by default.

28 changes: 27 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -308,6 +310,17 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
}
continue
}

if m.cfg.knobs.captureMuxRangeFeedRequestSender != nil {
m.cfg.knobs.captureMuxRangeFeedRequestSender(
args.Replica.NodeID,
func(req *kvpb.RangeFeedRequest) error {
conn.mu.Lock()
defer conn.mu.Unlock()
return conn.mu.sender.Send(req)
})
}

return nil
}

Expand Down Expand Up @@ -441,7 +454,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
}

if m.cfg.knobs.onRangefeedEvent != nil {
skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, &event.RangeFeedEvent)
skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, event.StreamID, &event.RangeFeedEvent)
if err != nil {
return err
}
Expand Down Expand Up @@ -604,3 +617,16 @@ func (c *muxStream) close() []*activeMuxRangeFeed {

return toRestart
}

// NewCloseStreamRequest returns a mux rangefeed request to close specified stream.
func NewCloseStreamRequest(
ctx context.Context, st *cluster.Settings, streamID int64,
) (*kvpb.RangeFeedRequest, error) {
if !st.Version.IsActive(ctx, clusterversion.V23_2) {
return nil, errors.Newf("CloseStream request requires cluster version 23.2 or above, found %s", st.Version)
}
return &kvpb.RangeFeedRequest{
StreamID: streamID,
CloseStream: true,
}, nil
}
20 changes: 17 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ type rangeFeedConfig struct {
// onRangefeedEvent invoked on each rangefeed event.
// Returns boolean indicating if event should be skipped or an error
// indicating if rangefeed should terminate.
onRangefeedEvent func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error)
// streamID set only for mux rangefeed.
onRangefeedEvent func(ctx context.Context, s roachpb.Span, muxStreamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error)
// metrics overrides rangefeed metrics to use.
metrics *DistSenderRangeFeedMetrics
// captureMuxRangeFeedRequestSender is a callback invoked when mux
// rangefeed establishes connection to the node.
captureMuxRangeFeedRequestSender func(nodeID roachpb.NodeID, sender func(req *kvpb.RangeFeedRequest) error)
}
}

Expand Down Expand Up @@ -795,7 +799,7 @@ func (ds *DistSender) singleRangeFeed(
}

if cfg.knobs.onRangefeedEvent != nil {
skip, err := cfg.knobs.onRangefeedEvent(ctx, span, event)
skip, err := cfg.knobs.onRangefeedEvent(ctx, span, 0 /*streamID */, event)
if err != nil {
return args.Timestamp, err
}
Expand Down Expand Up @@ -871,7 +875,7 @@ var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity")

// TestingWithOnRangefeedEvent returns a test only option to modify rangefeed event.
func TestingWithOnRangefeedEvent(
fn func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error),
fn func(ctx context.Context, s roachpb.Span, streamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error),
) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.onRangefeedEvent = fn
Expand All @@ -886,5 +890,15 @@ func TestingWithRangeFeedMetrics(m *DistSenderRangeFeedMetrics) RangeFeedOption
})
}

// TestingWithMuxRangeFeedRequestSenderCapture returns a test only option to specify a callback
// that will be invoked when mux establishes connection to a node.
func TestingWithMuxRangeFeedRequestSenderCapture(
fn func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error),
) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.captureMuxRangeFeedRequestSender = fn
})
}

// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics
138 changes: 136 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvcoord_test

import (
"context"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -629,7 +631,7 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
enoughErrors := make(chan struct{})
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, true,
kvcoord.TestingWithOnRangefeedEvent(
func(_ context.Context, _ roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
func(_ context.Context, _ roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
*event = transientErrEvent
if numErrors.Add(1) == numErrsToReturn {
close(enoughErrors)
Expand Down Expand Up @@ -727,7 +729,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, ignoreValues, useMux,
kvcoord.TestingWithRangeFeedMetrics(&metrics),
kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
switch t := event.GetValue().(type) {
case *kvpb.RangeFeedValue:
// If we previously arranged for the range to be skipped (stuck catchup scan),
Expand Down Expand Up @@ -802,3 +804,135 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value())
})
}

// TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed.
func TestMuxRangeFeedCanCloseStream(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

ts := tc.Server(0)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

// Insert 1000 rows, and split them into 10 ranges.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`,
)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo")
fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

frontier, err := span.MakeFrontier(fooSpan)
require.NoError(t, err)

expectFrontierAdvance := func() {
t.Helper()
// Closed timestamp for range advances every100ms. We'll require frontier to
// advance a bit more thn that.
threshold := frontier.Frontier().AddDuration(250 * time.Millisecond)
testutils.SucceedsWithin(t, func() error {
if frontier.Frontier().Less(threshold) {
return errors.Newf("waiting for frontier advance to at least %s", threshold)
}
return nil
}, 10*time.Second)
}

var observedStreams sync.Map
var capturedSender atomic.Value

ignoreValues := func(event kvcoord.RangeFeedMessage) {}
var numRestartStreams atomic.Int32

closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, true,
kvcoord.WithMuxRangeFeed(),
kvcoord.TestingWithMuxRangeFeedRequestSenderCapture(
// We expect a single mux sender since we have 1 node in this test.
func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error) {
capturedSender.Store(capture)
},
),
kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, streamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
switch t := event.GetValue().(type) {
case *kvpb.RangeFeedCheckpoint:
observedStreams.Store(streamID, nil)
_, err := frontier.Forward(t.Span, t.ResolvedTS)
if err != nil {
return true, err
}
case *kvpb.RangeFeedError:
// Keep track of mux errors due to RangeFeedRetryError_REASON_RANGEFEED_CLOSED.
// Those results when we issue CloseStream request.
err := t.Error.GoError()
log.Infof(ctx, "Got err: %v", err)
var retryErr *kvpb.RangeFeedRetryError
if ok := errors.As(err, &retryErr); ok && retryErr.Reason == kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED {
numRestartStreams.Add(1)
}
}

return false, nil
}),
)
defer closeFeed()

// Wait until we capture mux rangefeed request sender. There should only be 1.
var muxRangeFeedRequestSender func(req *kvpb.RangeFeedRequest) error
testutils.SucceedsWithin(t, func() error {
v, ok := capturedSender.Load().(func(request *kvpb.RangeFeedRequest) error)
if ok {
muxRangeFeedRequestSender = v
return nil
}
return errors.New("waiting to capture mux rangefeed request sender.")
}, 10*time.Second)

cancelledStreams := make(map[int64]struct{})
for i := 0; i < 5; i++ {
// Wait for the test frontier to advance. Once it advances,
// we know the rangefeed is started, all ranges are running.
expectFrontierAdvance()

// Pick some number of streams to close. Since sync.Map iteration order is non-deterministic,
// we'll pick few random streams.
initialClosed := numRestartStreams.Load()
numToCancel := 1 + rand.Int31n(3)
var numCancelled int32 = 0
observedStreams.Range(func(key any, _ any) bool {
streamID := key.(int64)
if _, wasCancelled := cancelledStreams[streamID]; wasCancelled {
return true // try another stream.
}
numCancelled++
cancelledStreams[streamID] = struct{}{}
req, err := kvcoord.NewCloseStreamRequest(ctx, ts.ClusterSettings(), streamID)
require.NoError(t, err)
require.NoError(t, muxRangeFeedRequestSender(req))
return numCancelled < numToCancel
})

// Observe numToCancel errors.
testutils.SucceedsWithin(t, func() error {
numRestarted := numRestartStreams.Load()
if numRestarted == initialClosed+numCancelled {
return nil
}
return errors.Newf("waiting for %d streams to be closed (%d so far)", numCancelled, numRestarted-initialClosed)
}, 10*time.Second)

// When we close the stream(s), the rangefeed server responds with a retryable error.
// Mux rangefeed should retry, and thus we expect frontier to keep advancing.
}
}
5 changes: 5 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2986,6 +2986,11 @@ message RangeFeedRequest {

// StreamID is set by the client issuing MuxRangeFeed requests.
int64 stream_id = 5 [(gogoproto.customname) = "StreamID"];
// CloseStream is set by the mux RangeFeed client to indicate that
// the server should close the stream with the specified stream_id.
// When this bit is set, the server should attempt, as best effort, to
// quickly terminate rangefeed for this stream.
bool close_stream = 6;
}

// RangeFeedValue is a variant of RangeFeedEvent that represents an update to
Expand Down
Loading

0 comments on commit 8df23f2

Please sign in to comment.