diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 6a0d139de122..62825f595d8a 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -61,7 +61,8 @@
kv.replica_circuit_breaker.slow_replication_threshold
| duration | 1m0s | duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) |
kv.replica_stats.addsst_request_size_factor
| integer | 50000 | the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1 |
kv.replication_reports.interval
| duration | 1m0s | the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) |
-kv.snapshot_delegation.enabled
| boolean | false | set to true to allow snapshots from follower replicas |
+kv.snapshot_delegation.num_follower
| integer | 1 | the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder |
+kv.snapshot_delegation.num_requests
| integer | 3 | how many queued requests are allowed on a delegate before the request is rejected |
kv.snapshot_rebalance.max_rate
| byte size | 32 MiB | the rate limit (bytes/sec) to use for rebalance and upreplication snapshots |
kv.snapshot_recovery.max_rate
| byte size | 32 MiB | the rate limit (bytes/sec) to use for recovery snapshots |
kv.transaction.max_intents_bytes
| integer | 4194304 | maximum number of bytes used to track locks in transactions |
diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go
index 35c0c030146a..0c7ca78704af 100644
--- a/pkg/cmd/roachtest/tests/decommissionbench.go
+++ b/pkg/cmd/roachtest/tests/decommissionbench.go
@@ -61,13 +61,15 @@ const (
type decommissionBenchSpec struct {
nodes int
- cpus int
warehouses int
- load bool
+ noLoad bool
multistore bool
snapshotRate int
duration time.Duration
+ // Whether the test cluster nodes are in multiple regions.
+ multiregion bool
+
// When true, the test will attempt to stop the node prior to decommission.
whileDown bool
@@ -85,6 +87,10 @@ type decommissionBenchSpec struct {
// An override for the default timeout, if needed.
timeout time.Duration
+ // An override for the decommission node to make it choose a predictable node
+ // instead of a random node.
+ decommissionNode int
+
skip string
}
@@ -95,29 +101,21 @@ func registerDecommissionBench(r registry.Registry) {
// Basic benchmark configurations, to be run nightly.
{
nodes: 4,
- cpus: 16,
warehouses: 1000,
- load: true,
},
{
nodes: 4,
- cpus: 16,
warehouses: 1000,
- load: true,
duration: 1 * time.Hour,
},
{
nodes: 4,
- cpus: 16,
warehouses: 1000,
- load: true,
whileDown: true,
},
{
nodes: 8,
- cpus: 16,
warehouses: 3000,
- load: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
timeout: 4 * time.Hour,
@@ -126,9 +124,7 @@ func registerDecommissionBench(r registry.Registry) {
{
// Add a new node during decommission (no drain).
nodes: 8,
- cpus: 16,
warehouses: 3000,
- load: true,
whileUpreplicating: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
@@ -138,9 +134,7 @@ func registerDecommissionBench(r registry.Registry) {
{
// Drain before decommission, without adding a new node.
nodes: 8,
- cpus: 16,
warehouses: 3000,
- load: true,
drainFirst: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
@@ -150,9 +144,7 @@ func registerDecommissionBench(r registry.Registry) {
{
// Drain before decommission, and add a new node.
nodes: 8,
- cpus: 16,
warehouses: 3000,
- load: true,
whileUpreplicating: true,
drainFirst: true,
// This test can take nearly an hour to import and achieve balance, so
@@ -162,25 +154,19 @@ func registerDecommissionBench(r registry.Registry) {
},
{
nodes: 4,
- cpus: 16,
warehouses: 1000,
- load: true,
drainFirst: true,
skip: manualBenchmarkingOnly,
},
{
nodes: 4,
- cpus: 16,
warehouses: 1000,
- load: true,
slowWrites: true,
skip: manualBenchmarkingOnly,
},
{
nodes: 8,
- cpus: 16,
warehouses: 3000,
- load: true,
slowWrites: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
@@ -189,9 +175,7 @@ func registerDecommissionBench(r registry.Registry) {
},
{
nodes: 12,
- cpus: 16,
warehouses: 3000,
- load: true,
multistore: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
@@ -201,14 +185,30 @@ func registerDecommissionBench(r registry.Registry) {
{
// Test to compare 12 4-store nodes vs 48 single-store nodes
nodes: 48,
- cpus: 16,
warehouses: 3000,
- load: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
timeout: 3 * time.Hour,
skip: manualBenchmarkingOnly,
},
+ {
+ // Multiregion decommission, and add a new node in the same region.
+ nodes: 6,
+ warehouses: 1000,
+ whileUpreplicating: true,
+ drainFirst: true,
+ multiregion: true,
+ decommissionNode: 2,
+ },
+ {
+ // Multiregion decommission, and add a new node in a different region.
+ nodes: 6,
+ warehouses: 1000,
+ whileUpreplicating: true,
+ drainFirst: true,
+ multiregion: true,
+ decommissionNode: 3,
+ },
} {
registerDecommissionBenchSpec(r, benchSpec)
}
@@ -223,7 +223,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
}
extraNameParts := []string{""}
addlNodeCount := 0
- specOptions := []spec.Option{spec.CPU(benchSpec.cpus)}
+ specOptions := []spec.Option{spec.CPU(16)}
if benchSpec.snapshotRate != 0 {
extraNameParts = append(extraNameParts,
@@ -251,7 +251,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
extraNameParts = append(extraNameParts, "while-upreplicating")
}
- if !benchSpec.load {
+ if benchSpec.noLoad {
extraNameParts = append(extraNameParts, "no-load")
}
@@ -259,11 +259,23 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
extraNameParts = append(extraNameParts, "hi-read-amp")
}
+ if benchSpec.decommissionNode != 0 {
+ extraNameParts = append(extraNameParts,
+ fmt.Sprintf("target=%d", benchSpec.decommissionNode))
+ }
+
if benchSpec.duration > 0 {
timeout = benchSpec.duration * 3
extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration))
}
+ if benchSpec.multiregion {
+ geoZones := []string{regionUsEast, regionUsWest, regionUsCentral}
+ specOptions = append(specOptions, spec.Zones(strings.Join(geoZones, ",")))
+ specOptions = append(specOptions, spec.Geo())
+ extraNameParts = append(extraNameParts, "multi-region")
+ }
+
// If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs.
noSkipFlag := os.Getenv(envDecommissionNoSkipFlag)
if noSkipFlag != "" {
@@ -273,8 +285,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
extraName := strings.Join(extraNameParts, "/")
r.Add(registry.TestSpec{
- Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s",
- benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName),
+ Name: fmt.Sprintf("decommissionBench/nodes=%d/warehouses=%d%s",
+ benchSpec.nodes, benchSpec.warehouses, extraName),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(
benchSpec.nodes+addlNodeCount+1,
@@ -471,7 +483,7 @@ func uploadPerfArtifacts(
// Get the workload perf artifacts and move them to the pinned node, so that
// they can be used to display the workload operation rates during decommission.
- if benchSpec.load {
+ if !benchSpec.noLoad {
workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json")
localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json")
workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json")
@@ -595,7 +607,7 @@ func runDecommissionBench(
workloadCtx, workloadCancel := context.WithCancel(ctx)
m := c.NewMonitor(workloadCtx, crdbNodes)
- if benchSpec.load {
+ if !benchSpec.noLoad {
m.Go(
func(ctx context.Context) error {
close(rampStarted)
@@ -642,7 +654,7 @@ func runDecommissionBench(
// If we are running a workload, wait until it has started and completed its
// ramp time before initiating a decommission.
- if benchSpec.load {
+ if !benchSpec.noLoad {
<-rampStarted
t.Status("Waiting for workload to ramp up...")
select {
@@ -666,7 +678,7 @@ func runDecommissionBench(
m.ExpectDeath()
defer m.ResetDeaths()
- err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate,
+ err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating,
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
)
@@ -729,7 +741,7 @@ func runDecommissionBenchLong(
workloadCtx, workloadCancel := context.WithCancel(ctx)
m := c.NewMonitor(workloadCtx, crdbNodes)
- if benchSpec.load {
+ if !benchSpec.noLoad {
m.Go(
func(ctx context.Context) error {
close(rampStarted)
@@ -773,7 +785,7 @@ func runDecommissionBenchLong(
// If we are running a workload, wait until it has started and completed its
// ramp time before initiating a decommission.
- if benchSpec.load {
+ if !benchSpec.noLoad {
<-rampStarted
t.Status("Waiting for workload to ramp up...")
select {
@@ -786,7 +798,7 @@ func runDecommissionBenchLong(
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; {
m.ExpectDeath()
- err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate,
+ err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating,
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
)
@@ -827,12 +839,15 @@ func runSingleDecommission(
ctx context.Context,
h *decommTestHelper,
pinnedNode int,
+ target int,
targetLogicalNodeAtomic *uint32,
snapshotRateMb int,
stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool,
tickByName func(name string),
) error {
- target := h.getRandNodeOtherThan(pinnedNode)
+ if target == 0 {
+ target = h.getRandNodeOtherThan(pinnedNode)
+ }
targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target)
if err != nil {
return err
diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go
index 55beabc549ff..2f1e60b3413c 100644
--- a/pkg/kv/kvserver/allocator/storepool/store_pool.go
+++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go
@@ -826,6 +826,23 @@ func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) {
return status == storeStatusAvailable, nil
}
+// IsStoreHealthy returns whether we believe this store can serve requests
+// reliably. A healthy store can be used for follower snapshot transmission or
+// follower reads. A healthy store does not imply that replicas can be moved to
+// this store.
+func (sp *StorePool) IsStoreHealthy(storeID roachpb.StoreID) bool {
+ status, err := sp.storeStatus(storeID, sp.NodeLivenessFn)
+ if err != nil {
+ return false
+ }
+ switch status {
+ case storeStatusAvailable, storeStatusDecommissioning, storeStatusDraining:
+ return true
+ default:
+ return false
+ }
+}
+
func (sp *StorePool) storeStatus(
storeID roachpb.StoreID, nl NodeLivenessFunc,
) (storeStatus, error) {
@@ -1238,6 +1255,24 @@ func (sp *StorePool) GetLocalitiesByNode(
return localities
}
+// GetLocalitiesPerReplica computes the localities for the provided replicas.
+// It returns a map from the ReplicaDescriptor to the Locality of the Node.
+func (sp *StorePool) GetLocalitiesPerReplica(
+ replicas ...roachpb.ReplicaDescriptor,
+) map[roachpb.ReplicaID]roachpb.Locality {
+ sp.localitiesMu.RLock()
+ defer sp.localitiesMu.RUnlock()
+ localities := make(map[roachpb.ReplicaID]roachpb.Locality)
+ for _, replica := range replicas {
+ if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok {
+ localities[replica.ReplicaID] = locality.locality
+ } else {
+ localities[replica.ReplicaID] = roachpb.Locality{}
+ }
+ }
+ return localities
+}
+
// GetNodeLocalityString returns the locality information for the given node
// in its string format.
func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string {
diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go
index 84a1fff66b2d..f966f936e7df 100644
--- a/pkg/kv/kvserver/client_raft_helpers_test.go
+++ b/pkg/kv/kvserver/client_raft_helpers_test.go
@@ -31,8 +31,9 @@ type unreliableRaftHandlerFuncs struct {
dropReq func(*kvserverpb.RaftMessageRequest) bool
dropHB func(*kvserverpb.RaftHeartbeat) bool
dropResp func(*kvserverpb.RaftMessageResponse) bool
- // snapErr defaults to returning nil.
- snapErr func(*kvserverpb.SnapshotRequest_Header) error
+ // snapErr and delegateErr default to returning nil.
+ snapErr func(*kvserverpb.SnapshotRequest_Header) error
+ delegateErr func(request *kvserverpb.DelegateSendSnapshotRequest) error
}
func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs {
@@ -134,6 +135,20 @@ func (h *unreliableRaftHandler) HandleSnapshot(
return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream)
}
+func (h *unreliableRaftHandler) HandleDelegatedSnapshot(
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
+) *kvserverpb.DelegateSnapshotResponse {
+ if req.RangeID == h.rangeID && h.delegateErr != nil {
+ if err := h.delegateErr(req); err != nil {
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_ERROR,
+ EncodedError: errors.EncodeError(context.Background(), err),
+ }
+ }
+ }
+ return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req)
+}
+
// testClusterStoreRaftMessageHandler exists to allows a store to be stopped and
// restarted while maintaining a partition using an unreliableRaftHandler.
type testClusterStoreRaftMessageHandler struct {
@@ -181,15 +196,16 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot(
}
func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot(
- ctx context.Context,
- req *kvserverpb.DelegateSnapshotRequest,
- stream kvserver.DelegateSnapshotResponseStream,
-) error {
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
+) *kvserverpb.DelegateSnapshotResponse {
store, err := h.getStore()
if err != nil {
- return err
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_ERROR,
+ EncodedError: errors.EncodeError(context.Background(), err),
+ }
}
- return store.HandleDelegatedSnapshot(ctx, req, stream)
+ return store.HandleDelegatedSnapshot(ctx, req)
}
// testClusterPartitionedRange is a convenient abstraction to create a range on a node
@@ -198,7 +214,7 @@ type testClusterPartitionedRange struct {
rangeID roachpb.RangeID
mu struct {
syncutil.RWMutex
- partitionedNode int
+ partitionedNodeIdx int
partitioned bool
partitionedReplicas map[roachpb.ReplicaID]bool
}
@@ -232,7 +248,7 @@ func setupPartitionedRange(
tc *testcluster.TestCluster,
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
- partitionedNode int,
+ partitionedNodeIdx int,
activated bool,
funcs unreliableRaftHandlerFuncs,
) (*testClusterPartitionedRange, error) {
@@ -243,14 +259,14 @@ func setupPartitionedRange(
storeIdx: i,
})
}
- return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, handlers, funcs)
+ return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNodeIdx, activated, handlers, funcs)
}
func setupPartitionedRangeWithHandlers(
tc *testcluster.TestCluster,
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
- partitionedNode int,
+ partitionedNodeIdx int,
activated bool,
handlers []kvserver.RaftMessageHandler,
funcs unreliableRaftHandlerFuncs,
@@ -260,9 +276,9 @@ func setupPartitionedRangeWithHandlers(
handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)),
}
pr.mu.partitioned = activated
- pr.mu.partitionedNode = partitionedNode
+ pr.mu.partitionedNodeIdx = partitionedNodeIdx
if replicaID == 0 {
- ts := tc.Servers[partitionedNode]
+ ts := tc.Servers[partitionedNodeIdx]
store, err := ts.Stores().GetStore(ts.GetFirstStoreID())
if err != nil {
return nil, err
@@ -294,8 +310,8 @@ func setupPartitionedRangeWithHandlers(
pr.mu.RLock()
defer pr.mu.RUnlock()
return pr.mu.partitioned &&
- (s == pr.mu.partitionedNode ||
- req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNode)+1)
+ (s == pr.mu.partitionedNodeIdx ||
+ req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1)
}
}
if h.dropHB == nil {
@@ -305,12 +321,21 @@ func setupPartitionedRangeWithHandlers(
if !pr.mu.partitioned {
return false
}
- if s == partitionedNode {
+ if s == partitionedNodeIdx {
return true
}
return pr.mu.partitionedReplicas[hb.FromReplicaID]
}
}
+ if h.dropResp == nil {
+ h.dropResp = func(resp *kvserverpb.RaftMessageResponse) bool {
+ pr.mu.RLock()
+ defer pr.mu.RUnlock()
+ return pr.mu.partitioned &&
+ (s == pr.mu.partitionedNodeIdx ||
+ resp.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1)
+ }
+ }
if h.snapErr == nil {
h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error {
pr.mu.RLock()
@@ -324,6 +349,16 @@ func setupPartitionedRangeWithHandlers(
return nil
}
}
+ if h.delegateErr == nil {
+ h.delegateErr = func(resp *kvserverpb.DelegateSendSnapshotRequest) error {
+ pr.mu.RLock()
+ defer pr.mu.RUnlock()
+ if pr.mu.partitionedReplicas[resp.DelegatedSender.ReplicaID] {
+ return errors.New("partitioned")
+ }
+ return nil
+ }
+ }
pr.handlers = append(pr.handlers, h)
tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h)
}
diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go
index 2a92de9b116f..35bbf48f3c61 100644
--- a/pkg/kv/kvserver/client_raft_test.go
+++ b/pkg/kv/kvserver/client_raft_test.go
@@ -3573,10 +3573,8 @@ func (errorChannelTestHandler) HandleSnapshot(
}
func (errorChannelTestHandler) HandleDelegatedSnapshot(
- ctx context.Context,
- req *kvserverpb.DelegateSnapshotRequest,
- stream kvserver.DelegateSnapshotResponseStream,
-) error {
+ _ context.Context, _ *kvserverpb.DelegateSendSnapshotRequest,
+) *kvserverpb.DelegateSnapshotResponse {
panic("unimplemented")
}
diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go
index 00564d86034a..243c8daf9d70 100644
--- a/pkg/kv/kvserver/helpers_test.go
+++ b/pkg/kv/kvserver/helpers_test.go
@@ -206,7 +206,7 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro
// ReservationCount counts the number of outstanding reservations that are not
// running.
func (s *Store) ReservationCount() int {
- return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len()
+ return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.AvailableLen()
}
// RaftSchedulerPriorityID returns the Raft scheduler's prioritized range.
diff --git a/pkg/kv/kvserver/kvserverpb/raft.go b/pkg/kv/kvserver/kvserverpb/raft.go
index 9612ce2e9d45..34785385fc36 100644
--- a/pkg/kv/kvserver/kvserverpb/raft.go
+++ b/pkg/kv/kvserver/kvserverpb/raft.go
@@ -23,8 +23,11 @@ func (SnapshotRequest_Type) SafeValue() {}
//
// The bool indicates whether this message uses the deprecated behavior of
// encoding an error as a string.
-func (m *DelegateSnapshotResponse) Error() (deprecated bool, _ error) {
- return m.SnapResponse.Error()
+func (m *DelegateSnapshotResponse) Error() error {
+ if m.Status != DelegateSnapshotResponse_ERROR {
+ return nil
+ }
+ return errors.DecodeError(context.Background(), m.EncodedError)
}
// Error returns the error contained in the snapshot response, if any.
diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto
index 229af24b873d..878f0290a133 100644
--- a/pkg/kv/kvserver/kvserverpb/raft.proto
+++ b/pkg/kv/kvserver/kvserverpb/raft.proto
@@ -14,7 +14,6 @@ option go_package = "kvserverpb";
import "errorspb/errors.proto";
import "roachpb/errors.proto";
-import "roachpb/internal_raft.proto";
import "roachpb/metadata.proto";
import "kv/kvserver/liveness/livenesspb/liveness.proto";
import "kv/kvserver/kvserverpb/state.proto";
@@ -226,7 +225,7 @@ message SnapshotRequest {
Header header = 1;
- // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages.
+ // A BatchRepr. Multiple kv_batches may be sent across multiple request messages.
bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"];
bool final = 4;
@@ -261,8 +260,15 @@ message SnapshotResponse {
errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false];
}
-// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
+// TODO(baptist): Extend this if necessary to separate out the request for the throttle.
message DelegateSnapshotRequest {
+ oneof value {
+ DelegateSendSnapshotRequest send = 1;
+ }
+}
+
+// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
+message DelegateSendSnapshotRequest {
uint64 range_id = 1 [(gogoproto.customname) = "RangeID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"];
@@ -277,30 +283,50 @@ message DelegateSnapshotRequest {
roachpb.ReplicaDescriptor delegated_sender = 4 [(gogoproto.nullable) = false];
// The priority of the snapshot.
+ // TODO(abaptist): Remove this field for v23.1.
SnapshotRequest.Priority priority = 5;
+ // The type of the snapshot.
+ // TODO(abaptist): Remove this field for v23.1.
+ SnapshotRequest.Type type = 6;
+
+ // The Raft term of the coordinator (in most cases the leaseholder) replica.
+ // The term is used during snapshot receiving to reject messages from an older term.
+ uint64 term = 7;
+
+ // The first index of the Raft log on the coordinator replica.
+ uint64 first_index = 8;
+
// The sending queue's name.
SnapshotRequest.QueueName sender_queue_name = 9;
// The sending queue's priority.
double sender_queue_priority = 10;
- // The type of the snapshot.
- SnapshotRequest.Type type = 6;
+ // The generation of the leaseholders descriptor.
+ uint64 descriptor_generation = 11 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeGeneration"];
- // The Raft term of the coordinator (in most cases the leaseholder) replica.
- // The term is used during snapshot receiving to reject messages from an older term.
- uint64 term = 7;
+ // Max queue length on the delegate before this request is rejected.
+ int64 queue_on_delegate_len = 12;
- // The truncated state of the Raft log on the coordinator replica.
- roachpb.RaftTruncatedState truncated_state = 8;
+ // Id of this snapshot which is maintained from coordinator to receiver.
+ bytes snap_id = 13 [
+ (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
+ (gogoproto.nullable) = false];
}
message DelegateSnapshotResponse {
- SnapshotResponse snapResponse = 1;
+ enum Status {
+ ERROR = 0;
+ APPLIED = 1;
+ }
+
+ Status status = 1;
+ errorspb.EncodedError encoded_error = 2 [(gogoproto.nullable) = false];
+
// collected_spans stores trace spans recorded during the execution of this
// request.
- repeated util.tracing.tracingpb.RecordedSpan collected_spans = 2 [(gogoproto.nullable) = false];
+ repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false];
}
// ConfChangeContext is encoded in the raftpb.ConfChange.Context field.
diff --git a/pkg/kv/kvserver/multiqueue/BUILD.bazel b/pkg/kv/kvserver/multiqueue/BUILD.bazel
index 5c7a1642b937..14dbae432c80 100644
--- a/pkg/kv/kvserver/multiqueue/BUILD.bazel
+++ b/pkg/kv/kvserver/multiqueue/BUILD.bazel
@@ -8,14 +8,16 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/util/syncutil",
+ "@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)
go_test(
name = "multiqueue_test",
+ size = "small",
srcs = ["multi_queue_test.go"],
- args = ["-test.timeout=295s"],
+ args = ["-test.timeout=55s"],
embed = [":multiqueue"],
deps = [
"//pkg/testutils",
diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go
index 588c4556de46..8848612fc727 100644
--- a/pkg/kv/kvserver/multiqueue/multi_queue.go
+++ b/pkg/kv/kvserver/multiqueue/multi_queue.go
@@ -14,6 +14,7 @@ import (
"container/heap"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
@@ -145,10 +146,20 @@ func (m *MultiQueue) tryRunNextLocked() {
// Add returns a Task that must be closed (calling m.Release(..)) to
// release the Permit. The number of types is expected to
// be relatively small and not be changing over time.
-func (m *MultiQueue) Add(queueType int, priority float64) *Task {
+func (m *MultiQueue) Add(queueType int, priority float64, maxQueueLength int64) (*Task, error) {
m.mu.Lock()
defer m.mu.Unlock()
+ // If there are remainingRuns we can run immediately, otherwise compute the
+ // queue length one we are added. If the queue is too long, return an error
+ // immediately so the caller doesn't have to wait.
+ if m.remainingRuns == 0 && maxQueueLength >= 0 {
+ currentLen := int64(m.queueLenLocked())
+ if currentLen > maxQueueLength {
+ return nil, errors.Newf("queue is too long %d > %d", currentLen, maxQueueLength)
+ }
+ }
+
// The mutex starts locked, unlock it when we are ready to run.
pos, ok := m.mapping[queueType]
if !ok {
@@ -169,7 +180,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task {
// Once we are done adding a task, attempt to signal the next waiting task.
m.tryRunNextLocked()
- return &newTask
+ return &newTask, nil
}
// Cancel will cancel a Task that may not have started yet. This is useful if it
@@ -221,11 +232,32 @@ func (m *MultiQueue) releaseLocked(permit *Permit) {
m.tryRunNextLocked()
}
-// Len returns the number of additional tasks that can be added without
-// queueing. This will return 0 if there is anything queued. This method should
-// only be used for testing.
-func (m *MultiQueue) Len() int {
+// AvailableLen returns the number of additional tasks that can be added without
+// queueing. This will return 0 if there is anything queued.
+func (m *MultiQueue) AvailableLen() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.remainingRuns
}
+
+// QueueLen returns the length of the queue if one more task is added. If this
+// returns 0 then a task can be added and run without queueing.
+// NB: The value returned is not a guarantee that queueing will not occur when
+// the request is submitted. multiple calls to QueueLen could race.
+func (m *MultiQueue) QueueLen() int {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.queueLenLocked()
+}
+
+func (m *MultiQueue) queueLenLocked() int {
+ if m.remainingRuns > 0 {
+ return 0
+ }
+ // Start counting from 1 since we will be the first in the queue if it gets added.
+ count := 1
+ for i := 0; i < len(m.outstanding); i++ {
+ count += len(m.outstanding[i])
+ }
+ return count - m.remainingRuns
+}
diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go
index 8e693a1e210f..4c50f4ac2753 100644
--- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go
+++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go
@@ -37,10 +37,10 @@ func TestMultiQueueAddTwiceSameQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
- blocker := queue.Add(0, 0)
+ blocker, _ := queue.Add(0, 0, -1)
- chan1 := queue.Add(7, 1.0)
- chan2 := queue.Add(7, 2.0)
+ chan1, _ := queue.Add(7, 1.0, -1)
+ chan2, _ := queue.Add(7, 2.0, -1)
permit := <-blocker.GetWaitChan()
queue.Release(permit)
@@ -56,13 +56,13 @@ func TestMultiQueueTwoQueues(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
- blocker := queue.Add(0, 0)
+ blocker, _ := queue.Add(0, 0, -1)
- a1 := queue.Add(5, 4.0)
- a2 := queue.Add(5, 5.0)
+ a1, _ := queue.Add(5, 4.0, -1)
+ a2, _ := queue.Add(5, 5.0, -1)
- b1 := queue.Add(6, 1.0)
- b2 := queue.Add(6, 2.0)
+ b1, _ := queue.Add(6, 1.0, -1)
+ b2, _ := queue.Add(6, 2.0, -1)
permit := <-blocker.GetWaitChan()
queue.Release(permit)
@@ -79,15 +79,15 @@ func TestMultiQueueComplex(t *testing.T) {
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
- blocker := queue.Add(0, 0)
+ blocker, _ := queue.Add(0, 0, -1)
- a2 := queue.Add(1, 4.0)
- b1 := queue.Add(2, 1.1)
- b2 := queue.Add(2, 2.1)
- c2 := queue.Add(3, 1.2)
- c3 := queue.Add(3, 2.2)
- a3 := queue.Add(1, 5.0)
- b3 := queue.Add(2, 6.1)
+ a2, _ := queue.Add(1, 4.0, -1)
+ b1, _ := queue.Add(2, 1.1, -1)
+ b2, _ := queue.Add(2, 2.1, -1)
+ c2, _ := queue.Add(3, 1.2, -1)
+ c3, _ := queue.Add(3, 2.2, -1)
+ a3, _ := queue.Add(1, 5.0, -1)
+ b3, _ := queue.Add(2, 6.1, -1)
permit := <-blocker.GetWaitChan()
queue.Release(permit)
@@ -99,15 +99,15 @@ func TestMultiQueueRemove(t *testing.T) {
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
- blocker := queue.Add(0, 0)
+ blocker, _ := queue.Add(0, 0, -1)
- a2 := queue.Add(1, 4.0)
- b1 := queue.Add(2, 1.1)
- b2 := queue.Add(2, 2.1)
- c2 := queue.Add(3, 1.2)
- c3 := queue.Add(3, 2.2)
- a3 := queue.Add(1, 5.0)
- b3 := queue.Add(2, 6.1)
+ a2, _ := queue.Add(1, 4.0, -1)
+ b1, _ := queue.Add(2, 1.1, -1)
+ b2, _ := queue.Add(2, 2.1, -1)
+ c2, _ := queue.Add(3, 1.2, -1)
+ c3, _ := queue.Add(3, 2.2, -1)
+ a3, _ := queue.Add(1, 5.0, -1)
+ b3, _ := queue.Add(2, 6.1, -1)
fmt.Println("Beginning cancel")
@@ -125,7 +125,7 @@ func TestMultiQueueCancelOne(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
queue := NewMultiQueue(1)
- task := queue.Add(1, 1)
+ task, _ := queue.Add(1, 1, -1)
queue.Cancel(task)
}
@@ -139,12 +139,12 @@ func TestMultiQueueCancelInProgress(t *testing.T) {
const b = 2
const c = 3
- a3 := queue.Add(a, 5.0)
- a2 := queue.Add(a, 4.0)
- b1 := queue.Add(b, 1.1)
- b2 := queue.Add(b, 2.1)
- c3 := queue.Add(c, 2.2)
- b3 := queue.Add(b, 6.1)
+ a3, _ := queue.Add(a, 5.0, -1)
+ a2, _ := queue.Add(a, 4.0, -1)
+ b1, _ := queue.Add(b, 1.1, -1)
+ b2, _ := queue.Add(b, 2.1, -1)
+ c3, _ := queue.Add(c, 2.2, -1)
+ b3, _ := queue.Add(b, 6.1, -1)
queue.Cancel(b2)
queue.Cancel(b1)
@@ -237,7 +237,7 @@ func TestMultiQueueStress(t *testing.T) {
for i := 0; i < numThreads; i++ {
go func(name int) {
for j := 0; j < numRequests; j++ {
- curTask := queue.Add(name, float64(j))
+ curTask, _ := queue.Add(name, float64(j), -1)
if alsoCancel && j%99 == 0 {
queue.Cancel(curTask)
} else {
@@ -271,7 +271,7 @@ func TestMultiQueueReleaseTwice(t *testing.T) {
queue := NewMultiQueue(1)
- task := queue.Add(1, 1)
+ task, _ := queue.Add(1, 1, -1)
p := <-task.GetWaitChan()
queue.Release(p)
require.Panics(t, func() { queue.Release(p) })
@@ -283,7 +283,7 @@ func TestMultiQueueReleaseAfterCancel(t *testing.T) {
queue := NewMultiQueue(1)
- task := queue.Add(1, 1)
+ task, _ := queue.Add(1, 1, -1)
p := <-task.GetWaitChan()
queue.Cancel(task)
queue.Release(p)
@@ -294,31 +294,84 @@ func TestMultiQueueLen(t *testing.T) {
defer log.Scope(t).Close(t)
queue := NewMultiQueue(2)
- require.Equal(t, 2, queue.Len())
-
- task1 := queue.Add(1, 1)
- require.Equal(t, 1, queue.Len())
- task2 := queue.Add(1, 1)
- require.Equal(t, 0, queue.Len())
- task3 := queue.Add(1, 1)
- require.Equal(t, 0, queue.Len())
+ require.Equal(t, 2, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
+
+ task1, _ := queue.Add(1, 1, -1)
+ require.Equal(t, 1, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
+ task2, _ := queue.Add(1, 1, -1)
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 1, queue.QueueLen())
+ task3, _ := queue.Add(1, 1, -1)
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 2, queue.QueueLen())
queue.Cancel(task1)
// Finish task 1, but immediately start task3.
- require.Equal(t, 0, queue.Len())
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 1, queue.QueueLen())
p := <-task2.GetWaitChan()
queue.Release(p)
- require.Equal(t, 1, queue.Len())
+ require.Equal(t, 1, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
queue.Cancel(task3)
- require.Equal(t, 2, queue.Len())
+ require.Equal(t, 2, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
+}
+
+func TestMultiQueueFull(t *testing.T) {
+ queue := NewMultiQueue(2)
+ require.Equal(t, 2, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
+
+ // Task 1 starts immediately since there is no queue.
+ task1, err := queue.Add(1, 1, 0)
+ require.NoError(t, err)
+ require.Equal(t, 1, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
+ // Task 2 also starts immediately as the queue supports 2 concurrent.
+ task2, err := queue.Add(1, 1, 0)
+ require.NoError(t, err)
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 1, queue.QueueLen())
+ // Task 3 would be queued so should not be added.
+ task3, err := queue.Add(1, 1, 0)
+ require.Error(t, err)
+ require.Nil(t, task3)
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 1, queue.QueueLen())
+ // Task 4 uses a longer max queue length so should be added.
+ task4, err := queue.Add(1, 1, 1)
+ require.NoError(t, err)
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 2, queue.QueueLen())
+
+ queue.Cancel(task1)
+ queue.Cancel(task2)
+ require.Equal(t, 1, queue.AvailableLen())
+ require.Equal(t, 0, queue.QueueLen())
+ // After these tasks are done, make sure we can add another one.
+ task5, err := queue.Add(1, 1, 0)
+ require.NoError(t, err)
+ require.Equal(t, 0, queue.AvailableLen())
+ require.Equal(t, 1, queue.QueueLen())
+
+ // Cancel all the remaining tasks.
+ queue.Cancel(task4)
+ queue.Cancel(task5)
}
// verifyOrder makes sure that the chans are called in the specified order.
func verifyOrder(t *testing.T, queue *MultiQueue, tasks ...*Task) {
// each time, verify that the only available channel is the "next" one in order
for i, task := range tasks {
+ if task == nil {
+ require.Fail(t, "Task is nil", "%d", task)
+ }
+
var found *Permit
for j, t2 := range tasks[i+1:] {
select {
diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go
index c4d9d0beb968..db127557bab7 100644
--- a/pkg/kv/kvserver/raft_log_queue_test.go
+++ b/pkg/kv/kvserver/raft_log_queue_test.go
@@ -645,17 +645,19 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
index2 = 60
)
+ r.mu.state.RaftAppliedIndex = index1
// Add first constraint.
- r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1, storeID)
+ _, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}
// Make sure it registered.
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)
+ r.mu.state.RaftAppliedIndex = index2
// Add another constraint with the same id. Extremely unlikely in practice
// but we want to make sure it doesn't blow anything up. Collisions are
// handled by ignoring the colliding update.
- r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2, storeID)
+ _, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)
// Helper that grabs the min constraint index (which can trigger GC as a
@@ -672,19 +674,22 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
// colliding update at index2 is not represented.
assertMin(index1, time.Time{})
+ r.mu.state.RaftAppliedIndex = index2
// Add another, higher, index. We're not going to notice it's around
// until the lower one disappears.
- r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2, storeID)
+ _, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID)
now := timeutil.Now()
// The colliding snapshot comes back. Or the original, we can't tell.
- r.completeSnapshotLogTruncationConstraint(id1)
+ cleanup1()
+ // This won't do anything since we had a collision, but make sure it's ok.
+ cleanup2()
// The index should show up when its deadline isn't hit.
assertMin(index2, now)
assertMin(index2, now.Add(1))
assertMin(index2, time.Time{})
- r.completeSnapshotLogTruncationConstraint(id2)
+ cleanup3()
assertMin(0, now)
assertMin(0, now.Add(2))
diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go
index 0ba36ab6c487..f8dcdeac6549 100644
--- a/pkg/kv/kvserver/raft_snapshot_queue.go
+++ b/pkg/kv/kvserver/raft_snapshot_queue.go
@@ -142,7 +142,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
}
}
- err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority)
+ err := repl.sendSnapshotUsingDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority)
// NB: if the snapshot fails because of an overlapping replica on the
// recipient which is also waiting for a snapshot, the "smart" thing is to
diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go
index 283643e551c1..4d299d90b8d0 100644
--- a/pkg/kv/kvserver/raft_transport.go
+++ b/pkg/kv/kvserver/raft_transport.go
@@ -98,13 +98,6 @@ type SnapshotResponseStream interface {
Recv() (*kvserverpb.SnapshotRequest, error)
}
-// DelegateSnapshotResponseStream is the subset of the
-// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses.
-type DelegateSnapshotResponseStream interface {
- Send(request *kvserverpb.DelegateSnapshotResponse) error
- Recv() (*kvserverpb.DelegateSnapshotRequest, error)
-}
-
// RaftMessageHandler is the interface that must be implemented by
// arguments to RaftTransport.Listen.
type RaftMessageHandler interface {
@@ -132,9 +125,8 @@ type RaftMessageHandler interface {
// request.
HandleDelegatedSnapshot(
ctx context.Context,
- req *kvserverpb.DelegateSnapshotRequest,
- stream DelegateSnapshotResponseStream,
- ) error
+ req *kvserverpb.DelegateSendSnapshotRequest,
+ ) *kvserverpb.DelegateSnapshotResponse
}
// RaftTransport handles the rpc messages for raft.
@@ -327,14 +319,24 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh
if err != nil {
return err
}
- // Check to ensure the header is valid.
+ resp := t.InternalDelegateRaftSnapshot(ctx, req.GetSend())
+ err = stream.Send(resp)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// InternalDelegateRaftSnapshot processes requests in a request/response fashion for normal DelegateSnapshotRequests
+func (t *RaftTransport) InternalDelegateRaftSnapshot(
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
+) *kvserverpb.DelegateSnapshotResponse {
if req == nil {
err := errors.New("client error: no message in first delegated snapshot request")
- return stream.Send(
- &kvserverpb.DelegateSnapshotResponse{
- SnapResponse: snapRespErr(err),
- },
- )
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_ERROR,
+ EncodedError: errors.EncodeError(context.Background(), err),
+ }
}
// Get the handler of the sender store.
handler, ok := t.getHandler(req.DelegatedSender.StoreID)
@@ -346,11 +348,15 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh
req.CoordinatorReplica.StoreID,
req.DelegatedSender.StoreID,
)
- return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID)
+ err := errors.New("unable to accept Raft message: no handler registered for the sender store")
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_ERROR,
+ EncodedError: errors.EncodeError(context.Background(), err),
+ }
}
// Pass off the snapshot request to the sender store.
- return handler.HandleDelegatedSnapshot(ctx, req, stream)
+ return handler.HandleDelegatedSnapshot(ctx, req)
}
// RaftSnapshot handles incoming streaming snapshot requests.
@@ -640,10 +646,10 @@ func (t *RaftTransport) SendSnapshot(
return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent)
}
-// DelegateSnapshot creates a rpc stream between the leaseholder and the
-// new designated sender for delegated snapshot requests.
+// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store
+// and determines if it encountered any errors when sending the snapshot.
func (t *RaftTransport) DelegateSnapshot(
- ctx context.Context, req *kvserverpb.DelegateSnapshotRequest,
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
) error {
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
@@ -659,8 +665,41 @@ func (t *RaftTransport) DelegateSnapshot(
}
defer func() {
if err := stream.CloseSend(); err != nil {
- log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
+ log.Warningf(ctx, "failed to close delegate snapshot stream: %+v", err)
}
}()
- return delegateSnapshot(ctx, stream, req)
+
+ // Send the request.
+ wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}}
+ if err := stream.Send(wrappedRequest); err != nil {
+ return errors.Mark(err, errMarkSnapshotError)
+ }
+ // Wait for response to see if the receiver successfully applied the snapshot.
+ resp, err := stream.Recv()
+ if err != nil {
+ return errors.Mark(
+ errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError,
+ )
+ }
+
+ if len(resp.CollectedSpans) != 0 {
+ span := tracing.SpanFromContext(ctx)
+ if span == nil {
+ log.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up")
+ } else {
+ span.ImportRemoteRecording(resp.CollectedSpans)
+ }
+ }
+
+ switch resp.Status {
+ case kvserverpb.DelegateSnapshotResponse_ERROR:
+ return errors.Mark(
+ errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError)
+ case kvserverpb.DelegateSnapshotResponse_APPLIED:
+ // This is the response we're expecting. Snapshot successfully applied.
+ log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", resp)
+ return nil
+ default:
+ return err
+ }
}
diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go
index 5d10bd7fa5e6..ed8d64380a60 100644
--- a/pkg/kv/kvserver/raft_transport_test.go
+++ b/pkg/kv/kvserver/raft_transport_test.go
@@ -97,11 +97,9 @@ func (s channelServer) HandleSnapshot(
}
func (s channelServer) HandleDelegatedSnapshot(
- ctx context.Context,
- req *kvserverpb.DelegateSnapshotRequest,
- stream kvserver.DelegateSnapshotResponseStream,
-) error {
- panic("unimplemented")
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
+) *kvserverpb.DelegateSnapshotResponse {
+ panic("unexpected HandleDelegatedSnapshot")
}
// raftTransportTestContext contains objects needed to test RaftTransport.
diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go
index 7ef521c1993f..e6129048b450 100644
--- a/pkg/kv/kvserver/replica_command.go
+++ b/pkg/kv/kvserver/replica_command.go
@@ -1693,6 +1693,7 @@ func (r *Replica) initializeRaftLearners(
default:
log.Fatalf(ctx, "unexpected replicaType %s", replicaType)
}
+
// Lock learner snapshots even before we run the ConfChange txn to add them
// to prevent a race with the raft snapshot queue trying to send it first.
//
@@ -1824,7 +1825,7 @@ func (r *Replica) initializeRaftLearners(
// orphaned learner. Second, this tickled some bugs in etcd/raft around
// switching between StateSnapshot and StateProbe. Even if we worked through
// these, it would be susceptible to future similar issues.
- if err := r.sendSnapshot(
+ if err := r.sendSnapshotUsingDelegate(
ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority,
); err != nil {
return nil, err
@@ -1835,9 +1836,9 @@ func (r *Replica) initializeRaftLearners(
// lockLearnerSnapshot stops the raft snapshot queue from sending snapshots to
// the soon-to-be added learner replicas to prevent duplicate snapshots from
-// being sent. This lock is best effort because it times out and it is a node
-// local lock while the raft snapshot queue might be running on a different
-// node. An idempotent unlock function is returned.
+// being sent. This lock is a node local lock while the raft snapshot queue
+// might be running on a different node. An idempotent unlock function is
+// returned.
func (r *Replica) lockLearnerSnapshot(
ctx context.Context, additions []roachpb.ReplicationTarget,
) (unlock func()) {
@@ -1845,15 +1846,15 @@ func (r *Replica) lockLearnerSnapshot(
// in 19.2 to work around a commit in etcd/raft that made this race more
// likely. It'd be nice if all learner snapshots could be sent from a single
// place.
- var lockUUIDs []uuid.UUID
+ var cleanups []func()
for _, addition := range additions {
lockUUID := uuid.MakeV4()
- lockUUIDs = append(lockUUIDs, lockUUID)
- r.addSnapshotLogTruncationConstraint(ctx, lockUUID, 1, addition.StoreID)
+ _, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID)
+ cleanups = append(cleanups, cleanup)
}
return func() {
- for _, lockUUID := range lockUUIDs {
- r.completeSnapshotLogTruncationConstraint(lockUUID)
+ for _, cleanup := range cleanups {
+ cleanup()
}
}
}
@@ -2517,34 +2518,140 @@ func recordRangeEventsInLog(
return nil
}
-// getSenderReplica returns a replica descriptor for a follower replica to act as
-// the sender for snapshots.
-// TODO(amy): select a follower based on locality matching.
-func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) {
- log.Fatal(ctx, "follower snapshots not implemented")
- return r.GetReplicaDescriptor()
+// getSenderReplicas returns an ordered list of replica descriptor for a
+// follower replica to act as the sender for delegated snapshots. The replicas
+// should be tried in order, and typically the coordinator is the last entry on
+// the list.
+func (r *Replica) getSenderReplicas(
+ ctx context.Context, recipient roachpb.ReplicaDescriptor,
+) ([]roachpb.ReplicaDescriptor, error) {
+
+ coordinator, err := r.GetReplicaDescriptor()
+ if err != nil {
+ // If there is no local replica descriptor, return an empty list.
+ return nil, err
+ }
+
+ // Unless all nodes are on V23.1, don't delegate. This prevents sending to a
+ // node that doesn't understand the request.
+ if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) {
+ return []roachpb.ReplicaDescriptor{coordinator}, nil
+ }
+
+ // Check follower snapshots, if zero just self-delegate.
+ numFollowers := int(NumDelegateLimit.Get(&r.ClusterSettings().SV))
+ if numFollowers == 0 {
+ return []roachpb.ReplicaDescriptor{coordinator}, nil
+ }
+
+ // Get range descriptor and store pool.
+ storePool := r.store.cfg.StorePool
+ rangeDesc := r.Desc()
+
+ if fn := r.store.cfg.TestingKnobs.SelectDelegateSnapshotSender; fn != nil {
+ sender := fn(rangeDesc)
+ // If a TestingKnob is specified use it whatever it is.
+ if sender != nil {
+ return sender, nil
+ }
+ }
+
+ // Include voter and non-voter replicas on healthy stores as candidates.
+ nonRecipientReplicas := rangeDesc.Replicas().Filter(
+ func(rDesc roachpb.ReplicaDescriptor) bool {
+ return rDesc.ReplicaID != recipient.ReplicaID && storePool.IsStoreHealthy(rDesc.StoreID)
+ },
+ )
+ candidates := nonRecipientReplicas.VoterAndNonVoterDescriptors()
+ if len(candidates) == 0 {
+ // Not clear when the coordinator would be considered dead, but if it does
+ // happen, just return the coordinator.
+ return []roachpb.ReplicaDescriptor{coordinator}, nil
+ }
+
+ // Get the localities of the candidate replicas, including the original sender.
+ localities := storePool.GetLocalitiesPerReplica(candidates...)
+ recipientLocality := storePool.GetLocalitiesPerReplica(recipient)[recipient.ReplicaID]
+
+ // Construct a map from replica to its diversity score compared to the
+ // recipient. Also track the best score we see.
+ replicaDistance := make(map[roachpb.ReplicaID]float64, len(localities))
+ closestStore := roachpb.MaxDiversityScore
+ for desc, locality := range localities {
+ score := recipientLocality.DiversityScore(locality)
+ if score < closestStore {
+ closestStore = score
+ }
+ replicaDistance[desc] = score
+ }
+
+ // Find all replicas that tie as the most optimal sender other than the
+ // coordinator. The coordinator will always be added to the end of the list
+ // regardless of score.
+ var tiedReplicas []roachpb.ReplicaID
+ for replID, score := range replicaDistance {
+ if score == closestStore {
+ // If the coordinator is tied for closest, always use it.
+ // TODO(baptist): Consider using other replicas at the same distance once
+ // this is integrated with admission control.
+ if replID == coordinator.ReplicaID {
+ return []roachpb.ReplicaDescriptor{coordinator}, nil
+ }
+ tiedReplicas = append(tiedReplicas, replID)
+ }
+ }
+
+ // Use a psuedo random source that is consistent across runs of this method
+ // for the same coordinator. Shuffle the replicas to prevent always choosing
+ // them in the same order.
+ pRand := rand.New(rand.NewSource(int64(coordinator.ReplicaID)))
+ pRand.Shuffle(len(tiedReplicas), func(i, j int) { tiedReplicas[i], tiedReplicas[j] = tiedReplicas[j], tiedReplicas[i] })
+
+ // Only keep the top numFollowers replicas.
+ if len(tiedReplicas) > numFollowers {
+ tiedReplicas = tiedReplicas[:numFollowers]
+ }
+
+ // Convert to replica descriptors before returning. The list of tiedReplicas
+ // is typically only one element.
+ replicaList := make([]roachpb.ReplicaDescriptor, len(tiedReplicas)+1)
+ for n, replicaId := range tiedReplicas {
+ found := false
+ replDesc, found := rangeDesc.Replicas().GetReplicaDescriptorByID(replicaId)
+ if !found {
+ return nil, errors.Errorf("unable to find replica for replicaId %d", replicaId)
+ }
+ replicaList[n] = replDesc
+ }
+ // Set the last replica to be the coordinator.
+ replicaList[len(replicaList)-1] = coordinator
+ return replicaList, nil
}
-// TODO(amy): update description when patch for follower snapshots are completed.
-// sendSnapshot sends a snapshot of the replica state to the specified replica.
-// Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful
-// about adding additional calls as generating a snapshot is moderately
-// expensive.
+// sendSnapshotUsingDelegate sends a snapshot of the replica state to the specified
+// replica through a delegate. Currently, only invoked from replicateQueue and
+// raftSnapshotQueue. Be careful about adding additional calls as generating a
+// snapshot is moderately expensive.
//
// A snapshot is a bulk transfer of all data in a range. It consists of a
// consistent view of all the state needed to run some replica of a range as of
-// some applied index (not as of some mvcc-time). Snapshots are used by Raft
-// when a follower is far enough behind the leader that it can no longer be
-// caught up using incremental diffs (because the leader has already garbage
-// collected the diffs, in this case because it truncated the Raft log past
-// where the follower is).
+// some applied index (not as of some mvcc-time). There are two primary cases
+// when a Snapshot is used.
+//
+// The first case is use by Raft when a voter or non-voter follower is far
+// enough behind the leader that it can no longer be caught up using incremental
+// diffs. This occurs because the leader has already garbage collected diffs
+// past where the follower is. The quota pool is responsible for keeping a
+// leader from getting too far ahead of any of the followers, so normally
+// followers don't need a snapshot, however there are a number of cases where
+// this can happen (restarts, paused followers, ...).
//
-// We also proactively send a snapshot when adding a new replica to bootstrap it
+// The second case is adding a new replica to a replica set, to bootstrap it
// (this is called a "learner" snapshot and is a special case of a Raft
// snapshot, we just speed the process along). It's called a learner snapshot
-// because it's sent to what Raft terms a learner replica. As of 19.2, when we
-// add a new replica, it's first added as a learner using a Raft ConfChange,
-// which means it accepts Raft traffic but doesn't vote or affect quorum. Then
+// because it's sent to what Raft terms a learner replica. When we
+// add a new replica, it's first added as a learner using a Raft ConfChange.
+// A learner accepts Raft traffic but doesn't vote or affect quorum. Then
// we immediately send it a snapshot to catch it up. After the snapshot
// successfully applies, we turn it into a normal voting replica using another
// ConfChange. It then uses the normal mechanisms to catch up with whatever got
@@ -2552,13 +2659,20 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript
// the voting replica directly, this avoids a period of fragility when the
// replica would be a full member, but very far behind.
//
-// Snapshots are expensive and mostly unexpected (except learner snapshots
-// during rebalancing). The quota pool is responsible for keeping a leader from
-// getting too far ahead of any of the followers, so ideally they'd never be far
-// enough behind to need a snapshot.
+// The snapshot process itself is broken into 4 parts: delegating the request,
+// generating the snapshot, transmitting it, and applying it.
//
-// The snapshot process itself is broken into 3 parts: generating the snapshot,
-// transmitting it, and applying it.
+// Delegating the request: Since a snapshot is expensive to transfer from a
+// network, CPU and IO perspective, the coordinator attempts to delegate the
+// request to a healthy delegate who is both closer to the final destination.
+// This is done by sending a DelegateSnapshotRequest to a replica. The replica
+// can either reject the delegation request or process it. It will reject if it
+// is too far behind, unhealthy, or has too long of a queue of snapshots to
+// send. If the delegate accepts the delegation request, then the remaining
+// three steps occur on that delegate. If the delegate does not decide to
+// process the request, it sends an error back to the coordinator and the
+// coordinator either chooses a different delegate or itself as the "delegate of
+// last resort".
//
// Generating the snapshot: The data contained in a snapshot is a full copy of
// the replicated data plus everything the replica needs to be a healthy member
@@ -2566,7 +2680,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript
// instead of keeping it all in memory at once. The `(Replica).GetSnapshot`
// method does the necessary locking and gathers the various Raft state needed
// to run a replica. It also creates an iterator for the range's data as it
-// looked under those locks (this is powered by a RocksDB snapshot, which is a
+// looked under those locks (this is powered by a Pebble snapshot, which is a
// different thing but a similar idea). Notably, GetSnapshot does not do the
// data iteration.
//
@@ -2589,7 +2703,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript
// returns true, this is communicated back to the sender, which then proceeds to
// call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier
// to send the data in chunks, each chunk a streaming grpc message. The sender
-// then sends a final message with an indicaton that it's done and blocks again,
+// then sends a final message with an indication that it's done and blocks again,
// waiting for a second and final response from the recipient which indicates if
// the snapshot was a success.
//
@@ -2601,7 +2715,6 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript
// RocksDB. Each of the SSTs also has a range deletion tombstone to delete the
// existing data in the range.
//
-
// Applying the snapshot: After the recipient has received the message
// indicating it has all the data, it hands it all to
// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks
@@ -2637,7 +2750,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript
// callers of `shouldAcceptSnapshotData` return an error so that we no longer
// have to worry about racing with a second snapshot. See the comment on
// ReplicaPlaceholder for details.
-func (r *Replica) sendSnapshot(
+func (r *Replica) sendSnapshotUsingDelegate(
ctx context.Context,
recipient roachpb.ReplicaDescriptor,
snapType kvserverpb.SnapshotRequest_Type,
@@ -2671,12 +2784,12 @@ func (r *Replica) sendSnapshot(
)
}
- // Check follower snapshots cluster setting.
- if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) {
- sender, err = r.getSenderReplica(ctx)
- if err != nil {
- return err
- }
+ status := r.RaftStatus()
+ if status == nil {
+ // This code path is sometimes hit during scatter for replicas that
+ // haven't woken up yet.
+ retErr = &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")}
+ return
}
// Don't send a queue name or priority if the receiver may not understand
@@ -2688,52 +2801,203 @@ func (r *Replica) sendSnapshot(
senderQueueName = 0
senderQueuePriority = 0
}
+ snapUUID := uuid.MakeV4()
+ appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID)
+ // The cleanup function needs to be called regardless of success or failure of
+ // sending to release the log truncation constraint.
+ defer cleanup()
- log.VEventf(
- ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender,
- )
- status := r.RaftStatus()
+ // Create new delegate snapshot request without specifying the delegate
+ // sender.
+ // NB: The leader sets its log truncation constraint at its current applied
+ // index to prevent GCing past this index. This log truncation constraint is
+ // held until the snapshot has been delivered to the end recipient, or fails
+ // trying. The delegate is required to send a snapshot with a FirstIndex equal
+ // or greater than this applied index to ensure the recipient can catch up
+ // using normal Raft processing.
+ delegateRequest := &kvserverpb.DelegateSendSnapshotRequest{
+ RangeID: r.RangeID,
+ CoordinatorReplica: sender,
+ RecipientReplica: recipient,
+ Priority: priority,
+ SenderQueueName: senderQueueName,
+ SenderQueuePriority: senderQueuePriority,
+ Type: snapType,
+ Term: status.Term,
+ DelegatedSender: sender,
+ FirstIndex: appliedIndex,
+ DescriptorGeneration: r.Desc().Generation,
+ QueueOnDelegateLen: MaxQueueOnDelegateLimit.Get(&r.ClusterSettings().SV),
+ SnapId: snapUUID,
+ }
+
+ // Get the list of senders in order.
+ senders, err := r.getSenderReplicas(ctx, recipient)
+ if err != nil {
+ return err
+ }
+
+ if len(senders) == 0 {
+ return errors.Errorf("no sender found to send a snapshot from for %v", r)
+ }
+
+ for n, sender := range senders {
+ delegateRequest.DelegatedSender = sender
+ log.VEventf(
+ ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n+1, recipient, sender,
+ )
+
+ // On the last attempt, always queue on the delegate to time out naturally.
+ if n == len(senders)-1 {
+ delegateRequest.QueueOnDelegateLen = -1
+ }
+
+ retErr = contextutil.RunWithTimeout(
+ ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
+ // Sending snapshot
+ return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
+ },
+ )
+ // Return once we have success.
+ if retErr == nil {
+ return
+ } else {
+ log.Warningf(ctx, "attempt %d: delegate snapshot %+v request failed %v", n+1, delegateRequest, retErr)
+ }
+ }
+ return
+}
+
+// validateSnapshotDelegationRequest will validate that this replica can send
+// the snapshot that the coordinator requested. The main reasons a request can't
+// be delegated are if the Generation or Term of the replica is not equal to the
+// Generation or Term of the coordinator's request or the applied index on this
+// replica is behind the truncated index of the coordinator. Note that the request
+// is validated twice, once before "queueing" and once after. This reduces the
+// chance of false positives (snapshots which are sent but can't be used),
+// however it is difficult to completely eliminate them. Between the time of sending the
+// original request and the delegate processing it, the leaseholder could decide
+// to truncate its index, change the leaseholder or split or merge the range.
+func (r *Replica) validateSnapshotDelegationRequest(
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
+) error {
+ desc := r.Desc()
+ // If the generation has changed, this snapshot may be useless, so don't
+ // attempt to send it.
+ // NB: This is an overly strict check. If other delegates are added to this
+ // snapshot, we don't necessarily need to reject sending the snapshot, however
+ // if there are merges or splits, it is safer to reject.
+ if desc.Generation != req.DescriptorGeneration {
+ log.VEventf(ctx, 2,
+ "%s: generation has changed since snapshot was generated %s != %s",
+ r, req.DescriptorGeneration, desc.Generation,
+ )
+ return errors.Errorf(
+ "%s: generation has changed since snapshot was generated %s != %s",
+ r, req.DescriptorGeneration, desc.Generation,
+ )
+ }
+
+ // Check that the snapshot we generated has a descriptor that includes the
+ // recipient. If it doesn't, the recipient will reject it, so it's better to
+ // not send it in the first place. It's possible to hit this case if we're not
+ // the leaseholder, and we haven't yet applied the configuration change that's
+ // adding the recipient to the range, or we are the leaseholder but have
+ // removed the recipient between starting to send the snapshot and this point.
+ if _, ok := desc.GetReplicaDescriptorByID(req.RecipientReplica.ReplicaID); !ok {
+ // Recipient replica not found in the current range descriptor.
+ // The sender replica's descriptor may be lagging behind the coordinator's.
+ log.VEventf(ctx, 2,
+ "%s: couldn't find receiver replica %s in sender descriptor %s",
+ r, req.DescriptorGeneration, r.Desc(),
+ )
+ return errors.Errorf(
+ "%s: couldn't find receiver replica %s in sender descriptor %s",
+ r, req.RecipientReplica, r.Desc(),
+ )
+ }
+
+ // Check the raft applied state index and term to determine if this replica
+ // is not too far behind the leaseholder. If the delegate is too far behind
+ // that is also needs a snapshot, then any snapshot it sends will be useless.
+ r.mu.RLock()
+ replIdx := r.mu.state.RaftAppliedIndex + 1
+
+ status := r.raftStatusRLocked()
if status == nil {
// This code path is sometimes hit during scatter for replicas that
// haven't woken up yet.
- return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")}
- }
-
- // Create new delegate snapshot request with only required metadata.
- delegateRequest := &kvserverpb.DelegateSnapshotRequest{
- RangeID: r.RangeID,
- CoordinatorReplica: sender,
- RecipientReplica: recipient,
- Priority: priority,
- SenderQueueName: senderQueueName,
- SenderQueuePriority: senderQueuePriority,
- Type: snapType,
- Term: status.Term,
- DelegatedSender: sender,
- }
- err = contextutil.RunWithTimeout(
- ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
- return r.store.cfg.Transport.DelegateSnapshot(
- ctx,
- delegateRequest,
- )
- },
- )
- // Only mark explicitly as snapshot error (which is retriable) if we timed out.
- // Otherwise, it's up to the remote to add this mark where appropriate.
- if errors.HasType(err, (*contextutil.TimeoutError)(nil)) {
- err = errors.Mark(err, errMarkSnapshotError)
+ return errors.Errorf("raft status not initialized")
+ }
+ replTerm := status.Term
+ r.mu.RUnlock()
+
+ // Delegate has a different term than the coordinator. This typically means
+ // the lease has been transferred, and we should not process this request.
+ // There is a potential race where the leaseholder sends a delegate request
+ // and then the term changes before this request is processed. In that
+ // case this code path will not be checked and the snapshot will still be
+ // sent.
+ if replTerm != req.Term {
+ log.Infof(
+ ctx,
+ "sender: %v is not fit to send snapshot for %v; sender term: %v coordinator term: %v",
+ req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term,
+ )
+ return errors.Errorf(
+ "sender: %v is not fit to send snapshot for %v; sender term: %v, coordinator term: %v",
+ req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term,
+ )
}
- return err
+
+ // Sender replica's snapshot will be rejected if the sender replica's raft
+ // applied index is lower than or equal to the log truncated constraint on the
+ // leaseholder, as this replica's snapshot will be wasted. Note that it is
+ // possible that we can enforce strictly lesser than if etcd does not require
+ // previous raft log entries for appending.
+ if replIdx <= req.FirstIndex {
+ log.Infof(
+ ctx, "sender: %v is not fit to send snapshot;"+
+ " sender first index: %v, "+
+ "coordinator log truncation constraint: %v", req.DelegatedSender, replIdx, req.FirstIndex,
+ )
+ return errors.Mark(errors.Errorf(
+ "sender: %v is not fit to send snapshot;"+
+ " sender first index: %v, "+
+ "coordinator log truncation constraint: %v", req.DelegatedSender, replIdx, req.FirstIndex,
+ ), errMarkSnapshotError)
+ }
+ return nil
}
-// followerSnapshotsEnabled is used to enable or disable follower snapshots.
-var followerSnapshotsEnabled = func() *settings.BoolSetting {
- s := settings.RegisterBoolSetting(
+// NumDelegateLimit is used to control the number of delegate followers
+// to use for snapshots. To disable follower snapshots, set this to 0. If
+// enabled, the leaseholder / leader will attempt to find a closer delegate than
+// itself to send the snapshot through. This can save on network bandwidth at a
+// cost in some cases to snapshot send latency.
+var NumDelegateLimit = func() *settings.IntSetting {
+ s := settings.RegisterIntSetting(
+ settings.SystemOnly,
+ "kv.snapshot_delegation.num_follower",
+ "the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder",
+ 1,
+ )
+ s.SetVisibility(settings.Public)
+ return s
+}()
+
+// MaxQueueOnDelegateLimit is used to control how long the outgoing snapshot
+// queue can be before we reject delegation requests. Setting to -1 allows
+// unlimited requests. The purpose of this setting is to prevent a long snapshot
+// queue from delaying a delegated snapshot from being sent. Once the queue
+// length is longer than the configured value, an additional delegation requests
+// will be rejected with an error.
+var MaxQueueOnDelegateLimit = func() *settings.IntSetting {
+ s := settings.RegisterIntSetting(
settings.SystemOnly,
- "kv.snapshot_delegation.enabled",
- "set to true to allow snapshots from follower replicas",
- false,
+ "kv.snapshot_delegation.num_requests",
+ "how many queued requests are allowed on a delegate before the request is rejected",
+ 3,
)
s.SetVisibility(settings.Public)
return s
@@ -2754,9 +3018,8 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting(
func (r *Replica) followerSendSnapshot(
ctx context.Context,
recipient roachpb.ReplicaDescriptor,
- req *kvserverpb.DelegateSnapshotRequest,
- stream DelegateSnapshotResponseStream,
-) (retErr error) {
+ req *kvserverpb.DelegateSendSnapshotRequest,
+) error {
ctx = r.AnnotateCtx(ctx)
sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV)
if sendThreshold > 0 {
@@ -2776,55 +3039,46 @@ func (r *Replica) followerSendSnapshot(
}()
}
- // TODO(amy): when delegating to different senders, check raft applied state
- // to determine if this follower replica is fit to send.
- // Acknowledge that the request has been accepted.
- if err := stream.Send(
- &kvserverpb.DelegateSnapshotResponse{
- SnapResponse: &kvserverpb.SnapshotResponse{
- Status: kvserverpb.SnapshotResponse_ACCEPTED,
- },
- },
- ); err != nil {
+ // Check the validity conditions twice, once before and once after we obtain
+ // the send semaphore. We check after to make sure the snapshot request still
+ // makes sense (e.g the range hasn't split under us). The check is
+ // lightweight. Even if the second check succeeds, the snapshot we send might
+ // still not be usable due to the leaseholder making a change that we don't
+ // know about, but we try to minimize that possibility since snapshots are
+ // expensive to send.
+ err := r.validateSnapshotDelegationRequest(ctx, req)
+ if err != nil {
return err
}
- // Throttle snapshot sending.
+ // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit.
rangeSize := r.GetMVCCStats().Total()
cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize)
if err != nil {
- return err
+ return errors.Wrap(err, "Unable to reserve space for sending this snapshot")
}
defer cleanup()
+ // Check validity again, it is possible that the pending request should not be
+ // sent after we are doing waiting.
+ err = r.validateSnapshotDelegationRequest(ctx, req)
+ if err != nil {
+ return err
+ }
+
snapType := req.Type
- snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID)
+ snap, err := r.GetSnapshot(ctx, snapType, req.SnapId)
if err != nil {
- err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
- return errors.Mark(err, errMarkSnapshotError)
+ return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
}
defer snap.Close()
log.Event(ctx, "generated snapshot")
- // Check that the snapshot we generated has a descriptor that includes the
- // recipient. If it doesn't, the recipient will reject it, so it's better to
- // not send it in the first place. It's possible to hit this case if we're not
- // the leaseholder and we haven't yet applied the configuration change that's
- // adding the recipient to the range.
- if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok {
- return errors.Wrapf(
- errMarkSnapshotError,
- "attempting to send snapshot that does not contain the recipient as a replica; "+
- "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc,
- )
- }
-
- // We avoid shipping over the past Raft log in the snapshot by changing
- // the truncated state (we're allowed to -- it's an unreplicated key and not
- // subject to mapping across replicas). The actual sending happens here:
- _ = (*kvBatchSnapshotStrategy)(nil).Send
- // and results in no log entries being sent at all. Note that
- // Metadata.Index is really the applied index of the replica.
+ // We avoid shipping over the past Raft log in the snapshot by changing the
+ // truncated state (we're allowed to -- it's an unreplicated key and not
+ // subject to mapping across replicas). The actual sending happens in
+ // kvBatchSnapshotStrategy.Send and results in no log entries being sent at
+ // all. Note that Metadata.Index is really the applied index of the replica.
snap.State.TruncatedState = &roachpb.RaftTruncatedState{
Index: snap.RaftSnap.Metadata.Index,
Term: snap.RaftSnap.Metadata.Term,
diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go
index c4844c389032..6b1507833e7a 100644
--- a/pkg/kv/kvserver/replica_learner_test.go
+++ b/pkg/kv/kvserver/replica_learner_test.go
@@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -40,8 +41,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
@@ -271,6 +274,13 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
)
defer tc.Stopper().Stop(ctx)
+
+ // Disable delegating snapshots to different senders, which would otherwise
+ // fail this test as snapshots could queue on different stores.
+ settings := cluster.MakeTestingClusterSettings()
+ sv := &settings.SV
+ kvserver.NumDelegateLimit.Override(ctx, sv, 0)
+
scratch := tc.ScratchRange(t)
replicationChange := make(chan error, 2)
g := ctxgroup.WithContext(ctx)
@@ -338,6 +348,195 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
require.NoError(t, g.Wait())
}
+// TestDelegateSnapshot verifies that the correct delegate is chosen when
+// sending snapshots to stores.
+// TODO: It is currently disabled because with raft snapshots sometimes being
+// required and not going through snapshot delegation, the results are
+// unpredictable.
+func TestDelegateSnapshot(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ skip.UnderStress(t, "Occasionally fails until 87553 is resolved")
+
+ ctx := context.Background()
+
+ // Synchronize on the moment before the snapshot gets sent to measure the
+ // state at that time.
+ requestChannel := make(chan *kvserverpb.DelegateSendSnapshotRequest, 10)
+
+ setupFn := func(t *testing.T) (
+ *testcluster.TestCluster,
+ roachpb.Key,
+ ) {
+ knobs, ltk := makeReplicationTestKnobs()
+ ltk.storeKnobs.DisableRaftSnapshotQueue = true
+
+ ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
+ requestChannel <- request
+ }
+
+ localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}}
+ localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}}
+
+ localityServerArgs := make(map[int]base.TestServerArgs)
+ localityServerArgs[0] = base.TestServerArgs{Knobs: knobs, Locality: localityA}
+ localityServerArgs[1] = base.TestServerArgs{Knobs: knobs, Locality: localityA}
+ localityServerArgs[2] = base.TestServerArgs{Knobs: knobs, Locality: localityB}
+ localityServerArgs[3] = base.TestServerArgs{Knobs: knobs, Locality: localityB}
+
+ tc := testcluster.StartTestCluster(
+ t, 4, base.TestClusterArgs{
+ ServerArgsPerNode: localityServerArgs,
+ ReplicationMode: base.ReplicationManual,
+ },
+ )
+ scratchKey := tc.ScratchRange(t)
+ return tc, scratchKey
+ }
+
+ tc, scratchKey := setupFn(t)
+ defer tc.Stopper().Stop(ctx)
+
+ // Node 3 (loc B) can only get the data from node 1 as its the only one that has it.
+ _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...)
+ request := <-requestChannel
+ require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1))
+ // Drain the channel. Unfortunately there are occasionally spurious raft snapshots sent.
+ for len(requestChannel) > 0 {
+ <-requestChannel
+ }
+
+ // Node 4 (loc B) should get the snapshot from node 3 as its the same locality.
+ _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...)
+ request = <-requestChannel
+ require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(3))
+ for len(requestChannel) > 0 {
+ <-requestChannel
+ }
+
+ // Node 2 (loc A) should get the snapshot from node 1 as it is the same locality.
+ _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...)
+ request = <-requestChannel
+ require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1))
+}
+
+// TestDelegateSnapshotFails is a test that ensure we fail fast when the
+// sender or receiver store crashes during delegated snapshot sending.
+func TestDelegateSnapshotFails(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ var senders struct {
+ mu syncutil.Mutex
+ desc []roachpb.ReplicaDescriptor
+ }
+
+ setupFn := func(t *testing.T) (
+ *testcluster.TestCluster,
+ roachpb.Key,
+ ) {
+ senders.desc = nil
+ knobs, ltk := makeReplicationTestKnobs()
+ ltk.storeKnobs.ThrottleEmptySnapshots = true
+
+ ltk.storeKnobs.SelectDelegateSnapshotSender =
+ func(descriptor *roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor {
+ senders.mu.Lock()
+ defer senders.mu.Unlock()
+ return senders.desc
+ }
+
+ tc := testcluster.StartTestCluster(
+ t, 4, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ },
+ )
+
+ scratchKey := tc.ScratchRange(t)
+ return tc, scratchKey
+ }
+
+ // Add a learner replica that will need a snapshot, kill the server
+ // the learner is on. Assert that the failure is detected and change replicas
+ // fails fast.
+ t.Run("receiver", func(t *testing.T) {
+ tc, scratchKey := setupFn(t)
+ defer tc.Stopper().Stop(ctx)
+
+ desc, err := tc.LookupRange(scratchKey)
+ require.NoError(t, err, "Unable to lookup the range")
+
+ _, err = setupPartitionedRange(tc, desc.RangeID, 0, 0, true, unreliableRaftHandlerFuncs{})
+ require.NoError(t, err)
+
+ _, err = tc.Servers[0].DB().AdminChangeReplicas(
+ ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)),
+ )
+
+ require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err)
+ })
+
+ // Add a follower replica to act as the snapshot sender, and kill the server
+ // the sender is on. Assert that the failure is detected and change replicas
+ // fails fast.
+ t.Run("sender_no_fallback", func(t *testing.T) {
+ tc, scratchKey := setupFn(t)
+ defer tc.Stopper().Stop(ctx)
+
+ // Add a replica that will be the delegated sender, and another so we have
+ // quorum with this node down
+ desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...)
+
+ replicaDesc, ok := desc.GetReplicaDescriptor(3)
+ require.True(t, ok)
+ // Always use node 3 (index 2) as the only delegate.
+ senders.mu.Lock()
+ senders.desc = append(senders.desc, replicaDesc)
+ senders.mu.Unlock()
+
+ // Now stop accepting traffic to node 3 (index 2).
+ _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{})
+ require.NoError(t, err)
+
+ _, err = tc.Servers[0].DB().AdminChangeReplicas(
+ ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)),
+ )
+ log.Infof(ctx, "Err=%v", err)
+ require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err)
+ })
+
+ // Identical setup as the previous test, but allow a fallback to the leaseholder.
+ t.Run("sender_with_fallback", func(t *testing.T) {
+ tc, scratchKey := setupFn(t)
+ defer tc.Stopper().Stop(ctx)
+
+ // Add a replica that will be the delegated sender, and another so we have
+ // quorum with this node down
+ desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...)
+
+ replicaDesc, ok := desc.GetReplicaDescriptor(3)
+ require.True(t, ok)
+ leaseholderDesc, ok := desc.GetReplicaDescriptor(1)
+ require.True(t, ok)
+ // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure.
+ senders.mu.Lock()
+ senders.desc = append(senders.desc, replicaDesc)
+ senders.desc = append(senders.desc, leaseholderDesc)
+ senders.mu.Unlock()
+
+ // Now stop accepting traffic to node 3 (index 2).
+ _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{})
+ require.NoError(t, err)
+
+ _, err = tc.Servers[0].DB().AdminChangeReplicas(
+ ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)),
+ )
+ require.NoError(t, err)
+ })
+}
+
func TestLearnerRaftConfState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
@@ -502,7 +701,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// the state at that time & gather metrics.
blockUntilSnapshotSendCh := make(chan struct{})
blockSnapshotSendCh := make(chan struct{})
- ltk.storeKnobs.SendSnapshot = func() {
+ ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
close(blockUntilSnapshotSendCh)
select {
case <-blockSnapshotSendCh:
@@ -592,7 +791,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// snapshot being sent.
<-blockUntilSnapshotSendCh
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
- snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, tc.Server(1).GetFirstStoreID())
+ snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE)
require.NoError(t, err)
close(blockSnapshotSendCh)
@@ -1766,9 +1965,8 @@ func getExpectedSnapshotSizeBytes(
originStore *kvserver.Store,
originRepl *kvserver.Replica,
snapType kvserverpb.SnapshotRequest_Type,
- recipientStoreID roachpb.StoreID,
) (int64, error) {
- snap, err := originRepl.GetSnapshot(ctx, snapType, recipientStoreID)
+ snap, err := originRepl.GetSnapshot(ctx, snapType, uuid.MakeV4())
if err != nil {
return 0, err
}
@@ -1831,7 +2029,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
// the state at that time.
blockUntilSnapshotSendCh := make(chan struct{})
blockSnapshotSendCh := make(chan struct{})
- ltk.storeKnobs.SendSnapshot = func() {
+ ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
close(blockUntilSnapshotSendCh)
select {
case <-blockSnapshotSendCh:
@@ -1865,7 +2063,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
// snapshot being sent.
<-blockUntilSnapshotSendCh
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
- snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL, tc.Server(1).GetFirstStoreID())
+ snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL)
require.NoError(t, err)
close(blockSnapshotSendCh)
diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go
index f09bdf33cdac..e30791208e34 100644
--- a/pkg/kv/kvserver/replica_raft.go
+++ b/pkg/kv/kvserver/replica_raft.go
@@ -1619,17 +1619,26 @@ type snapTruncationInfo struct {
recipientStore roachpb.StoreID
}
+// addSnapshotLogTruncation creates a log truncation record which will prevent
+// the raft log from being truncated past this point until the cleanup function
+// is called. This function will return the index that the truncation constraint
+// is set at and a cleanup function to remove the constraint. We will fetch the
+// applied index again in GetSnapshot and that is likely a different but higher
+// index. The appliedIndex fetched here is narrowly used for adding a log
+// truncation constraint to prevent log entries > appliedIndex from being
+// removed. Note that the appliedIndex maintained in Replica actually lags the
+// one in the engine, since replicaAppBatch.ApplyToStateMachine commits the
+// engine batch and then acquires mu to update the RaftAppliedIndex. The use of
+// a possibly stale value here is harmless since the values increases
+// monotonically. The actual snapshot index, may preserve more from a log
+// truncation perspective.
func (r *Replica) addSnapshotLogTruncationConstraint(
- ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID,
-) {
+ ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID,
+) (uint64, func()) {
r.mu.Lock()
defer r.mu.Unlock()
- r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, index, recipientStore)
-}
-
-func (r *Replica) addSnapshotLogTruncationConstraintLocked(
- ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID,
-) {
+ appliedIndex := r.mu.state.RaftAppliedIndex
+ // Cleared when OutgoingSnapshot closes.
if r.mu.snapshotLogTruncationConstraints == nil {
r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo)
}
@@ -1639,32 +1648,30 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked(
// fed into this method twice) or a UUID collision. We discard the update
// (which is benign) but log it loudly. If the index is the same, it's
// likely the former, otherwise the latter.
- log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index)
- return
+ log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, appliedIndex)
+ return appliedIndex, func() {}
}
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
- index: index,
+ index: appliedIndex,
recipientStore: recipientStore,
}
-}
-// completeSnapshotLogTruncationConstraint marks the given snapshot as finished,
-// releasing the lock on raft log truncation after a grace period.
-func (r *Replica) completeSnapshotLogTruncationConstraint(snapUUID uuid.UUID) {
- r.mu.Lock()
- defer r.mu.Unlock()
+ return appliedIndex, func() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
- _, ok := r.mu.snapshotLogTruncationConstraints[snapUUID]
- if !ok {
- // UUID collision while adding the snapshot in originally. Nothing
- // else to do.
- return
- }
- delete(r.mu.snapshotLogTruncationConstraints, snapUUID)
- if len(r.mu.snapshotLogTruncationConstraints) == 0 {
- // Save a little bit of memory.
- r.mu.snapshotLogTruncationConstraints = nil
+ _, ok := r.mu.snapshotLogTruncationConstraints[snapUUID]
+ if !ok {
+ // UUID collision while adding the snapshot in originally. Nothing
+ // else to do.
+ return
+ }
+ delete(r.mu.snapshotLogTruncationConstraints, snapUUID)
+ if len(r.mu.snapshotLogTruncationConstraints) == 0 {
+ // Save a little bit of memory.
+ r.mu.snapshotLogTruncationConstraints = nil
+ }
}
}
diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go
index ae01168ee7a4..00cc06e19c5e 100644
--- a/pkg/kv/kvserver/replica_raftstorage.go
+++ b/pkg/kv/kvserver/replica_raftstorage.go
@@ -220,44 +220,17 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) {
// replica. If this method returns without error, callers must eventually call
// OutgoingSnapshot.Close.
func (r *Replica) GetSnapshot(
- ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, recipientStore roachpb.StoreID,
+ ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, snapUUID uuid.UUID,
) (_ *OutgoingSnapshot, err error) {
- snapUUID := uuid.MakeV4()
// Get a snapshot while holding raftMu to make sure we're not seeing "half
// an AddSSTable" (i.e. a state in which an SSTable has been linked in, but
// the corresponding Raft command not applied yet).
r.raftMu.Lock()
snap := r.store.engine.NewSnapshot()
- {
- r.mu.Lock()
- // We will fetch the applied index later again, from snap. The
- // appliedIndex fetched here is narrowly used for adding a log truncation
- // constraint to prevent log entries > appliedIndex from being removed.
- // Note that the appliedIndex maintained in Replica actually lags the one
- // in the engine, since replicaAppBatch.ApplyToStateMachine commits the
- // engine batch and then acquires Replica.mu to update
- // Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value
- // here is harmless since using a lower index in this constraint, than the
- // actual snapshot index, preserves more from a log truncation
- // perspective.
- //
- // TODO(sumeer): despite the above justification, this is unnecessarily
- // complicated. Consider loading the RaftAppliedIndex from the snap for
- // this use case.
- appliedIndex := r.mu.state.RaftAppliedIndex
- // Cleared when OutgoingSnapshot closes.
- r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore)
- r.mu.Unlock()
- }
r.raftMu.Unlock()
- release := func() {
- r.completeSnapshotLogTruncationConstraint(snapUUID)
- }
-
defer func() {
if err != nil {
- release()
snap.Close()
}
}()
@@ -284,7 +257,6 @@ func (r *Replica) GetSnapshot(
log.Errorf(ctx, "error generating snapshot: %+v", err)
return nil, err
}
- snapData.onClose = release
return &snapData, nil
}
diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto
index a876b768dbeb..63c7dc2dc4ff 100644
--- a/pkg/kv/kvserver/storage_services.proto
+++ b/pkg/kv/kvserver/storage_services.proto
@@ -27,15 +27,11 @@ service MultiRaft {
// ERROR, including any collected traces from processing.
rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {}
// DelegateRaftSnapshot asks the server to send a range snapshot to a target
- // (so the client delegates the sending of the snapshot to the server). The
- // server responds in two phases.
- //
- // TODO(nvanbenschoten): This RPC is bi-directional streaming (as opposed to
- // only server-streaming) because of future aspirations; at the moment the
- // request is unary. In the future, we wanted to pause all log truncation,
- // then handshake with the delegated sender, then weaken log truncation
- // protection to just below the index that the sender was sending the
- // snapshot at.
+ // (so the client delegates the sending of the snapshot to the server). This
+ // is a "single-shot" stream today that sends a single request and returns a
+ // single response, however the expectation is that in the future the
+ // throttling/permit reservation will be separated out from the actual
+ // sending.
rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {}
}
diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go
index 60d543d71685..fa3fed233cd4 100644
--- a/pkg/kv/kvserver/store_raft.go
+++ b/pkg/kv/kvserver/store_raft.go
@@ -160,41 +160,41 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) {
// HandleDelegatedSnapshot reads the incoming delegated snapshot message and
// throttles sending snapshots before passing the request to the sender replica.
func (s *Store) HandleDelegatedSnapshot(
- ctx context.Context,
- req *kvserverpb.DelegateSnapshotRequest,
- stream DelegateSnapshotResponseStream,
-) error {
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
+) *kvserverpb.DelegateSnapshotResponse {
ctx = s.AnnotateCtx(ctx)
if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil {
- fn()
+ fn(req)
}
+ sp := tracing.SpanFromContext(ctx)
+
+ // This can happen if the delegate doesn't know about the range yet. Return an
+ // error immediately.
sender, err := s.GetReplica(req.RangeID)
if err != nil {
- return err
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_ERROR,
+ EncodedError: errors.EncodeError(context.Background(), err),
+ CollectedSpans: sp.GetConfiguredRecording(),
+ }
}
- sp := tracing.SpanFromContext(ctx)
// Pass the request to the sender replica.
- if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil {
- return stream.Send(
- &kvserverpb.DelegateSnapshotResponse{
- SnapResponse: snapRespErr(err),
- CollectedSpans: sp.GetConfiguredRecording(),
- },
- )
+ if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil {
+ // If an error occurred during snapshot sending, send an error response.
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_ERROR,
+ EncodedError: errors.EncodeError(context.Background(), err),
+ CollectedSpans: sp.GetConfiguredRecording(),
+ }
}
- resp := &kvserverpb.DelegateSnapshotResponse{
- SnapResponse: &kvserverpb.SnapshotResponse{
- Status: kvserverpb.SnapshotResponse_APPLIED,
- DeprecatedMessage: "Snapshot successfully applied by recipient",
- },
+ return &kvserverpb.DelegateSnapshotResponse{
+ Status: kvserverpb.DelegateSnapshotResponse_APPLIED,
CollectedSpans: sp.GetConfiguredRecording(),
}
- // Send a final response that snapshot sending is completed.
- return stream.Send(resp)
}
// HandleSnapshot reads an incoming streaming snapshot and applies it if
diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go
index 830faf132846..6768b2efcded 100644
--- a/pkg/kv/kvserver/store_snapshot.go
+++ b/pkg/kv/kvserver/store_snapshot.go
@@ -91,13 +91,6 @@ type outgoingSnapshotStream interface {
Recv() (*kvserverpb.SnapshotResponse, error)
}
-// outgoingSnapshotStream is the minimal interface on a GRPC stream required
-// to send a snapshot over the network.
-type outgoingDelegatedStream interface {
- Send(*kvserverpb.DelegateSnapshotRequest) error
- Recv() (*kvserverpb.DelegateSnapshotResponse, error)
-}
-
// snapshotRecordMetrics is a wrapper function that increments a set of metrics
// related to the number of snapshot bytes sent/received. The definer of the
// function specifies which metrics are incremented.
@@ -674,7 +667,11 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
}
}
-// reserveReceiveSnapshot throttles incoming snapshots.
+// reserveReceiveSnapshot reserves space for this snapshot which will attempt to
+// prevent overload of system resources as this snapshot is being sent.
+// Snapshots are often sent in bulk (due to operations like store decommission)
+// so it is necessary to prevent snapshot transfers from overly impacting
+// foreground traffic.
func (s *Store) reserveReceiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header,
) (_cleanup func(), _err error) {
@@ -683,8 +680,9 @@ func (s *Store) reserveReceiveSnapshot(
return s.throttleSnapshot(ctx, s.snapshotApplyQueue,
int(header.SenderQueueName), header.SenderQueuePriority,
+ -1,
header.RangeSize,
- header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID,
+ header.RaftMessageRequest.RangeID,
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress,
)
@@ -692,7 +690,7 @@ func (s *Store) reserveReceiveSnapshot(
// reserveSendSnapshot throttles outgoing snapshots.
func (s *Store) reserveSendSnapshot(
- ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, rangeSize int64,
+ ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, rangeSize int64,
) (_cleanup func(), _err error) {
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveSendSnapshot")
defer sp.Finish()
@@ -701,9 +699,11 @@ func (s *Store) reserveSendSnapshot(
}
return s.throttleSnapshot(ctx, s.snapshotSendQueue,
- int(req.SenderQueueName), req.SenderQueuePriority,
+ int(req.SenderQueueName),
+ req.SenderQueuePriority,
+ req.QueueOnDelegateLen,
rangeSize,
- req.RangeID, req.DelegatedSender.ReplicaID,
+ req.RangeID,
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress,
)
@@ -717,11 +717,12 @@ func (s *Store) throttleSnapshot(
snapshotQueue *multiqueue.MultiQueue,
requestSource int,
requestPriority float64,
+ maxQueueLength int64,
rangeSize int64,
rangeID roachpb.RangeID,
- replicaID roachpb.ReplicaID,
waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge,
-) (cleanup func(), err error) {
+) (cleanup func(), funcErr error) {
+
tBegin := timeutil.Now()
var permit *multiqueue.Permit
// Empty snapshots are exempt from rate limits because they're so cheap to
@@ -729,9 +730,14 @@ func (s *Store) throttleSnapshot(
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
// getting stuck behind large snapshots managed by the replicate queue.
if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots {
- task := snapshotQueue.Add(requestSource, requestPriority)
+ task, err := snapshotQueue.Add(requestSource, requestPriority, maxQueueLength)
+ if err != nil {
+ return nil, err
+ }
+ // After this point, the task is on the queue, so any future errors need to
+ // be handled by cancelling the task to release the permit.
defer func() {
- if err != nil {
+ if funcErr != nil {
snapshotQueue.Cancel(task)
}
}()
@@ -787,10 +793,9 @@ func (s *Store) throttleSnapshot(
if elapsed > snapshotReservationWaitWarnThreshold && !buildutil.CrdbTestBuild {
log.Infof(
ctx,
- "waited for %.1fs to acquire snapshot reservation to r%d/%d",
+ "waited for %.1fs to acquire snapshot reservation to r%d",
elapsed.Seconds(),
rangeID,
- replicaID,
)
}
@@ -1127,7 +1132,7 @@ func maybeHandleDeprecatedSnapErr(deprecated bool, err error) error {
return errors.Mark(err, errMarkSnapshotError)
}
-// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test.
+// SnapshotStorePool narrows StorePool to make sendSnapshotUsingDelegate easier to test.
type SnapshotStorePool interface {
Throttle(reason storepool.ThrottleReason, why string, toStoreID roachpb.StoreID)
}
@@ -1501,7 +1506,7 @@ func sendSnapshot(
recordBytesSent snapshotRecordMetrics,
) error {
if recordBytesSent == nil {
- // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot`
+ // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate`
// with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't
// hooked up to anything.
recordBytesSent = func(inc int64) {}
@@ -1626,76 +1631,3 @@ func sendSnapshot(
)
}
}
-
-// delegateSnapshot sends an outgoing delegated snapshot request via a
-// pre-opened GRPC stream. It sends the delegated snapshot request to the
-// sender and waits for confirmation that the snapshot has been applied.
-func delegateSnapshot(
- ctx context.Context, stream outgoingDelegatedStream, req *kvserverpb.DelegateSnapshotRequest,
-) error {
-
- delegatedSender := req.DelegatedSender
- if err := stream.Send(req); err != nil {
- return err
- }
- // Wait for a response from the sender.
- resp, err := stream.Recv()
- if err != nil {
- return err
- }
- switch resp.SnapResponse.Status {
- case kvserverpb.SnapshotResponse_ERROR:
- return errors.Wrapf(
- maybeHandleDeprecatedSnapErr(resp.Error()),
- "%s: sender couldn't accept %s", delegatedSender, req)
- case kvserverpb.SnapshotResponse_ACCEPTED:
- // The sender accepted the request, it will continue with sending.
- log.VEventf(
- ctx, 2, "sender %s accepted snapshot request %s", delegatedSender,
- req,
- )
- default:
- err := errors.Errorf(
- "%s: server sent an invalid status while negotiating %s: %s",
- delegatedSender, req, resp.SnapResponse.Status,
- )
- return err
- }
-
- // Wait for response to see if the receiver successfully applied the snapshot.
- resp, err = stream.Recv()
- if err != nil {
- return errors.Mark(
- errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError,
- )
- }
- // Wait for EOF to ensure server side processing is complete.
- if unexpectedResp, err := stream.Recv(); err != io.EOF {
- if err != nil {
- return errors.Mark(errors.Wrapf(
- err, "%s: expected EOF, got resp=%v with error",
- delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError)
- }
- return errors.Mark(errors.Newf(
- "%s: expected EOF, got resp=%v", delegatedSender.StoreID,
- unexpectedResp), errMarkSnapshotError)
- }
- sp := tracing.SpanFromContext(ctx)
- if sp != nil {
- sp.ImportRemoteRecording(resp.CollectedSpans)
- }
- switch resp.SnapResponse.Status {
- case kvserverpb.SnapshotResponse_ERROR:
- return maybeHandleDeprecatedSnapErr(resp.Error())
- case kvserverpb.SnapshotResponse_APPLIED:
- // This is the response we're expecting. Snapshot successfully applied.
- log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender)
- return nil
- default:
- return errors.Errorf(
- "%s: server sent an invalid status during finalization: %s",
- delegatedSender, resp.SnapResponse.Status,
- )
- }
-
-}
diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go
index 075409d49560..bebeebef138f 100644
--- a/pkg/kv/kvserver/store_test.go
+++ b/pkg/kv/kvserver/store_test.go
@@ -2995,7 +2995,7 @@ func (sp *fakeStorePool) Throttle(
}
// TestSendSnapshotThrottling tests the store pool throttling behavior of
-// store.sendSnapshot, ensuring that it properly updates the StorePool on
+// store.sendSnapshotUsingDelegate, ensuring that it properly updates the StorePool on
// various exceptional conditions and new capacity estimates.
func TestSendSnapshotThrottling(t *testing.T) {
defer leaktest.AfterTest(t)()
@@ -3066,60 +3066,82 @@ func TestSendSnapshotConcurrency(t *testing.T) {
s := tc.store
// Checking this now makes sure that if the defaults change this test will also.
- require.Equal(t, 2, s.snapshotSendQueue.Len())
- cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{
+ require.Equal(t, 2, s.snapshotSendQueue.AvailableLen())
+ require.Equal(t, 0, s.snapshotSendQueue.QueueLen())
+ cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{
SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE,
SenderQueuePriority: 1,
}, 1)
- require.Nil(t, err)
- require.Equal(t, 1, s.snapshotSendQueue.Len())
- cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{
+ require.NoError(t, err)
+ cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{
SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE,
SenderQueuePriority: 1,
}, 1)
- require.Nil(t, err)
- require.Equal(t, 0, s.snapshotSendQueue.Len())
+ require.NoError(t, err)
+ require.Equal(t, 0, s.snapshotSendQueue.AvailableLen())
+ require.Equal(t, 1, s.snapshotSendQueue.QueueLen())
+
// At this point both the first two tasks will be holding reservations and
- // waiting for cleanup, a third task will block.
+ // waiting for cleanup, a third task will block or fail First send one with
+ // the queue length set to 0 - this will fail since the first tasks are still
+ // running.
+ _, err = s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{
+ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE,
+ SenderQueuePriority: 1,
+ QueueOnDelegateLen: 0,
+ }, 1)
+ require.Error(t, err)
+ require.Equal(t, 0, s.snapshotSendQueue.AvailableLen())
+ require.Equal(t, 1, s.snapshotSendQueue.QueueLen())
+
+ // Now add a task that will wait indefinitely for another task to finish.
var wg sync.WaitGroup
wg.Add(2)
go func() {
before := timeutil.Now()
- cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{
+ cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSendSnapshotRequest{
SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE,
SenderQueuePriority: 1,
+ QueueOnDelegateLen: -1,
}, 1)
after := timeutil.Now()
+ require.NoError(t, err)
+ require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond)
cleanup3()
wg.Done()
- require.Nil(t, err)
- require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond)
}()
- // This task will not block for more than a few MS, but we want to wait for
- // it to complete to make sure it frees the permit.
+ // Now add one that will queue up and wait for another task to finish. This
+ // task will not block for more than 8ms, but we want to wait for it to
+ // complete to make sure it frees the permit.
go func() {
- deadlineCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
+ deadlineCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
defer cancel()
// This will time out since the deadline is set artificially low. Make sure
// the permit is released.
- _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSnapshotRequest{
+ _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSendSnapshotRequest{
SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE,
SenderQueuePriority: 1,
+ QueueOnDelegateLen: -1,
}, 1)
+ require.Error(t, err)
wg.Done()
- require.NotNil(t, err)
}()
// Wait a little time before calling signaling the first two as complete.
time.Sleep(100 * time.Millisecond)
+ require.Equal(t, 0, s.snapshotSendQueue.AvailableLen())
+ // One remaining task are queued at this point.
+ require.Equal(t, 2, s.snapshotSendQueue.QueueLen())
+
cleanup1()
cleanup2()
// Wait until all cleanup run before checking the number of permits.
wg.Wait()
- require.Equal(t, 2, s.snapshotSendQueue.Len())
+ require.Equal(t, 2, s.snapshotSendQueue.AvailableLen())
+ require.Equal(t, 0, s.snapshotSendQueue.QueueLen())
}
func TestReserveSnapshotThrottling(t *testing.T) {
diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go
index ab998562d9af..e5ebda32f678 100644
--- a/pkg/kv/kvserver/testing_knobs.go
+++ b/pkg/kv/kvserver/testing_knobs.go
@@ -280,7 +280,7 @@ type StoreTestingKnobs struct {
// SendSnapshot is run after receiving a DelegateRaftSnapshot request but
// before any throttling or sending logic.
- SendSnapshot func()
+ SendSnapshot func(*kvserverpb.DelegateSendSnapshotRequest)
// ReceiveSnapshot is run after receiving a snapshot header but before
// acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an
// error is returned from the hook, it's sent as an ERROR SnapshotResponse.
@@ -423,6 +423,9 @@ type StoreTestingKnobs struct {
// AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the
// send snapshot semaphore.
AfterSendSnapshotThrottle func()
+ // SelectDelegateSnapshotSender returns an ordered list of replica which will
+ // be used as delegates for sending a snapshot.
+ SelectDelegateSnapshotSender func(*roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor
// EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`.
EnqueueReplicaInterceptor func(queueName string, replica *Replica)