Skip to content

Commit

Permalink
kvserver: rename RaftMessageHandler to IncomingRaftMessageHandler
Browse files Browse the repository at this point in the history
The commit renames `RaftMessageHandler` to `IncomingRaftMessageHandler`. Another
PR is introducing a new interface `OutgoingRaftMessageHandler`, dedicated to
managing messages sent. The main purpose of this PR is to make the future PR
cleaner by handling the renaming process. Note that this commit does not change
any existing functionality.

Part of: cockroachdb#103983
Related: cockroachdb#105122
Release Note: None
  • Loading branch information
wenyihu6 committed Jun 22, 2023
1 parent 71228e5 commit 0cca776
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 111 deletions.
36 changes: 18 additions & 18 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func mergeCheckingTimestampCaches(
}
tc.Servers[i].RaftTransport().Listen(s.StoreID(), &unreliableRaftHandler{
rangeID: lhsDesc.GetRangeID(),
RaftMessageHandler: s,
IncomingRaftMessageHandler: s,
unreliableRaftHandlerFuncs: funcs,
})
}
Expand Down Expand Up @@ -801,11 +801,11 @@ func mergeCheckingTimestampCaches(
// Remove the partition. A snapshot to the leaseholder should follow.
// This snapshot will inform the leaseholder about the range merge.
for i, s := range lhsStores {
var h kvserver.RaftMessageHandler
var h kvserver.IncomingRaftMessageHandler
if i == 0 {
h = &unreliableRaftHandler{
rangeID: lhsDesc.GetRangeID(),
RaftMessageHandler: s,
IncomingRaftMessageHandler: s,
unreliableRaftHandlerFuncs: restoredLeaseholderFuncs,
}
} else {
Expand Down Expand Up @@ -2737,8 +2737,8 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) {
// Start dropping all Raft traffic to the LHS on store2 so that it won't be
// aware that there is a merge in progress.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
rangeID: lhsDesc.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
return true
Expand Down Expand Up @@ -3018,8 +3018,8 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi
// Start dropping all Raft traffic to the LHS replica on store2 so that it
// won't be aware that there is a merge in progress.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
rangeID: lhsDesc.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
return true
Expand Down Expand Up @@ -3219,7 +3219,7 @@ func TestStoreRangeReadoptedLHSFollower(t *testing.T) {
type slowSnapRaftHandler struct {
rangeID roachpb.RangeID
waitCh chan struct{}
kvserver.RaftMessageHandler
kvserver.IncomingRaftMessageHandler
syncutil.Mutex
}

Expand All @@ -3245,7 +3245,7 @@ func (h *slowSnapRaftHandler) HandleSnapshot(
<-waitCh
}
}
return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream)
return h.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream)
}

// TestStoreRangeMergeUninitializedLHSFollower reproduces a rare bug in which a
Expand Down Expand Up @@ -3326,8 +3326,8 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
// of range 1 never processes the split trigger, which would create an
// initialized replica of A.
unreliableHandler := &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store2,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
Expand All @@ -3343,9 +3343,9 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
// Wedge a Raft snapshot that's destined for A. This allows us to capture a
// pre-merge Raft snapshot, which we'll let loose after the merge commits.
slowSnapHandler := &slowSnapRaftHandler{
rangeID: aRangeID,
waitCh: make(chan struct{}),
RaftMessageHandler: unreliableHandler,
rangeID: aRangeID,
waitCh: make(chan struct{}),
IncomingRaftMessageHandler: unreliableHandler,
}
defer slowSnapHandler.unblock()
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, slowSnapHandler)
Expand Down Expand Up @@ -4008,8 +4008,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

// Start dropping all Raft traffic to the first range on store2.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
rangeID: aRepl0.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
Expand Down Expand Up @@ -4051,8 +4051,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// Restore Raft traffic to the LHS on store2.
log.Infof(ctx, "restored traffic to store 2")
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
rangeID: aRepl0.RangeID,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs {
type unreliableRaftHandler struct {
name string
rangeID roachpb.RangeID
kvserver.RaftMessageHandler
kvserver.IncomingRaftMessageHandler
unreliableRaftHandlerFuncs
}

Expand Down Expand Up @@ -115,7 +115,7 @@ func (h *unreliableRaftHandler) HandleRaftRequest(
)
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
return h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
Expand All @@ -142,7 +142,7 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
return nil
}
}
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
return h.IncomingRaftMessageHandler.HandleRaftResponse(ctx, resp)
}

func (h *unreliableRaftHandler) HandleSnapshot(
Expand All @@ -155,7 +155,7 @@ func (h *unreliableRaftHandler) HandleSnapshot(
return err
}
}
return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream)
return h.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream)
}

