Skip to content

Commit

Permalink
kvserver, kvclient: allow Rangefeeds to run over non-voting replicas
Browse files Browse the repository at this point in the history
Prior to this commit, RangeFeeds only worked over voting replicas. This
included followers as well as the leaseholder. This commit allows
rangefeeds to run over non-voting replicas as well.

Release justification: low risk high reward change to be able to fuel
CDC streams using non-voting replicas

Release note: None
  • Loading branch information
aayushshah15 committed Mar 3, 2021
1 parent 9bf97ac commit 839de54
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 10 deletions.
5 changes: 2 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ func (ds *DistSender) singleRangeFeed(
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
// TODO(aayush): We should enable creating RangeFeeds on non-voting replicas.
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, OnlyPotentialLeaseholders)
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas)
if err != nil {
return args.Timestamp, err
}
Expand All @@ -256,7 +255,7 @@ func (ds *DistSender) singleRangeFeed(
log.VErrEventf(ctx, 2, "RPC error: %s", err)
continue
}

log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica)
stream, err := client.RangeFeed(clientCtx, &args)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
Expand Down
60 changes: 60 additions & 0 deletions pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver_test
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -182,3 +184,61 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) {
cancel()
require.Regexp(t, context.Canceled.Error(), <-rangefeedErrChan)
}

func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
clusterArgs := aggressiveResolvedTimestampClusterArgs
// We want to manually add a non-voter to a range in this test, so disable
// the replicateQueue to prevent it from disrupting the test.
clusterArgs.ReplicationMode = base.ReplicationManual
// NB: setupClusterForClosedTSTesting sets a low closed timestamp target
// duration.
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration,
testingCloseFraction, clusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1))

db := tc.Server(1).DB()
ds := tc.Server(1).DistSenderI().(*kvcoord.DistSender)
_, err := tc.ServerConn(1).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)

startTS := db.Clock().Now()
rangefeedCtx, rangefeedCancel := context.WithCancel(ctx)
rangefeedCtx, getRec, cancel := tracing.ContextWithRecordingSpan(rangefeedCtx,
tracing.NewTracer(),
"rangefeed over non-voter")
defer cancel()

// Do a read on the range to make sure that the dist sender learns about the
// latest state of the range (with the new non-voter).
_, err = db.Get(ctx, desc.StartKey.AsRawKey())
require.NoError(t, err)

rangefeedErrChan := make(chan error, 1)
eventCh := make(chan *roachpb.RangeFeedEvent, 1000)
go func() {
rangefeedErrChan <- ds.RangeFeed(
rangefeedCtx,
desc.RSpan().AsRawSpanWithNoLocals(),
startTS,
false, /* withDiff */
eventCh,
)
}()

// Wait for an event to ensure that the rangefeed is set up.
select {
case <-eventCh:
case err := <-rangefeedErrChan:
t.Fatalf("rangefeed failed with %s", err)
case <-time.After(60 * time.Second):
t.Fatalf("rangefeed initialization took too long")
}
rangefeedCancel()
require.Regexp(t, "context canceled", <-rangefeedErrChan)
require.Regexp(t, "attempting to create a RangeFeed over replica.*2NON_VOTER", getRec().String())
}
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestReplicaRangefeed(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
const numNodes = 5
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: make(map[int]base.TestServerArgs, numNodes),
Expand Down Expand Up @@ -116,6 +116,7 @@ func TestReplicaRangefeed(t *testing.T) {
startKey := []byte("a")
tc.SplitRangeOrFatal(t, startKey)
tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2))
tc.AddNonVotersOrFatal(t, startKey, tc.Target(3), tc.Target(4))
if pErr := tc.WaitForVoters(startKey, tc.Target(1), tc.Target(2)); pErr != nil {
t.Fatalf("Unexpected error waiting for replication: %v", pErr)
}
Expand All @@ -128,13 +129,12 @@ func TestReplicaRangefeed(t *testing.T) {
if _, pErr := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts1}, incArgs); pErr != nil {
t.Fatal(pErr)
}
tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9})
tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9, 9, 9})

replNum := 3
streams := make([]*testStream, replNum)
streamErrC := make(chan *roachpb.Error, replNum)
streams := make([]*testStream, numNodes)
streamErrC := make(chan *roachpb.Error, numNodes)
rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}
for i := 0; i < replNum; i++ {
for i := 0; i < numNodes; i++ {
stream := newTestStream()
streams[i] = stream
ts := tc.Servers[i]
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestReplicaRangefeed(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
for i := 0; i < replNum; i++ {
for i := 0; i < numNodes; i++ {
ts := tc.Servers[i]
store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID())
if pErr != nil {
Expand Down

0 comments on commit 839de54

Please sign in to comment.