Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: Implement CloseStream for MuxRangeFeed #108335

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -308,6 +310,17 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
}
continue
}

if m.cfg.knobs.captureMuxRangeFeedRequestSender != nil {
m.cfg.knobs.captureMuxRangeFeedRequestSender(
args.Replica.NodeID,
func(req *kvpb.RangeFeedRequest) error {
conn.mu.Lock()
defer conn.mu.Unlock()
return conn.mu.sender.Send(req)
})
}

return nil
}

Expand Down Expand Up @@ -441,7 +454,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
}

if m.cfg.knobs.onRangefeedEvent != nil {
skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, &event.RangeFeedEvent)
skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, event.StreamID, &event.RangeFeedEvent)
if err != nil {
return err
}
Expand Down Expand Up @@ -604,3 +617,16 @@ func (c *muxStream) close() []*activeMuxRangeFeed {

return toRestart
}

// NewCloseStreamRequest returns a mux rangefeed request to close specified stream.
func NewCloseStreamRequest(
ctx context.Context, st *cluster.Settings, streamID int64,
) (*kvpb.RangeFeedRequest, error) {
if !st.Version.IsActive(ctx, clusterversion.V23_2) {
return nil, errors.Newf("CloseStream request requires cluster version 23.2 or above, found %s", st.Version)
}
return &kvpb.RangeFeedRequest{
StreamID: streamID,
CloseStream: true,
}, nil
}
20 changes: 17 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ type rangeFeedConfig struct {
// onRangefeedEvent invoked on each rangefeed event.
// Returns boolean indicating if event should be skipped or an error
// indicating if rangefeed should terminate.
onRangefeedEvent func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error)
// streamID set only for mux rangefeed.
onRangefeedEvent func(ctx context.Context, s roachpb.Span, muxStreamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error)
// metrics overrides rangefeed metrics to use.
metrics *DistSenderRangeFeedMetrics
// captureMuxRangeFeedRequestSender is a callback invoked when mux
// rangefeed establishes connection to the node.
captureMuxRangeFeedRequestSender func(nodeID roachpb.NodeID, sender func(req *kvpb.RangeFeedRequest) error)
}
}

Expand Down Expand Up @@ -795,7 +799,7 @@ func (ds *DistSender) singleRangeFeed(
}

if cfg.knobs.onRangefeedEvent != nil {
skip, err := cfg.knobs.onRangefeedEvent(ctx, span, event)
skip, err := cfg.knobs.onRangefeedEvent(ctx, span, 0 /*streamID */, event)
if err != nil {
return args.Timestamp, err
}
Expand Down Expand Up @@ -871,7 +875,7 @@ var errRestartStuckRange = errors.New("rangefeed restarting due to inactivity")

// TestingWithOnRangefeedEvent returns a test only option to modify rangefeed event.
func TestingWithOnRangefeedEvent(
fn func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error),
fn func(ctx context.Context, s roachpb.Span, streamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error),
) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.onRangefeedEvent = fn
Expand All @@ -886,5 +890,15 @@ func TestingWithRangeFeedMetrics(m *DistSenderRangeFeedMetrics) RangeFeedOption
})
}

// TestingWithMuxRangeFeedRequestSenderCapture returns a test only option to specify a callback
// that will be invoked when mux establishes connection to a node.
func TestingWithMuxRangeFeedRequestSenderCapture(
fn func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error),
) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.captureMuxRangeFeedRequestSender = fn
})
}

// TestingMakeRangeFeedMetrics exposes makeDistSenderRangeFeedMetrics for test use.
var TestingMakeRangeFeedMetrics = makeDistSenderRangeFeedMetrics
138 changes: 136 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvcoord_test