func (h *unreliableRaftHandler) HandleDelegatedSnapshot(
Expand All @@ -169,7 +169,7 @@ func (h *unreliableRaftHandler) HandleDelegatedSnapshot(
}
}
}
return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req)
return h.IncomingRaftMessageHandler.HandleDelegatedSnapshot(ctx, req)
}

// testClusterStoreRaftMessageHandler exists to allows a store to be stopped and
Expand Down Expand Up @@ -241,7 +241,7 @@ type testClusterPartitionedRange struct {
partitioned bool
partitionedReplicas map[roachpb.ReplicaID]bool
}
handlers []kvserver.RaftMessageHandler
handlers []kvserver.IncomingRaftMessageHandler
}

// setupPartitionedRange sets up an testClusterPartitionedRange for the provided
Expand Down Expand Up @@ -275,7 +275,7 @@ func setupPartitionedRange(
activated bool,
funcs unreliableRaftHandlerFuncs,
) (*testClusterPartitionedRange, error) {
handlers := make([]kvserver.RaftMessageHandler, 0, len(tc.Servers))
handlers := make([]kvserver.IncomingRaftMessageHandler, 0, len(tc.Servers))
for i := range tc.Servers {
handlers = append(handlers, &testClusterStoreRaftMessageHandler{
tc: tc,
Expand All @@ -291,12 +291,12 @@ func setupPartitionedRangeWithHandlers(
replicaID roachpb.ReplicaID,
partitionedNodeIdx int,
activated bool,
handlers []kvserver.RaftMessageHandler,
handlers []kvserver.IncomingRaftMessageHandler,
funcs unreliableRaftHandlerFuncs,
) (*testClusterPartitionedRange, error) {
pr := &testClusterPartitionedRange{
rangeID: rangeID,
handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)),
handlers: make([]kvserver.IncomingRaftMessageHandler, 0, len(handlers)),
}
pr.mu.partitioned = activated
pr.mu.partitionedNodeIdx = partitionedNodeIdx
Expand All @@ -323,7 +323,7 @@ func setupPartitionedRangeWithHandlers(
s := i
h := &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: handlers[s],
IncomingRaftMessageHandler: handlers[s],
unreliableRaftHandlerFuncs: funcs,
}
// Only filter messages from the partitioned store on the other
Expand Down Expand Up @@ -439,8 +439,8 @@ func dropRaftMessagesFrom(
store, err := srv.Stores().GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
srv.RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: store,
rangeID: rangeID,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropHB: func(hb *kvserverpb.RaftHeartbeat) bool {
return shouldDrop(hb.RangeID, hb.FromReplicaID)
Expand Down
36 changes: 18 additions & 18 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,8 +919,8 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
log.Infof(ctx, "test: installing unreliable Raft transports")
for _, s := range []int{0, 1, 2} {
h := &unreliableRaftHandler{
rangeID: partRepl.RangeID,
RaftMessageHandler: tc.GetFirstStoreFromServer(t, s),
rangeID: partRepl.RangeID,
IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, s),
}
if s != partStore {
// Only filter messages from the partitioned store on the other
Expand Down Expand Up @@ -1029,8 +1029,8 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
log.Infof(ctx, "test: removing the partition")
for _, s := range []int{0, 1, 2} {
tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, &unreliableRaftHandler{
rangeID: partRepl.RangeID,
RaftMessageHandler: tc.GetFirstStoreFromServer(t, s),
rangeID: partRepl.RangeID,
IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, s),
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just truncated can
Expand Down Expand Up @@ -1149,9 +1149,9 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
for _, i := range []int{0, 1, 2} {
store := tc.GetFirstStoreFromServer(t, i)
h := &unreliableRaftHandler{
name: fmt.Sprintf("store %d", i),
rangeID: rngDesc.RangeID,
RaftMessageHandler: store,
name: fmt.Sprintf("store %d", i),
rangeID: rngDesc.RangeID,
IncomingRaftMessageHandler: store,
}
if i != partitionNodeIdx {
// Only filter messages from the partitioned store on the other two
Expand Down Expand Up @@ -1262,9 +1262,9 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
// believing that it is the leader, and lease acquisition requests block.
log.Infof(ctx, "test: removing partition")
slowSnapHandler := &slowSnapRaftHandler{
rangeID: rngDesc.RangeID,
waitCh: make(chan struct{}),
RaftMessageHandler: partitionStore,
rangeID: rngDesc.RangeID,
waitCh: make(chan struct{}),
IncomingRaftMessageHandler: partitionStore,
}
defer slowSnapHandler.unblock()
partitionStore.Transport().Listen(partitionStore.Ident.StoreID, slowSnapHandler)
Expand Down Expand Up @@ -3038,7 +3038,7 @@ func TestRemovePlaceholderRace(t *testing.T) {

type noConfChangeTestHandler struct {
rangeID roachpb.RangeID
kvserver.RaftMessageHandler
kvserver.IncomingRaftMessageHandler
}

func (ncc *noConfChangeTestHandler) HandleRaftRequest(
Expand Down Expand Up @@ -3069,7 +3069,7 @@ func (ncc *noConfChangeTestHandler) HandleRaftRequest(
}
}
}
return ncc.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
return ncc.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (ncc *noConfChangeTestHandler) HandleRaftResponse(
Expand All @@ -3083,7 +3083,7 @@ func (ncc *noConfChangeTestHandler) HandleRaftResponse(
return nil
}
}
return ncc.RaftMessageHandler.HandleRaftResponse(ctx, resp)
return ncc.IncomingRaftMessageHandler.HandleRaftResponse(ctx, resp)
}

func TestReplicaGCRace(t *testing.T) {
Expand All @@ -3108,8 +3108,8 @@ func TestReplicaGCRace(t *testing.T) {
// Prevent the victim replica from processing configuration changes.
tc.Servers[2].RaftTransport().Stop(toStore.Ident.StoreID)
tc.Servers[2].RaftTransport().Listen(toStore.Ident.StoreID, &noConfChangeTestHandler{
rangeID: desc.RangeID,
RaftMessageHandler: toStore,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: toStore,
})

repl, err := leaderStore.GetReplica(desc.RangeID)
Expand Down Expand Up @@ -4242,7 +4242,7 @@ func TestUninitializedReplicaRemainsQuiesced(t *testing.T) {
require.NoError(t, err)
tc.Servers[1].RaftTransport().Listen(s2.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: s2,
IncomingRaftMessageHandler: s2,
unreliableRaftHandlerFuncs: handlerFuncs,
})

Expand Down Expand Up @@ -4732,8 +4732,8 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) {

for i := 0; i < 3; i++ {
tc.Servers[i].RaftTransport().Listen(tc.Target(i).StoreID, &unreliableRaftHandler{
rangeID: ri.Desc.RangeID,
RaftMessageHandler: tc.GetFirstStoreFromServer(t, i),
rangeID: ri.Desc.RangeID,
IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, i),
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
return rand.Intn(2) == 0
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2968,7 +2968,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
}
tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{
rangeID: repl0.GetRangeID(),
RaftMessageHandler: store2,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: funcs,
})

Expand Down Expand Up @@ -3103,7 +3103,7 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) {
}
tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{
rangeID: repl0.GetRangeID(),
RaftMessageHandler: store2,
IncomingRaftMessageHandler: store2,
unreliableRaftHandlerFuncs: funcs,
})

Expand Down Expand Up @@ -3402,7 +3402,7 @@ func TestReplicaTombstone(t *testing.T) {
}
tc.Servers[1].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: funcs,
})
tc.RemoveVotersOrFatal(t, key, tc.Target(1))
Expand Down Expand Up @@ -3458,7 +3458,7 @@ func TestReplicaTombstone(t *testing.T) {
}
tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: raftFuncs,
})
tc.RemoveVotersOrFatal(t, key, tc.Target(2))
Expand Down Expand Up @@ -3501,8 +3501,8 @@ func TestReplicaTombstone(t *testing.T) {
// with a manual replica GC.
store, _ := getFirstStoreReplica(t, tc.Server(2), key)
tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: store,
})
tc.RemoveVotersOrFatal(t, key, tc.Target(2))
repl, err := store.GetReplica(desc.RangeID)
Expand Down Expand Up @@ -3540,8 +3540,8 @@ func TestReplicaTombstone(t *testing.T) {
// Partition node 2 from all raft communication.
store, _ := getFirstStoreReplica(t, tc.Server(2), keyA)
tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: store,
})

// We'll move the range from server 2 to 3 and merge key and keyA.
Expand Down Expand Up @@ -3621,8 +3621,8 @@ func TestReplicaTombstone(t *testing.T) {
}
setMinHeartbeat(repl.ReplicaID() + 1)
tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: desc.RangeID,
RaftMessageHandler: store,
rangeID: desc.RangeID,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropResp: func(*kvserverpb.RaftMessageResponse) bool {
return true
Expand Down Expand Up @@ -3730,9 +3730,9 @@ func TestReplicaTombstone(t *testing.T) {
tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{
rangeID: lhsDesc.RangeID,
unreliableRaftHandlerFuncs: raftFuncs,
RaftMessageHandler: &unreliableRaftHandler{
IncomingRaftMessageHandler: &unreliableRaftHandler{
rangeID: rhsDesc.RangeID,
RaftMessageHandler: store,
IncomingRaftMessageHandler: store,
unreliableRaftHandlerFuncs: raftFuncs,
},
})
Expand Down
Loading

0 comments on commit 0cca776

Please sign in to comment.