Skip to content

Commit

Permalink
kvserver: rename RaftMessageHandler to IncomingRaftMessageHandler
Browse files Browse the repository at this point in the history
The commit renames `RaftMessageHandler`, `Listen`, and `Stop` to
`IncomingRaftMessageHandler`, `ListenIncomingRaftMessages`, and
`StopIncomingRaftMessages`. Another PR is introducing a new interface
`OutgoingRaftMessageHandler`, dedicated to managing messages sent. The main
purpose of this PR is to make the future PR cleaner by handling the renaming
process. Note that this commit does not change any existing functionality.

Part of: cockroachdb#103983

Related: cockroachdb#105122

Release Note: None
  • Loading branch information
wenyihu6 committed Jun 25, 2023
1 parent 71228e5 commit 4da323c
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 167 deletions.
56 changes: 28 additions & 28 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,9 @@ func mergeCheckingTimestampCaches(
} else {
funcs = partitionedLeaderFuncs
}
tc.Servers[i].RaftTransport().Listen(s.StoreID(), &unreliableRaftHandler{
tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(s.StoreID(), &unreliableRaftHandler{
rangeID: lhsDesc.GetRangeID(),
RaftMessageHandler: s,
IncomingRaftMessageHandler: s,
unreliableRaftHandlerFuncs: funcs,
})
}
Expand Down Expand Up @@ -801,17 +801,17 @@ func mergeCheckingTimestampCaches(
// Remove the partition. A snapshot to the leaseholder should follow.
// This snapshot will inform the leaseholder about the range merge.
for i, s := range lhsStores {
var h kvserver.RaftMessageHandler
var h kvserver.IncomingRaftMessageHandler
if i == 0 {
h = &unreliableRaftHandler{
rangeID: lhsDesc.GetRangeID(),
RaftMessageHandler: s,
IncomingRaftMessageHandler: s,
unreliableRaftHandlerFuncs: restoredLeaseholderFuncs,
}
} else {
h = s
}
tc.Servers[i].RaftTransport().Listen(s.StoreID(), h)
tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(s.StoreID(), h)
}
close(filterMu.blockHBAndGCs)
filterMu.Lock()
Expand Down Expand Up @@ -2481,8 +2481,8 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
nil, /* knobs */
)
errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1))
transport.Listen(store0.StoreID(), errChan)
transport.Listen(store1.StoreID(), errChan)
transport.ListenIncomingRaftMessages(store0.StoreID(), errChan)
transport.ListenIncomingRaftMessages(store1.StoreID(), errChan)

sendHeartbeat := func(
rangeID roachpb.RangeID,
Expand Down Expand Up @@ -2736,9 +2736,9 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) {

// Start dropping all Raft traffic to the LHS on store2 so that it won't be
// aware that there is a merge in progress.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
return true
Expand Down Expand Up @@ -3017,9 +3017,9 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi

// Start dropping all Raft traffic to the LHS replica on store2 so that it
// won't be aware that there is a merge in progress.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
return true
Expand Down Expand Up @@ -3219,7 +3219,7 @@ func TestStoreRangeReadoptedLHSFollower(t *testing.T) {
type slowSnapRaftHandler struct {
rangeID roachpb.RangeID
waitCh chan struct{}
kvserver.RaftMessageHandler
kvserver.IncomingRaftMessageHandler
syncutil.Mutex
}

Expand All @@ -3245,7 +3245,7 @@ func (h *slowSnapRaftHandler) HandleSnapshot(
<-waitCh
}
}
return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream)
return h.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream)
}

// TestStoreRangeMergeUninitializedLHSFollower reproduces a rare bug in which a
Expand Down Expand Up @@ -3326,15 +3326,15 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
// of range 1 never processes the split trigger, which would create an
// initialized replica of A.
unreliableHandler := &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store2,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
},
},
}
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, unreliableHandler)
tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, unreliableHandler)

// Perform the split of A, now that store2 won't be able to initialize its
// replica of A.
Expand All @@ -3343,12 +3343,12 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
// Wedge a Raft snapshot that's destined for A. This allows us to capture a
// pre-merge Raft snapshot, which we'll let loose after the merge commits.
slowSnapHandler := &slowSnapRaftHandler{
rangeID: aRangeID,
waitCh: make(chan struct{}),
RaftMessageHandler: unreliableHandler,
rangeID: aRangeID,
waitCh: make(chan struct{}),
IncomingRaftMessageHandler: unreliableHandler,
}
defer slowSnapHandler.unblock()
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, slowSnapHandler)
tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, slowSnapHandler)

