From 64ba572b89282de042d04bce666c14bfb81acc87 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Wed, 27 Jan 2021 23:13:34 -0500 Subject: [PATCH] kvserver, kvclient: allow Rangefeeds to run over non-voting replicas Release note: None --- .../kvclient/kvcoord/dist_sender_rangefeed.go | 3 +- pkg/kv/kvserver/replica_rangefeed_test.go | 420 +++++++++--------- 2 files changed, 215 insertions(+), 208 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index db8acb309da8..7bf6a89efb80 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -229,8 +229,7 @@ func (ds *DistSender) singleRangeFeed( if ds.rpcContext != nil { latencyFn = ds.rpcContext.RemoteClocks.Latency } - // TODO(aayush): We should enable creating RangeFeeds on non-voting replicas. - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, OnlyPotentialLeaseholders) + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) if err != nil { return args.Timestamp, err } diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 0940d83491fa..7f78ef5708aa 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -101,235 +101,243 @@ func TestReplicaRangefeed(t *testing.T) { kvserver.RangefeedEnabled.Override(&settings.SV, true) args.ServerArgsPerNode[i] = base.TestServerArgs{Settings: settings} } - tc := testcluster.StartTestCluster(t, numNodes, args) - defer tc.Stopper().Stop(ctx) - ts := tc.Servers[0] - firstStore, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) - if pErr != nil { - t.Fatal(pErr) - } + testutils.RunTrueAndFalse(t, "with non voters", func(t *testing.T, withNonVoters bool) { + tc := testcluster.StartTestCluster(t, numNodes, args) + defer tc.Stopper().Stop(ctx) - db := firstStore.DB().NonTransactionalSender() + ts := tc.Servers[0] + firstStore, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } - // Split the range so that the RHS uses epoch-based leases. - startKey := []byte("a") - tc.SplitRangeOrFatal(t, startKey) - tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2)) - if pErr := tc.WaitForVoters(startKey, tc.Target(1), tc.Target(2)); pErr != nil { - t.Fatalf("Unexpected error waiting for replication: %v", pErr) - } - rangeID := firstStore.LookupReplica(startKey).RangeID - - // Insert a key before starting the rangefeeds. - initTime := ts.Clock().Now() - ts1 := initTime.Add(0, 1) - incArgs := incrementArgs(roachpb.Key("b"), 9) - if _, pErr := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts1}, incArgs); pErr != nil { - t.Fatal(pErr) - } - tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9}) + db := firstStore.DB().NonTransactionalSender() - replNum := 3 - streams := make([]*testStream, replNum) - streamErrC := make(chan *roachpb.Error, replNum) - rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")} - for i := 0; i < replNum; i++ { - stream := newTestStream() - streams[i] = stream - ts := tc.Servers[i] - store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) - if err != nil { - t.Fatal(err) + // Split the range so that the RHS uses epoch-based leases. + startKey := []byte("a") + tc.SplitRangeOrFatal(t, startKey) + + if withNonVoters { + tc.AddNonVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2)) + } else { + tc.AddVotersOrFatal(t, startKey, tc.Target(1), tc.Target(2)) + if pErr := tc.WaitForVoters(startKey, tc.Target(1), tc.Target(2)); pErr != nil { + t.Fatalf("Unexpected error waiting for replication: %v", pErr) + } } - go func(i int) { - req := roachpb.RangeFeedRequest{ - Header: roachpb.Header{ - Timestamp: initTime, - RangeID: rangeID, - }, - Span: rangefeedSpan, - WithDiff: true, + rangeID := firstStore.LookupReplica(startKey).RangeID + + // Insert a key before starting the rangefeeds. + initTime := ts.Clock().Now() + ts1 := initTime.Add(0, 1) + incArgs := incrementArgs(roachpb.Key("b"), 9) + if _, pErr := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts1}, incArgs); pErr != nil { + t.Fatal(pErr) + } + tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9}) + + replNum := 3 + streams := make([]*testStream, replNum) + streamErrC := make(chan *roachpb.Error, replNum) + rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")} + for i := 0; i < replNum; i++ { + stream := newTestStream() + streams[i] = stream + ts := tc.Servers[i] + store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) + if err != nil { + t.Fatal(err) } - pErr := store.RangeFeed(&req, stream) - streamErrC <- pErr - }(i) - } + go func(i int) { + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + Timestamp: initTime, + RangeID: rangeID, + }, + Span: rangefeedSpan, + WithDiff: true, + } + pErr := store.RangeFeed(&req, stream) + streamErrC <- pErr + }(i) + } + + checkForExpEvents := func(expEvents []*roachpb.RangeFeedEvent) { + t.Helper() + for i, stream := range streams { + var events []*roachpb.RangeFeedEvent + testutils.SucceedsSoon(t, func() error { + if len(streamErrC) > 0 { + // Break if the error channel is already populated. + return nil + } - checkForExpEvents := func(expEvents []*roachpb.RangeFeedEvent) { - t.Helper() - for i, stream := range streams { - var events []*roachpb.RangeFeedEvent - testutils.SucceedsSoon(t, func() error { - if len(streamErrC) > 0 { - // Break if the error channel is already populated. + events = stream.Events() + if len(events) < len(expEvents) { + return errors.Errorf("too few events: %v", events) + } return nil - } + }) - events = stream.Events() - if len(events) < len(expEvents) { - return errors.Errorf("too few events: %v", events) + if len(streamErrC) > 0 { + t.Fatalf("unexpected error from stream: %v", <-streamErrC) + } + if !reflect.DeepEqual(events, expEvents) { + t.Fatalf("incorrect events on stream %d, found %v, want %v", i, events, expEvents) } - return nil - }) - - if len(streamErrC) > 0 { - t.Fatalf("unexpected error from stream: %v", <-streamErrC) - } - if !reflect.DeepEqual(events, expEvents) { - t.Fatalf("incorrect events on stream %d, found %v, want %v", i, events, expEvents) } } - } - - // Wait for all streams to observe the catch-up related events. - expVal1 := roachpb.Value{Timestamp: ts1} - expVal1.SetInt(9) - expVal1.InitChecksum(roachpb.Key("b")) - expEvents := []*roachpb.RangeFeedEvent{ - {Val: &roachpb.RangeFeedValue{ - Key: roachpb.Key("b"), Value: expVal1, - }}, - {Checkpoint: &roachpb.RangeFeedCheckpoint{ - Span: rangefeedSpan, - ResolvedTS: hlc.Timestamp{}, - }}, - } - checkForExpEvents(expEvents) - - // Insert a key non-transactionally. - ts2 := initTime.Add(0, 2) - pArgs := putArgs(roachpb.Key("c"), []byte("val2")) - _, err := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts2}, pArgs) - if err != nil { - t.Fatal(err) - } - server1 := tc.Servers[1] - store1, pErr := server1.Stores().GetStore(server1.GetFirstStoreID()) - if pErr != nil { - t.Fatal(pErr) - } - // Insert a second key transactionally. - ts3 := initTime.Add(0, 3) - if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts3) - return txn.Put(ctx, roachpb.Key("m"), []byte("val3")) - }); err != nil { - t.Fatal(err) - } - // Read to force intent resolution. - if _, err := store1.DB().Get(ctx, roachpb.Key("m")); err != nil { - t.Fatal(err) - } + // Wait for all streams to observe the catch-up related events. + expVal1 := roachpb.Value{Timestamp: ts1} + expVal1.SetInt(9) + expVal1.InitChecksum(roachpb.Key("b")) + expEvents := []*roachpb.RangeFeedEvent{ + {Val: &roachpb.RangeFeedValue{ + Key: roachpb.Key("b"), Value: expVal1, + }}, + {Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: rangefeedSpan, + ResolvedTS: hlc.Timestamp{}, + }}, + } + checkForExpEvents(expEvents) - // Update the originally incremented key non-transactionally. - ts4 := initTime.Add(0, 4) - _, err = kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts4}, incArgs) - if err != nil { - t.Fatal(err) - } + // Insert a key non-transactionally. + ts2 := initTime.Add(0, 2) + pArgs := putArgs(roachpb.Key("c"), []byte("val2")) + _, err := kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts2}, pArgs) + if err != nil { + t.Fatal(err) + } - // Update the originally incremented key transactionally. - ts5 := initTime.Add(0, 5) - if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - txn.SetFixedTimestamp(ctx, ts5) - _, err := txn.Inc(ctx, incArgs.Key, 7) - return err - }); err != nil { - t.Fatal(err) - } - // Read to force intent resolution. - if _, err := store1.DB().Get(ctx, roachpb.Key("b")); err != nil { - t.Fatal(err) - } + server1 := tc.Servers[1] + store1, pErr := server1.Stores().GetStore(server1.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + // Insert a second key transactionally. + ts3 := initTime.Add(0, 3) + if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetFixedTimestamp(ctx, ts3) + return txn.Put(ctx, roachpb.Key("m"), []byte("val3")) + }); err != nil { + t.Fatal(err) + } + // Read to force intent resolution. + if _, err := store1.DB().Get(ctx, roachpb.Key("m")); err != nil { + t.Fatal(err) + } - // Wait for all streams to observe the expected events. - expVal2 := roachpb.MakeValueFromBytesAndTimestamp([]byte("val2"), ts2) - expVal3 := roachpb.MakeValueFromBytesAndTimestamp([]byte("val3"), ts3) - expVal3.InitChecksum([]byte("m")) // client.Txn sets value checksum - expVal4 := roachpb.Value{Timestamp: ts4} - expVal4.SetInt(18) - expVal4.InitChecksum(roachpb.Key("b")) - expVal5 := roachpb.Value{Timestamp: ts5} - expVal5.SetInt(25) - expVal5.InitChecksum(roachpb.Key("b")) - expVal1NoTS, expVal4NoTS := expVal1, expVal4 - expVal1NoTS.Timestamp, expVal4NoTS.Timestamp = hlc.Timestamp{}, hlc.Timestamp{} - expEvents = append(expEvents, []*roachpb.RangeFeedEvent{ - {Val: &roachpb.RangeFeedValue{ - Key: roachpb.Key("c"), Value: expVal2, - }}, - {Val: &roachpb.RangeFeedValue{ - Key: roachpb.Key("m"), Value: expVal3, - }}, - {Val: &roachpb.RangeFeedValue{ - Key: roachpb.Key("b"), Value: expVal4, PrevValue: expVal1NoTS, - }}, - {Val: &roachpb.RangeFeedValue{ - Key: roachpb.Key("b"), Value: expVal5, PrevValue: expVal4NoTS, - }}, - }...) - checkForExpEvents(expEvents) - - // Cancel each of the rangefeed streams. - for _, stream := range streams { - stream.Cancel() + // Update the originally incremented key non-transactionally. + ts4 := initTime.Add(0, 4) + _, err = kv.SendWrappedWith(ctx, db, roachpb.Header{Timestamp: ts4}, incArgs) + if err != nil { + t.Fatal(err) + } - pErr := <-streamErrC - if !testutils.IsPError(pErr, "context canceled") { - t.Fatalf("got error for RangeFeed: %v", pErr) + // Update the originally incremented key transactionally. + ts5 := initTime.Add(0, 5) + if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetFixedTimestamp(ctx, ts5) + _, err := txn.Inc(ctx, incArgs.Key, 7) + return err + }); err != nil { + t.Fatal(err) + } + // Read to force intent resolution. + if _, err := store1.DB().Get(ctx, roachpb.Key("b")); err != nil { + t.Fatal(err) } - } - // Bump the GC threshold and assert that RangeFeed below the timestamp will - // catch an error. - gcReq := &roachpb.GCRequest{ - Threshold: initTime.Add(0, 1), - } - gcReq.Key = startKey - gcReq.EndKey = firstStore.LookupReplica(startKey).Desc().EndKey.AsRawKey() - var ba roachpb.BatchRequest - ba.RangeID = rangeID - ba.Add(gcReq) - if _, pErr := firstStore.Send(ctx, ba); pErr != nil { - t.Fatal(pErr) - } + // Wait for all streams to observe the expected events. + expVal2 := roachpb.MakeValueFromBytesAndTimestamp([]byte("val2"), ts2) + expVal3 := roachpb.MakeValueFromBytesAndTimestamp([]byte("val3"), ts3) + expVal3.InitChecksum([]byte("m")) // client.Txn sets value checksum + expVal4 := roachpb.Value{Timestamp: ts4} + expVal4.SetInt(18) + expVal4.InitChecksum(roachpb.Key("b")) + expVal5 := roachpb.Value{Timestamp: ts5} + expVal5.SetInt(25) + expVal5.InitChecksum(roachpb.Key("b")) + expVal1NoTS, expVal4NoTS := expVal1, expVal4 + expVal1NoTS.Timestamp, expVal4NoTS.Timestamp = hlc.Timestamp{}, hlc.Timestamp{} + expEvents = append(expEvents, []*roachpb.RangeFeedEvent{ + {Val: &roachpb.RangeFeedValue{ + Key: roachpb.Key("c"), Value: expVal2, + }}, + {Val: &roachpb.RangeFeedValue{ + Key: roachpb.Key("m"), Value: expVal3, + }}, + {Val: &roachpb.RangeFeedValue{ + Key: roachpb.Key("b"), Value: expVal4, PrevValue: expVal1NoTS, + }}, + {Val: &roachpb.RangeFeedValue{ + Key: roachpb.Key("b"), Value: expVal5, PrevValue: expVal4NoTS, + }}, + }...) + checkForExpEvents(expEvents) + + // Cancel each of the rangefeed streams. + for _, stream := range streams { + stream.Cancel() + + pErr := <-streamErrC + if !testutils.IsPError(pErr, "context canceled") { + t.Fatalf("got error for RangeFeed: %v", pErr) + } + } - req := roachpb.RangeFeedRequest{ - Header: roachpb.Header{ - Timestamp: initTime, - RangeID: rangeID, - }, - Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, - } + // Bump the GC threshold and assert that RangeFeed below the timestamp will + // catch an error. + gcReq := &roachpb.GCRequest{ + Threshold: initTime.Add(0, 1), + } + gcReq.Key = startKey + gcReq.EndKey = firstStore.LookupReplica(startKey).Desc().EndKey.AsRawKey() + var ba roachpb.BatchRequest + ba.RangeID = rangeID + ba.Add(gcReq) + if _, pErr := firstStore.Send(ctx, ba); pErr != nil { + t.Fatal(pErr) + } - testutils.SucceedsSoon(t, func() error { - for i := 0; i < replNum; i++ { - ts := tc.Servers[i] - store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) - if pErr != nil { - t.Fatal(pErr) - } - repl := store.LookupReplica(startKey) - if repl == nil { - return errors.Errorf("replica not found on node #%d", i+1) - } - if cur := repl.GetGCThreshold(); cur.Less(gcReq.Threshold) { - return errors.Errorf("%s has GCThreshold %s < %s; hasn't applied the bump yet", repl, cur, gcReq.Threshold) - } - stream := newTestStream() - timer := time.AfterFunc(10*time.Second, stream.Cancel) - defer timer.Stop() - defer stream.Cancel() + req := roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + Timestamp: initTime, + RangeID: rangeID, + }, + Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, + } - if pErr := store.RangeFeed(&req, stream); !testutils.IsPError( - pErr, `must be after replica GC threshold`, - ) { - return pErr.GoError() + testutils.SucceedsSoon(t, func() error { + for i := 0; i < replNum; i++ { + ts := tc.Servers[i] + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + repl := store.LookupReplica(startKey) + if repl == nil { + return errors.Errorf("replica not found on node #%d", i+1) + } + if cur := repl.GetGCThreshold(); cur.Less(gcReq.Threshold) { + return errors.Errorf("%s has GCThreshold %s < %s; hasn't applied the bump yet", repl, cur, gcReq.Threshold) + } + stream := newTestStream() + timer := time.AfterFunc(10*time.Second, stream.Cancel) + defer timer.Stop() + defer stream.Cancel() + + if pErr := store.RangeFeed(&req, stream); !testutils.IsPError( + pErr, `must be after replica GC threshold`, + ) { + return pErr.GoError() + } } - } - return nil + return nil + }) }) }