diff --git a/pkg/kv/kvserver/asim/state/config_loader.go b/pkg/kv/kvserver/asim/state/config_loader.go index 6835609deaac..8c4f24a46283 100644 --- a/pkg/kv/kvserver/asim/state/config_loader.go +++ b/pkg/kv/kvserver/asim/state/config_loader.go @@ -36,9 +36,9 @@ var SingleRegionConfig = ClusterInfo{ { Name: "US", Zones: []Zone{ - {Name: "US_1", NodeCount: 5}, - {Name: "US_2", NodeCount: 5}, - {Name: "US_3", NodeCount: 5}, + NewZoneWithSingleStore("US_1", 5), + NewZoneWithSingleStore("US_2", 5), + NewZoneWithSingleStore("US_3", 5), }, }, }, @@ -52,9 +52,9 @@ var SingleRegionMultiStoreConfig = ClusterInfo{ { Name: "US", Zones: []Zone{ - {Name: "US_1", NodeCount: 1, StoresPerNode: 5}, - {Name: "US_2", NodeCount: 1, StoresPerNode: 5}, - {Name: "US_3", NodeCount: 1, StoresPerNode: 5}, + NewZone("US_1", 1, 5), + NewZone("US_2", 1, 5), + NewZone("US_3", 1, 5), }, }, }, @@ -67,25 +67,25 @@ var MultiRegionConfig = ClusterInfo{ { Name: "US_East", Zones: []Zone{ - {Name: "US_East_1", NodeCount: 4}, - {Name: "US_East_2", NodeCount: 4}, - {Name: "US_East_3", NodeCount: 4}, + NewZoneWithSingleStore("US_East_1", 4), + NewZoneWithSingleStore("US_East_2", 4), + NewZoneWithSingleStore("US_East_3", 4), }, }, { Name: "US_West", Zones: []Zone{ - {Name: "US_West_1", NodeCount: 4}, - {Name: "US_West_2", NodeCount: 4}, - {Name: "US_West_3", NodeCount: 4}, + NewZoneWithSingleStore("US_West_1", 4), + NewZoneWithSingleStore("US_West_2", 4), + NewZoneWithSingleStore("US_West_3", 4), }, }, { Name: "EU", Zones: []Zone{ - {Name: "EU_1", NodeCount: 4}, - {Name: "EU_2", NodeCount: 4}, - {Name: "EU_3", NodeCount: 4}, + NewZoneWithSingleStore("EU_1", 4), + NewZoneWithSingleStore("EU_2", 4), + NewZoneWithSingleStore("EU_3", 4), }, }, }, @@ -98,24 +98,24 @@ var ComplexConfig = ClusterInfo{ { Name: "US_East", Zones: []Zone{ - {Name: "US_East_1", NodeCount: 1}, - {Name: "US_East_2", NodeCount: 2}, - {Name: "US_East_3", NodeCount: 3}, - {Name: "US_East_3", NodeCount: 10}, + NewZoneWithSingleStore("US_East_1", 1), + NewZoneWithSingleStore("US_East_2", 2), + NewZoneWithSingleStore("US_East_3", 3), + NewZoneWithSingleStore("US_East_3", 10), }, }, { Name: "US_West", Zones: []Zone{ - {Name: "US_West_1", NodeCount: 2}, + NewZoneWithSingleStore("US_West_1", 2), }, }, { Name: "EU", Zones: []Zone{ - {Name: "EU_1", NodeCount: 3}, - {Name: "EU_2", NodeCount: 3}, - {Name: "EU_3", NodeCount: 4}, + NewZoneWithSingleStore("EU_1", 3), + NewZoneWithSingleStore("EU_2", 3), + NewZoneWithSingleStore("EU_3", 4), }, }, }, @@ -253,6 +253,25 @@ type Zone struct { StoresPerNode int } +// NewZoneWithSingleStore is a constructor for a simulated availability zone, +// taking zone name, node count, and a default of one store per node. +func NewZoneWithSingleStore(name string, nodeCount int) Zone { + return NewZone(name, nodeCount, 1) +} + +// NewZone is a constructor for a simulated availability zone, taking zone name, +// node count, and custom stores per node. +func NewZone(name string, nodeCount int, storesPerNode int) Zone { + if storesPerNode < 1 { + panic(fmt.Sprintf("storesPerNode cannot be less than one but found %v", storesPerNode)) + } + return Zone{ + Name: name, + NodeCount: nodeCount, + StoresPerNode: storesPerNode, + } +} + // Region is a simulated region which contains one or more zones. type Region struct { Name string @@ -328,7 +347,7 @@ func LoadClusterInfo(c ClusterInfo, settings *config.SimulationSettings) State { s.SetNodeLocality(node.NodeID(), locality) storesRequired := z.StoresPerNode if storesRequired < 1 { - storesRequired = 1 + panic(fmt.Sprintf("storesPerNode cannot be less than one but found %v", storesRequired)) } for store := 0; store < storesRequired; store++ { if newStore, ok := s.AddStore(node.NodeID()); !ok { diff --git a/pkg/kv/kvserver/asim/state/new_state.go b/pkg/kv/kvserver/asim/state/new_state.go index 4a9d18bb3d4f..21fcc9185dcc 100644 --- a/pkg/kv/kvserver/asim/state/new_state.go +++ b/pkg/kv/kvserver/asim/state/new_state.go @@ -244,7 +244,7 @@ func ClusterInfoWithDistribution( availableNodes -= allocatedNodes ret.Regions[i] = Region{ Name: name, - Zones: []Zone{{Name: name + "_1", NodeCount: allocatedNodes, StoresPerNode: storesPerNode}}, + Zones: []Zone{NewZone(name+"_1", allocatedNodes, storesPerNode)}, } } diff --git a/pkg/kv/kvserver/asim/tests/testdata/rand/rand_cluster b/pkg/kv/kvserver/asim/tests/testdata/rand/rand_cluster index baffd99a3259..a342dde1ba60 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/rand/rand_cluster +++ b/pkg/kv/kvserver/asim/tests/testdata/rand/rand_cluster @@ -21,7 +21,7 @@ sample1: pass sample2: start running configurations generated using seed 1926012586526624009 loaded cluster with - region:US [zone=US_1(nodes=5,stores=0), zone=US_2(nodes=5,stores=0), zone=US_3(nodes=5,stores=0)] + region:US [zone=US_1(nodes=5,stores=1), zone=US_2(nodes=5,stores=1), zone=US_3(nodes=5,stores=1)] basic ranges with placement_type=even, ranges=10, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -60,9 +60,9 @@ generating settings configurations using static option sample1: start running configurations generated using seed 608747136543856411 loaded cluster with - region:US_East [zone=US_East_1(nodes=1,stores=0), zone=US_East_2(nodes=2,stores=0), zone=US_East_3(nodes=3,stores=0), zone=US_East_3(nodes=10,stores=0)] - region:US_West [zone=US_West_1(nodes=2,stores=0)] - region:EU [zone=EU_1(nodes=3,stores=0), zone=EU_2(nodes=3,stores=0), zone=EU_3(nodes=4,stores=0)] + region:US_East [zone=US_East_1(nodes=1,stores=1), zone=US_East_2(nodes=2,stores=1), zone=US_East_3(nodes=3,stores=1), zone=US_East_3(nodes=10,stores=1)] + region:US_West [zone=US_West_1(nodes=2,stores=1)] + region:EU [zone=EU_1(nodes=3,stores=1), zone=EU_2(nodes=3,stores=1), zone=EU_3(nodes=4,stores=1)] basic ranges with placement_type=even, ranges=10, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -92,9 +92,9 @@ sample1: pass sample2: start running configurations generated using seed 1926012586526624009 loaded cluster with - region:US_East [zone=US_East_1(nodes=4,stores=0), zone=US_East_2(nodes=4,stores=0), zone=US_East_3(nodes=4,stores=0)] - region:US_West [zone=US_West_1(nodes=4,stores=0), zone=US_West_2(nodes=4,stores=0), zone=US_West_3(nodes=4,stores=0)] - region:EU [zone=EU_1(nodes=4,stores=0), zone=EU_2(nodes=4,stores=0), zone=EU_3(nodes=4,stores=0)] + region:US_East [zone=US_East_1(nodes=4,stores=1), zone=US_East_2(nodes=4,stores=1), zone=US_East_3(nodes=4,stores=1)] + region:US_West [zone=US_West_1(nodes=4,stores=1), zone=US_West_2(nodes=4,stores=1), zone=US_West_3(nodes=4,stores=1)] + region:EU [zone=EU_1(nodes=4,stores=1), zone=EU_2(nodes=4,stores=1), zone=EU_3(nodes=4,stores=1)] basic ranges with placement_type=even, ranges=10, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -128,9 +128,9 @@ sample2: pass sample3: start running configurations generated using seed 3534334367214237261 loaded cluster with - region:US_East [zone=US_East_1(nodes=1,stores=0), zone=US_East_2(nodes=2,stores=0), zone=US_East_3(nodes=3,stores=0), zone=US_East_3(nodes=10,stores=0)] - region:US_West [zone=US_West_1(nodes=2,stores=0)] - region:EU [zone=EU_1(nodes=3,stores=0), zone=EU_2(nodes=3,stores=0), zone=EU_3(nodes=4,stores=0)] + region:US_East [zone=US_East_1(nodes=1,stores=1), zone=US_East_2(nodes=2,stores=1), zone=US_East_3(nodes=3,stores=1), zone=US_East_3(nodes=10,stores=1)] + region:US_West [zone=US_West_1(nodes=2,stores=1)] + region:EU [zone=EU_1(nodes=3,stores=1), zone=EU_2(nodes=3,stores=1), zone=EU_3(nodes=4,stores=1)] basic ranges with placement_type=even, ranges=10, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -179,7 +179,7 @@ sample1: pass sample2: start running configurations generated using seed 1926012586526624009 loaded cluster with - region:US [zone=US_1(nodes=5,stores=0), zone=US_2(nodes=5,stores=0), zone=US_3(nodes=5,stores=0)] + region:US [zone=US_1(nodes=5,stores=1), zone=US_2(nodes=5,stores=1), zone=US_3(nodes=5,stores=1)] basic ranges with placement_type=even, ranges=10, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -188,9 +188,9 @@ sample2: pass sample3: start running configurations generated using seed 3534334367214237261 loaded cluster with - region:US_East [zone=US_East_1(nodes=1,stores=0), zone=US_East_2(nodes=2,stores=0), zone=US_East_3(nodes=3,stores=0), zone=US_East_3(nodes=10,stores=0)] - region:US_West [zone=US_West_1(nodes=2,stores=0)] - region:EU [zone=EU_1(nodes=3,stores=0), zone=EU_2(nodes=3,stores=0), zone=EU_3(nodes=4,stores=0)] + region:US_East [zone=US_East_1(nodes=1,stores=1), zone=US_East_2(nodes=2,stores=1), zone=US_East_3(nodes=3,stores=1), zone=US_East_3(nodes=10,stores=1)] + region:US_West [zone=US_West_1(nodes=2,stores=1)] + region:EU [zone=EU_1(nodes=3,stores=1), zone=EU_2(nodes=3,stores=1), zone=EU_3(nodes=4,stores=1)] basic ranges with placement_type=even, ranges=10, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 diff --git a/pkg/kv/kvserver/asim/tests/testdata/rand/rand_event b/pkg/kv/kvserver/asim/tests/testdata/rand/rand_event index 9ef61e4ccdd4..c05943774988 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/rand/rand_event +++ b/pkg/kv/kvserver/asim/tests/testdata/rand/rand_event @@ -25,9 +25,9 @@ generating settings configurations using static option sample1: start running configurations generated using seed 7894140303635748408 loaded cluster with - region:US_East [zone=US_East_1(nodes=1,stores=0), zone=US_East_2(nodes=2,stores=0), zone=US_East_3(nodes=3,stores=0), zone=US_East_3(nodes=10,stores=0)] - region:US_West [zone=US_West_1(nodes=2,stores=0)] - region:EU [zone=EU_1(nodes=3,stores=0), zone=EU_2(nodes=3,stores=0), zone=EU_3(nodes=4,stores=0)] + region:US_East [zone=US_East_1(nodes=1,stores=1), zone=US_East_2(nodes=2,stores=1), zone=US_East_3(nodes=3,stores=1), zone=US_East_3(nodes=10,stores=1)] + region:US_West [zone=US_West_1(nodes=2,stores=1)] + region:EU [zone=EU_1(nodes=3,stores=1), zone=EU_2(nodes=3,stores=1), zone=EU_3(nodes=4,stores=1)] basic ranges with placement_type=even, ranges=1, key_space=200000, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=12, number of assertion events=12 diff --git a/pkg/kv/kvserver/asim/tests/testdata/rand/rand_ranges b/pkg/kv/kvserver/asim/tests/testdata/rand/rand_ranges index 5477afe44a1d..783671ee4bf1 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/rand/rand_ranges +++ b/pkg/kv/kvserver/asim/tests/testdata/rand/rand_ranges @@ -49,7 +49,7 @@ sample1: pass sample2: start running configurations generated using seed 2643318057788968173 loaded cluster with - region:US [zone=US_1(nodes=5,stores=0), zone=US_2(nodes=5,stores=0), zone=US_3(nodes=5,stores=0)] + region:US [zone=US_1(nodes=5,stores=1), zone=US_2(nodes=5,stores=1), zone=US_3(nodes=5,stores=1)] randomized ranges with placement_type=random, ranges=944, key_space=1357, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -58,7 +58,7 @@ sample2: pass sample3: start running configurations generated using seed 6972490225919430754 loaded cluster with - region:US [zone=US_1(nodes=5,stores=0), zone=US_2(nodes=5,stores=0), zone=US_3(nodes=5,stores=0)] + region:US [zone=US_1(nodes=5,stores=1), zone=US_2(nodes=5,stores=1), zone=US_3(nodes=5,stores=1)] randomized ranges with placement_type=random, ranges=479, key_space=1003, replication_factor=3, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -173,7 +173,7 @@ sample1: pass sample2: start running configurations generated using seed 2643318057788968173 loaded cluster with - region:US [zone=US_1(nodes=5,stores=0), zone=US_2(nodes=5,stores=0), zone=US_3(nodes=5,stores=0)] + region:US [zone=US_1(nodes=5,stores=1), zone=US_2(nodes=5,stores=1), zone=US_3(nodes=5,stores=1)] randomized ranges with placement_type=random, ranges=944, key_space=150098, replication_factor=1, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 @@ -205,7 +205,7 @@ over replicated: sample3: start running configurations generated using seed 6972490225919430754 loaded cluster with - region:US [zone=US_1(nodes=5,stores=0), zone=US_2(nodes=5,stores=0), zone=US_3(nodes=5,stores=0)] + region:US [zone=US_1(nodes=5,stores=1), zone=US_2(nodes=5,stores=1), zone=US_3(nodes=5,stores=1)] randomized ranges with placement_type=random, ranges=479, key_space=199954, replication_factor=1, bytes=0 basic load with rw_ratio=0.00, rate=0.00, skewed_access=false, min_block_size=1, max_block_size=1, min_key=1, max_key=200000 number of mutation events=0, number of assertion events=0 diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 9ae4a88a10a5..90c4ac8e190b 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -473,7 +473,6 @@ func mergeCheckingTimestampCaches( var snapshotFilter func(kvserver.IncomingSnapshot) beforeSnapshotSSTIngestion := func( inSnap kvserver.IncomingSnapshot, - snapType kvserverpb.SnapshotRequest_Type, _ []string, ) error { filterMu.Lock() @@ -3726,7 +3725,6 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { rangeIds := make(map[string]roachpb.RangeID, 4) beforeSnapshotSSTIngestion := func( inSnap kvserver.IncomingSnapshot, - snapType kvserverpb.SnapshotRequest_Type, sstNames []string, ) error { // Only verify snapshots of type VIA_SNAPSHOT_QUEUE and on the range under @@ -3735,8 +3733,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // there are too many keys and the other replicated keys are verified later // on in the test. This function verifies that the subsumed replicas have // been handled properly. - if snapType != kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE || - inSnap.Desc.RangeID != rangeIds[string(keyA)] { + if inSnap.Desc.RangeID != rangeIds[string(keyA)] { return nil } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 5911155898e0..89a082a74d30 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -4736,7 +4736,6 @@ func TestTenantID(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ BeforeSnapshotSSTIngestion: func( snapshot kvserver.IncomingSnapshot, - requestType kvserverpb.SnapshotRequest_Type, strings []string, ) error { if snapshot.Desc.RangeID == repl.RangeID { @@ -4815,7 +4814,6 @@ func TestUninitializedMetric(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ BeforeSnapshotSSTIngestion: func( snapshot kvserver.IncomingSnapshot, - _ kvserverpb.SnapshotRequest_Type, _ []string, ) error { if snapshot.Desc.RangeID == repl.RangeID { diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index fa69180cd8fe..a719841ec60a 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -214,16 +214,17 @@ message SnapshotRequest { // The priority of the snapshot. // Deprecated, prefer sender_queue_priority. - // TODO(abaptist): Remove this field for v23.1. - Priority priority = 6; + // TODO(abaptist): Remove this field when v23.1 compatibility is dropped. + Priority deprecated_priority = 6; // The strategy of the snapshot. - Strategy strategy = 7; + // TODO(abaptist): Remove this field when v23.1 compatibility is dropped. + Strategy deprecated_strategy = 7; // The type of the snapshot. // Deprecated, prefer sender_queue_name. - // TODO(abaptist): Remove this field for v23.1. - Type type = 9; + // TODO(abaptist): Remove this field when v23.1 compatibility is dropped. + Type deprecated_type = 9; // Whether the snapshot uses the unreplicated RaftTruncatedState or not. // This is always true for snapshots generated in v21.1+ clusters. In v20.2 @@ -378,12 +379,12 @@ message DelegateSendSnapshotRequest { roachpb.ReplicaDescriptor delegated_sender = 4 [(gogoproto.nullable) = false]; // The priority of the snapshot. - // TODO(abaptist): Remove this field for v23.1. - SnapshotRequest.Priority priority = 5; + // TODO(abaptist): Remove this field when v23.1 compatibility is dropped. + SnapshotRequest.Priority deprecated_priority = 5; // The type of the snapshot. - // TODO(abaptist): Remove this field for v23.1. - SnapshotRequest.Type type = 6; + // TODO(abaptist): Remove this field when v23.1 compatibility is dropped. + SnapshotRequest.Type deprecated_type = 6; // The Raft term of the coordinator (in most cases the leaseholder) replica. // The term is used during snapshot receiving to reject messages from an older term. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c45c41f1bf90..f488e085a355 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2814,7 +2814,7 @@ func (r *Replica) sendSnapshotUsingDelegate( } snapUUID := uuid.MakeV4() - appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, snapType == kvserverpb.SnapshotRequest_INITIAL, recipient.StoreID) + appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, senderQueueName != kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, recipient.StoreID) // The cleanup function needs to be called regardless of success or failure of // sending to release the log truncation constraint. defer cleanup() @@ -2831,10 +2831,10 @@ func (r *Replica) sendSnapshotUsingDelegate( RangeID: r.RangeID, CoordinatorReplica: sender, RecipientReplica: recipient, - Priority: priority, + DeprecatedPriority: priority, SenderQueueName: senderQueueName, SenderQueuePriority: senderQueuePriority, - Type: snapType, + DeprecatedType: snapType, Term: kvpb.RaftTerm(status.Term), DelegatedSender: sender, FirstIndex: appliedIndex, @@ -3114,10 +3114,9 @@ func (r *Replica) followerSendSnapshot( return nil, err } - snapType := req.Type - snap, err := r.GetSnapshot(ctx, snapType, req.SnapId) + snap, err := r.GetSnapshot(ctx, req.SnapId) if err != nil { - return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) + return nil, errors.Wrapf(err, "%s: failed to generate snapshot", r) } defer snap.Close() log.Event(ctx, "generated snapshot") @@ -3159,11 +3158,11 @@ func (r *Replica) followerSendSnapshot( }, }, RangeSize: rangeSize, - Priority: req.Priority, + DeprecatedPriority: req.DeprecatedPriority, SenderQueueName: req.SenderQueueName, SenderQueuePriority: req.SenderQueuePriority, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: req.Type, + DeprecatedStrategy: kvserverpb.SnapshotRequest_KV_BATCH, + DeprecatedType: req.DeprecatedType, SharedReplicate: sharedReplicate, } newBatchFn := func() storage.WriteBatch { @@ -3183,15 +3182,25 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.RangeSnapshotSentBytes.Inc(inc) r.store.metrics.updateCrossLocalityMetricsOnSnapshotSent(comparisonResult, inc) - switch header.Priority { - case kvserverpb.SnapshotRequest_RECOVERY: + // This computation is a little ugly, it is intended for backward + // compatibility of stats, but in the future it should be cleaned up. + if header.SenderQueueName == kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE { r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc) - case kvserverpb.SnapshotRequest_REBALANCE: + } else if header.SenderQueueName == kvserverpb.SnapshotRequest_OTHER { + // OTHER snapshots are sent by Replica.ChangeReplicas but are not used for + // recovery, but do have various uses (user, pre-merge, store rebalancer). + // They are all bucketed under the Rebalance bucket. r.store.metrics.RangeSnapshotRebalancingSentBytes.Inc(inc) - default: - // If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of - // type UNKNOWN. - r.store.metrics.RangeSnapshotUnknownSentBytes.Inc(inc) + } else { + // SnapshotRequest_REPLICATE_QUEUE sends both recovery and rebalance + // snapshots. Split based on whether the priority is set. Priority 0 means + // it is used for rebalance. + // See AllocatorAction.Priority + if header.SenderQueuePriority > 0 { + r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc) + } else { + r.store.metrics.RangeSnapshotRebalancingSentBytes.Inc(inc) + } } } diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 3c625a73e4db..808ced2922ac 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -886,8 +886,8 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { log.Fatalf(ctx, "unexpected replicaType: %s", replicaType) } - if !testutils.IsError(err, `remote couldn't accept INITIAL snapshot`) { - t.Fatalf(`expected "remote couldn't accept INITIAL snapshot" error got: %+v`, err) + if !testutils.IsError(err, `remote couldn't accept snapshot`) { + t.Fatalf(`expected "remote couldn't accept snapshot" error got: %+v`, err) } // Make sure we cleaned up after ourselves (by removing the learner/non-voter). desc := tc.LookupRangeOrFatal(t, scratchStartKey) @@ -1005,7 +1005,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { if err != nil { return err } - matched, err := regexp.MatchString("streamed VIA_SNAPSHOT_QUEUE snapshot.*to.*NON_VOTER", recording.String()) + matched, err := regexp.MatchString("streamed snapshot.*to.*NON_VOTER", recording.String()) if err != nil { return err } @@ -2223,7 +2223,7 @@ func getExpectedSnapshotSizeBytes( originRepl *kvserver.Replica, snapType kvserverpb.SnapshotRequest_Type, ) (int64, error) { - snap, err := originRepl.GetSnapshot(ctx, snapType, uuid.MakeV4()) + snap, err := originRepl.GetSnapshot(ctx, uuid.MakeV4()) if err != nil { return 0, err } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index f9c813a9a001..4db097d459c4 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -232,7 +232,7 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) { // replica. If this method returns without error, callers must eventually call // OutgoingSnapshot.Close. func (r *Replica) GetSnapshot( - ctx context.Context, snapType kvserverpb.SnapshotRequest_Type, snapUUID uuid.UUID, + ctx context.Context, snapUUID uuid.UUID, ) (_ *OutgoingSnapshot, err error) { // Get a snapshot while holding raftMu to make sure we're not seeing "half // an AddSSTable" (i.e. a state in which an SSTable has been linked in, but @@ -286,7 +286,7 @@ func (r *Replica) GetSnapshot( // NB: We have Replica.mu read-locked, but we need it write-locked in order // to use Replica.mu.stateLoader. This call is not performance sensitive, so // create a new state loader. - snapData, err := snapshot(ctx, snapUUID, stateloader.Make(rangeID), snapType, snap, startKey) + snapData, err := snapshot(ctx, snapUUID, stateloader.Make(rangeID), snap, startKey) if err != nil { log.Errorf(ctx, "error generating snapshot: %+v", err) return nil, err @@ -305,7 +305,6 @@ type OutgoingSnapshot struct { EngineSnap storage.Reader // The replica state within the snapshot. State kvserverpb.ReplicaState - snapType kvserverpb.SnapshotRequest_Type sharedBackings []objstorage.RemoteObjectBackingHandle onClose func() } @@ -316,8 +315,8 @@ func (s OutgoingSnapshot) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (s OutgoingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("%s snapshot %s at applied index %d", - s.snapType, redact.Safe(s.SnapUUID.Short()), s.State.RaftAppliedIndex) + w.Printf("snapshot %s at applied index %d", + redact.Safe(s.SnapUUID.Short()), s.State.RaftAppliedIndex) } // Close releases the resources associated with the snapshot. @@ -341,7 +340,6 @@ type IncomingSnapshot struct { Desc *roachpb.RangeDescriptor DataSize int64 SharedSize int64 - snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder raftAppliedIndex kvpb.RaftIndex // logging only msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied @@ -355,8 +353,8 @@ func (s IncomingSnapshot) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (s IncomingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("%s snapshot %s from %s at applied index %d", - s.snapType, redact.Safe(s.SnapUUID.Short()), s.FromReplica, s.raftAppliedIndex) + w.Printf("snapshot %s from %s at applied index %d", + redact.Safe(s.SnapUUID.Short()), s.FromReplica, s.raftAppliedIndex) } // snapshot creates an OutgoingSnapshot containing a pebble snapshot for the @@ -365,7 +363,6 @@ func snapshot( ctx context.Context, snapUUID uuid.UUID, rsl stateloader.StateLoader, - snapType kvserverpb.SnapshotRequest_Type, snap storage.Reader, startKey roachpb.RKey, ) (OutgoingSnapshot, error) { @@ -400,7 +397,6 @@ func snapshot( ConfState: desc.Replicas().ConfState(), }, }, - snapType: snapType, }, nil } @@ -611,7 +607,7 @@ func (r *Replica) applySnapshot( // Ingest all SSTs atomically. if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil { - if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil { + if err := fn(inSnap, inSnap.SSTStorageScratch.SSTs()); err != nil { return err } } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 34e41b49cfbf..07d070aca828 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -108,49 +108,6 @@ type outgoingSnapshotStream interface { // function specifies which metrics are incremented. type snapshotRecordMetrics func(inc int64) -// snapshotStrategy is an approach to sending and receiving Range snapshots. -// Each implementation corresponds to a SnapshotRequest_Strategy, and it is -// expected that the implementation that matches the Strategy specified in the -// snapshot header will always be used. -type snapshotStrategy interface { - // Receive streams SnapshotRequests in from the provided stream and - // constructs an IncomingSnapshot. - Receive( - context.Context, - *Store, - incomingSnapshotStream, - kvserverpb.SnapshotRequest_Header, - snapshotRecordMetrics, - ) (IncomingSnapshot, error) - - // Send streams SnapshotRequests created from the OutgoingSnapshot in to the - // provided stream. On nil error, the number of bytes sent is returned. - Send( - context.Context, - outgoingSnapshotStream, - kvserverpb.SnapshotRequest_Header, - *OutgoingSnapshot, - snapshotRecordMetrics, - ) (int64, error) - - // Status provides a status report on the work performed during the - // snapshot. Only valid if the strategy succeeded. - Status() redact.RedactableString - - // Close cleans up any resources associated with the snapshot strategy. - Close(context.Context) -} - -func assertStrategy( - ctx context.Context, - header kvserverpb.SnapshotRequest_Header, - expect kvserverpb.SnapshotRequest_Strategy, -) { - if header.Strategy != expect { - log.Fatalf(ctx, "expected strategy %s, found strategy %s", expect, header.Strategy) - } -} - // kvBatchSnapshotStrategy is an implementation of snapshotStrategy that streams // batches of KV pairs in the BatchRepr format. type kvBatchSnapshotStrategy struct { @@ -514,7 +471,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( header kvserverpb.SnapshotRequest_Header, recordBytesReceived snapshotRecordMetrics, ) (IncomingSnapshot, error) { - assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) if fn := s.cfg.TestingKnobs.BeforeRecvAcceptedSnapshot; fn != nil { fn() } @@ -720,7 +676,6 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( Desc: header.State.Desc, DataSize: dataSize, SharedSize: sharedSize, - snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, msgAppRespCh: make(chan raftpb.Message, 1), sharedSSTs: sharedSSTs, @@ -743,7 +698,6 @@ func (kvSS *kvBatchSnapshotStrategy) Send( snap *OutgoingSnapshot, recordBytesSent snapshotRecordMetrics, ) (int64, error) { - assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) // bytesSent is updated as key-value batches are sent with sendBatch. It does // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). @@ -1327,16 +1281,17 @@ func (s *Store) receiveSnapshot( // happens in getStoreListFromIDsLocked()), but in case they are, they should // reject the incoming rebalancing snapshots. if s.IsDraining() { - switch t := header.Priority; t { - case kvserverpb.SnapshotRequest_RECOVERY: + switch t := header.SenderQueueName; t { + case kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE: // We can not reject Raft snapshots because draining nodes may have // replicas in `StateSnapshot` that need to catch up. - // - // TODO(aayush): We also do not reject snapshots sent to replace dead - // replicas here, but draining stores are still filtered out in - // getStoreListFromIDsLocked(). Is that sound? Don't we want to - // upreplicate to draining nodes if there are no other candidates? - case kvserverpb.SnapshotRequest_REBALANCE: + case kvserverpb.SnapshotRequest_REPLICATE_QUEUE: + // Only reject if these are "rebalance" snapshots, not "recovery" + // snapshots. We use the priority 0 to differentiate the types. + if header.SenderQueuePriority == 0 { + return sendSnapshotError(ctx, s, stream, errors.New(storeDrainingMsg)) + } + case kvserverpb.SnapshotRequest_OTHER: return sendSnapshotError(ctx, s, stream, errors.New(storeDrainingMsg)) default: // If this a new snapshot type that this cockroach version does not know @@ -1356,8 +1311,8 @@ func (s *Store) receiveSnapshot( storeID := s.StoreID() if _, ok := header.State.Desc.GetReplicaDescriptor(storeID); !ok { return errors.AssertionFailedf( - `snapshot of type %s was sent to s%d which did not contain it as a replica: %s`, - header.Type, storeID, header.State.Desc.Replicas()) + `snapshot from queue %s was sent to s%d which did not contain it as a replica: %s`, + header.SenderQueueName, storeID, header.State.Desc.Replicas()) } cleanup, err := s.reserveReceiveSnapshot(ctx, header) @@ -1401,30 +1356,18 @@ func (s *Store) receiveSnapshot( } }() - // Determine which snapshot strategy the sender is using to send this - // snapshot. If we don't know how to handle the specified strategy, return - // an error. - var ss snapshotStrategy - switch header.Strategy { - case kvserverpb.SnapshotRequest_KV_BATCH: - snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) - if err != nil { - err = errors.Wrap(err, "invalid snapshot") - return sendSnapshotError(ctx, s, stream, err) - } + snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) + if err != nil { + err = errors.Wrap(err, "invalid snapshot") + return sendSnapshotError(ctx, s, stream, err) + } - ss = &kvBatchSnapshotStrategy{ - scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID), - sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), - st: s.ClusterSettings(), - } - defer ss.Close(ctx) - default: - return sendSnapshotError(ctx, s, stream, - errors.Errorf("%s,r%d: unknown snapshot strategy: %s", - s, header.State.Desc.RangeID, header.Strategy), - ) + ss := &kvBatchSnapshotStrategy{ + scratch: s.sstSnapshotStorage.NewScratchSpace(header.State.Desc.RangeID, snapUUID), + sstChunkSize: snapshotSSTWriteSyncRate.Get(&s.cfg.Settings.SV), + st: s.ClusterSettings(), } + defer ss.Close(ctx) if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { return err @@ -1440,15 +1383,24 @@ func (s *Store) receiveSnapshot( s.metrics.RangeSnapshotRcvdBytes.Inc(inc) s.metrics.updateCrossLocalityMetricsOnSnapshotRcvd(comparisonResult, inc) - switch header.Priority { - case kvserverpb.SnapshotRequest_RECOVERY: + // This logic for metrics should match what is in replica_command. + if header.SenderQueueName == kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE { s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc) - case kvserverpb.SnapshotRequest_REBALANCE: + } else if header.SenderQueueName == kvserverpb.SnapshotRequest_OTHER { + // OTHER snapshots are sent by Replica.ChangeReplicas but are not used for + // recovery, but do have various uses (user, pre-merge, store rebalancer). + // They are all bucketed under the Rebalance bucket. s.metrics.RangeSnapshotRebalancingRcvdBytes.Inc(inc) - default: - // If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of - // type UNKNOWN. - s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc) + } else { + // SnapshotRequest_REPLICATE_QUEUE sends both recovery and rebalance + // snapshots. Split based on whether the priority is set. Priority 0 means + // it is used for rebalance. + // See AllocatorAction.Priority + if header.SenderQueuePriority > 0 { + s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc) + } else { + s.metrics.RangeSnapshotRebalancingRcvdBytes.Inc(inc) + } } } inSnap, err := ss.Receive(ctx, s, stream, *header, recordBytesReceived) @@ -1742,12 +1694,6 @@ func SendEmptySnapshot( ctx, snapUUID, sl, - // TODO(tbg): We may want a separate SnapshotRequest type - // for recovery that always goes through by bypassing all throttling - // so they cannot be declined. We don't want our operation to be held - // up behind a long running snapshot. We want this to go through - // quickly. - kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, engSnapshot, desc.StartKey, ) @@ -1778,13 +1724,9 @@ func SendEmptySnapshot( } header := kvserverpb.SnapshotRequest_Header{ - State: state, - RaftMessageRequest: req, - RangeSize: ms.Total(), - Priority: kvserverpb.SnapshotRequest_RECOVERY, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: kvserverpb.SnapshotRequest_VIA_SNAPSHOT_QUEUE, - DeprecatedUnreplicatedTruncatedState: true, + State: state, + RaftMessageRequest: req, + RangeSize: ms.Total(), } stream, err := NewMultiRaftClient(cc).RaftSnapshot(ctx) @@ -1888,18 +1830,11 @@ func sendSnapshot( // nice to figure this out, but the batches/sec rate limit works for now. limiter := rate.NewLimiter(targetRate/rate.Limit(batchSize), 1 /* burst size */) - // Create a snapshotStrategy based on the desired snapshot strategy. - var ss snapshotStrategy - switch header.Strategy { - case kvserverpb.SnapshotRequest_KV_BATCH: - ss = &kvBatchSnapshotStrategy{ - batchSize: batchSize, - limiter: limiter, - newWriteBatch: newWriteBatch, - st: st, - } - default: - log.Fatalf(ctx, "unknown snapshot strategy: %s", header.Strategy) + ss := &kvBatchSnapshotStrategy{ + batchSize: batchSize, + limiter: limiter, + newWriteBatch: newWriteBatch, + st: st, } // Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index e735de1f61a0..c099d8283421 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -341,7 +341,7 @@ type StoreTestingKnobs struct { BeforeRemovingDemotedLearner func() // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when // applying a snapshot. - BeforeSnapshotSSTIngestion func(IncomingSnapshot, kvserverpb.SnapshotRequest_Type, []string) error + BeforeSnapshotSSTIngestion func(IncomingSnapshot, []string) error // OnRelocatedOne intercepts the return values of s.relocateOne after they // have successfully been put into effect. OnRelocatedOne func(_ []kvpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget) diff --git a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts index 31c11e99c77c..d6c51d1f6caa 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts @@ -118,6 +118,40 @@ export function executeInternalSql( return executeSql(req); } +/** + * executeInternalSqlHelper is an extension on top of executeInternalSql + * to add the ability to handle max size errors. + * + * @param req execution request details + */ +export async function executeInternalSqlHelper( + req: () => SqlExecutionRequest, + resultSetCallback: (response: SqlTxnResult[]) => void, +): Promise { + let lastResponse: SqlExecutionResponse = undefined; + do { + const sqlRequest = req(); + lastResponse = await executeSqlAndAddToResponse( + sqlRequest, + resultSetCallback, + ); + } while (lastResponse && isMaxSizeError(lastResponse.error?.message)); + + return lastResponse.error; +} + +async function executeSqlAndAddToResponse( + req: SqlExecutionRequest, + resultSetCallback: (response: SqlTxnResult[]) => void, +): Promise> { + const response = await executeInternalSql(req); + if (!sqlResultsAreEmpty(response)) { + resultSetCallback(response.execution.txn_results); + } + + return response; +} + /** * sqlResultsAreEmpty returns true if the provided result * does not contain any rows. @@ -250,5 +284,14 @@ export function combineQueryErrors( } export function txnResultIsEmpty(txn_result: SqlTxnResult): boolean { - return !txn_result.rows || txn_result.rows?.length === 0; + return !txn_result || !txn_result.rows || txn_result.rows?.length === 0; +} + +export function txnResultSetIsEmpty( + txn_results: SqlTxnResult[], +): boolean { + if (!txn_results || txn_results.length === 0) { + return true; + } + return txn_results.every(x => txnResultIsEmpty(x)); } diff --git a/pkg/ui/workspaces/cluster-ui/src/api/statementDiagnosticsApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/statementDiagnosticsApi.ts index e9d36e87c4e2..97c0e7412ff2 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/statementDiagnosticsApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/statementDiagnosticsApi.ts @@ -11,10 +11,12 @@ import moment from "moment-timezone"; import { Duration } from "src/util/format"; import { + createSqlExecutionRequest, executeInternalSql, - LARGE_RESULT_SIZE, + executeInternalSqlHelper, SqlExecutionRequest, - sqlResultsAreEmpty, + SqlTxnResult, + txnResultSetIsEmpty, } from "src/api"; export type StatementDiagnosticsReport = { @@ -29,11 +31,20 @@ export type StatementDiagnosticsReport = { export type StatementDiagnosticsResponse = StatementDiagnosticsReport[]; -export function getStatementDiagnosticsReports(): Promise { - const req: SqlExecutionRequest = { - statements: [ - { - sql: `SELECT +export async function getStatementDiagnosticsReports(): Promise { + let result: StatementDiagnosticsResponse = []; + + const createReq = () => { + let offset = ""; + const args = []; + if (result.length > 0) { + // Using the id is more performant and reliable than offset. + // Schema is PRIMARY KEY (id) with INT8 DEFAULT unique_rowid() NOT NULL. + offset = " AND (id::STRING < $1) "; + const last = result[result.length - 1]; + args.push(last.id); + } + const query = `SELECT id::STRING, statement_fingerprint, completed, @@ -42,27 +53,43 @@ export function getStatementDiagnosticsReports(): Promise now() OR expires_at IS NULL OR completed = true`, + (expires_at > now() OR expires_at IS NULL OR completed = true) ${offset} + order by id desc`; + + return createSqlExecutionRequest(undefined, [ + { + sql: query, + arguments: args, }, - ], - execute: true, - max_result_size: LARGE_RESULT_SIZE, + ]); }; - return executeInternalSql(req).then(res => { - // If request succeeded but query failed, throw error (caught by saga/cacheDataReducer). - if (res.error) { - throw res.error; - } + const err = await executeInternalSqlHelper( + createReq, + (response: SqlTxnResult[]) => { + if (!response) { + return; + } - if (sqlResultsAreEmpty(res)) { - return []; - } + if (txnResultSetIsEmpty(response)) { + return; + } - return res.execution.txn_results[0].rows; - }); + response.forEach(x => { + if (x.rows && x.rows.length > 0) { + result = result.concat(x.rows); + } + }); + }, + ); + + if (err) { + throw err; + } + + return result; } type CheckPendingStmtDiagnosticRow = { @@ -146,9 +173,9 @@ function checkExistingDiagRequest(stmtFingerprint: string): Promise { const checkPendingStmtDiag = { sql: `SELECT count(1) FROM system.statement_diagnostics_requests WHERE - completed = false AND - statement_fingerprint = $1 AND - (expires_at IS NULL OR expires_at > now())`, + completed = false AND + statement_fingerprint = $1 AND + (expires_at IS NULL OR expires_at > now())`, arguments: [stmtFingerprint], };