// Remove the replica of range 1 on store2. If we were to leave it in place,
// store2 would refuse to GC its replica of C after the merge commits, because
Expand Down Expand Up @@ -4007,9 +4007,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
aRepl0 := store0.LookupReplica(roachpb.RKey(keyA))

// Start dropping all Raft traffic to the first range on store2.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: aRepl0.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
Expand Down Expand Up @@ -4050,9 +4050,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

// Restore Raft traffic to the LHS on store2.
log.Infof(ctx, "restored traffic to store 2")
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: aRepl0.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs {
type unreliableRaftHandler struct {
name string
rangeID roachpb.RangeID
kvserver.RaftMessageHandler
kvserver.IncomingRaftMessageHandler
unreliableRaftHandlerFuncs
}

Expand Down Expand Up @@ -115,7 +115,7 @@ func (h *unreliableRaftHandler) HandleRaftRequest(
)
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
return h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
Expand All @@ -142,7 +142,7 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
return nil
}
}
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
return h.IncomingRaftMessageHandler.HandleRaftResponse(ctx, resp)
}

func (h *unreliableRaftHandler) HandleSnapshot(
Expand All @@ -155,7 +155,7 @@ func (h *unreliableRaftHandler) HandleSnapshot(
return err
}
}
return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream)
return h.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream)
}

func (h *unreliableRaftHandler) HandleDelegatedSnapshot(
Expand All @@ -169,7 +169,7 @@ func (h *unreliableRaftHandler) HandleDelegatedSnapshot(
}
}
}
return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req)
return h.IncomingRaftMessageHandler.HandleDelegatedSnapshot(ctx, req)
}

// testClusterStoreRaftMessageHandler exists to allows a store to be stopped and
Expand Down Expand Up @@ -241,7 +241,7 @@ type testClusterPartitionedRange struct {
partitioned bool
partitionedReplicas map[roachpb.ReplicaID]bool
}
handlers []kvserver.RaftMessageHandler
handlers []kvserver.IncomingRaftMessageHandler
}

// setupPartitionedRange sets up an testClusterPartitionedRange for the provided
Expand Down Expand Up @@ -275,7 +275,7 @@ func setupPartitionedRange(
activated bool,
funcs unreliableRaftHandlerFuncs,
) (*testClusterPartitionedRange, error) {
handlers := make([]kvserver.RaftMessageHandler, 0, len(tc.Servers))
handlers := make([]kvserver.IncomingRaftMessageHandler, 0, len(tc.Servers))
for i := range tc.Servers {
handlers = append(handlers, &testClusterStoreRaftMessageHandler{
tc: tc,
Expand All @@ -291,12 +291,12 @@ func setupPartitionedRangeWithHandlers(
replicaID roachpb.ReplicaID,
partitionedNodeIdx int,
activated bool,
handlers []kvserver.RaftMessageHandler,
handlers []kvserver.IncomingRaftMessageHandler,
funcs unreliableRaftHandlerFuncs,
) (*testClusterPartitionedRange, error) {
pr := &testClusterPartitionedRange{
rangeID: rangeID,
handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)),
handlers: make([]kvserver.IncomingRaftMessageHandler, 0, len(handlers)),
}
pr.mu.partitioned = activated
pr.mu.partitionedNodeIdx = partitionedNodeIdx
Expand All @@ -323,7 +323,7 @@ func setupPartitionedRangeWithHandlers(
s := i
h := &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: handlers[s],
IncomingRaftMessageHandler: handlers[s],
unreliableRaftHandlerFuncs: funcs,
}
// Only filter messages from the partitioned store on the other
Expand Down Expand Up @@ -383,7 +383,7 @@ func setupPartitionedRangeWithHandlers(
}
}
pr.handlers = append(pr.handlers, h)
tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h)
tc.Servers[s].RaftTransport().ListenIncomingRaftMessages(tc.Target(s).StoreID, h)
}
return pr, nil
}
Expand Down Expand Up @@ -438,9 +438,9 @@ func dropRaftMessagesFrom(

store, err := srv.Stores().GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
srv.RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: store,
srv.RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{
rangeID: rangeID,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropHB: func(hb *kvserverpb.RaftHeartbeat) bool {
return shouldDrop(hb.RangeID, hb.FromReplicaID)
Expand Down
Loading

0 comments on commit 4da323c

Please sign in to comment.