import (
"context"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -629,7 +631,7 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
enoughErrors := make(chan struct{})
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, noValuesExpected, true,
kvcoord.TestingWithOnRangefeedEvent(
func(_ context.Context, _ roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
func(_ context.Context, _ roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
*event = transientErrEvent
if numErrors.Add(1) == numErrsToReturn {
close(enoughErrors)
Expand Down Expand Up @@ -727,7 +729,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, ignoreValues, useMux,
kvcoord.TestingWithRangeFeedMetrics(&metrics),
kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
func(ctx context.Context, s roachpb.Span, _ int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
switch t := event.GetValue().(type) {
case *kvpb.RangeFeedValue:
// If we previously arranged for the range to be skipped (stuck catchup scan),
Expand Down Expand Up @@ -802,3 +804,135 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value())
})
}

// TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed.
func TestMuxRangeFeedCanCloseStream(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

ts := tc.Server(0)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

// Insert 1000 rows, and split them into 10 ranges.
sqlDB.ExecMultiple(t,
`SET CLUSTER SETTING kv.rangefeed.enabled = true`,
`SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'`,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`,
`CREATE TABLE foo (key INT PRIMARY KEY)`,
`INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`,
`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`,
)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo")
fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

frontier, err := span.MakeFrontier(fooSpan)
require.NoError(t, err)

expectFrontierAdvance := func() {
t.Helper()
// Closed timestamp for range advances every100ms. We'll require frontier to
// advance a bit more thn that.
threshold := frontier.Frontier().AddDuration(250 * time.Millisecond)
testutils.SucceedsWithin(t, func() error {
if frontier.Frontier().Less(threshold) {
return errors.Newf("waiting for frontier advance to at least %s", threshold)
}
return nil
}, 10*time.Second)
}

var observedStreams sync.Map
var capturedSender atomic.Value

ignoreValues := func(event kvcoord.RangeFeedMessage) {}
var numRestartStreams atomic.Int32

closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, ts.Clock().Now(), ignoreValues, true,
kvcoord.WithMuxRangeFeed(),
kvcoord.TestingWithMuxRangeFeedRequestSenderCapture(
// We expect a single mux sender since we have 1 node in this test.
func(nodeID roachpb.NodeID, capture func(request *kvpb.RangeFeedRequest) error) {
capturedSender.Store(capture)
},
),
kvcoord.TestingWithOnRangefeedEvent(
func(ctx context.Context, s roachpb.Span, streamID int64, event *kvpb.RangeFeedEvent) (skip bool, _ error) {
switch t := event.GetValue().(type) {
case *kvpb.RangeFeedCheckpoint:
observedStreams.Store(streamID, nil)
_, err := frontier.Forward(t.Span, t.ResolvedTS)
if err != nil {
return true, err
}
case *kvpb.RangeFeedError:
// Keep track of mux errors due to RangeFeedRetryError_REASON_RANGEFEED_CLOSED.
// Those results when we issue CloseStream request.
err := t.Error.GoError()
log.Infof(ctx, "Got err: %v", err)
var retryErr *kvpb.RangeFeedRetryError
if ok := errors.As(err, &retryErr); ok && retryErr.Reason == kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED {
numRestartStreams.Add(1)
}
}

return false, nil
}),
)
defer closeFeed()

// Wait until we capture mux rangefeed request sender. There should only be 1.
var muxRangeFeedRequestSender func(req *kvpb.RangeFeedRequest) error
testutils.SucceedsWithin(t, func() error {
v, ok := capturedSender.Load().(func(request *kvpb.RangeFeedRequest) error)
if ok {
muxRangeFeedRequestSender = v
return nil
}
return errors.New("waiting to capture mux rangefeed request sender.")
}, 10*time.Second)

cancelledStreams := make(map[int64]struct{})
for i := 0; i < 5; i++ {
// Wait for the test frontier to advance. Once it advances,
// we know the rangefeed is started, all ranges are running.
expectFrontierAdvance()

// Pick some number of streams to close. Since sync.Map iteration order is non-deterministic,
// we'll pick few random streams.
initialClosed := numRestartStreams.Load()
numToCancel := 1 + rand.Int31n(3)
var numCancelled int32 = 0
observedStreams.Range(func(key any, _ any) bool {
streamID := key.(int64)
if _, wasCancelled := cancelledStreams[streamID]; wasCancelled {
return true // try another stream.
}
numCancelled++
cancelledStreams[streamID] = struct{}{}
req, err := kvcoord.NewCloseStreamRequest(ctx, ts.ClusterSettings(), streamID)
require.NoError(t, err)
require.NoError(t, muxRangeFeedRequestSender(req))
return numCancelled < numToCancel
})

// Observe numToCancel errors.
testutils.SucceedsWithin(t, func() error {
numRestarted := numRestartStreams.Load()
if numRestarted == initialClosed+numCancelled {
return nil
}
return errors.Newf("waiting for %d streams to be closed (%d so far)", numCancelled, numRestarted-initialClosed)
}, 10*time.Second)

// When we close the stream(s), the rangefeed server responds with a retryable error.
// Mux rangefeed should retry, and thus we expect frontier to keep advancing.
}
}
7 changes: 7 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2986,6 +2986,13 @@ message RangeFeedRequest {

// StreamID is set by the client issuing MuxRangeFeed requests.
int64 stream_id = 5 [(gogoproto.customname) = "StreamID"];
// CloseStream is set by the mux RangeFeed client to indicate that
// the server should close the stream with the specified stream_id.
// When this bit is set, the server should attempt, as best effort, to
// quickly terminate rangefeed for this stream.
// When CloseStream is set, only the StreamID must be set, and
// other fields (such as Span) are ignored.
bool close_stream = 6;
}

// RangeFeedValue is a variant of RangeFeedEvent that represents an update to
Expand Down
Loading