Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: move raft.proto to kvserverpb #76113

Merged
merged 1 commit into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/kvserver/closedts/ctpb/service.go | `LAI`
pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum`
pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy`
pkg/kv/kvserver/raft.go | `SnapshotRequest_Type`
pkg/kv/kvserver/kvserverpb/raft.go | `SnapshotRequest_Type`
pkg/roachpb/data.go | `LeaseSequence`
pkg/roachpb/data.go | `ReplicaChangeType`
pkg/roachpb/data.go | `TransactionStatus`
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -436,18 +436,15 @@ proto_library(
name = "kvserver_proto",
srcs = [
"api.proto",
"raft.proto",
"storage_services.proto",
],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvserverpb:kvserverpb_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto",
"//pkg/roachpb:roachpb_proto",
"//pkg/storage/enginepb:enginepb_proto",
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto",
],
)

Expand All @@ -459,11 +456,9 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb:with-mocks",
"//pkg/storage/enginepb",
"@com_github_gogo_protobuf//gogoproto",
"@io_etcd_go_etcd_raft_v3//raftpb",
],
)

Expand Down
37 changes: 19 additions & 18 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand Down Expand Up @@ -467,7 +468,7 @@ func mergeCheckingTimestampCaches(
var snapshotFilter func(kvserver.IncomingSnapshot)
beforeSnapshotSSTIngestion := func(
inSnap kvserver.IncomingSnapshot,
snapType kvserver.SnapshotRequest_Type,
snapType kvserverpb.SnapshotRequest_Type,
_ []string,
) error {
filterMu.Lock()
Expand Down Expand Up @@ -631,28 +632,28 @@ func mergeCheckingTimestampCaches(

// Applied to the leaseholder's raft transport during the partition.
partitionedLeaseholderFuncs := noopRaftHandlerFuncs()
partitionedLeaseholderFuncs.dropReq = func(*kvserver.RaftMessageRequest) bool {
partitionedLeaseholderFuncs.dropReq = func(*kvserverpb.RaftMessageRequest) bool {
// Ignore everything from new leader.
return true
}

// Applied to the leader and other follower's raft transport during the
// partition.
partitionedLeaderFuncs := noopRaftHandlerFuncs()
partitionedLeaderFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool {
partitionedLeaderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
// Ignore everything from leaseholder, except forwarded proposals.
return req.FromReplica.StoreID == lhsStore.StoreID() &&
req.Message.Type != raftpb.MsgProp
}
partitionedLeaderFuncs.dropHB = func(hb *kvserver.RaftHeartbeat) bool {
partitionedLeaderFuncs.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
// Ignore heartbeats from leaseholder, results in campaign.
return hb.FromReplicaID == roachpb.ReplicaID(lhsRepls[0].RaftStatus().ID)
}

// Applied to leaseholder after the partition heals.
var truncIndex uint64
restoredLeaseholderFuncs := noopRaftHandlerFuncs()
restoredLeaseholderFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool {
restoredLeaseholderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just
// truncated can make it through. The Raft transport is asynchronous
// so this is necessary to make the test pass reliably - otherwise
Expand Down Expand Up @@ -2494,10 +2495,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
) {
// Try several times, as the message may be dropped (see #18355).
for i := 0; i < 5; i++ {
if sent := transport.SendAsync(&kvserver.RaftMessageRequest{
if sent := transport.SendAsync(&kvserverpb.RaftMessageRequest{
FromReplica: fromReplDesc,
ToReplica: toReplDesc,
Heartbeats: []kvserver.RaftHeartbeat{
Heartbeats: []kvserverpb.RaftHeartbeat{
{
RangeID: rangeID,
FromReplicaID: fromReplDesc.ReplicaID,
Expand Down Expand Up @@ -2744,7 +2745,7 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) {
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserver.RaftMessageRequest) bool {
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3025,7 +3026,7 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi
rangeID: lhsDesc.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(*kvserver.RaftMessageRequest) bool {
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3232,7 +3233,7 @@ func (h *slowSnapRaftHandler) unblock() {

func (h *slowSnapRaftHandler) HandleSnapshot(
ctx context.Context,
header *kvserver.SnapshotRequest_Header,
header *kvserverpb.SnapshotRequest_Header,
respStream kvserver.SnapshotResponseStream,
) error {
if header.RaftMessageRequest.RangeID == h.rangeID {
Expand Down Expand Up @@ -3327,7 +3328,7 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) {
rangeID: desc.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserver.RaftMessageRequest) bool {
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3725,7 +3726,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
rangeIds := make(map[string]roachpb.RangeID, 4)
beforeSnapshotSSTIngestion := func(
inSnap kvserver.IncomingSnapshot,
snapType kvserver.SnapshotRequest_Type,
snapType kvserverpb.SnapshotRequest_Type,
sstNames []string,
) error {
// Only verify snapshots of type VIA_SNAPSHOT_QUEUE and on the range under
Expand All @@ -3734,7 +3735,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// there are too many keys and the other replicated keys are verified later
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE ||
if snapType != kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE ||
inSnap.Desc.RangeID != rangeIds[string(keyA)] {
return nil
}
Expand Down Expand Up @@ -3943,7 +3944,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(request *kvserver.RaftMessageRequest) bool {
dropReq: func(request *kvserverpb.RaftMessageRequest) bool {
return true
},
},
Expand Down Expand Up @@ -3988,7 +3989,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
rangeID: aRepl0.RangeID,
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{
dropReq: func(req *kvserver.RaftMessageRequest) bool {
dropReq: func(req *kvserverpb.RaftMessageRequest) bool {
// Make sure that even going forward no MsgApp for what we just
// truncated can make it through. The Raft transport is asynchronous
// so this is necessary to make the test pass reliably - otherwise
Expand All @@ -4000,8 +4001,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
return req.Message.Type == raftpb.MsgApp && req.Message.Index < index
},
// Don't drop heartbeats or responses.
dropHB: func(*kvserver.RaftHeartbeat) bool { return false },
dropResp: func(*kvserver.RaftMessageResponse) bool { return false },
dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false },
dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false },
},
})

Expand Down Expand Up @@ -4687,7 +4688,7 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
1: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(header *kvserver.SnapshotRequest_Header) error {
ReceiveSnapshot: func(header *kvserverpb.SnapshotRequest_Header) error {
val := delaySnapshotTrap.Load()
if val != nil {
fn := val.(func() error)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.DisableRaftSnapshotQueue = true // we'll control it ourselves
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
// We'll want a signal for when the snapshot was received by the sender.
once.Do(func() { close(blockUntilSnapshotCh) })

Expand Down
39 changes: 20 additions & 19 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -27,22 +28,22 @@ type unreliableRaftHandlerFuncs struct {
// If non-nil, can return false to avoid dropping the msg to
// unreliableRaftHandler.rangeID. If nil, all messages pertaining to the
// respective range are dropped.
dropReq func(*kvserver.RaftMessageRequest) bool
dropHB func(*kvserver.RaftHeartbeat) bool
dropResp func(*kvserver.RaftMessageResponse) bool
dropReq func(*kvserverpb.RaftMessageRequest) bool
dropHB func(*kvserverpb.RaftHeartbeat) bool
dropResp func(*kvserverpb.RaftMessageResponse) bool
// snapErr defaults to returning nil.
snapErr func(*kvserver.SnapshotRequest_Header) error
snapErr func(*kvserverpb.SnapshotRequest_Header) error
}

func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs {
return unreliableRaftHandlerFuncs{
dropResp: func(*kvserver.RaftMessageResponse) bool {
dropResp: func(*kvserverpb.RaftMessageResponse) bool {
return false
},
dropReq: func(*kvserver.RaftMessageRequest) bool {
dropReq: func(*kvserverpb.RaftMessageRequest) bool {
return false
},
dropHB: func(*kvserver.RaftHeartbeat) bool {
dropHB: func(*kvserverpb.RaftHeartbeat) bool {
return false
},
}
Expand All @@ -59,7 +60,7 @@ type unreliableRaftHandler struct {

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *kvserver.RaftMessageRequest,
req *kvserverpb.RaftMessageRequest,
respStream kvserver.RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
Expand Down Expand Up @@ -94,12 +95,12 @@ func (h *unreliableRaftHandler) HandleRaftRequest(
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []kvserver.RaftHeartbeat,
) []kvserver.RaftHeartbeat {
hbs []kvserverpb.RaftHeartbeat,
) []kvserverpb.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []kvserver.RaftHeartbeat
var cpy []kvserverpb.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
Expand All @@ -110,7 +111,7 @@ func (h *unreliableRaftHandler) filterHeartbeats(
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *kvserver.RaftMessageResponse,
ctx context.Context, resp *kvserverpb.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.dropResp == nil || h.dropResp(resp) {
Expand All @@ -122,7 +123,7 @@ func (h *unreliableRaftHandler) HandleRaftResponse(

func (h *unreliableRaftHandler) HandleSnapshot(
ctx context.Context,
header *kvserver.SnapshotRequest_Header,
header *kvserverpb.SnapshotRequest_Header,
respStream kvserver.SnapshotResponseStream,
) error {
if header.RaftMessageRequest.RangeID == h.rangeID && h.snapErr != nil {
Expand All @@ -147,7 +148,7 @@ func (h *testClusterStoreRaftMessageHandler) getStore() (*kvserver.Store, error)

func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest(
ctx context.Context,
req *kvserver.RaftMessageRequest,
req *kvserverpb.RaftMessageRequest,
respStream kvserver.RaftMessageResponseStream,
) *roachpb.Error {
store, err := h.getStore()
Expand All @@ -158,7 +159,7 @@ func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest(
}

func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse(
ctx context.Context, resp *kvserver.RaftMessageResponse,
ctx context.Context, resp *kvserverpb.RaftMessageResponse,
) error {
store, err := h.getStore()
if err != nil {
Expand All @@ -169,7 +170,7 @@ func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse(

func (h *testClusterStoreRaftMessageHandler) HandleSnapshot(
ctx context.Context,
header *kvserver.SnapshotRequest_Header,
header *kvserverpb.SnapshotRequest_Header,
respStream kvserver.SnapshotResponseStream,
) error {
store, err := h.getStore()
Expand Down Expand Up @@ -277,7 +278,7 @@ func setupPartitionedRangeWithHandlers(
// Only filter messages from the partitioned store on the other
// two stores.
if h.dropReq == nil {
h.dropReq = func(req *kvserver.RaftMessageRequest) bool {
h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool {
pr.mu.RLock()
defer pr.mu.RUnlock()
return pr.mu.partitioned &&
Expand All @@ -286,7 +287,7 @@ func setupPartitionedRangeWithHandlers(
}
}
if h.dropHB == nil {
h.dropHB = func(hb *kvserver.RaftHeartbeat) bool {
h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool {
pr.mu.RLock()
defer pr.mu.RUnlock()
if !pr.mu.partitioned {
Expand All @@ -299,7 +300,7 @@ func setupPartitionedRangeWithHandlers(
}
}
if h.snapErr == nil {
h.snapErr = func(header *kvserver.SnapshotRequest_Header) error {
h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error {
pr.mu.RLock()
defer pr.mu.RUnlock()
if !pr.mu.partitioned {
Expand Down
Loading