From 23e51e7d8652a57ff9cccf34819c20feb4872cc3 Mon Sep 17 00:00:00 2001 From: Ibrahim Kettaneh Date: Tue, 9 Jul 2024 10:04:44 -0400 Subject: [PATCH] loqrecovery: make make-plan CLI print updates Right now, the loss-of-quorum CLI tool only prints the final recovery plan after it finishes running. However, for large clusters, it might take a long time until it finishes. This made it unclear whether the tool is still making progress, or it is stuck. This PR changes that by making the tool print some of the server updates. In particular, the CLI tool now will print the node that the server is currently streaming replica info from. Fixes: #122640 Release note: None --- pkg/cli/debug_recover_loss_of_quorum.go | 6 +++-- pkg/cli/debug_recover_loss_of_quorum_test.go | 23 +++++++++++++++++++ pkg/kv/kvserver/loqrecovery/collect.go | 14 ++++++++++- .../loqrecovery/collect_raft_log_test.go | 3 ++- .../loqrecovery/server_integration_test.go | 12 ++++++---- 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index aa2fc0e40713..22ac92e473cd 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -318,7 +318,8 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to get admin connection to cluster") } defer finish() - replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverCollectInfoOpts.maxConcurrency) + replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, + debugRecoverCollectInfoOpts.maxConcurrency, stderr /* logOutput */) if err != nil { return errors.WithHint(errors.Wrap(err, "failed to retrieve replica info from cluster"), @@ -435,7 +436,8 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to get admin connection to cluster") } defer finish() - replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverPlanOpts.maxConcurrency) + replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, + debugRecoverPlanOpts.maxConcurrency, stderr /* logOutput */) if err != nil { return errors.Wrapf(err, "failed to retrieve replica info from cluster") } diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 4d6da545af4e..e80ba39098f8 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "os" + "sync/atomic" "testing" "time" @@ -469,6 +470,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { // TODO(oleg): Make test run with 7 nodes to exercise cases where multiple // replicas survive. Current startup and allocator behaviour would make // this test flaky. + var failCount atomic.Int64 sa := make(map[int]base.TestServerArgs) for i := 0; i < 3; i++ { sa[i] = base.TestServerArgs{ @@ -476,6 +478,23 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { Server: &server.TestingKnobs{ StickyVFSRegistry: fs.NewStickyRegistry(), }, + LOQRecovery: &loqrecovery.TestingKnobs{ + MetadataScanTimeout: 15 * time.Second, + ForwardReplicaFilter: func( + response *serverpb.RecoveryCollectLocalReplicaInfoResponse, + ) error { + // Artificially add an error that would cause the server to retry + // the replica info for node 1. Note that we only add an error after + // we return the first replica info for that node. + // This helps in verifying that the replica info get discarded and + // the node is revisited (based on the logs). + if response != nil && response.ReplicaInfo.NodeID == 1 && + failCount.Add(1) < 3 && failCount.Load() > 1 { + return errors.New("rpc stream stopped") + } + return nil + }, + }, }, StoreSpecs: []base.StoreSpec{ { @@ -532,6 +551,10 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { "--plan=" + planFile, }) require.NoError(t, err, "failed to run make-plan") + require.Contains(t, out, "Started getting replica info for node_id:1", + "planner didn't log the visited nodes properly") + require.Contains(t, out, "Discarding replica info for node_id:1", + "planner didn't log the discarded nodes properly") require.Contains(t, out, fmt.Sprintf("- node n%d", node1ID), "planner didn't provide correct apply instructions") require.FileExists(t, planFile, "generated plan file") diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 718946e89854..0772e363a393 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -13,6 +13,7 @@ package loqrecovery import ( "cmp" "context" + "fmt" "io" "math" "slices" @@ -46,8 +47,10 @@ type CollectionStats struct { // maxConcurrency is the maximum parallelism that will be used when fanning out // RPCs to nodes in the cluster. A value of 0 disables concurrency. A negative // value configures no limit for concurrency. +// If logOutput is not nil, this function will write when a node is visited, +// and when a node needs to be revisited. func CollectRemoteReplicaInfo( - ctx context.Context, c serverpb.AdminClient, maxConcurrency int, + ctx context.Context, c serverpb.AdminClient, maxConcurrency int, logOutput io.Writer, ) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) { cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{ MaxConcurrency: int32(maxConcurrency), @@ -71,6 +74,10 @@ func CollectRemoteReplicaInfo( if r := info.GetReplicaInfo(); r != nil { stores[r.StoreID] = struct{}{} nodes[r.NodeID] = struct{}{} + + if _, ok := replInfoMap[r.NodeID]; !ok && logOutput != nil { + _, _ = fmt.Fprintf(logOutput, "Started getting replica info for node_id:%d.\n", r.NodeID) + } replInfoMap[r.NodeID] = append(replInfoMap[r.NodeID], *r) } else if d := info.GetRangeDescriptor(); d != nil { descriptors = append(descriptors, *d) @@ -78,6 +85,11 @@ func CollectRemoteReplicaInfo( // If server had to restart a fan-out work because of error and retried, // then we discard partial data for the node. delete(replInfoMap, s.NodeID) + if logOutput != nil { + _, _ = fmt.Fprintf(logOutput, "Discarding replica info for node_id:%d."+ + "The node will be revisted.\n", s.NodeID) + } + } else if m := info.GetMetadata(); m != nil { metadata = *m } else { diff --git a/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go b/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go index ef39ac658748..d3f1d3360482 100644 --- a/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go +++ b/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go @@ -255,7 +255,8 @@ func TestCollectLeaseholderStatus(t *testing.T) { // Note: we need to retry because replica collection is not atomic and // leaseholder could move around so we could see none or more than one. testutils.SucceedsSoon(t, func() error { - replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) + replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, + -1 /* maxConcurrency */, nil /* logOutput */) require.NoError(t, err, "failed to collect replica info") foundLeaseholders := 0 diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index d37c8e0c1946..abc721129728 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -89,7 +89,8 @@ func TestReplicaCollection(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats - replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) + replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, + -1 /* maxConcurrency */, nil /* logOutput */) require.NoError(t, err, "failed to retrieve replica info") // Check counters on retrieved replica info. @@ -161,7 +162,8 @@ func TestStreamRestart(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats - replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) + replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, + -1 /* maxConcurrency */, nil /* logOutput */) require.NoError(t, err, "failed to retrieve replica info") // Check counters on retrieved replica info. @@ -613,7 +615,8 @@ func TestRetrieveApplyStatus(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo testutils.SucceedsSoon(t, func() error { var err error - replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, + -1 /* maxConcurrency */, nil /* logOutput */) return err }) plan, planDetails, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) @@ -695,7 +698,8 @@ func TestRejectBadVersionApplication(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo testutils.SucceedsSoon(t, func() error { var err error - replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, + -1 /* maxConcurrency */, nil /* logOutput */) return err }) plan, _, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator)