From 8292a2950ef8a572354ce6df756ef7b6509b63f4 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 5 Apr 2023 17:39:25 -0400 Subject: [PATCH] kv: Add stats for delegate snapshots Fixes: #98243 This PR adds two new stats for delegate snapshots to track failure of sending snapshots. There are failures either before data is transferred or after the snapshot is received. Epic: none Release note: This commit adds two new stats which are useful for tracking the efficiency of snapshot transfers. Some snapshots will always fail due to system level "races", but the goal is to keep it as low as possible. range.snapshots.recv-failed - The number of snapshot send attempts that are initiated but not accepted by the recipient. range.snapshots.recv-unusable - The number of snapshots that were fully transmitted but not used. --- pkg/kv/kvserver/metrics.go | 16 ++ pkg/kv/kvserver/replica_command.go | 2 +- pkg/kv/kvserver/replica_learner_test.go | 267 ++++++++++++++++++++++-- pkg/kv/kvserver/replica_raft.go | 9 + pkg/kv/kvserver/store_raft.go | 5 +- pkg/kv/kvserver/store_snapshot.go | 42 ++-- pkg/kv/kvserver/testing_knobs.go | 2 +- pkg/ts/catalog/chart_catalog.go | 2 + 8 files changed, 301 insertions(+), 44 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 4c05133b9ad6..b410891bf691 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -842,6 +842,18 @@ var ( Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaRangeSnapshotRecvFailed = metric.Metadata{ + Name: "range.snapshots.recv-failed", + Help: "Number of range snapshot initialization messages that errored out on the recipient, typically before any data is transferred", + Measurement: "Snapshots", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRecvUnusable = metric.Metadata{ + Name: "range.snapshots.recv-unusable", + Help: "Number of range snapshot that were fully transmitted but determined to be unnecessary or unusable", + Measurement: "Snapshots", + Unit: metric.Unit_COUNT, + } metaRangeSnapshotSendQueueLength = metric.Metadata{ Name: "range.snapshots.send-queue", Help: "Number of snapshots queued to send", @@ -2017,6 +2029,8 @@ type StoreMetrics struct { RangeSnapshotRecoverySentBytes *metric.Counter RangeSnapshotRebalancingRcvdBytes *metric.Counter RangeSnapshotRebalancingSentBytes *metric.Counter + RangeSnapshotRecvFailed *metric.Counter + RangeSnapshotRecvUnusable *metric.Counter // Range snapshot queue metrics. RangeSnapshotSendQueueLength *metric.Gauge @@ -2574,6 +2588,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRecoverySentBytes: metric.NewCounter(metaRangeSnapshotRecoverySentBytes), RangeSnapshotRebalancingRcvdBytes: metric.NewCounter(metaRangeSnapshotRebalancingRcvdBytes), RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes), + RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed), + RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable), RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength), RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength), RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress), diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 3f6817fd465b..5c17b8221b4a 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2838,7 +2838,7 @@ func (r *Replica) sendSnapshotUsingDelegate( ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n+1, recipient, sender, ) - selfDelegate := n == len(senders)-1 + selfDelegate := sender.StoreID == r.StoreID() // On the last attempt, always queue on the delegate to time out naturally. if selfDelegate { diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index a3e728bf74f1..65be73a53804 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -351,6 +351,42 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { require.NoError(t, g.Wait()) } +type expectedMetric struct { + DelegateSnapshotSuccesses int64 + DelegateSnapshotFailures int64 + RangeSnapshotRecvFailed int64 + RangeSnapshotRecvUnusable int64 +} + +func verifySnapshotMetrics( + t *testing.T, tc *testcluster.TestCluster, expected map[int]expectedMetric, +) { + for id, metrics := range expected { + server := tc.Server(id) + store, _ := server.GetStores().(*kvserver.Stores).GetStore(server.GetFirstStoreID()) + weakEqualf(t, metrics.DelegateSnapshotSuccesses, store.Metrics().DelegateSnapshotSuccesses.Count(), "metric successes, %d", id) + weakEqualf(t, metrics.DelegateSnapshotFailures, store.Metrics().DelegateSnapshotFailures.Count(), "metric failures, %d", id) + weakEqualf(t, metrics.RangeSnapshotRecvFailed, store.Metrics().RangeSnapshotRecvFailed.Count(), "metric recv failed, %d", id) + weakEqualf(t, metrics.RangeSnapshotRecvUnusable, store.Metrics().RangeSnapshotRecvUnusable.Count(), "metric recv unusable, %d", id) + } +} + +const issuesFixed = false + +// TODO(baptist): This should be require.Equalf, but this is not consistent +// enough. There are two reasons this is inconsistent. +// 1) Raft snapshots still sometimes sneak in. (#96841) +// 2) The delegate hasn't seen the updated descriptor in time. +func weakEqualf(t *testing.T, successes int64, count int64, s string, id int) { + if issuesFixed { + require.Equalf(t, successes, count, s, id) + } else { + if successes != count { + log.Warningf(context.Background(), "Not equal, expected %d, got %d for %s %d", successes, count, s, id) + } + } +} + // TestDelegateSnapshot verifies that the correct delegate is chosen when // sending snapshots to stores. func TestDelegateSnapshot(t *testing.T) { @@ -385,6 +421,12 @@ func TestDelegateSnapshot(t *testing.T) { ReplicationMode: base.ReplicationManual, }, ) + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) scratchKey := tc.ScratchRange(t) defer tc.Stopper().Stop(ctx) @@ -395,6 +437,13 @@ func TestDelegateSnapshot(t *testing.T) { require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(1), "Wrong sender for request %+v", request) } + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + // Node 4 (loc B) should get the delegated snapshot from node 3 which is the // same locality. { @@ -410,6 +459,7 @@ func TestDelegateSnapshot(t *testing.T) { } return nil }) + request := <-requestChannel require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(3), "Wrong type of request %+v", request) // TODO(abaptist): Remove this loop. Sometimes the delegated request fails @@ -421,6 +471,13 @@ func TestDelegateSnapshot(t *testing.T) { for len(requestChannel) > 0 { <-requestChannel } + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {1, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) } // Node 2 (loc A) should get the snapshot from node 1 as they have the same locality. @@ -429,6 +486,12 @@ func TestDelegateSnapshot(t *testing.T) { request := <-requestChannel require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(1), "Wrong type of request %+v", request) } + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {1, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) } // TestDelegateSnapshotFails is a test that ensure we fail fast when the @@ -443,7 +506,11 @@ func TestDelegateSnapshotFails(t *testing.T) { desc []roachpb.ReplicaDescriptor } - setupFn := func(t *testing.T) ( + setupFn := func(t *testing.T, + receiveFunc func(*kvserverpb.SnapshotRequest_Header) error, + sendFunc func(*kvserverpb.DelegateSendSnapshotRequest), + processRaft func(roachpb.StoreID) bool, + ) ( *testcluster.TestCluster, roachpb.Key, ) { @@ -457,6 +524,9 @@ func TestDelegateSnapshotFails(t *testing.T) { defer senders.mu.Unlock() return senders.desc } + ltk.storeKnobs.ReceiveSnapshot = receiveFunc + ltk.storeKnobs.SendSnapshot = sendFunc + ltk.storeKnobs.DisableProcessRaft = processRaft tc := testcluster.StartTestCluster( t, 4, base.TestClusterArgs{ @@ -473,7 +543,7 @@ func TestDelegateSnapshotFails(t *testing.T) { // the learner is on. Assert that the failure is detected and change replicas // fails fast. t.Run("receiver", func(t *testing.T) { - tc, scratchKey := setupFn(t) + tc, scratchKey := setupFn(t, nil, nil, nil) defer tc.Stopper().Stop(ctx) desc, err := tc.LookupRange(scratchKey) @@ -487,24 +557,28 @@ func TestDelegateSnapshotFails(t *testing.T) { ) require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) }) // Add a follower replica to act as the snapshot sender, and kill the server // the sender is on. Assert that the failure is detected and change replicas // fails fast. t.Run("sender_no_fallback", func(t *testing.T) { - tc, scratchKey := setupFn(t) + tc, scratchKey := setupFn(t, nil, nil, nil) defer tc.Stopper().Stop(ctx) // Add a replica that will be the delegated sender, and another so we have - // quorum with this node down + // quorum with this node down. desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) - replicaDesc, ok := desc.GetReplicaDescriptor(3) - require.True(t, ok) // Always use node 3 (index 2) as the only delegate. senders.mu.Lock() - senders.desc = append(senders.desc, replicaDesc) + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3}) senders.mu.Unlock() // Now stop accepting traffic to node 3 (index 2). @@ -514,27 +588,30 @@ func TestDelegateSnapshotFails(t *testing.T) { _, err = tc.Servers[0].DB().AdminChangeReplicas( ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), ) - log.Infof(ctx, "Err=%v", err) - require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + // The delegate can not send this request since it does not have the latest + // generation descriptor. + require.ErrorContains(t, err, "generation has changed") + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) }) // Identical setup as the previous test, but allow a fallback to the leaseholder. t.Run("sender_with_fallback", func(t *testing.T) { - tc, scratchKey := setupFn(t) + tc, scratchKey := setupFn(t, nil, nil, nil) defer tc.Stopper().Stop(ctx) // Add a replica that will be the delegated sender, and another so we have // quorum with this node down desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) - replicaDesc, ok := desc.GetReplicaDescriptor(3) - require.True(t, ok) - leaseholderDesc, ok := desc.GetReplicaDescriptor(1) - require.True(t, ok) // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. senders.mu.Lock() - senders.desc = append(senders.desc, replicaDesc) - senders.desc = append(senders.desc, leaseholderDesc) + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3}) + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1}) senders.mu.Unlock() // Now stop accepting traffic to node 3 (index 2). @@ -545,6 +622,164 @@ func TestDelegateSnapshotFails(t *testing.T) { ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), ) require.NoError(t, err) + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + t.Run("receiver_rejects", func(t *testing.T) { + var block atomic.Int32 + tc, scratchKey := setupFn( + t, + func(h *kvserverpb.SnapshotRequest_Header) error { + // TODO(abaptist): Remove this check once #96841 is fixed. + if h.SenderQueueName == kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE { + return nil + } + if val := block.Load(); val > 0 { + block.Add(-1) + return errors.Newf("BAM: receive error %d", val) + } + return nil + }, + nil, + nil, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3}) + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1}) + senders.mu.Unlock() + + block.Store(2) + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.ErrorContains(t, err, "BAM: receive error") + + // There will be two attempts to send this, both fail. + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 2, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + // Test that the delegate that doesn't have the snapshot that we fall back to + // the leaseholder and correctly increment the stats. + t.Run("delegate_missing_range", func(t *testing.T) { + tc, scratchKey := setupFn(t, nil, nil, nil) + defer tc.Stopper().Stop(ctx) + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...) + + // First try to use node 4 (index 3) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4}) + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1}) + senders.mu.Unlock() + + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + + // NB: This test indicates a potential problem with delegated snapshots as + // they currently work. The generation changes on a change replica request, + // however if the delegation request races ahead of the Raft level update, + // then the delegate will not be used. In testing we don't see this often, + // however it is something to watch out for especially on overloaded servers. + t.Run("delegate_raft_slow", func(t *testing.T) { + var block atomic.Bool + tc, scratchKey := setupFn(t, nil, nil, + func(id roachpb.StoreID) bool { + return id == 4 && block.Load() + }) + defer tc.Stopper().Stop(ctx) + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + // Choose the store which we are about to block. + senders.mu.Lock() + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4}) + senders.mu.Unlock() + + // Don't allow store 4 to see the new descriptor through Raft. + block.Store(true) + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.ErrorContains(t, err, "generation has changed") + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + + // This test ensures that the leader doesn't truncate while the delegate + // snapshot is in flight. Right after the leader sends the delegate request, + // but before the snapshot has been created, truncate the log. This test is + // not "as good" as it could be since the new node is in the probe state so + // the log truncation constraints aren't really necessary since we don't + // truncate anything in that state. This test will be better if it could be + // tested on Raft snapshots. + t.Run("truncate_during_send", func(t *testing.T) { + var blockRaft atomic.Bool + var truncateLog func() + + tc, scratchKey := setupFn(t, nil, + func(*kvserverpb.DelegateSendSnapshotRequest) { + if blockRaft.Load() { + truncateLog() + } + }, nil) + defer tc.Stopper().Stop(ctx) + // This will truncate the log on the first store. + truncateLog = func() { + server := tc.Servers[0] + store, _ := server.GetStores().(*kvserver.Stores).GetStore(server.GetFirstStoreID()) + store.MustForceRaftLogScanAndProcess() + } + + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + // Chose a delegate to block. + senders.mu.Lock() + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4}) + senders.desc = append(senders.desc, roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1}) + senders.mu.Unlock() + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + + // Don't allow the new store to see Raft updates. + blockRaft.Store(true) + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {1, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) }) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 6c3134f92e08..54c400c88b4c 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -704,6 +704,15 @@ var noSnap IncomingSnapshot func (r *Replica) handleRaftReady( ctx context.Context, inSnap IncomingSnapshot, ) (handleRaftReadyStats, error) { + + // Don't process anything if this fn returns false. + if fn := r.store.cfg.TestingKnobs.DisableProcessRaft; fn != nil && fn(r.store.StoreID()) { + return handleRaftReadyStats{ + tBegin: timeutil.Now(), + tEnd: timeutil.Now(), + }, nil + } + r.raftMu.Lock() defer r.raftMu.Unlock() return r.handleRaftReadyRaftMuLocked(ctx, inSnap) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index dc99302c14ae..89f6b7043dea 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -470,6 +470,7 @@ func (s *Store) processRaftSnapshotRequest( // multiple snapshots raced (as is possible when raft leadership changes // and both the old and new leaders send snapshots). log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index) + s.metrics.RangeSnapshotRecvUnusable.Inc(1) } return nil }) @@ -710,10 +711,6 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { } func (s *Store) processRaft(ctx context.Context) { - if s.cfg.TestingKnobs.DisableProcessRaft { - return - } - s.scheduler.Start(s.stopper) // Wait for the scheduler worker goroutines to finish. if err := s.stopper.RunAsyncTask(ctx, "sched-wait", s.scheduler.Wait); err != nil { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 72f68241c430..66c5c66633c7 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -36,7 +36,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -105,6 +104,7 @@ type snapshotStrategy interface { // constructs an IncomingSnapshot. Receive( context.Context, + *Store, incomingSnapshotStream, kvserverpb.SnapshotRequest_Header, snapshotRecordMetrics, @@ -367,6 +367,7 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue { // key space across to the next key span. func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, + s *Store, stream incomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, recordBytesReceived snapshotRecordMetrics, @@ -410,7 +411,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return noSnap, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(ctx, s, stream, err) } if req.KVBatch != nil { @@ -476,7 +477,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "client error: invalid snapshot") - return noSnap, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(ctx, s, stream, err) } inSnap := IncomingSnapshot{ @@ -950,8 +951,6 @@ func (s *Store) checkSnapshotOverlapLocked( func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { - sp := tracing.SpanFromContext(ctx) - // Draining nodes will generally not be rebalanced to (see the filtering that // happens in getStoreListFromIDsLocked()), but in case they are, they should // reject the incoming rebalancing snapshots. @@ -966,7 +965,7 @@ func (s *Store) receiveSnapshot( // getStoreListFromIDsLocked(). Is that sound? Don't we want to // upreplicate to draining nodes if there are no other candidates? case kvserverpb.SnapshotRequest_REBALANCE: - return sendSnapshotError(stream, errors.New(storeDrainingMsg)) + return sendSnapshotError(ctx, s, stream, errors.New(storeDrainingMsg)) default: // If this a new snapshot type that this cockroach version does not know // about, we let it through. @@ -977,7 +976,7 @@ func (s *Store) receiveSnapshot( if err := fn(header); err != nil { // NB: we intentionally don't mark this error as errMarkSnapshotError so // that we don't end up retrying injected errors in tests. - return sendSnapshotError(stream, err) + return sendSnapshotError(ctx, s, stream, err) } } @@ -1017,7 +1016,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return sendSnapshotError(stream, pErr.GoError()) + return sendSnapshotError(ctx, s, stream, pErr.GoError()) } defer func() { @@ -1039,7 +1038,7 @@ func (s *Store) receiveSnapshot( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") - return sendSnapshotError(stream, err) + return sendSnapshotError(ctx, s, stream, err) } ss = &kvBatchSnapshotStrategy{ @@ -1049,7 +1048,7 @@ func (s *Store) receiveSnapshot( } defer ss.Close(ctx) default: - return sendSnapshotError(stream, + return sendSnapshotError(ctx, s, stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", s, header.State.Desc.RangeID, header.Strategy), ) @@ -1078,14 +1077,12 @@ func (s *Store) receiveSnapshot( } ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors - inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived) + inSnap, err := ss.Receive(ctx, s, stream, *header, recordBytesReceived) if err != nil { return err } inSnap.placeholder = placeholder - rec := sp.GetConfiguredRecording() - // Use a background context for applying the snapshot, as handleRaftReady is // not prepared to deal with arbitrary context cancellation. Also, we've // already received the entire snapshot here, so there's no point in @@ -1097,23 +1094,24 @@ func (s *Store) receiveSnapshot( // sender as this being a retriable error, see isSnapshotError(). err = errors.Mark(err, errMarkSnapshotError) err = errors.Wrap(err, "failed to apply snapshot") - return sendSnapshotErrorWithTrace(stream, err, rec) + return sendSnapshotError(ctx, s, stream, err) } return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, - CollectedSpans: rec, + CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(), }) } -func sendSnapshotError(stream incomingSnapshotStream, err error) error { - return sendSnapshotErrorWithTrace(stream, err, nil /* trace */) -} - -func sendSnapshotErrorWithTrace( - stream incomingSnapshotStream, err error, trace tracingpb.Recording, +// sendSnapshotError sends an error response back to the sender of this snapshot +// to signify that it can not accept this snapshot. Internally it increments the +// statistic tracking how many invalid snapshots it received. +func sendSnapshotError( + ctx context.Context, s *Store, stream incomingSnapshotStream, err error, ) error { + s.metrics.RangeSnapshotRecvFailed.Inc(1) resp := snapRespErr(err) - resp.CollectedSpans = trace + resp.CollectedSpans = tracing.SpanFromContext(ctx).GetConfiguredRecording() + return stream.Send(resp) } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index cf9a78b15653..46e3806fe837 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -241,7 +241,7 @@ type StoreTestingKnobs struct { // of Raft group ticks. RefreshReasonTicksPeriod int // DisableProcessRaft disables the process raft loop. - DisableProcessRaft bool + DisableProcessRaft func(roachpb.StoreID) bool // DisableLastProcessedCheck disables checking on replica queue last processed times. DisableLastProcessedCheck bool // ReplicateQueueAcceptsUnsplit allows the replication queue to diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index aa30036f9134..f932b369d61d 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -632,6 +632,8 @@ var charts = []sectionDescription{ "range.snapshots.applied-non-voter", "range.snapshots.delegate.successes", "range.snapshots.delegate.failures", + "range.snapshots.recv-failed", + "range.snapshots.recv-unusable", }, }, {