From d11d509a8d06d2a810e2979f09d6aa2515ca701f Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 5 Apr 2023 17:39:25 -0400 Subject: [PATCH] kv: Add stats for delegate snapshots Fixes: #98243 This PR adds two new stats for delegate snapshots to track failure of sending snapshots. There are failures either before data is transferred or after the snapshot is received. Both stats are useful. Epic: none Release note: None --- pkg/kv/kvserver/metrics.go | 16 ++ pkg/kv/kvserver/replica_command.go | 2 +- pkg/kv/kvserver/replica_learner_test.go | 268 ++++++++++++++++++++++-- pkg/kv/kvserver/replica_raft.go | 9 + pkg/kv/kvserver/store_raft.go | 5 +- pkg/kv/kvserver/store_snapshot.go | 35 ++-- pkg/kv/kvserver/testing_knobs.go | 2 +- pkg/ts/catalog/chart_catalog.go | 2 + 8 files changed, 304 insertions(+), 35 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 496f9ccce673..fad54c8ba2bb 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -863,6 +863,18 @@ var ( Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaRangeSnapshotRecvFailed = metric.Metadata{ + Name: "range.snapshots.recv-failed", + Help: "Number of range snapshot initialization messages that errored out on the recipient, typically before any data is transferred", + Measurement: "Snapshots", + Unit: metric.Unit_COUNT, + } + metaRangeSnapshotRecvUnusable = metric.Metadata{ + Name: "range.snapshots.send-failed", + Help: "Number of range snapshot that were fully transmitted but determined to be unusable", + Measurement: "Snapshots", + Unit: metric.Unit_COUNT, + } metaRangeSnapshotSendQueueLength = metric.Metadata{ Name: "range.snapshots.send-queue", Help: "Number of snapshots queued to send", @@ -2038,6 +2050,8 @@ type StoreMetrics struct { RangeSnapshotRecoverySentBytes *metric.Counter RangeSnapshotRebalancingRcvdBytes *metric.Counter RangeSnapshotRebalancingSentBytes *metric.Counter + RangeSnapshotRecvFailed *metric.Counter + RangeSnapshotRecvUnusable *metric.Counter // Range snapshot queue metrics. RangeSnapshotSendQueueLength *metric.Gauge @@ -2639,6 +2653,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRecoverySentBytes: metric.NewCounter(metaRangeSnapshotRecoverySentBytes), RangeSnapshotRebalancingRcvdBytes: metric.NewCounter(metaRangeSnapshotRebalancingRcvdBytes), RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes), + RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed), + RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable), RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength), RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength), RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress), diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 12f7082c0650..38033df840ce 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2839,7 +2839,7 @@ func (r *Replica) sendSnapshotUsingDelegate( ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n+1, recipient, sender, ) - selfDelegate := n == len(senders)-1 + selfDelegate := sender.StoreID == r.StoreID() // On the last attempt, always queue on the delegate to time out naturally. if selfDelegate { diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index cda2a6fa913b..f05550d06e92 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -351,6 +351,42 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { require.NoError(t, g.Wait()) } +type expectedMetric struct { + DelegateSnapshotSuccesses int64 + DelegateSnapshotFailures int64 + RangeSnapshotRecvFailed int64 + RangeSnapshotRecvUnusable int64 +} + +func verifySnapshotMetrics( + t *testing.T, tc *testcluster.TestCluster, expected map[int]expectedMetric, +) { + for id, metrics := range expected { + server := tc.Server(id) + store, _ := server.GetStores().(*kvserver.Stores).GetStore(server.GetFirstStoreID()) + weakEqualf(t, metrics.DelegateSnapshotSuccesses, store.Metrics().DelegateSnapshotSuccesses.Count(), "metric successes, %d", id) + weakEqualf(t, metrics.DelegateSnapshotFailures, store.Metrics().DelegateSnapshotFailures.Count(), "metric failures, %d", id) + weakEqualf(t, metrics.RangeSnapshotRecvFailed, store.Metrics().RangeSnapshotRecvFailed.Count(), "metric recv failed, %d", id) + weakEqualf(t, metrics.RangeSnapshotRecvUnusable, store.Metrics().RangeSnapshotRecvUnusable.Count(), "metric recv unusable, %d", id) + } +} + +const issuesFixed = false + +// TODO(baptist): This should be require.Equalf, but this is not consistent +// enough. There are two reasons this is inconsistent. +// 1) Raft snapshots still sometimes sneak in. (#96841) +// 2) THe delegate hasn't since the updated descriptor in time. +func weakEqualf(t *testing.T, successes int64, count int64, s string, id int) { + if issuesFixed { + require.Equalf(t, successes, count, s, id) + } else { + if successes != count { + log.Warningf(context.Background(), "Not equal, expected %d, got %d for %s %d", successes, count, s, id) + } + } +} + // TestDelegateSnapshot verifies that the correct delegate is chosen when // sending snapshots to stores. func TestDelegateSnapshot(t *testing.T) { @@ -385,6 +421,12 @@ func TestDelegateSnapshot(t *testing.T) { ReplicationMode: base.ReplicationManual, }, ) + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) scratchKey := tc.ScratchRange(t) defer tc.Stopper().Stop(ctx) @@ -395,6 +437,13 @@ func TestDelegateSnapshot(t *testing.T) { require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(1), "Wrong sender for request %+v", request) } + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + // Node 4 (loc B) should get the delegated snapshot from node 3 which is the // same locality. { @@ -410,6 +459,7 @@ func TestDelegateSnapshot(t *testing.T) { } return nil }) + request := <-requestChannel require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(3), "Wrong type of request %+v", request) // TODO(abaptist): Remove this loop. Sometimes the delegated request fails @@ -421,6 +471,13 @@ func TestDelegateSnapshot(t *testing.T) { for len(requestChannel) > 0 { <-requestChannel } + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {1, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) } // Node 2 (loc A) should get the snapshot from node 1 as they have the same locality. @@ -429,6 +486,12 @@ func TestDelegateSnapshot(t *testing.T) { request := <-requestChannel require.Equalf(t, request.DelegatedSender.StoreID, roachpb.StoreID(1), "Wrong type of request %+v", request) } + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {1, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) } // TestDelegateSnapshotFails is a test that ensure we fail fast when the @@ -443,7 +506,11 @@ func TestDelegateSnapshotFails(t *testing.T) { desc []roachpb.ReplicaDescriptor } - setupFn := func(t *testing.T) ( + setupFn := func(t *testing.T, + receiveFunc func(*kvserverpb.SnapshotRequest_Header) error, + sendFunc func(*kvserverpb.DelegateSendSnapshotRequest), + processRaft func(roachpb.StoreID) bool, + ) ( *testcluster.TestCluster, roachpb.Key, ) { @@ -457,6 +524,9 @@ func TestDelegateSnapshotFails(t *testing.T) { defer senders.mu.Unlock() return senders.desc } + ltk.storeKnobs.ReceiveSnapshot = receiveFunc + ltk.storeKnobs.SendSnapshot = sendFunc + ltk.storeKnobs.DisableProcessRaft = processRaft tc := testcluster.StartTestCluster( t, 4, base.TestClusterArgs{ @@ -473,7 +543,7 @@ func TestDelegateSnapshotFails(t *testing.T) { // the learner is on. Assert that the failure is detected and change replicas // fails fast. t.Run("receiver", func(t *testing.T) { - tc, scratchKey := setupFn(t) + tc, scratchKey := setupFn(t, nil, nil, nil) defer tc.Stopper().Stop(ctx) desc, err := tc.LookupRange(scratchKey) @@ -487,13 +557,19 @@ func TestDelegateSnapshotFails(t *testing.T) { ) require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) }) // Add a follower replica to act as the snapshot sender, and kill the server // the sender is on. Assert that the failure is detected and change replicas // fails fast. t.Run("sender_no_fallback", func(t *testing.T) { - tc, scratchKey := setupFn(t) + tc, scratchKey := setupFn(t, nil, nil, nil) defer tc.Stopper().Stop(ctx) // Add a replica that will be the delegated sender, and another so we have @@ -514,27 +590,30 @@ func TestDelegateSnapshotFails(t *testing.T) { _, err = tc.Servers[0].DB().AdminChangeReplicas( ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), ) - log.Infof(ctx, "Err=%v", err) - require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + require.ErrorContains(t, err, "partitioned") + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) }) // Identical setup as the previous test, but allow a fallback to the leaseholder. t.Run("sender_with_fallback", func(t *testing.T) { - tc, scratchKey := setupFn(t) + tc, scratchKey := setupFn(t, nil, nil, nil) defer tc.Stopper().Stop(ctx) // Add a replica that will be the delegated sender, and another so we have // quorum with this node down desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) - replicaDesc, ok := desc.GetReplicaDescriptor(3) - require.True(t, ok) - leaseholderDesc, ok := desc.GetReplicaDescriptor(1) - require.True(t, ok) + store3, _ := desc.GetReplicaDescriptor(3) + store1, _ := desc.GetReplicaDescriptor(1) // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. senders.mu.Lock() - senders.desc = append(senders.desc, replicaDesc) - senders.desc = append(senders.desc, leaseholderDesc) + senders.desc = append(senders.desc, store3) + senders.desc = append(senders.desc, store1) senders.mu.Unlock() // Now stop accepting traffic to node 3 (index 2). @@ -545,6 +624,171 @@ func TestDelegateSnapshotFails(t *testing.T) { ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), ) require.NoError(t, err) + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + t.Run("receiver_rejects", func(t *testing.T) { + var block atomic.Int32 + tc, scratchKey := setupFn( + t, + func(h *kvserverpb.SnapshotRequest_Header) error { + // TODO(abaptist): Remove this check once #96841 is fixed. + if h.SenderQueueName == kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE { + return nil + } + if val := block.Load(); val > 0 { + block.Add(-1) + return errors.Newf("BAM: receive error %d", val) + } + return nil + }, + nil, + nil, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + store3, _ := desc.GetReplicaDescriptor(3) + store1, _ := desc.GetReplicaDescriptor(1) + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, store3) + senders.desc = append(senders.desc, store1) + senders.mu.Unlock() + + block.Store(2) + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.ErrorContains(t, err, "BAM: receive error") + + // There will be two attempts to send this, both fail. + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 2, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + // Test that the delegate that doesn't have + t.Run("delegate_missing_range", func(t *testing.T) { + tc, scratchKey := setupFn(t, nil, nil, nil) + defer tc.Stopper().Stop(ctx) + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...) + + // Choose the store that doesn't have the range first. + store4, _ := desc.GetReplicaDescriptor(4) + store1, _ := desc.GetReplicaDescriptor(1) + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, store4) + senders.desc = append(senders.desc, store1) + senders.mu.Unlock() + + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + + // NB: This test indicates a potential problem with delegated snapshots as + // they currently work. The generation changes on a change replica request, + // however if the delegation request races ahead of the Raft level update, + // then the delegate will not be used. In testing we don't see this often, + // however it is something to watch out for especially on overloaded servers. + t.Run("delegate_raft_slow", func(t *testing.T) { + var block atomic.Bool + tc, scratchKey := setupFn(t, nil, nil, + func(id roachpb.StoreID) bool { + return id == 4 && block.Load() + }) + defer tc.Stopper().Stop(ctx) + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + // Choose the store which we are about to block. + store4, _ := desc.GetReplicaDescriptor(4) + senders.mu.Lock() + senders.desc = append(senders.desc, store4) + senders.mu.Unlock() + + // Don't allow store 4 to see the new descriptor through Raft. + block.Store(true) + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.ErrorContains(t, err, "generation has changed") + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {0, 1, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) + }) + + // This test ensures that the leader doesn't truncate while the delegate + // snapshot is in flight. Right after the leader sends the delegate request, + // but before the snapshot has been created, truncate the log. This test is + // not "as good" as it could be since the new node is in the probe state so + // the log truncation constraints aren't really necessary since we don't + // truncate anything in that state. This test will be better if it could be + // tested on Raft snapshots. + t.Run("truncate_during_send", func(t *testing.T) { + var blockRaft atomic.Bool + var truncateLog func() + + tc, scratchKey := setupFn(t, nil, + func(*kvserverpb.DelegateSendSnapshotRequest) { + if blockRaft.Load() { + truncateLog() + } + }, nil) + defer tc.Stopper().Stop(ctx) + // This will truncate the log on the first store. + truncateLog = func() { + server := tc.Servers[0] + store, _ := server.GetStores().(*kvserver.Stores).GetStore(server.GetFirstStoreID()) + store.MustForceRaftLogScanAndProcess() + } + + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + // Chose a delegate to block. + store4, _ := desc.GetReplicaDescriptor(4) + store1, _ := desc.GetReplicaDescriptor(1) + senders.mu.Lock() + senders.desc = append(senders.desc, store4) + senders.desc = append(senders.desc, store1) + senders.mu.Unlock() + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + + // Don't allow the new store to see Raft updates. + blockRaft.Store(true) + _, err := tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + + verifySnapshotMetrics(t, tc, map[int]expectedMetric{ + 0: {1, 0, 0, 0}, + 1: {0, 0, 0, 0}, + 2: {0, 0, 0, 0}, + 3: {0, 0, 0, 0}, + }) }) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 763f04f7363e..2877442b01e4 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -704,6 +704,15 @@ var noSnap IncomingSnapshot func (r *Replica) handleRaftReady( ctx context.Context, inSnap IncomingSnapshot, ) (handleRaftReadyStats, error) { + + // Don't process anything if this fn returns false. + if fn := r.store.cfg.TestingKnobs.DisableProcessRaft; fn != nil && fn(r.store.StoreID()) { + return handleRaftReadyStats{ + tBegin: timeutil.Now(), + tEnd: timeutil.Now(), + }, nil + } + r.raftMu.Lock() defer r.raftMu.Unlock() return r.handleRaftReadyRaftMuLocked(ctx, inSnap) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index dc99302c14ae..89f6b7043dea 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -470,6 +470,7 @@ func (s *Store) processRaftSnapshotRequest( // multiple snapshots raced (as is possible when raft leadership changes // and both the old and new leaders send snapshots). log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index) + s.metrics.RangeSnapshotRecvUnusable.Inc(1) } return nil }) @@ -710,10 +711,6 @@ func (s *Store) nodeIsLiveCallback(l livenesspb.Liveness) { } func (s *Store) processRaft(ctx context.Context) { - if s.cfg.TestingKnobs.DisableProcessRaft { - return - } - s.scheduler.Start(s.stopper) // Wait for the scheduler worker goroutines to finish. if err := s.stopper.RunAsyncTask(ctx, "sched-wait", s.scheduler.Wait); err != nil { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 72f68241c430..8b1331b0184a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -105,6 +105,7 @@ type snapshotStrategy interface { // constructs an IncomingSnapshot. Receive( context.Context, + *Store, incomingSnapshotStream, kvserverpb.SnapshotRequest_Header, snapshotRecordMetrics, @@ -367,11 +368,14 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue { // key space across to the next key span. func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, + s *Store, stream incomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, recordBytesReceived snapshotRecordMetrics, ) (IncomingSnapshot, error) { assertStrategy(ctx, header, kvserverpb.SnapshotRequest_KV_BATCH) + sp := tracing.SpanFromContext(ctx) + rec := sp.GetConfiguredRecording() // These stopwatches allow us to time the various components of Receive(). // - totalTime Stopwatch measures the total time spent within this function. @@ -410,7 +414,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return noSnap, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(s, stream, err, rec) } if req.KVBatch != nil { @@ -476,7 +480,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "client error: invalid snapshot") - return noSnap, sendSnapshotError(stream, err) + return noSnap, sendSnapshotError(s, stream, err, rec) } inSnap := IncomingSnapshot{ @@ -951,6 +955,7 @@ func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { sp := tracing.SpanFromContext(ctx) + rec := sp.GetConfiguredRecording() // Draining nodes will generally not be rebalanced to (see the filtering that // happens in getStoreListFromIDsLocked()), but in case they are, they should @@ -966,7 +971,7 @@ func (s *Store) receiveSnapshot( // getStoreListFromIDsLocked(). Is that sound? Don't we want to // upreplicate to draining nodes if there are no other candidates? case kvserverpb.SnapshotRequest_REBALANCE: - return sendSnapshotError(stream, errors.New(storeDrainingMsg)) + return sendSnapshotError(s, stream, errors.New(storeDrainingMsg), rec) default: // If this a new snapshot type that this cockroach version does not know // about, we let it through. @@ -977,7 +982,7 @@ func (s *Store) receiveSnapshot( if err := fn(header); err != nil { // NB: we intentionally don't mark this error as errMarkSnapshotError so // that we don't end up retrying injected errors in tests. - return sendSnapshotError(stream, err) + return sendSnapshotError(s, stream, err, rec) } } @@ -1017,7 +1022,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return sendSnapshotError(stream, pErr.GoError()) + return sendSnapshotError(s, stream, pErr.GoError(), rec) } defer func() { @@ -1039,7 +1044,7 @@ func (s *Store) receiveSnapshot( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") - return sendSnapshotError(stream, err) + return sendSnapshotError(s, stream, err, rec) } ss = &kvBatchSnapshotStrategy{ @@ -1049,9 +1054,10 @@ func (s *Store) receiveSnapshot( } defer ss.Close(ctx) default: - return sendSnapshotError(stream, + return sendSnapshotError(s, stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", s, header.State.Desc.RangeID, header.Strategy), + rec, ) } @@ -1078,14 +1084,12 @@ func (s *Store) receiveSnapshot( } ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors - inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived) + inSnap, err := ss.Receive(ctx, s, stream, *header, recordBytesReceived) if err != nil { return err } inSnap.placeholder = placeholder - rec := sp.GetConfiguredRecording() - // Use a background context for applying the snapshot, as handleRaftReady is // not prepared to deal with arbitrary context cancellation. Also, we've // already received the entire snapshot here, so there's no point in @@ -1097,7 +1101,7 @@ func (s *Store) receiveSnapshot( // sender as this being a retriable error, see isSnapshotError(). err = errors.Mark(err, errMarkSnapshotError) err = errors.Wrap(err, "failed to apply snapshot") - return sendSnapshotErrorWithTrace(stream, err, rec) + return sendSnapshotError(s, stream, err, rec) } return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, @@ -1105,13 +1109,10 @@ func (s *Store) receiveSnapshot( }) } -func sendSnapshotError(stream incomingSnapshotStream, err error) error { - return sendSnapshotErrorWithTrace(stream, err, nil /* trace */) -} - -func sendSnapshotErrorWithTrace( - stream incomingSnapshotStream, err error, trace tracingpb.Recording, +func sendSnapshotError( + s *Store, stream incomingSnapshotStream, err error, trace tracingpb.Recording, ) error { + s.metrics.RangeSnapshotRecvFailed.Inc(1) resp := snapRespErr(err) resp.CollectedSpans = trace return stream.Send(resp) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index cf9a78b15653..46e3806fe837 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -241,7 +241,7 @@ type StoreTestingKnobs struct { // of Raft group ticks. RefreshReasonTicksPeriod int // DisableProcessRaft disables the process raft loop. - DisableProcessRaft bool + DisableProcessRaft func(roachpb.StoreID) bool // DisableLastProcessedCheck disables checking on replica queue last processed times. DisableLastProcessedCheck bool // ReplicateQueueAcceptsUnsplit allows the replication queue to diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 342df09ab129..0533a9a2e321 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -638,6 +638,8 @@ var charts = []sectionDescription{ "range.snapshots.applied-non-voter", "range.snapshots.delegate.successes", "range.snapshots.delegate.failures", + "range.snapshots.recv-failed", + "range.snapshots.send-failed", }, }, {