diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index a4116f54802d..ccc3e6228990 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -11,7 +11,7 @@ pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/kvserver/closedts/ctpb/service.go | `LAI` pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum` pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy` -pkg/kv/kvserver/raft.go | `SnapshotRequest_Type` +pkg/kv/kvserver/kvserverpb/raft.go | `SnapshotRequest_Type` pkg/roachpb/data.go | `LeaseSequence` pkg/roachpb/data.go | `ReplicaChangeType` pkg/roachpb/data.go | `TransactionStatus` diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index dcf7e33270e5..07dd384a65fe 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -436,18 +436,15 @@ proto_library( name = "kvserver_proto", srcs = [ "api.proto", - "raft.proto", "storage_services.proto", ], strip_import_prefix = "/pkg", visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/kvserverpb:kvserverpb_proto", - "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto", "//pkg/roachpb:roachpb_proto", "//pkg/storage/enginepb:enginepb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", - "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", ], ) @@ -459,11 +456,9 @@ go_proto_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/kvserverpb", - "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb:with-mocks", "//pkg/storage/enginepb", "@com_github_gogo_protobuf//gogoproto", - "@io_etcd_go_etcd_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index fa49a05475cb..36d09cd42d2d 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" @@ -467,7 +468,7 @@ func mergeCheckingTimestampCaches( var snapshotFilter func(kvserver.IncomingSnapshot) beforeSnapshotSSTIngestion := func( inSnap kvserver.IncomingSnapshot, - snapType kvserver.SnapshotRequest_Type, + snapType kvserverpb.SnapshotRequest_Type, _ []string, ) error { filterMu.Lock() @@ -631,7 +632,7 @@ func mergeCheckingTimestampCaches( // Applied to the leaseholder's raft transport during the partition. partitionedLeaseholderFuncs := noopRaftHandlerFuncs() - partitionedLeaseholderFuncs.dropReq = func(*kvserver.RaftMessageRequest) bool { + partitionedLeaseholderFuncs.dropReq = func(*kvserverpb.RaftMessageRequest) bool { // Ignore everything from new leader. return true } @@ -639,12 +640,12 @@ func mergeCheckingTimestampCaches( // Applied to the leader and other follower's raft transport during the // partition. partitionedLeaderFuncs := noopRaftHandlerFuncs() - partitionedLeaderFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool { + partitionedLeaderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { // Ignore everything from leaseholder, except forwarded proposals. return req.FromReplica.StoreID == lhsStore.StoreID() && req.Message.Type != raftpb.MsgProp } - partitionedLeaderFuncs.dropHB = func(hb *kvserver.RaftHeartbeat) bool { + partitionedLeaderFuncs.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool { // Ignore heartbeats from leaseholder, results in campaign. return hb.FromReplicaID == roachpb.ReplicaID(lhsRepls[0].RaftStatus().ID) } @@ -652,7 +653,7 @@ func mergeCheckingTimestampCaches( // Applied to leaseholder after the partition heals. var truncIndex uint64 restoredLeaseholderFuncs := noopRaftHandlerFuncs() - restoredLeaseholderFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool { + restoredLeaseholderFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just // truncated can make it through. The Raft transport is asynchronous // so this is necessary to make the test pass reliably - otherwise @@ -2494,10 +2495,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { ) { // Try several times, as the message may be dropped (see #18355). for i := 0; i < 5; i++ { - if sent := transport.SendAsync(&kvserver.RaftMessageRequest{ + if sent := transport.SendAsync(&kvserverpb.RaftMessageRequest{ FromReplica: fromReplDesc, ToReplica: toReplDesc, - Heartbeats: []kvserver.RaftHeartbeat{ + Heartbeats: []kvserverpb.RaftHeartbeat{ { RangeID: rangeID, FromReplicaID: fromReplDesc.ReplicaID, @@ -2744,7 +2745,7 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) { rangeID: lhsDesc.RangeID, RaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserver.RaftMessageRequest) bool { + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { return true }, }, @@ -3025,7 +3026,7 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi rangeID: lhsDesc.RangeID, RaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(*kvserver.RaftMessageRequest) bool { + dropReq: func(*kvserverpb.RaftMessageRequest) bool { return true }, }, @@ -3232,7 +3233,7 @@ func (h *slowSnapRaftHandler) unblock() { func (h *slowSnapRaftHandler) HandleSnapshot( ctx context.Context, - header *kvserver.SnapshotRequest_Header, + header *kvserverpb.SnapshotRequest_Header, respStream kvserver.SnapshotResponseStream, ) error { if header.RaftMessageRequest.RangeID == h.rangeID { @@ -3327,7 +3328,7 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { rangeID: desc.RangeID, RaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(request *kvserver.RaftMessageRequest) bool { + dropReq: func(request *kvserverpb.RaftMessageRequest) bool { return true }, }, @@ -3725,7 +3726,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { rangeIds := make(map[string]roachpb.RangeID, 4) beforeSnapshotSSTIngestion := func( inSnap kvserver.IncomingSnapshot, - snapType kvserver.SnapshotRequest_Type, + snapType kvserverpb.SnapshotRequest_Type, sstNames []string, ) error { // Only verify snapshots of type VIA_SNAPSHOT_QUEUE and on the range under @@ -3734,7 +3735,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // there are too many keys and the other replicated keys are verified later // on in the test. This function verifies that the subsumed replicas have // been handled properly. - if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE || + if snapType != kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE || inSnap.Desc.RangeID != rangeIds[string(keyA)] { return nil } @@ -3943,7 +3944,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { rangeID: aRepl0.RangeID, RaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(request *kvserver.RaftMessageRequest) bool { + dropReq: func(request *kvserverpb.RaftMessageRequest) bool { return true }, }, @@ -3988,7 +3989,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { rangeID: aRepl0.RangeID, RaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserver.RaftMessageRequest) bool { + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just // truncated can make it through. The Raft transport is asynchronous // so this is necessary to make the test pass reliably - otherwise @@ -4000,8 +4001,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { return req.Message.Type == raftpb.MsgApp && req.Message.Index < index }, // Don't drop heartbeats or responses. - dropHB: func(*kvserver.RaftHeartbeat) bool { return false }, - dropResp: func(*kvserver.RaftMessageResponse) bool { return false }, + dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false }, + dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false }, }, }) @@ -4687,7 +4688,7 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { 1: { Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - ReceiveSnapshot: func(header *kvserver.SnapshotRequest_Header) error { + ReceiveSnapshot: func(header *kvserverpb.SnapshotRequest_Header) error { val := delaySnapshotTrap.Load() if val != nil { fn := val.(func() error) diff --git a/pkg/kv/kvserver/client_migration_test.go b/pkg/kv/kvserver/client_migration_test.go index 714ef34cf8c7..74801038fe1b 100644 --- a/pkg/kv/kvserver/client_migration_test.go +++ b/pkg/kv/kvserver/client_migration_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -137,7 +138,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) { blockSnapshotsCh := make(chan struct{}) knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.DisableRaftSnapshotQueue = true // we'll control it ourselves - ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { // We'll want a signal for when the snapshot was received by the sender. once.Do(func() { close(blockUntilSnapshotCh) }) diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index d2263014fbab..33000a87dcea 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -27,22 +28,22 @@ type unreliableRaftHandlerFuncs struct { // If non-nil, can return false to avoid dropping the msg to // unreliableRaftHandler.rangeID. If nil, all messages pertaining to the // respective range are dropped. - dropReq func(*kvserver.RaftMessageRequest) bool - dropHB func(*kvserver.RaftHeartbeat) bool - dropResp func(*kvserver.RaftMessageResponse) bool + dropReq func(*kvserverpb.RaftMessageRequest) bool + dropHB func(*kvserverpb.RaftHeartbeat) bool + dropResp func(*kvserverpb.RaftMessageResponse) bool // snapErr defaults to returning nil. - snapErr func(*kvserver.SnapshotRequest_Header) error + snapErr func(*kvserverpb.SnapshotRequest_Header) error } func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { return unreliableRaftHandlerFuncs{ - dropResp: func(*kvserver.RaftMessageResponse) bool { + dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false }, - dropReq: func(*kvserver.RaftMessageRequest) bool { + dropReq: func(*kvserverpb.RaftMessageRequest) bool { return false }, - dropHB: func(*kvserver.RaftHeartbeat) bool { + dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false }, } @@ -59,7 +60,7 @@ type unreliableRaftHandler struct { func (h *unreliableRaftHandler) HandleRaftRequest( ctx context.Context, - req *kvserver.RaftMessageRequest, + req *kvserverpb.RaftMessageRequest, respStream kvserver.RaftMessageResponseStream, ) *roachpb.Error { if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { @@ -94,12 +95,12 @@ func (h *unreliableRaftHandler) HandleRaftRequest( } func (h *unreliableRaftHandler) filterHeartbeats( - hbs []kvserver.RaftHeartbeat, -) []kvserver.RaftHeartbeat { + hbs []kvserverpb.RaftHeartbeat, +) []kvserverpb.RaftHeartbeat { if len(hbs) == 0 { return hbs } - var cpy []kvserver.RaftHeartbeat + var cpy []kvserverpb.RaftHeartbeat for i := range hbs { hb := &hbs[i] if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) { @@ -110,7 +111,7 @@ func (h *unreliableRaftHandler) filterHeartbeats( } func (h *unreliableRaftHandler) HandleRaftResponse( - ctx context.Context, resp *kvserver.RaftMessageResponse, + ctx context.Context, resp *kvserverpb.RaftMessageResponse, ) error { if resp.RangeID == h.rangeID { if h.dropResp == nil || h.dropResp(resp) { @@ -122,7 +123,7 @@ func (h *unreliableRaftHandler) HandleRaftResponse( func (h *unreliableRaftHandler) HandleSnapshot( ctx context.Context, - header *kvserver.SnapshotRequest_Header, + header *kvserverpb.SnapshotRequest_Header, respStream kvserver.SnapshotResponseStream, ) error { if header.RaftMessageRequest.RangeID == h.rangeID && h.snapErr != nil { @@ -147,7 +148,7 @@ func (h *testClusterStoreRaftMessageHandler) getStore() (*kvserver.Store, error) func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest( ctx context.Context, - req *kvserver.RaftMessageRequest, + req *kvserverpb.RaftMessageRequest, respStream kvserver.RaftMessageResponseStream, ) *roachpb.Error { store, err := h.getStore() @@ -158,7 +159,7 @@ func (h *testClusterStoreRaftMessageHandler) HandleRaftRequest( } func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse( - ctx context.Context, resp *kvserver.RaftMessageResponse, + ctx context.Context, resp *kvserverpb.RaftMessageResponse, ) error { store, err := h.getStore() if err != nil { @@ -169,7 +170,7 @@ func (h *testClusterStoreRaftMessageHandler) HandleRaftResponse( func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( ctx context.Context, - header *kvserver.SnapshotRequest_Header, + header *kvserverpb.SnapshotRequest_Header, respStream kvserver.SnapshotResponseStream, ) error { store, err := h.getStore() @@ -277,7 +278,7 @@ func setupPartitionedRangeWithHandlers( // Only filter messages from the partitioned store on the other // two stores. if h.dropReq == nil { - h.dropReq = func(req *kvserver.RaftMessageRequest) bool { + h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { pr.mu.RLock() defer pr.mu.RUnlock() return pr.mu.partitioned && @@ -286,7 +287,7 @@ func setupPartitionedRangeWithHandlers( } } if h.dropHB == nil { - h.dropHB = func(hb *kvserver.RaftHeartbeat) bool { + h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool { pr.mu.RLock() defer pr.mu.RUnlock() if !pr.mu.partitioned { @@ -299,7 +300,7 @@ func setupPartitionedRangeWithHandlers( } } if h.snapErr == nil { - h.snapErr = func(header *kvserver.SnapshotRequest_Header) error { + h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error { pr.mu.RLock() defer pr.mu.RUnlock() if !pr.mu.partitioned { diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index be74c295c4a7..1956bc869711 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -911,10 +911,10 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { if s != partStore { // Only filter messages from the partitioned store on the other // two stores. - h.dropReq = func(req *kvserver.RaftMessageRequest) bool { + h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { return req.FromReplica.StoreID == partRepl.StoreID() } - h.dropHB = func(hb *kvserver.RaftHeartbeat) bool { + h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool { return hb.FromReplicaID == partReplDesc.ReplicaID } } @@ -1020,7 +1020,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { rangeID: partRepl.RangeID, RaftMessageHandler: tc.GetFirstStoreFromServer(t, s), unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserver.RaftMessageRequest) bool { + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just truncated can // make it through. The Raft transport is asynchronous so this is necessary // to make the test pass reliably. @@ -1028,8 +1028,8 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // entries in the MsgApp, so filter where msg.Index < index, not <= index. return req.Message.Type == raftpb.MsgApp && req.Message.Index < index }, - dropHB: func(*kvserver.RaftHeartbeat) bool { return false }, - dropResp: func(*kvserver.RaftMessageResponse) bool { return false }, + dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false }, + dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false }, }, }) } @@ -1144,10 +1144,10 @@ func TestRequestsOnLaggingReplica(t *testing.T) { if i != partitionNodeIdx { // Only filter messages from the partitioned store on the other two // stores. - h.dropReq = func(req *kvserver.RaftMessageRequest) bool { + h.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { return req.FromReplica.StoreID == partRepl.StoreID() } - h.dropHB = func(hb *kvserver.RaftHeartbeat) bool { + h.dropHB = func(hb *kvserverpb.RaftHeartbeat) bool { return hb.FromReplicaID == partReplDesc.ReplicaID } } @@ -1283,17 +1283,17 @@ func TestRequestsOnLaggingReplica(t *testing.T) { } type fakeSnapshotStream struct { - nextReq *kvserver.SnapshotRequest + nextReq *kvserverpb.SnapshotRequest nextErr error } // Recv implements the SnapshotResponseStream interface. -func (c fakeSnapshotStream) Recv() (*kvserver.SnapshotRequest, error) { +func (c fakeSnapshotStream) Recv() (*kvserverpb.SnapshotRequest, error) { return c.nextReq, c.nextErr } // Send implements the SnapshotResponseStream interface. -func (c fakeSnapshotStream) Send(request *kvserver.SnapshotResponse) error { +func (c fakeSnapshotStream) Send(request *kvserverpb.SnapshotResponse) error { return nil } @@ -1322,10 +1322,10 @@ func TestFailedSnapshotFillsReservation(t *testing.T) { desc.AddReplica(2, 2, roachpb.LEARNER) rep2Desc, found := desc.GetReplicaDescriptor(2) require.True(t, found) - header := kvserver.SnapshotRequest_Header{ + header := kvserverpb.SnapshotRequest_Header{ RangeSize: 100, State: kvserverpb.ReplicaState{Desc: desc}, - RaftMessageRequest: kvserver.RaftMessageRequest{ + RaftMessageRequest: kvserverpb.RaftMessageRequest{ RangeID: rep.RangeID, FromReplica: repDesc, ToReplica: rep2Desc, @@ -1787,7 +1787,7 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) { addReplica := func(storeNum int, desc *roachpb.RangeDescriptor) error { chgs := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(storeNum)) - _, err := repl.ChangeReplicas(ctx, desc, kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs) + _, err := repl.ChangeReplicas(ctx, desc, kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs) return err } @@ -2774,10 +2774,10 @@ func TestRaftAfterRemoveRange(t *testing.T) { StoreID: target2.StoreID, } - tc.Servers[2].RaftTransport().SendAsync(&kvserver.RaftMessageRequest{ + tc.Servers[2].RaftTransport().SendAsync(&kvserverpb.RaftMessageRequest{ ToReplica: replica1, FromReplica: replica2, - Heartbeats: []kvserver.RaftHeartbeat{ + Heartbeats: []kvserverpb.RaftHeartbeat{ { RangeID: desc.RangeID, FromReplicaID: replica2.ReplicaID, @@ -2867,7 +2867,7 @@ func TestRemovePlaceholderRace(t *testing.T) { for _, action := range []roachpb.ReplicaChangeType{roachpb.REMOVE_VOTER, roachpb.ADD_VOTER} { for { chgs := roachpb.MakeReplicationChanges(action, tc.Target(1)) - if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonUnknown, "", chgs); err != nil { + if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonUnknown, "", chgs); err != nil { if kvserver.IsRetriableReplicationChangeError(err) { continue } else { @@ -2887,7 +2887,7 @@ type noConfChangeTestHandler struct { func (ncc *noConfChangeTestHandler) HandleRaftRequest( ctx context.Context, - req *kvserver.RaftMessageRequest, + req *kvserverpb.RaftMessageRequest, respStream kvserver.RaftMessageResponseStream, ) *roachpb.Error { for i, e := range req.Message.Entries { @@ -2896,7 +2896,7 @@ func (ncc *noConfChangeTestHandler) HandleRaftRequest( if err := protoutil.Unmarshal(e.Data, &cc); err != nil { panic(err) } - var ccCtx kvserver.ConfChangeContext + var ccCtx kvserverpb.ConfChangeContext if err := protoutil.Unmarshal(cc.Context, &ccCtx); err != nil { panic(err) } @@ -2917,7 +2917,7 @@ func (ncc *noConfChangeTestHandler) HandleRaftRequest( } func (ncc *noConfChangeTestHandler) HandleRaftResponse( - ctx context.Context, resp *kvserver.RaftMessageResponse, + ctx context.Context, resp *kvserverpb.RaftMessageResponse, ) error { switch val := resp.Union.GetValue().(type) { case *roachpb.Error: @@ -2969,7 +2969,7 @@ func TestReplicaGCRace(t *testing.T) { NodeID: toStore.Ident.NodeID, StoreID: toStore.Ident.StoreID, }) - if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); err != nil { + if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); err != nil { t.Fatal(err) } @@ -2985,10 +2985,10 @@ func TestReplicaGCRace(t *testing.T) { t.Fatalf("expected %s to have a replica on %s", rangeDesc, toStore) } - hbReq := kvserver.RaftMessageRequest{ + hbReq := kvserverpb.RaftMessageRequest{ FromReplica: fromReplicaDesc, ToReplica: toReplicaDesc, - Heartbeats: []kvserver.RaftHeartbeat{ + Heartbeats: []kvserverpb.RaftHeartbeat{ { RangeID: desc.RangeID, FromReplicaID: fromReplicaDesc.ReplicaID, @@ -3018,7 +3018,7 @@ func TestReplicaGCRace(t *testing.T) { // Remove the victim replica and manually GC it. chgs[0].ChangeType = roachpb.REMOVE_VOTER - if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeOverReplicated, "", chgs); err != nil { + if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeOverReplicated, "", chgs); err != nil { t.Fatal(err) } @@ -3414,13 +3414,13 @@ func TestReplicateRogueRemovedNode(t *testing.T) { type errorChannelTestHandler chan *roachpb.Error func (errorChannelTestHandler) HandleRaftRequest( - _ context.Context, _ *kvserver.RaftMessageRequest, _ kvserver.RaftMessageResponseStream, + _ context.Context, _ *kvserverpb.RaftMessageRequest, _ kvserver.RaftMessageResponseStream, ) *roachpb.Error { panic("unimplemented") } func (d errorChannelTestHandler) HandleRaftResponse( - ctx context.Context, resp *kvserver.RaftMessageResponse, + ctx context.Context, resp *kvserverpb.RaftMessageResponse, ) error { switch val := resp.Union.GetValue().(type) { case *roachpb.Error: @@ -3432,7 +3432,7 @@ func (d errorChannelTestHandler) HandleRaftResponse( } func (errorChannelTestHandler) HandleSnapshot( - _ context.Context, _ *kvserver.SnapshotRequest_Header, _ kvserver.SnapshotResponseStream, + _ context.Context, _ *kvserverpb.SnapshotRequest_Header, _ kvserver.SnapshotResponseStream, ) error { panic("unimplemented") } @@ -3539,7 +3539,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { // Simulate the removed node asking to trigger an election. Try and try again // until we're reasonably sure the message was sent. - for !transport0.SendAsync(&kvserver.RaftMessageRequest{ + for !transport0.SendAsync(&kvserverpb.RaftMessageRequest{ RangeID: desc.RangeID, ToReplica: replica1, FromReplica: replica0, @@ -4123,7 +4123,7 @@ func TestUninitializedReplicaRemainsQuiesced(t *testing.T) { // Block incoming snapshots on s2 until channel is signaled. blockSnapshot := make(chan struct{}) handlerFuncs := noopRaftHandlerFuncs() - handlerFuncs.snapErr = func(header *kvserver.SnapshotRequest_Header) error { + handlerFuncs.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error { select { case <-blockSnapshot: case <-tc.Stopper().ShouldQuiesce(): @@ -4561,12 +4561,12 @@ func TestStoreWaitForReplicaInit(t *testing.T) { var repl44 *kvserver.Replica testutils.SucceedsSoon(t, func() (err error) { // Try several times, as the message may be dropped (see #18355). - tc.Servers[0].RaftTransport().SendAsync(&kvserver.RaftMessageRequest{ + tc.Servers[0].RaftTransport().SendAsync(&kvserverpb.RaftMessageRequest{ ToReplica: roachpb.ReplicaDescriptor{ NodeID: store.Ident.NodeID, StoreID: store.Ident.StoreID, }, - Heartbeats: []kvserver.RaftHeartbeat{{RangeID: 44, ToReplicaID: 1}}, + Heartbeats: []kvserverpb.RaftHeartbeat{{RangeID: 44, ToReplicaID: 1}}, }, rpc.DefaultClass) repl44, err = store.GetReplica(44) return err @@ -4621,7 +4621,7 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) { rangeID: ri.Desc.RangeID, RaftMessageHandler: tc.GetFirstStoreFromServer(t, i), unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserver.RaftMessageRequest) bool { + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { return rand.Intn(2) == 0 }, }, @@ -5494,7 +5494,7 @@ func TestReplicaRemovalClosesProposalQuota(t *testing.T) { require.True(t, found) newReplDesc := replDesc newReplDesc.ReplicaID = desc.NextReplicaID - require.Nil(t, store.HandleRaftRequest(ctx, &kvserver.RaftMessageRequest{ + require.Nil(t, store.HandleRaftRequest(ctx, &kvserverpb.RaftMessageRequest{ RangeID: desc.RangeID, RangeStartKey: desc.StartKey, FromReplica: fromReplDesc, @@ -5510,7 +5510,7 @@ func TestReplicaRemovalClosesProposalQuota(t *testing.T) { type noopRaftMessageResponseStream struct{} -func (n noopRaftMessageResponseStream) Send(*kvserver.RaftMessageResponse) error { +func (n noopRaftMessageResponseStream) Send(*kvserverpb.RaftMessageResponse) error { return nil } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index f52a0e42c05a..e7c75a38313e 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2015,7 +2015,7 @@ func TestDrainRangeRejection(t *testing.T) { drainingIdx := 1 tc.GetFirstStoreFromServer(t, 1).SetDraining(true, nil /* reporter */, false /* verbose */) chgs := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(drainingIdx)) - if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); !testutils.IsError(err, "store is draining") { + if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); !testutils.IsError(err, "store is draining") { t.Fatalf("unexpected error: %+v", err) } } @@ -2035,7 +2035,7 @@ func TestChangeReplicasGeneration(t *testing.T) { oldGeneration := repl.Desc().Generation chgs := roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)) - if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); err != nil { + if _, err := repl.ChangeReplicas(context.Background(), repl.Desc(), kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs); err != nil { t.Fatalf("unexpected error: %v", err) } assert.EqualValues(t, repl.Desc().Generation, oldGeneration+2) @@ -2043,7 +2043,7 @@ func TestChangeReplicasGeneration(t *testing.T) { oldGeneration = repl.Desc().Generation oldDesc := repl.Desc() chgs[0].ChangeType = roachpb.REMOVE_VOTER - newDesc, err := repl.ChangeReplicas(context.Background(), oldDesc, kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeOverReplicated, "", chgs) + newDesc, err := repl.ChangeReplicas(context.Background(), oldDesc, kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeOverReplicated, "", chgs) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -2311,7 +2311,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // ensures that that when node 2 comes back up it will require a snapshot // from Raft. funcs := noopRaftHandlerFuncs() - funcs.dropReq = func(*kvserver.RaftMessageRequest) bool { + funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool { return true } tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ @@ -2592,7 +2592,7 @@ func TestReplicaTombstone(t *testing.T) { // it rather than receiving a ReplicaTooOldError. store, _ := getFirstStoreReplica(t, tc.Server(1), key) funcs := noopRaftHandlerFuncs() - funcs.dropResp = func(*kvserver.RaftMessageResponse) bool { + funcs.dropResp = func(*kvserverpb.RaftMessageResponse) bool { return true } tc.Servers[1].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ @@ -2639,7 +2639,7 @@ func TestReplicaTombstone(t *testing.T) { // ReplicaTooOldError. sawTooOld := make(chan struct{}, 1) raftFuncs := noopRaftHandlerFuncs() - raftFuncs.dropResp = func(resp *kvserver.RaftMessageResponse) bool { + raftFuncs.dropResp = func(resp *kvserverpb.RaftMessageResponse) bool { if pErr, ok := resp.Union.GetValue().(*roachpb.Error); ok { if _, isTooOld := pErr.GetDetail().(*roachpb.ReplicaTooOldError); isTooOld { select { @@ -2650,7 +2650,7 @@ func TestReplicaTombstone(t *testing.T) { } return false } - raftFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool { + raftFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { return req.ToReplica.StoreID == store.StoreID() } tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ @@ -2823,17 +2823,17 @@ func TestReplicaTombstone(t *testing.T) { rangeID: desc.RangeID, RaftMessageHandler: store, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropResp: func(*kvserver.RaftMessageResponse) bool { + dropResp: func(*kvserverpb.RaftMessageResponse) bool { return true }, - dropReq: func(*kvserver.RaftMessageRequest) bool { + dropReq: func(*kvserverpb.RaftMessageRequest) bool { return true }, - dropHB: func(hb *kvserver.RaftHeartbeat) bool { + dropHB: func(hb *kvserverpb.RaftHeartbeat) bool { recordHeartbeat(hb.ToReplicaID) return false }, - snapErr: func(*kvserver.SnapshotRequest_Header) error { + snapErr: func(*kvserverpb.SnapshotRequest_Header) error { waitForSnapshot() return errors.New("boom") }, @@ -2925,7 +2925,7 @@ func TestReplicaTombstone(t *testing.T) { var partActive atomic.Value partActive.Store(false) raftFuncs := noopRaftHandlerFuncs() - raftFuncs.dropReq = func(req *kvserver.RaftMessageRequest) bool { + raftFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { return partActive.Load().(bool) && req.Message.Type == raftpb.MsgApp } tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ @@ -3059,7 +3059,7 @@ func TestAdminRelocateRangeSafety(t *testing.T) { change := func() { <-seenAdd chgs := roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, makeReplicationTargets(2)...) - changedDesc, changeErr = r1.ChangeReplicas(ctx, &expDescAfterAdd, kvserver.SnapshotRequest_REBALANCE, "replicate", "testing", chgs) + changedDesc, changeErr = r1.ChangeReplicas(ctx, &expDescAfterAdd, kvserverpb.SnapshotRequest_REBALANCE, "replicate", "testing", chgs) } relocate := func() { relocateErr = db.AdminRelocateRange( @@ -3879,7 +3879,7 @@ func TestTenantID(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ BeforeSnapshotSSTIngestion: func( snapshot kvserver.IncomingSnapshot, - request_type kvserver.SnapshotRequest_Type, + request_type kvserverpb.SnapshotRequest_Type, strings []string, ) error { if snapshot.Desc.RangeID == repl.RangeID { @@ -3958,7 +3958,7 @@ func TestUninitializedMetric(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ BeforeSnapshotSSTIngestion: func( snapshot kvserver.IncomingSnapshot, - _ kvserver.SnapshotRequest_Type, + _ kvserverpb.SnapshotRequest_Type, _ []string, ) error { if snapshot.Desc.RangeID == repl.RangeID { diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 0b1eb04c90b1..5ffcc503089d 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -800,12 +801,12 @@ func TestStoreRangeSplitStats(t *testing.T) { // header before delegating to the underlying HandleSnapshot method. type RaftMessageHandlerInterceptor struct { kvserver.RaftMessageHandler - handleSnapshotFilter func(header *kvserver.SnapshotRequest_Header) + handleSnapshotFilter func(header *kvserverpb.SnapshotRequest_Header) } func (mh RaftMessageHandlerInterceptor) HandleSnapshot( ctx context.Context, - header *kvserver.SnapshotRequest_Header, + header *kvserverpb.SnapshotRequest_Header, respStream kvserver.SnapshotResponseStream, ) error { mh.handleSnapshotFilter(header) @@ -846,11 +847,11 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { // snapshot request headers. messageRecorder := struct { syncutil.Mutex - headers []*kvserver.SnapshotRequest_Header + headers []*kvserverpb.SnapshotRequest_Header }{} messageHandler := RaftMessageHandlerInterceptor{ RaftMessageHandler: tc.GetFirstStoreFromServer(t, 1), - handleSnapshotFilter: func(header *kvserver.SnapshotRequest_Header) { + handleSnapshotFilter: func(header *kvserverpb.SnapshotRequest_Header) { // Each snapshot request is handled in a new goroutine, so we need // synchronization. messageRecorder.Lock() @@ -2143,7 +2144,7 @@ func TestStoreRangeSplitRaceUninitializedRHS(t *testing.T) { // side in the split trigger was racing with the uninitialized // version for the same group, resulting in clobbered HardState). for term := uint64(1); ; term++ { - if sent := tc.Servers[1].RaftTransport().SendAsync(&kvserver.RaftMessageRequest{ + if sent := tc.Servers[1].RaftTransport().SendAsync(&kvserverpb.RaftMessageRequest{ RangeID: trigger.RightDesc.RangeID, ToReplica: replicas[0], FromReplica: replicas[1], diff --git a/pkg/kv/kvserver/debug_print.go b/pkg/kv/kvserver/debug_print.go index 31a5aeee2f36..6ae330eeaf1a 100644 --- a/pkg/kv/kvserver/debug_print.go +++ b/pkg/kv/kvserver/debug_print.go @@ -236,7 +236,7 @@ func tryRaftLogEntry(kv storage.MVCCKeyValue) (string, error) { c = cc } - var ctx ConfChangeContext + var ctx kvserverpb.ConfChangeContext if err := protoutil.Unmarshal(c.AsV2().Context, &ctx); err != nil { return "", err } diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index 581d767e3363..8e5eac9e1d0b 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "lease_status.go", "log.go", "proposer_kv.go", + "raft.go", ], embed = [":kvserverpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb", @@ -22,8 +23,9 @@ proto_library( name = "kvserverpb_proto", srcs = [ "lease_status.proto", - "log.proto", "proposer_kv.proto", + "raft.proto", + "range_log.proto", "state.proto", ], strip_import_prefix = "/pkg", @@ -36,6 +38,7 @@ proto_library( "//pkg/util/hlc:hlc_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:timestamp_proto", + "@io_etcd_go_etcd_raft_v3//raftpb:raftpb_proto", ], ) @@ -54,5 +57,6 @@ go_proto_library( "//pkg/util/hlc", "//pkg/util/uuid", # keep "@com_github_gogo_protobuf//gogoproto", + "@io_etcd_go_etcd_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/kvserverpb/raft.go b/pkg/kv/kvserver/kvserverpb/raft.go new file mode 100644 index 000000000000..3940a7a1c8ca --- /dev/null +++ b/pkg/kv/kvserver/kvserverpb/raft.go @@ -0,0 +1,14 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserverpb + +// SafeValue implements the redact.SafeValue interface. +func (SnapshotRequest_Type) SafeValue() {} diff --git a/pkg/kv/kvserver/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto similarity index 99% rename from pkg/kv/kvserver/raft.proto rename to pkg/kv/kvserver/kvserverpb/raft.proto index a11ac8fda205..2fa84622e03f 100644 --- a/pkg/kv/kvserver/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -9,8 +9,8 @@ // licenses/APL.txt. syntax = "proto3"; -package cockroach.kv.kvserver; -option go_package = "kvserver"; +package cockroach.kv.kvserver.kvserverpb; +option go_package = "kvserverpb"; import "roachpb/errors.proto"; import "roachpb/metadata.proto"; @@ -222,4 +222,3 @@ message ConfChangeContext { // kvserverpb.RaftCommand). bytes payload = 2; } - diff --git a/pkg/kv/kvserver/kvserverpb/log.proto b/pkg/kv/kvserver/kvserverpb/range_log.proto similarity index 100% rename from pkg/kv/kvserver/kvserverpb/log.proto rename to pkg/kv/kvserver/kvserverpb/range_log.proto diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index efe22095bb8e..d3d134c2b830 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -100,7 +100,7 @@ func GetDescriptorChangesFromRaftLog( var ent raftpb.Entry decodeRaftChange := func(ccI raftpb.ConfChangeI) ([]byte, error) { - var ccC kvserver.ConfChangeContext + var ccC kvserverpb.ConfChangeContext if err := protoutil.Unmarshal(ccI.AsV2().Context, &ccC); err != nil { return nil, errors.Wrap(err, "while unmarshaling CCContext") } diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 8705d3048520..c0de7979ba30 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -17,6 +17,7 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/util/log" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" @@ -222,16 +223,16 @@ func raftEntryFormatter(data []byte) string { var raftMessageRequestPool = sync.Pool{ New: func() interface{} { - return &RaftMessageRequest{} + return &kvserverpb.RaftMessageRequest{} }, } -func newRaftMessageRequest() *RaftMessageRequest { - return raftMessageRequestPool.Get().(*RaftMessageRequest) +func newRaftMessageRequest() *kvserverpb.RaftMessageRequest { + return raftMessageRequestPool.Get().(*kvserverpb.RaftMessageRequest) } -func (m *RaftMessageRequest) release() { - *m = RaftMessageRequest{} +func releaseRaftMessageRequest(m *kvserverpb.RaftMessageRequest) { + *m = kvserverpb.RaftMessageRequest{} raftMessageRequestPool.Put(m) } @@ -285,6 +286,3 @@ func traceProposals(r *Replica, ids []kvserverbase.CmdIDKey, event string) { log.Eventf(ctx, "%v", event) } } - -// SafeValue implements the redact.SafeValue interface. -func (SnapshotRequest_Type) SafeValue() {} diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 7f28faa2b2a7..82dc076a0098 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -107,7 +108,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( if !ok { return errors.Errorf("%s: replica %d not present in %v", repl, id, desc.Replicas()) } - snapType := SnapshotRequest_VIA_SNAPSHOT_QUEUE + snapType := kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE skipSnapLogLimiter := log.Every(10 * time.Second) if typ := repDesc.GetType(); typ == roachpb.LEARNER || typ == roachpb.NON_VOTER { @@ -146,7 +147,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, SnapshotRequest_RECOVERY) + err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 44a6ebe6f41a..af4a306cfaea 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -20,6 +20,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -73,7 +74,7 @@ var targetRaftOutgoingBatchSize = settings.RegisterByteSizeSetting( // RaftMessageResponseStream is the subset of the // MultiRaft_RaftMessageServer interface that is needed for sending responses. type RaftMessageResponseStream interface { - Send(*RaftMessageResponse) error + Send(*kvserverpb.RaftMessageResponse) error } // lockedRaftMessageResponseStream is an implementation of @@ -85,13 +86,13 @@ type lockedRaftMessageResponseStream struct { sendMu syncutil.Mutex } -func (s *lockedRaftMessageResponseStream) Send(resp *RaftMessageResponse) error { +func (s *lockedRaftMessageResponseStream) Send(resp *kvserverpb.RaftMessageResponse) error { s.sendMu.Lock() defer s.sendMu.Unlock() return s.wrapped.Send(resp) } -func (s *lockedRaftMessageResponseStream) Recv() (*RaftMessageRequestBatch, error) { +func (s *lockedRaftMessageResponseStream) Recv() (*kvserverpb.RaftMessageRequestBatch, error) { // No need for lock. gRPC.Stream.RecvMsg is safe for concurrent use. return s.wrapped.Recv() } @@ -99,8 +100,8 @@ func (s *lockedRaftMessageResponseStream) Recv() (*RaftMessageRequestBatch, erro // SnapshotResponseStream is the subset of the // MultiRaft_RaftSnapshotServer interface that is needed for sending responses. type SnapshotResponseStream interface { - Send(*SnapshotResponse) error - Recv() (*SnapshotRequest, error) + Send(*kvserverpb.SnapshotResponse) error + Recv() (*kvserverpb.SnapshotRequest, error) } // RaftMessageHandler is the interface that must be implemented by @@ -110,17 +111,17 @@ type RaftMessageHandler interface { // always processed asynchronously and the response is sent over respStream. // If an error is encountered during asynchronous processing, it will be // streamed back to the sender of the message as a RaftMessageResponse. - HandleRaftRequest(ctx context.Context, req *RaftMessageRequest, + HandleRaftRequest(ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream) *roachpb.Error // HandleRaftResponse is called for each raft response. Note that // not all messages receive a response. An error is returned if and only if // the underlying Raft connection should be closed. - HandleRaftResponse(context.Context, *RaftMessageResponse) error + HandleRaftResponse(context.Context, *kvserverpb.RaftMessageResponse) error // HandleSnapshot is called for each new incoming snapshot stream, after // parsing the initial SnapshotRequest_Header on the stream. - HandleSnapshot(ctx context.Context, header *SnapshotRequest_Header, respStream SnapshotResponseStream) error + HandleSnapshot(ctx context.Context, header *kvserverpb.SnapshotRequest_Header, respStream SnapshotResponseStream) error } type raftTransportStats struct { @@ -222,7 +223,7 @@ func NewRaftTransport( return true } setQueueLength := func(k int64, v unsafe.Pointer) bool { - ch := *(*chan *RaftMessageRequest)(v) + ch := *(*chan *kvserverpb.RaftMessageRequest)(v) if s, ok := statsMap[roachpb.NodeID(k)]; ok { s.queue += len(ch) } @@ -279,7 +280,7 @@ func NewRaftTransport( func (t *RaftTransport) queuedMessageCount() int64 { var n int64 addLength := func(k int64, v unsafe.Pointer) bool { - ch := *(*chan *RaftMessageRequest)(v) + ch := *(*chan *kvserverpb.RaftMessageRequest)(v) n += int64(len(ch)) return true } @@ -298,7 +299,7 @@ func (t *RaftTransport) getHandler(storeID roachpb.StoreID) (RaftMessageHandler, // handleRaftRequest proxies a request to the listening server interface. func (t *RaftTransport) handleRaftRequest( - ctx context.Context, req *RaftMessageRequest, respStream RaftMessageResponseStream, + ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) *roachpb.Error { handler, ok := t.getHandler(req.ToReplica.StoreID) if !ok { @@ -312,8 +313,10 @@ func (t *RaftTransport) handleRaftRequest( // newRaftMessageResponse constructs a RaftMessageResponse from the // given request and error. -func newRaftMessageResponse(req *RaftMessageRequest, pErr *roachpb.Error) *RaftMessageResponse { - resp := &RaftMessageResponse{ +func newRaftMessageResponse( + req *kvserverpb.RaftMessageRequest, pErr *roachpb.Error, +) *kvserverpb.RaftMessageResponse { + resp := &kvserverpb.RaftMessageResponse{ RangeID: req.RangeID, // From and To are reversed in the response. ToReplica: req.FromReplica, @@ -420,8 +423,8 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } if req.Header == nil { - return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, + return stream.Send(&kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, Message: "client error: no header in first snapshot request message"}) } rmr := req.Header.RaftMessageRequest @@ -461,7 +464,7 @@ func (t *RaftTransport) Stop(storeID roachpb.StoreID) { // to be sent. func (t *RaftTransport) processQueue( nodeID roachpb.NodeID, - ch chan *RaftMessageRequest, + ch chan *kvserverpb.RaftMessageRequest, stats *raftTransportStats, stream MultiRaft_RaftMessageBatchClient, class rpc.ConnectionClass, @@ -497,7 +500,7 @@ func (t *RaftTransport) processQueue( var raftIdleTimer timeutil.Timer defer raftIdleTimer.Stop() - batch := &RaftMessageRequestBatch{} + batch := &kvserverpb.RaftMessageRequestBatch{} for { raftIdleTimer.Reset(raftIdleTimeout) select { @@ -511,14 +514,14 @@ func (t *RaftTransport) processQueue( case req := <-ch: budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size()) batch.Requests = append(batch.Requests, *req) - req.release() + releaseRaftMessageRequest(req) // Pull off as many queued requests as possible, within reason. for budget > 0 { select { case req = <-ch: budget -= int64(req.Size()) batch.Requests = append(batch.Requests, *req) - req.release() + releaseRaftMessageRequest(req) default: budget = -1 } @@ -532,7 +535,7 @@ func (t *RaftTransport) processQueue( // Reuse the Requests slice, but zero out the contents to avoid delaying // GC of memory referenced from within. for i := range batch.Requests { - batch.Requests[i] = RaftMessageRequest{} + batch.Requests[i] = kvserverpb.RaftMessageRequest{} } batch.Requests = batch.Requests[:0] @@ -545,14 +548,14 @@ func (t *RaftTransport) processQueue( // indicating whether the queue already exists (true) or was created (false). func (t *RaftTransport) getQueue( nodeID roachpb.NodeID, class rpc.ConnectionClass, -) (chan *RaftMessageRequest, bool) { +) (chan *kvserverpb.RaftMessageRequest, bool) { queuesMap := &t.queues[class] value, ok := queuesMap.Load(int64(nodeID)) if !ok { - ch := make(chan *RaftMessageRequest, raftSendBufferSize) + ch := make(chan *kvserverpb.RaftMessageRequest, raftSendBufferSize) value, ok = queuesMap.LoadOrStore(int64(nodeID), unsafe.Pointer(&ch)) } - return *(*chan *RaftMessageRequest)(value), ok + return *(*chan *kvserverpb.RaftMessageRequest)(value), ok } // SendAsync sends a message to the recipient specified in the request. It @@ -560,7 +563,9 @@ func (t *RaftTransport) getQueue( // positive but will never be a false negative; if sent is true the message may // or may not actually be sent but if it's false the message definitely was not // sent. It is not safe to continue using the reference to the provided request. -func (t *RaftTransport) SendAsync(req *RaftMessageRequest, class rpc.ConnectionClass) (sent bool) { +func (t *RaftTransport) SendAsync( + req *kvserverpb.RaftMessageRequest, class rpc.ConnectionClass, +) (sent bool) { toNodeID := req.ToReplica.NodeID stats := t.getStats(toNodeID, class) defer func() { @@ -599,7 +604,7 @@ func (t *RaftTransport) SendAsync(req *RaftMessageRequest, class rpc.ConnectionC } return true default: - req.release() + releaseRaftMessageRequest(req) return false } } @@ -619,7 +624,7 @@ func (t *RaftTransport) startProcessNewQueue( class rpc.ConnectionClass, stats *raftTransportStats, ) (started bool) { - cleanup := func(ch chan *RaftMessageRequest) { + cleanup := func(ch chan *kvserverpb.RaftMessageRequest) { // Account for the remainder of `ch` which was never sent. // NB: we deleted the queue above, so within a short amount // of time nobody should be writing into the channel any @@ -678,7 +683,7 @@ func (t *RaftTransport) startProcessNewQueue( func (t *RaftTransport) SendSnapshot( ctx context.Context, storePool *StorePool, - header SnapshotRequest_Header, + header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, newBatch func() storage.Batch, sent func(), diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index c149346e42db..6b6ca7c440a7 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -43,7 +44,7 @@ import ( const channelServerBrokenRangeMessage = "channelServer broken range" type channelServer struct { - ch chan *kvserver.RaftMessageRequest + ch chan *kvserverpb.RaftMessageRequest maxSleep time.Duration // If non-zero, all messages to this range will return errors @@ -52,13 +53,13 @@ type channelServer struct { func newChannelServer(bufSize int, maxSleep time.Duration) channelServer { return channelServer{ - ch: make(chan *kvserver.RaftMessageRequest, bufSize), + ch: make(chan *kvserverpb.RaftMessageRequest, bufSize), maxSleep: maxSleep, } } func (s channelServer) HandleRaftRequest( - ctx context.Context, req *kvserver.RaftMessageRequest, _ kvserver.RaftMessageResponseStream, + ctx context.Context, req *kvserverpb.RaftMessageRequest, _ kvserver.RaftMessageResponseStream, ) *roachpb.Error { if s.maxSleep != 0 { // maxSleep simulates goroutine scheduling delays that could @@ -74,7 +75,7 @@ func (s channelServer) HandleRaftRequest( } func (s channelServer) HandleRaftResponse( - ctx context.Context, resp *kvserver.RaftMessageResponse, + ctx context.Context, resp *kvserverpb.RaftMessageResponse, ) error { // Mimic the logic in (*Store).HandleRaftResponse without requiring an // entire Store object to be pulled into these tests. @@ -89,7 +90,7 @@ func (s channelServer) HandleRaftResponse( func (s channelServer) HandleSnapshot( _ context.Context, - header *kvserver.SnapshotRequest_Header, + header *kvserverpb.SnapshotRequest_Header, stream kvserver.SnapshotResponseStream, ) error { panic("unexpected HandleSnapshot") @@ -204,7 +205,7 @@ func (rttc *raftTransportTestContext) Send( ) bool { msg.To = uint64(to.ReplicaID) msg.From = uint64(from.ReplicaID) - req := &kvserver.RaftMessageRequest{ + req := &kvserverpb.RaftMessageRequest{ RangeID: rangeID, Message: msg, ToReplica: to, @@ -273,7 +274,7 @@ func TestSendAndReceive(t *testing.T) { } for fromStoreID, fromNodeID := range storeNodes { - baseReq := kvserver.RaftMessageRequest{ + baseReq := kvserverpb.RaftMessageRequest{ RangeID: 1, Message: raftpb.Message{ From: uint64(fromStoreID), @@ -343,7 +344,7 @@ func TestSendAndReceive(t *testing.T) { // Send a message from replica 2 (on store 3, node 2) to replica 1 (on store 5, node 3) fromStoreID := roachpb.StoreID(3) toStoreID := roachpb.StoreID(5) - expReq := &kvserver.RaftMessageRequest{ + expReq := &kvserverpb.RaftMessageRequest{ RangeID: 1, Message: raftpb.Message{ Type: raftpb.MsgApp, @@ -639,7 +640,7 @@ func TestSendFailureToConnectDoesNotHangRaft(t *testing.T) { rttc.GossipNode(to, ln.Addr()) // Try to send a message, make sure we don't block waiting to set up the // connection. - transport.SendAsync(&kvserver.RaftMessageRequest{ + transport.SendAsync(&kvserverpb.RaftMessageRequest{ RangeID: rangeID, ToReplica: roachpb.ReplicaDescriptor{ StoreID: to, diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2e7f7747fc6f..23b42d1dbe34 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1092,7 +1092,7 @@ func (r *Replica) mergeInProgressRLocked() bool { // setLastReplicaDescriptors sets the most recently seen replica // descriptors to those contained in the *RaftMessageRequest. -func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *RaftMessageRequest) { +func (r *Replica) setLastReplicaDescriptorsRaftMuLocked(req *kvserverpb.RaftMessageRequest) { r.raftMu.AssertHeld() r.raftMu.lastFromReplica = req.FromReplica r.raftMu.lastToReplica = req.ToReplica diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index af86c8a89ea9..aefca7549f5a 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -90,7 +90,7 @@ type decodedRaftEntry struct { // decodedConfChange represents the fields of a config change raft command. type decodedConfChange struct { raftpb.ConfChangeI - ConfChangeContext + kvserverpb.ConfChangeContext } // decode decodes the entry e into the replicatedCmd. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 8732d155c18a..4d2f54ddd95f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -957,7 +957,7 @@ func waitForReplicasInit( func (r *Replica) ChangeReplicas( ctx context.Context, desc *roachpb.RangeDescriptor, - priority SnapshotRequest_Priority, + priority kvserverpb.SnapshotRequest_Priority, reason kvserverpb.RangeLogEventReason, details string, chgs roachpb.ReplicationChanges, @@ -987,7 +987,7 @@ func (r *Replica) ChangeReplicas( func (r *Replica) changeReplicasImpl( ctx context.Context, desc *roachpb.RangeDescriptor, - priority SnapshotRequest_Priority, + priority kvserverpb.SnapshotRequest_Priority, reason kvserverpb.RangeLogEventReason, details string, chgs roachpb.ReplicationChanges, @@ -1538,7 +1538,7 @@ func getChangesByNodeID(chgs roachpb.ReplicationChanges) changesByNodeID { func (r *Replica) initializeRaftLearners( ctx context.Context, desc *roachpb.RangeDescriptor, - priority SnapshotRequest_Priority, + priority kvserverpb.SnapshotRequest_Priority, reason kvserverpb.RangeLogEventReason, details string, targets []roachpb.ReplicationTarget, @@ -1684,7 +1684,7 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_INITIAL, priority); err != nil { + if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil { return nil, err } } @@ -2426,8 +2426,8 @@ func recordRangeEventsInLog( func (r *Replica) sendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, - snapType SnapshotRequest_Type, - priority SnapshotRequest_Priority, + snapType kvserverpb.SnapshotRequest_Type, + priority kvserverpb.SnapshotRequest_Priority, ) (retErr error) { defer func() { // Report the snapshot status to Raft, which expects us to do this once we @@ -2481,10 +2481,10 @@ func (r *Replica) sendSnapshot( // explicitly for snapshots going out to followers. snap.State.DeprecatedUsingAppliedStateKey = true - req := SnapshotRequest_Header{ + req := kvserverpb.SnapshotRequest_Header{ State: snap.State, DeprecatedUnreplicatedTruncatedState: true, - RaftMessageRequest: RaftMessageRequest{ + RaftMessageRequest: kvserverpb.RaftMessageRequest{ RangeID: r.RangeID, FromReplica: sender, ToReplica: recipient, @@ -2498,7 +2498,7 @@ func (r *Replica) sendSnapshot( }, RangeSize: r.GetMVCCStats().Total(), Priority: priority, - Strategy: SnapshotRequest_KV_BATCH, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, Type: snapType, } newBatchFn := func() storage.Batch { diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 3bd3f10214b8..83c5e8dbca2a 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -144,7 +145,7 @@ func TestAddReplicaViaLearner(t *testing.T) { blockUntilSnapshotCh := make(chan struct{}) blockSnapshotsCh := make(chan struct{}) knobs, ltk := makeReplicationTestKnobs() - ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { close(blockUntilSnapshotCh) select { case <-blockSnapshotsCh: @@ -315,7 +316,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { runTest := func(t *testing.T, replicaType roachpb.ReplicaType) { var rejectSnapshots int64 knobs, ltk := makeReplicationTestKnobs() - ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { if atomic.LoadInt64(&rejectSnapshots) > 0 { return errors.New(`nope`) } @@ -607,7 +608,7 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) { blockSnapshotsCh := make(chan struct{}) knobs, ltk := makeReplicationTestKnobs() ltk.storeKnobs.DisableRaftSnapshotQueue = true - ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { select { case <-blockSnapshotsCh: case <-time.After(10 * time.Second): @@ -669,7 +670,7 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) { blockUntilSnapshotCh := make(chan struct{}, 2) blockSnapshotsCh := make(chan struct{}) knobs, ltk := makeReplicationTestKnobs() - ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { blockUntilSnapshotCh <- struct{}{} <-blockSnapshotsCh return nil @@ -744,7 +745,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) { // In this case we'll get a snapshot error from the replicate queue which // will retry the up-replication with a new descriptor and succeed. ltk.storeKnobs.DisableEagerReplicaRemoval = true - ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error { + ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error { if atomic.LoadInt64(&skipReceiveSnapshotKnobAtomic) > 0 { return nil } diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 52187cd53827..c6b33ea7c459 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -537,7 +537,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( } ents = ents[len(ents):] - confChangeCtx := ConfChangeContext{ + confChangeCtx := kvserverpb.ConfChangeContext{ CommandID: string(p.idKey), Payload: p.encodedCommand, } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f2680ff5b043..96865f9b9b7d 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -464,7 +464,7 @@ var errRemoved = errors.New("replica removed") // stepRaftGroup calls Step on the replica's RawNode with the provided request's // message. Before doing so, it assures that the replica is unquiesced and ready // to handle the request. -func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { +func (r *Replica) stepRaftGroup(req *kvserverpb.RaftMessageRequest) error { // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1239,7 +1239,7 @@ func (r *Replica) maybeCoalesceHeartbeat( quiesce bool, lagging laggingReplicaSet, ) bool { - var hbMap map[roachpb.StoreIdent][]RaftHeartbeat + var hbMap map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat switch msg.Type { case raftpb.MsgHeartbeat: r.store.coalescedMu.Lock() @@ -1250,7 +1250,7 @@ func (r *Replica) maybeCoalesceHeartbeat( default: return false } - beat := RaftHeartbeat{ + beat := kvserverpb.RaftHeartbeat{ RangeID: r.RangeID, ToReplicaID: toReplica.ReplicaID, FromReplicaID: fromReplica.ReplicaID, @@ -1395,7 +1395,7 @@ func (r *Replica) sendRaftMessageRaftMuLocked(ctx context.Context, msg raftpb.Me } req := newRaftMessageRequest() - *req = RaftMessageRequest{ + *req = kvserverpb.RaftMessageRequest{ RangeID: r.RangeID, ToReplica: toReplica, FromReplica: fromReplica, @@ -1427,7 +1427,9 @@ func (r *Replica) addUnreachableRemoteReplica(remoteReplica roachpb.ReplicaID) { // sendRaftMessageRequest sends a raft message, returning false if the message // was dropped. It is the caller's responsibility to call ReportUnreachable on // the Raft group. -func (r *Replica) sendRaftMessageRequest(ctx context.Context, req *RaftMessageRequest) bool { +func (r *Replica) sendRaftMessageRequest( + ctx context.Context, req *kvserverpb.RaftMessageRequest, +) bool { if log.V(4) { log.Infof(ctx, "sending raft request %+v", req) } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 0c7b8605ecfc..3634783867f4 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -419,7 +419,7 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) { // replica. If this method returns without error, callers must eventually call // OutgoingSnapshot.Close. func (r *Replica) GetSnapshot( - ctx context.Context, snapType SnapshotRequest_Type, recipientStore roachpb.StoreID, + ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, recipientStore roachpb.StoreID, ) (_ *OutgoingSnapshot, err error) { snapUUID := uuid.MakeV4() // Get a snapshot while holding raftMu to make sure we're not seeing "half @@ -514,7 +514,7 @@ type OutgoingSnapshot struct { // sideloaded storage in the meantime. WithSideloaded func(func(SideloadStorage) error) error RaftEntryCache *raftentry.Cache - snapType SnapshotRequest_Type + snapType kvserverpb.SnapshotRequest_Type onClose func() } @@ -546,7 +546,7 @@ type IncomingSnapshot struct { // The descriptor in the snapshot, never nil. Desc *roachpb.RangeDescriptor DataSize int64 - snapType SnapshotRequest_Type + snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder raftAppliedIndex uint64 // logging only } @@ -567,7 +567,7 @@ func snapshot( ctx context.Context, snapUUID uuid.UUID, rsl stateloader.StateLoader, - snapType SnapshotRequest_Type, + snapType kvserverpb.SnapshotRequest_Type, snap storage.Reader, rangeID roachpb.RangeID, eCache *raftentry.Cache, diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index ae042f253534..fb5029151a4a 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -858,7 +859,7 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { rangeID: rangeID, RaftMessageHandler: partitionStore, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ - dropReq: func(req *kvserver.RaftMessageRequest) bool { + dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just truncated can // make it through. The Raft transport is asynchronous so this is necessary // to make the test pass reliably. @@ -866,8 +867,8 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { // entries in the MsgApp, so filter where msg.Index < index, not <= index. return req.Message.Type == raftpb.MsgApp && req.Message.Index < index }, - dropHB: func(*kvserver.RaftHeartbeat) bool { return false }, - dropResp: func(*kvserver.RaftMessageResponse) bool { return false }, + dropHB: func(*kvserverpb.RaftHeartbeat) bool { return false }, + dropResp: func(*kvserverpb.RaftMessageResponse) bool { return false }, }, }) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index c09c251d2b5e..17b2ab99d476 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -821,7 +821,7 @@ func (r *Replica) executeAdminBatch( case *roachpb.AdminChangeReplicasRequest: chgs := tArgs.Changes() - desc, err := r.ChangeReplicas(ctx, &tArgs.ExpDesc, SnapshotRequest_REBALANCE, kvserverpb.ReasonAdminRequest, "", chgs) + desc, err := r.ChangeReplicas(ctx, &tArgs.ExpDesc, kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonAdminRequest, "", chgs) pErr = roachpb.NewError(err) if pErr != nil { resp = &roachpb.AdminChangeReplicasResponse{} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 887d3f3fca52..b3a2fefe918c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -6553,7 +6553,7 @@ func TestChangeReplicasDuplicateError(t *testing.T) { if _, err := tc.repl.ChangeReplicas( context.Background(), tc.repl.Desc(), - SnapshotRequest_REBALANCE, + kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRebalance, "", chgs, @@ -9926,7 +9926,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { test := func( expected bool, - transform func(*testQuiescer, RaftMessageRequest) (*testQuiescer, RaftMessageRequest), + transform func(*testQuiescer, kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest), ) { t.Run("", func(t *testing.T) { q := &testQuiescer{ @@ -9948,7 +9948,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { 3: {IsLive: true}, }, } - req := RaftMessageRequest{ + req := kvserverpb.RaftMessageRequest{ Message: raftpb.Message{ Type: raftpb.MsgHeartbeat, From: 1, @@ -9971,23 +9971,23 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { }) } - test(true, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(true, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { return q, req }) - test(false, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(false, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { req.Message.Term = 4 return q, req }) - test(false, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(false, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { req.Message.Commit = 9 return q, req }) - test(false, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(false, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { q.numProposals = 1 return q, req }) // Lagging replica with same liveness information. - test(true, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(true, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { l := livenesspb.Liveness{ NodeID: 3, Epoch: 7, @@ -10001,7 +10001,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { return q, req }) // Lagging replica with older liveness information. - test(false, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(false, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { l := livenesspb.Liveness{ NodeID: 3, Epoch: 7, @@ -10016,7 +10016,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { req.LaggingFollowersOnQuiesce = []livenesspb.Liveness{lOld} return q, req }) - test(false, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(false, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { l := livenesspb.Liveness{ NodeID: 3, Epoch: 7, @@ -10032,7 +10032,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { return q, req }) // Lagging replica with newer liveness information. - test(true, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(true, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { l := livenesspb.Liveness{ NodeID: 3, Epoch: 7, @@ -10047,7 +10047,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) { req.LaggingFollowersOnQuiesce = []livenesspb.Liveness{lNew} return q, req }) - test(true, func(q *testQuiescer, req RaftMessageRequest) (*testQuiescer, RaftMessageRequest) { + test(true, func(q *testQuiescer, req kvserverpb.RaftMessageRequest) (*testQuiescer, kvserverpb.RaftMessageRequest) { l := livenesspb.Liveness{ NodeID: 3, Epoch: 7, diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 391fad611f96..0c6f79445d8a 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -644,7 +644,7 @@ func (rq *replicateQueue) addOrReplaceVoters( repl, ops, desc, - SnapshotRequest_RECOVERY, + kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -697,7 +697,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( repl, ops, desc, - SnapshotRequest_RECOVERY, + kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -878,7 +878,7 @@ func (rq *replicateQueue) removeVoter( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target), desc, - SnapshotRequest_UNKNOWN, // unused + kvserverpb.SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -921,7 +921,7 @@ func (rq *replicateQueue) removeNonVoter( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), desc, - SnapshotRequest_UNKNOWN, + kvserverpb.SnapshotRequest_UNKNOWN, kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -978,7 +978,7 @@ func (rq *replicateQueue) removeDecommissioning( repl, roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, - SnapshotRequest_UNKNOWN, // unused + kvserverpb.SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, ); err != nil { return false, err @@ -1022,7 +1022,7 @@ func (rq *replicateQueue) removeDead( repl, roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, - SnapshotRequest_UNKNOWN, // unused + kvserverpb.SnapshotRequest_UNKNOWN, // unused kvserverpb.ReasonStoreDead, "", dryRun, @@ -1057,7 +1057,7 @@ func (rq *replicateQueue) removeLearner( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target), desc, - SnapshotRequest_UNKNOWN, + kvserverpb.SnapshotRequest_UNKNOWN, kvserverpb.ReasonAbandonedLearner, "", dryRun, @@ -1140,7 +1140,7 @@ func (rq *replicateQueue) considerRebalance( repl, chgs, desc, - SnapshotRequest_REBALANCE, + kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRebalance, details, dryRun, @@ -1380,7 +1380,7 @@ func (rq *replicateQueue) changeReplicas( repl *Replica, chgs roachpb.ReplicationChanges, desc *roachpb.RangeDescriptor, - priority SnapshotRequest_Priority, + priority kvserverpb.SnapshotRequest_Priority, reason kvserverpb.RangeLogEventReason, details string, dryRun bool, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 7bd497b73f7c..5bb943a6e62e 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1127,7 +1127,7 @@ const ( func (h delayingRaftMessageHandler) HandleRaftRequest( ctx context.Context, - req *kvserver.RaftMessageRequest, + req *kvserverpb.RaftMessageRequest, respStream kvserver.RaftMessageResponseStream, ) *roachpb.Error { if h.rangeID != req.RangeID { diff --git a/pkg/kv/kvserver/split_trigger_helper_test.go b/pkg/kv/kvserver/split_trigger_helper_test.go index 46fcf5411e1f..90fb71cc6c8d 100644 --- a/pkg/kv/kvserver/split_trigger_helper_test.go +++ b/pkg/kv/kvserver/split_trigger_helper_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -92,13 +93,13 @@ func TestProtoZeroNilSlice(t *testing.T) { defer log.Scope(t).Close(t) testutils.RunTrueAndFalse(t, "isNil", func(t *testing.T, isNil bool) { - msg := &RaftMessageRequest{} + msg := &kvserverpb.RaftMessageRequest{} if !isNil { msg.RangeStartKey = roachpb.RKey("foo") } b, err := protoutil.Marshal(msg) assert.NoError(t, err) - out := &RaftMessageRequest{} + out := &kvserverpb.RaftMessageRequest{} assert.NoError(t, protoutil.Unmarshal(b, out)) assert.Equal(t, isNil, out.RangeStartKey == nil) }) diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index 0a7ccb86de26..61b762ebce25 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -12,13 +12,13 @@ syntax = "proto3"; package cockroach.storage; // HACK option go_package = "kvserver"; -import "kv/kvserver/raft.proto"; +import "kv/kvserver/kvserverpb/raft.proto"; import "kv/kvserver/api.proto"; import "gogoproto/gogo.proto"; service MultiRaft { - rpc RaftMessageBatch (stream cockroach.kv.kvserver.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.RaftMessageResponse) {} - rpc RaftSnapshot (stream cockroach.kv.kvserver.SnapshotRequest) returns (stream cockroach.kv.kvserver.SnapshotResponse) {} + rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {} + rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} } service PerReplica { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e727704211a7..b54462d5a31a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -741,8 +741,8 @@ type Store struct { coalescedMu struct { syncutil.Mutex - heartbeats map[roachpb.StoreIdent][]RaftHeartbeat - heartbeatResponses map[roachpb.StoreIdent][]RaftHeartbeat + heartbeats map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat + heartbeatResponses map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat } // 1 if the store was started, 0 if it wasn't. To be accessed using atomic // ops. @@ -1158,8 +1158,8 @@ func NewStore( s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) s.coalescedMu.Lock() - s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]RaftHeartbeat{} - s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]RaftHeartbeat{} + s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} + s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} s.coalescedMu.Unlock() s.mu.Lock() diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index e7cc24dba5f1..9a64ab97170f 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -15,6 +15,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -28,7 +29,7 @@ import ( ) type raftRequestInfo struct { - req *RaftMessageRequest + req *kvserverpb.RaftMessageRequest respStream RaftMessageResponseStream } @@ -65,7 +66,7 @@ func (q *raftRequestQueue) recycle(processed []raftRequestInfo) { // HandleSnapshot reads an incoming streaming snapshot and applies it if // possible. func (s *Store) HandleSnapshot( - ctx context.Context, header *SnapshotRequest_Header, stream SnapshotResponseStream, + ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream SnapshotResponseStream, ) error { ctx = s.AnnotateCtx(ctx) const name = "storage.Store: handle snapshot" @@ -73,8 +74,8 @@ func (s *Store) HandleSnapshot( s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1) if s.IsDraining() { - return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, + return stream.Send(&kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, Message: storeDrainingMsg, }) } @@ -85,7 +86,7 @@ func (s *Store) HandleSnapshot( func (s *Store) uncoalesceBeats( ctx context.Context, - beats []RaftHeartbeat, + beats []kvserverpb.RaftHeartbeat, fromReplica, toReplica roachpb.ReplicaDescriptor, msgT raftpb.MessageType, respStream RaftMessageResponseStream, @@ -96,7 +97,7 @@ func (s *Store) uncoalesceBeats( if log.V(4) { log.Infof(ctx, "uncoalescing %d beats of type %v: %+v", len(beats), msgT, beats) } - beatReqs := make([]RaftMessageRequest, len(beats)) + beatReqs := make([]kvserverpb.RaftMessageRequest, len(beats)) var toEnqueue []roachpb.RangeID for i, beat := range beats { msg := raftpb.Message{ @@ -106,7 +107,7 @@ func (s *Store) uncoalesceBeats( Term: beat.Term, Commit: beat.Commit, } - beatReqs[i] = RaftMessageRequest{ + beatReqs[i] = kvserverpb.RaftMessageRequest{ RangeID: beat.RangeID, FromReplica: roachpb.ReplicaDescriptor{ NodeID: fromReplica.NodeID, @@ -137,7 +138,7 @@ func (s *Store) uncoalesceBeats( // HandleRaftRequest dispatches a raft message to the appropriate Replica. It // requires that s.mu is not held. func (s *Store) HandleRaftRequest( - ctx context.Context, req *RaftMessageRequest, respStream RaftMessageResponseStream, + ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) *roachpb.Error { // NB: unlike the other two RaftMessageHandler methods implemented by Store, // this one doesn't need to directly run through a Stopper task because it @@ -162,7 +163,7 @@ func (s *Store) HandleRaftRequest( // Replica. The method returns whether the Range needs to be enqueued in the // Raft scheduler. It requires that s.mu is not held. func (s *Store) HandleRaftUncoalescedRequest( - ctx context.Context, req *RaftMessageRequest, respStream RaftMessageResponseStream, + ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) (enqueue bool) { if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { log.Fatalf(ctx, "HandleRaftUncoalescedRequest cannot be given coalesced heartbeats or heartbeat responses, received %s", req) @@ -200,7 +201,9 @@ func (s *Store) HandleRaftUncoalescedRequest( // initialized) Replica specified in the request. The replica passed to // the function will have its Replica.raftMu locked. func (s *Store) withReplicaForRequest( - ctx context.Context, req *RaftMessageRequest, f func(context.Context, *Replica) *roachpb.Error, + ctx context.Context, + req *kvserverpb.RaftMessageRequest, + f func(context.Context, *Replica) *roachpb.Error, ) *roachpb.Error { // Lazily create the replica. r, _, err := s.getOrCreateReplica( @@ -221,7 +224,7 @@ func (s *Store) withReplicaForRequest( // the specified replica. Notably, it does not handle updates to the Raft Ready // state; callers will probably want to handle this themselves at some point. func (s *Store) processRaftRequestWithReplica( - ctx context.Context, r *Replica, req *RaftMessageRequest, + ctx context.Context, r *Replica, req *kvserverpb.RaftMessageRequest, ) *roachpb.Error { if verboseRaftLoggingEnabled() { log.Infof(ctx, "incoming raft message:\n%s", raftDescribeMessage(req.Message, raftEntryFormatter)) @@ -271,7 +274,7 @@ func (s *Store) processRaftRequestWithReplica( // If (and only if) no error is returned, the placeholder (if any) in inSnap // will have been removed. func (s *Store) processRaftSnapshotRequest( - ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, + ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot, ) *roachpb.Error { return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( ctx context.Context, r *Replica, @@ -352,7 +355,9 @@ func (s *Store) processRaftSnapshotRequest( // interface specification, an error is returned if and only if the underlying // Raft connection should be closed. // It requires that s.mu is not held. -func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageResponse) error { +func (s *Store) HandleRaftResponse( + ctx context.Context, resp *kvserverpb.RaftMessageResponse, +) error { ctx = s.AnnotateCtx(ctx) const name = "storage.Store: handle raft response" return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error { @@ -683,7 +688,7 @@ func (s *Store) coalescedHeartbeatsLoop(ctx context.Context) { // sendQueuedHeartbeatsToNode requires that the s.coalescedMu lock is held. It // returns the number of heartbeats that were sent. func (s *Store) sendQueuedHeartbeatsToNode( - ctx context.Context, beats, resps []RaftHeartbeat, to roachpb.StoreIdent, + ctx context.Context, beats, resps []kvserverpb.RaftHeartbeat, to roachpb.StoreIdent, ) int { var msgType raftpb.MessageType @@ -698,7 +703,7 @@ func (s *Store) sendQueuedHeartbeatsToNode( } chReq := newRaftMessageRequest() - *chReq = RaftMessageRequest{ + *chReq = kvserverpb.RaftMessageRequest{ RangeID: 0, ToReplica: roachpb.ReplicaDescriptor{ NodeID: to.NodeID, @@ -740,8 +745,8 @@ func (s *Store) sendQueuedHeartbeats(ctx context.Context) { s.coalescedMu.Lock() heartbeats := s.coalescedMu.heartbeats heartbeatResponses := s.coalescedMu.heartbeatResponses - s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]RaftHeartbeat{} - s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]RaftHeartbeat{} + s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} + s.coalescedMu.heartbeatResponses = map[roachpb.StoreIdent][]kvserverpb.RaftHeartbeat{} s.coalescedMu.Unlock() var beatsSent int diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 51d2f23b6142..693cfb732144 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -49,15 +50,15 @@ const ( // incomingSnapshotStream is the minimal interface on a GRPC stream required // to receive a snapshot over the network. type incomingSnapshotStream interface { - Send(*SnapshotResponse) error - Recv() (*SnapshotRequest, error) + Send(*kvserverpb.SnapshotResponse) error + Recv() (*kvserverpb.SnapshotRequest, error) } // outgoingSnapshotStream is the minimal interface on a GRPC stream required // to send a snapshot over the network. type outgoingSnapshotStream interface { - Send(*SnapshotRequest) error - Recv() (*SnapshotResponse, error) + Send(*kvserverpb.SnapshotRequest) error + Recv() (*kvserverpb.SnapshotResponse, error) } // snapshotStrategy is an approach to sending and receiving Range snapshots. @@ -67,11 +68,11 @@ type outgoingSnapshotStream interface { type snapshotStrategy interface { // Receive streams SnapshotRequests in from the provided stream and // constructs an IncomingSnapshot. - Receive(context.Context, incomingSnapshotStream, SnapshotRequest_Header) (IncomingSnapshot, error) + Receive(context.Context, incomingSnapshotStream, kvserverpb.SnapshotRequest_Header) (IncomingSnapshot, error) // Send streams SnapshotRequests created from the OutgoingSnapshot in to the // provided stream. On nil error, the number of bytes sent is returned. - Send(context.Context, outgoingSnapshotStream, SnapshotRequest_Header, *OutgoingSnapshot) (int64, error) + Send(context.Context, outgoingSnapshotStream, kvserverpb.SnapshotRequest_Header, *OutgoingSnapshot) (int64, error) // Status provides a status report on the work performed during the // snapshot. Only valid if the strategy succeeded. @@ -82,7 +83,9 @@ type snapshotStrategy interface { } func assertStrategy( - ctx context.Context, header SnapshotRequest_Header, expect SnapshotRequest_Strategy, + ctx context.Context, + header kvserverpb.SnapshotRequest_Header, + expect kvserverpb.SnapshotRequest_Strategy, ) { if header.Strategy != expect { log.Fatalf(ctx, "expected strategy %s, found strategy %s", expect, header.Strategy) @@ -232,9 +235,9 @@ func (msstw *multiSSTWriter) Close() { // 3. Two lock-table key ranges (optional) // 4. User key range func (kvSS *kvBatchSnapshotStrategy) Receive( - ctx context.Context, stream incomingSnapshotStream, header SnapshotRequest_Header, + ctx context.Context, stream incomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, ) (IncomingSnapshot, error) { - assertStrategy(ctx, header, SnapshotRequest_KV_BATCH) + assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) // At the moment we'll write at most five SSTs. // TODO(jeffreyxiao): Re-evaluate as the default range size grows. @@ -315,10 +318,10 @@ var errMalformedSnapshot = errors.New("malformed snapshot generated") func (kvSS *kvBatchSnapshotStrategy) Send( ctx context.Context, stream outgoingSnapshotStream, - header SnapshotRequest_Header, + header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, ) (int64, error) { - assertStrategy(ctx, header, SnapshotRequest_KV_BATCH) + assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) // bytesSent is updated as key-value batches are sent with sendBatch. It // does not reflect the log entries sent (which are never sent in newer @@ -376,7 +379,7 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( if err := kvSS.limiter.WaitN(ctx, 1); err != nil { return err } - return stream.Send(&SnapshotRequest{KVBatch: batch.Repr()}) + return stream.Send(&kvserverpb.SnapshotRequest{KVBatch: batch.Repr()}) } // Status implements the snapshotStrategy interface. @@ -399,7 +402,7 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { // reserveSnapshot throttles incoming snapshots. The returned closure is used // to cleanup the reservation and release its resources. func (s *Store) reserveSnapshot( - ctx context.Context, header *SnapshotRequest_Header, + ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { tBegin := timeutil.Now() @@ -468,7 +471,7 @@ func (s *Store) reserveSnapshot( // Both the store mu and the raft mu for the existing replica (which must exist) // must be held. func (s *Store) canAcceptSnapshotLocked( - ctx context.Context, snapHeader *SnapshotRequest_Header, + ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, ) (*ReplicaPlaceholder, error) { // TODO(tbg): see the comment on desc.Generation for what seems to be a much // saner way to handle overlap via generational semantics. @@ -530,7 +533,7 @@ func (s *Store) canAcceptSnapshotLocked( // chance of being abandoned, so they're proactively handed to the replica GC // queue. func (s *Store) checkSnapshotOverlapLocked( - ctx context.Context, snapHeader *SnapshotRequest_Header, + ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, ) error { desc := *snapHeader.State.Desc @@ -589,7 +592,7 @@ func (s *Store) checkSnapshotOverlapLocked( // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. func (s *Store) receiveSnapshot( - ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream, + ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { @@ -651,7 +654,7 @@ func (s *Store) receiveSnapshot( // an error. var ss snapshotStrategy switch header.Strategy { - case SnapshotRequest_KV_BATCH: + case kvserverpb.SnapshotRequest_KV_BATCH: snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") @@ -670,7 +673,7 @@ func (s *Store) receiveSnapshot( ) } - if err := stream.Send(&SnapshotResponse{Status: SnapshotResponse_ACCEPTED}); err != nil { + if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { return err } if log.V(2) { @@ -691,12 +694,12 @@ func (s *Store) receiveSnapshot( if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) } - return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED}) + return stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_APPLIED}) } func sendSnapshotError(stream incomingSnapshotStream, err error) error { - return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_ERROR, + return stream.Send(&kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, Message: err.Error(), }) } @@ -862,12 +865,12 @@ var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting( ) func snapshotRateLimit( - st *cluster.Settings, priority SnapshotRequest_Priority, + st *cluster.Settings, priority kvserverpb.SnapshotRequest_Priority, ) (rate.Limit, error) { switch priority { - case SnapshotRequest_RECOVERY: + case kvserverpb.SnapshotRequest_RECOVERY: return rate.Limit(recoverySnapshotRate.Get(&st.SV)), nil - case SnapshotRequest_REBALANCE: + case kvserverpb.SnapshotRequest_REBALANCE: return rate.Limit(rebalanceSnapshotRate.Get(&st.SV)), nil default: return 0, errors.Errorf("unknown snapshot priority: %s", priority) @@ -960,7 +963,7 @@ func SendEmptySnapshot( // so they cannot be declined. We don't want our operation to be held // up behind a long running snapshot. We want this to go through // quickly. - SnapshotRequest_VIA_SNAPSHOT_QUEUE, + kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, eng, desc.RangeID, raftentry.NewCache(1), // cache is not used @@ -977,7 +980,7 @@ func SendEmptySnapshot( // Sending it from the current replica ensures that. Otherwise, // it would be a malformed request if it came from a non-member. from := to - req := RaftMessageRequest{ + req := kvserverpb.RaftMessageRequest{ RangeID: desc.RangeID, FromReplica: from, ToReplica: to, @@ -990,13 +993,13 @@ func SendEmptySnapshot( }, } - header := SnapshotRequest_Header{ + header := kvserverpb.SnapshotRequest_Header{ State: state, RaftMessageRequest: req, RangeSize: ms.Total(), - Priority: SnapshotRequest_RECOVERY, - Strategy: SnapshotRequest_KV_BATCH, - Type: SnapshotRequest_VIA_SNAPSHOT_QUEUE, + Priority: kvserverpb.SnapshotRequest_RECOVERY, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, + Type: kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, DeprecatedUnreplicatedTruncatedState: true, } @@ -1034,14 +1037,14 @@ func sendSnapshot( st *cluster.Settings, stream outgoingSnapshotStream, storePool SnapshotStorePool, - header SnapshotRequest_Header, + header kvserverpb.SnapshotRequest_Header, snap *OutgoingSnapshot, newBatch func() storage.Batch, sent func(), ) error { start := timeutil.Now() to := header.RaftMessageRequest.ToReplica - if err := stream.Send(&SnapshotRequest{Header: &header}); err != nil { + if err := stream.Send(&kvserverpb.SnapshotRequest{Header: &header}); err != nil { return err } // Wait until we get a response from the server. The recipient may queue us @@ -1054,11 +1057,11 @@ func sendSnapshot( return err } switch resp.Status { - case SnapshotResponse_ERROR: + case kvserverpb.SnapshotResponse_ERROR: storePool.throttle(throttleFailed, resp.Message, to.StoreID) return errors.Errorf("%s: remote couldn't accept %s with error: %s", to, snap, resp.Message) - case SnapshotResponse_ACCEPTED: + case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. default: err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s", @@ -1089,7 +1092,7 @@ func sendSnapshot( // Create a snapshotStrategy based on the desired snapshot strategy. var ss snapshotStrategy switch header.Strategy { - case SnapshotRequest_KV_BATCH: + case kvserverpb.SnapshotRequest_KV_BATCH: ss = &kvBatchSnapshotStrategy{ batchSize: batchSize, limiter: limiter, @@ -1109,7 +1112,7 @@ func sendSnapshot( // the snapshots generated metric gets incremented before the snapshot is // applied. sent() - if err := stream.Send(&SnapshotRequest{Final: true}); err != nil { + if err := stream.Send(&kvserverpb.SnapshotRequest{Final: true}); err != nil { return err } log.Infof( @@ -1139,9 +1142,9 @@ func sendSnapshot( return errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) } switch resp.Status { - case SnapshotResponse_ERROR: + case kvserverpb.SnapshotResponse_ERROR: return errors.Errorf("%s: remote failed to apply snapshot for reason %s", to, resp.Message) - case SnapshotResponse_APPLIED: + case kvserverpb.SnapshotResponse_APPLIED: return nil default: return errors.Errorf("%s: server sent an invalid status during finalization: %s", diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 03a4b1721772..70c8a9672997 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2655,9 +2655,9 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { // Wrap the snapshot in a minimal header. The request will be dropped // because the Raft log index and term are less than the hard state written // above. - req := &SnapshotRequest_Header{ + req := &kvserverpb.SnapshotRequest_Header{ State: kvserverpb.ReplicaState{Desc: repl1.Desc()}, - RaftMessageRequest: RaftMessageRequest{ + RaftMessageRequest: kvserverpb.RaftMessageRequest{ RangeID: 1, ToReplica: roachpb.ReplicaDescriptor{ NodeID: 1, @@ -2720,15 +2720,15 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { } type fakeSnapshotStream struct { - nextResp *SnapshotResponse + nextResp *kvserverpb.SnapshotResponse nextErr error } -func (c fakeSnapshotStream) Recv() (*SnapshotResponse, error) { +func (c fakeSnapshotStream) Recv() (*kvserverpb.SnapshotResponse, error) { return c.nextResp, c.nextErr } -func (c fakeSnapshotStream) Send(request *SnapshotRequest) error { +func (c fakeSnapshotStream) Send(request *kvserverpb.SnapshotRequest) error { return nil } @@ -2757,7 +2757,7 @@ func TestSendSnapshotThrottling(t *testing.T) { ctx := context.Background() st := cluster.MakeTestingClusterSettings() - header := SnapshotRequest_Header{ + header := kvserverpb.SnapshotRequest_Header{ State: kvserverpb.ReplicaState{ Desc: &roachpb.RangeDescriptor{RangeID: 1}, }, @@ -2781,8 +2781,8 @@ func TestSendSnapshotThrottling(t *testing.T) { // Test that an errored snapshot causes a fail throttle. { sp := &fakeStorePool{} - resp := &SnapshotResponse{ - Status: SnapshotResponse_ERROR, + resp := &kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_ERROR, } c := fakeSnapshotStream{resp, nil} err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) @@ -2806,7 +2806,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { tc.Start(ctx, t, stopper) s := tc.store - cleanupNonEmpty1, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ + cleanupNonEmpty1, err := s.reserveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ RangeSize: 1, }) if err != nil { @@ -2817,7 +2817,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { } // Ensure we allow a concurrent empty snapshot. - cleanupEmpty, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{}) + cleanupEmpty, err := s.reserveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{}) if err != nil { t.Fatal(err) } @@ -2843,7 +2843,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { } }() - cleanupNonEmpty3, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ + cleanupNonEmpty3, err := s.reserveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ RangeSize: 1, }) if err != nil { @@ -2886,7 +2886,7 @@ func TestReserveSnapshotFullnessLimit(t *testing.T) { } // A snapshot should be allowed. - cleanupAccepted, err := s.reserveSnapshot(ctx, &SnapshotRequest_Header{ + cleanupAccepted, err := s.reserveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ RangeSize: 1, }) if err != nil { @@ -2960,7 +2960,7 @@ func TestReserveSnapshotQueueTimeoutAvoidsStarvation(t *testing.T) { if err := func() error { snapCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - cleanup, err := s.reserveSnapshot(snapCtx, &SnapshotRequest_Header{RangeSize: 1}) + cleanup, err := s.reserveSnapshot(snapCtx, &kvserverpb.SnapshotRequest_Header{RangeSize: 1}) if err != nil { if errors.Is(err, context.DeadlineExceeded) { return nil @@ -3003,13 +3003,13 @@ func TestSnapshotRateLimit(t *testing.T) { defer log.Scope(t).Close(t) testCases := []struct { - priority SnapshotRequest_Priority + priority kvserverpb.SnapshotRequest_Priority expectedLimit rate.Limit expectedErr string }{ - {SnapshotRequest_UNKNOWN, 0, "unknown snapshot priority"}, - {SnapshotRequest_RECOVERY, 32 << 20, ""}, - {SnapshotRequest_REBALANCE, 32 << 20, ""}, + {kvserverpb.SnapshotRequest_UNKNOWN, 0, "unknown snapshot priority"}, + {kvserverpb.SnapshotRequest_RECOVERY, 32 << 20, ""}, + {kvserverpb.SnapshotRequest_REBALANCE, 32 << 20, ""}, } for _, c := range testCases { t.Run(c.priority.String(), func(t *testing.T) { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 5d5c7b5e39f5..77798a352a8d 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -267,7 +268,7 @@ type StoreTestingKnobs struct { // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. - ReceiveSnapshot func(*SnapshotRequest_Header) error + ReceiveSnapshot func(*kvserverpb.SnapshotRequest_Header) error // ReplicaAddSkipLearnerRollback causes replica addition to skip the learner // rollback that happens when either the initial snapshot or the promotion of // a learner to a voter fails. @@ -304,7 +305,7 @@ type StoreTestingKnobs struct { ReplicationAlwaysUseJointConfig func() bool // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when // applying a snapshot. - BeforeSnapshotSSTIngestion func(IncomingSnapshot, SnapshotRequest_Type, []string) error + BeforeSnapshotSSTIngestion func(IncomingSnapshot, kvserverpb.SnapshotRequest_Type, []string) error // OnRelocatedOne intercepts the return values of s.relocateOne after they // have successfully been put into effect. OnRelocatedOne func(_ []roachpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget) diff --git a/pkg/server/authentication_test.go b/pkg/server/authentication_test.go index 3cf65c616b48..65183ac4b76c 100644 --- a/pkg/server/authentication_test.go +++ b/pkg/server/authentication_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security" @@ -756,7 +757,7 @@ func TestGRPCAuthentication(t *testing.T) { if err != nil { return err } - _ = stream.Send(&kvserver.RaftMessageRequestBatch{}) + _ = stream.Send(&kvserverpb.RaftMessageRequestBatch{}) _, err = stream.Recv() return err }}, diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto index d8a663729f69..aeb4f11215e8 100644 --- a/pkg/server/serverpb/admin.proto +++ b/pkg/server/serverpb/admin.proto @@ -17,7 +17,7 @@ import "jobs/jobspb/jobs.proto"; import "server/serverpb/status.proto"; import "storage/enginepb/mvcc.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; -import "kv/kvserver/kvserverpb/log.proto"; +import "kv/kvserver/kvserverpb/range_log.proto"; import "roachpb/api.proto"; import "ts/catalog/chart_catalog.proto"; import "util/metric/metric.proto";