diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index db8acb309da8..8e4e4ced9e5e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -229,8 +229,7 @@ func (ds *DistSender) singleRangeFeed( if ds.rpcContext != nil { latencyFn = ds.rpcContext.RemoteClocks.Latency } - // TODO(aayush): We should enable creating RangeFeeds on non-voting replicas. - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, OnlyPotentialLeaseholders) + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) if err != nil { return args.Timestamp, err } @@ -256,7 +255,7 @@ func (ds *DistSender) singleRangeFeed( log.VErrEventf(ctx, 2, "RPC error: %s", err) continue } - + log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica) stream, err := client.RangeFeed(clientCtx, &args) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index 4741baa11b23..b8e1f2e8dbe0 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -13,6 +13,7 @@ package kvserver_test import ( "context" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/stretchr/testify/require" ) @@ -182,3 +184,61 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) { cancel() require.Regexp(t, context.Canceled.Error(), <-rangefeedErrChan) } + +func TestRangefeedIsRoutedToNonVoter(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + clusterArgs := aggressiveResolvedTimestampClusterArgs + // We want to manually add a non-voter to a range in this test, so disable + // the replicateQueue to prevent it from disrupting the test. + clusterArgs.ReplicationMode = base.ReplicationManual + // NB: setupClusterForClosedTSTesting sets a low closed timestamp target + // duration. + tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, + testingCloseFraction, clusterArgs, "cttest", "kv") + defer tc.Stopper().Stop(ctx) + tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1)) + + db := tc.Server(1).DB() + ds := tc.Server(1).DistSenderI().(*kvcoord.DistSender) + _, err := tc.ServerConn(1).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + + startTS := db.Clock().Now() + rangefeedCtx, rangefeedCancel := context.WithCancel(ctx) + rangefeedCtx, getRec, cancel := tracing.ContextWithRecordingSpan(rangefeedCtx, + tracing.NewTracer(), + "rangefeed over non-voter") + defer cancel() + + // Do a read on the range to make sure that the dist sender learns about the + // latest state of the range (with the new non-voter). + _, err = db.Get(ctx, desc.StartKey.AsRawKey()) + require.NoError(t, err) + + rangefeedErrChan := make(chan error, 1) + eventCh := make(chan *roachpb.RangeFeedEvent, 1000) + go func() { + rangefeedErrChan <- ds.RangeFeed( + rangefeedCtx, + desc.RSpan().AsRawSpanWithNoLocals(), + startTS, + false, /* withDiff */ + eventCh, + ) + }() + + // Wait for an event to ensure that the rangefeed is set up. + select { + case <-eventCh: + case err := <-rangefeedErrChan: + t.Fatalf("rangefeed failed with %s", err) + case <-time.After(60 * time.Second): + t.Fatalf("rangefeed initialization took too long") + } + rangefeedCancel() + require.Regexp(t, "context canceled", <-rangefeedErrChan) + require.Regexp(t, "attempting to create a RangeFeed over replica.*2NON_VOTER", getRec().String()) +} diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index de435bd8712b..b51af9d1b9a9 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -88,7 +88,7 @@ func TestReplicaRangefeed(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - const numNodes = 3 + const numNodes = 5 args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgsPerNode: make(map[int]base.TestServerArgs, numNodes), @@ -116,6 +116,7 @@ func TestReplicaRangefeed(t *testing.T) { startKey := []byte("a") tc.SplitRangeOrFatal(t, startKey) tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2)) + tc.AddNonVotersOrFatal(t, startKey, tc.Target(3), tc.Target(4)) if pErr := tc.WaitForVoters(startKey, tc.Target(1), tc.Target(2)); pErr != nil { t.Fatalf("Unexpected error waiting for replication: %v", pErr) } @@ -128,13 +129,12 @@ func TestReplicaRangefeed(t *testing.T) { if _, pErr := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts1}, incArgs); pErr != nil { t.Fatal(pErr) } - tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9}) + tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9, 9, 9}) - replNum := 3 - streams := make([]*testStream, replNum) - streamErrC := make(chan *roachpb.Error, replNum) + streams := make([]*testStream, numNodes) + streamErrC := make(chan *roachpb.Error, numNodes) rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")} - for i := 0; i < replNum; i++ { + for i := 0; i < numNodes; i++ { stream := newTestStream() streams[i] = stream ts := tc.Servers[i] @@ -308,7 +308,7 @@ func TestReplicaRangefeed(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - for i := 0; i < replNum; i++ { + for i := 0; i < numNodes; i++ { ts := tc.Servers[i] store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) if pErr != nil {