Skip to content

Commit

Permalink
kvserver: move raft.proto to kvserverpb
Browse files Browse the repository at this point in the history
I'm starting to type code for #75729 and it's clear immediately
that we shouldn't have these protos in the `kvserver` package
as this will make dependency cycles hard to avoid.

We've long introduced the `kvserverpb` package, but simply
didn't pull all the protos into it.

This commit moves `raft.proto` to `kvserverpb`.

There's still a bit of protobuf left in `api.proto`, but
that can be handled separately.

Release note: None
  • Loading branch information
tbg committed Feb 5, 2022
1 parent 19c7255 commit 9fec14b
Show file tree
Hide file tree
Showing 34 changed files with 315 additions and 278 deletions.
37 changes: 19 additions & 18 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand Down Expand Up @@ -467,7 +468,7 @@ func mergeCheckingTimestampCaches(
var snapshotFilter func(kvserver.IncomingSnapshot)
beforeSnapshotSSTIngestion := func(
inSnap kvserver.IncomingSnapshot,
snapType kvserver.SnapshotRequest_Type,
snapType kvserverpb.SnapshotRequest_Type,
_ []string,
) error {
filterMu.Lock()
Expand Down Expand Up @@ -631,28 +632,28 @@ func mergeCheckingTimestampCaches(

// Applied to the leaseholder's raft transport during the partition.
partitionedLeaseholderFuncs := noopRaftHandlerFuncs()
partitionedLeaseholderFuncs.dropReq = func(*kvserver.RaftMessageRequest) bool {
partitionedLeaseholderFuncs.dropReq = func(*kvserverpb.RaftMessageRequest) bool {
// Ignore everything from new leader.
return true
}

// Applied to the leader and other follower's raft transport during the
// partition.
partitionedLeaderFuncs := noopRaftHandlerFuncs()
partitionedLeaderFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool {
partitionedLeaderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
// Ignore everything from leaseholder, except forwarded proposals.
return req.FromReplica.StoreID == lhsStore.StoreID() &&
req.Message.Type != raftpb.MsgProp
}
partitionedLeaderFuncs.dropHB = func(hb *kvserver.RaftHeartbeat) bool {
partitionedLeaderFuncs.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
// Ignore heartbeats from leaseholder, results in campaign.
return hb.FromReplicaID == roachpb.ReplicaID(lhsRepls[0].RaftStatus().ID)
}

// Applied to leaseholder after the partition heals.
var truncIndex uint64
restoredLeaseholderFuncs := noopRaftHandlerFuncs()
restoredLeaseholderFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool {
restoredLeaseholderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just
// truncated can make it through. The Raft transport is asynchronous
// so this is necessary to make the test pass reliably - otherwise
Expand Down Expand Up @@ -2494,10 +2495,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
) {
// Try several times, as the message may be dropped (see #18355).
for i := 0; i < 5; i++ {
if sent := transport.SendAsync(&kvserver.RaftMessageRequest{
if sent := transport.SendAsync(&kvserverpb.RaftMessageRequest{
FromReplica: fromReplDesc,
ToReplica: toReplDesc,
Heartbeats: []kvserver.RaftHeartbeat{
Heartbeats: []kvserverpb.RaftHeartbeat{
{
RangeID: rangeID,
FromReplicaID: fromReplDesc.ReplicaID,
Expand Down Expand Up @@ -2744,7 +2745,7 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) {
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserver.RaftMessageRequest) bool {
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3025,7 +3026,7 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(*kvserver.RaftMessageRequest) bool {
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3232,7 +3233,7 @@ func (h *slowSnapRaftHandler) unblock() {

func (h *slowSnapRaftHandler) HandleSnapshot(
ctx context.Context,
header *kvserver.SnapshotRequest_Header,
header *kvserverpb.SnapshotRequest_Header,
respStream kvserver.SnapshotResponseStream,
) error {
if header.RaftMessageRequest.RangeID == h.rangeID {
Expand Down Expand Up @@ -3327,7 +3328,7 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
rangeID: desc.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserver.RaftMessageRequest) bool {
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3725,7 +3726,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
rangeIds := make(map[string]roachpb.RangeID, 4)
beforeSnapshotSSTIngestion := func(
inSnap kvserver.IncomingSnapshot,
snapType kvserver.SnapshotRequest_Type,
snapType kvserverpb.SnapshotRequest_Type,
sstNames []string,
) error {
// Only verify snapshots of type VIA_SNAPSHOT_QUEUE and on the range under
Expand All @@ -3734,7 +3735,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// there are too many keys and the other replicated keys are verified later
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE ||
if snapType != kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE ||
inSnap.Desc.RangeID != rangeIds[string(keyA)] {
return nil
}
Expand Down Expand Up @@ -3943,7 +3944,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserver.RaftMessageRequest) bool {
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3988,7 +3989,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserver.RaftMessageRequest) bool {
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just
// truncated can make it through. The Raft transport is asynchronous
// so this is necessary to make the test pass reliably - otherwise
Expand All @@ -4000,8 +4001,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
return req.Message.Type == raftpb.MsgApp && req.Message.Index < index
},
// Don't drop heartbeats or responses.
dropHB: func(*kvserver.RaftHeartbeat) bool { return false },
dropResp: func(*kvserver.RaftMessageResponse) bool { return false },
dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
},
})

Expand Down Expand Up @@ -4687,7 +4688,7 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
1: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(header *kvserver.SnapshotRequest_Header) error {
ReceiveSnapshot: func(header *kvserverpb.SnapshotRequest_Header) error {
val := delaySnapshotTrap.Load()
if val != nil {
fn := val.(func() error)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.DisableRaftSnapshotQueue = true // we'll control it ourselves
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
// We'll want a signal for when the snapshot was received by the sender.
once.Do(func() { close(blockUntilSnapshotCh) })

Expand Down
39 changes: 20 additions & 19 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -27,22 +28,22 @@ type unreliableRaftHandlerFuncs struct {
// If non-nil, can return false to avoid dropping the msg to
// unreliableRaftHandler.rangeID. If nil, all messages pertaining to the
// respective range are dropped.
dropReq func(*kvserver.RaftMessageRequest) bool
dropHB func(*kvserver.RaftHeartbeat) bool
dropResp func(*kvserver.RaftMessageResponse) bool
dropReq func(*kvserverpb.RaftMessageRequest) bool
dropHB func(*kvserverpb.RaftHeartbeat) bool
dropResp func(*kvserverpb.RaftMessageResponse) bool
// snapErr defaults to returning nil.
snapErr func(*kvserver.SnapshotRequest_Header) error
snapErr func(*kvserverpb.SnapshotRequest_Header) error
}

func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs {
return unreliableRaftHandlerFuncs{
dropResp: func(*kvserver.RaftMessageResponse) bool {
dropResp: func(*kvserverpb.RaftMessageResponse) bool {
return false
},
dropReq: func(*kvserver.RaftMessageRequest) bool {
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
return false
},
dropHB: func(*kvserver.RaftHeartbeat) bool {
dropHB: func(*kvserverpb.RaftHeartbeat) bool {
return false
},
}
Expand All @@ -59,7 +60,7 @@ type unreliableRaftHandler struct {

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *kvserver.RaftMessageRequest,
req *kvserverpb.RaftMessageRequest,
respStream kvserver.RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
Expand Down Expand Up @@ -94,12 +95,12 @@ func (h *unreliableRaftHandler) HandleRaftRequest(
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []kvserver.RaftHeartbeat,
) []kvserver.RaftHeartbeat {
hbs []kvserverpb.RaftHeartbeat,
) []kvserverpb.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []kvserver.RaftHeartbeat
var cpy []kvserverpb.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
Expand All @@ -110,7 +111,7 @@ func (h *unreliableRaftHandler) filterHeartbeats(
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *kvserver.RaftMessageResponse,
ctx context.Context, resp *kvserverpb.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.dropResp == nil || h.dropResp(resp) {
Expand All @@ -122,7 +123,7 @@ func (h *unreliableRaftHandler) HandleRaftResponse(

func (h *unreliableRaftHandler) HandleSnapshot(
ctx context.Context,
header *kvserver.SnapshotRequest_Header,
header *kvserverpb.SnapshotRequest_Header,
respStream kvserver.SnapshotResponseStream,
) error {
if header.RaftMessageRequest.RangeID == h.rangeID && h.snapErr != nil {
Expand All @@ -147,7 +148,7 @@ func (h *testClusterStoreRaftMessageHandler) getStore() (*kvserver.Store, error)

func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest(
ctx context.Context,
req *kvserver.RaftMessageRequest,
req *kvserverpb.RaftMessageRequest,
respStream kvserver.RaftMessageResponseStream,
) *roachpb.Error {
store, err := h.getStore()
Expand All @@ -158,7 +159,7 @@ func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest(
}

func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse(
ctx context.Context, resp *kvserver.RaftMessageResponse,
ctx context.Context, resp *kvserverpb.RaftMessageResponse,
) error {
store, err := h.getStore()
if err != nil {
Expand All @@ -169,7 +170,7 @@ func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse(

func (h *testClusterStoreRaftMessageHandler) HandleSnapshot(
ctx context.Context,
header *kvserver.SnapshotRequest_Header,
header *kvserverpb.SnapshotRequest_Header,
respStream kvserver.SnapshotResponseStream,
) error {
store, err := h.getStore()
Expand Down Expand Up @@ -277,7 +278,7 @@ func setupPartitionedRangeWithHandlers(
// Only filter messages from the partitioned store on the other
// two stores.
if h.dropReq == nil {
h.dropReq = func(req *kvserver.RaftMessageRequest) bool {
h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
pr.mu.RLock()
defer pr.mu.RUnlock()
return pr.mu.partitioned &&
Expand All @@ -286,7 +287,7 @@ func setupPartitionedRangeWithHandlers(
}
}
if h.dropHB == nil {
h.dropHB = func(hb *kvserver.RaftHeartbeat) bool {
h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
pr.mu.RLock()
defer pr.mu.RUnlock()
if !pr.mu.partitioned {
Expand All @@ -299,7 +300,7 @@ func setupPartitionedRangeWithHandlers(
}
}
if h.snapErr == nil {
h.snapErr = func(header *kvserver.SnapshotRequest_Header) error {
h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error {
pr.mu.RLock()
defer pr.mu.RUnlock()
if !pr.mu.partitioned {
Expand Down
Loading

0 comments on commit 9fec14b

Please sign in to comment.