Skip to content

Commit

Permalink
Merge #127001
Browse files Browse the repository at this point in the history
127001: loqrecovery: make make-plan CLI print updates r=iskettaneh a=iskettaneh

loqrecovery: make make-plan CLI print updates

Right now, the loss-of-quorum CLI tool only prints the final recovery
plan after it finishes running. However, for large clusters, it might
take a long time until it finishes. This made it unclear whether the
tool is still making progress, or it is stuck.

This PR changes that by making the tool print some of the server
updates. In particular, the CLI tool now will print the node that the
server is currently streaming replica info from.

Fixes: #122640

Release note: None

Co-authored-by: Ibrahim Kettaneh <[email protected]>
  • Loading branch information
craig[bot] and iskettaneh committed Jul 16, 2024
2 parents b140278 + 23e51e7 commit 8f2a27b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
6 changes: 4 additions & 2 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverCollectInfoOpts.maxConcurrency)
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c,
debugRecoverCollectInfoOpts.maxConcurrency, stderr /* logOutput */)
if err != nil {
return errors.WithHint(errors.Wrap(err,
"failed to retrieve replica info from cluster"),
Expand Down Expand Up @@ -435,7 +436,8 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverPlanOpts.maxConcurrency)
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c,
debugRecoverPlanOpts.maxConcurrency, stderr /* logOutput */)
if err != nil {
return errors.Wrapf(err, "failed to retrieve replica info from cluster")
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/cli/debug_recover_loss_of_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -469,13 +470,31 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
// TODO(oleg): Make test run with 7 nodes to exercise cases where multiple
// replicas survive. Current startup and allocator behaviour would make
// this test flaky.
var failCount atomic.Int64
sa := make(map[int]base.TestServerArgs)
for i := 0; i < 3; i++ {
sa[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyVFSRegistry: fs.NewStickyRegistry(),
},
LOQRecovery: &loqrecovery.TestingKnobs{
MetadataScanTimeout: 15 * time.Second,
ForwardReplicaFilter: func(
response *serverpb.RecoveryCollectLocalReplicaInfoResponse,
) error {
// Artificially add an error that would cause the server to retry
// the replica info for node 1. Note that we only add an error after
// we return the first replica info for that node.
// This helps in verifying that the replica info get discarded and
// the node is revisited (based on the logs).
if response != nil && response.ReplicaInfo.NodeID == 1 &&
failCount.Add(1) < 3 && failCount.Load() > 1 {
return errors.New("rpc stream stopped")
}
return nil
},
},
},
StoreSpecs: []base.StoreSpec{
{
Expand Down Expand Up @@ -532,6 +551,10 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
"--plan=" + planFile,
})
require.NoError(t, err, "failed to run make-plan")
require.Contains(t, out, "Started getting replica info for node_id:1",
"planner didn't log the visited nodes properly")
require.Contains(t, out, "Discarding replica info for node_id:1",
"planner didn't log the discarded nodes properly")
require.Contains(t, out, fmt.Sprintf("- node n%d", node1ID),
"planner didn't provide correct apply instructions")
require.FileExists(t, planFile, "generated plan file")
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package loqrecovery
import (
"cmp"
"context"
"fmt"
"io"
"math"
"slices"
Expand Down Expand Up @@ -46,8 +47,10 @@ type CollectionStats struct {
// maxConcurrency is the maximum parallelism that will be used when fanning out
// RPCs to nodes in the cluster. A value of 0 disables concurrency. A negative
// value configures no limit for concurrency.
// If logOutput is not nil, this function will write when a node is visited,
// and when a node needs to be revisited.
func CollectRemoteReplicaInfo(
ctx context.Context, c serverpb.AdminClient, maxConcurrency int,
ctx context.Context, c serverpb.AdminClient, maxConcurrency int, logOutput io.Writer,
) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{
MaxConcurrency: int32(maxConcurrency),
Expand All @@ -71,13 +74,22 @@ func CollectRemoteReplicaInfo(
if r := info.GetReplicaInfo(); r != nil {
stores[r.StoreID] = struct{}{}
nodes[r.NodeID] = struct{}{}

if _, ok := replInfoMap[r.NodeID]; !ok && logOutput != nil {
_, _ = fmt.Fprintf(logOutput, "Started getting replica info for node_id:%d.\n", r.NodeID)
}
replInfoMap[r.NodeID] = append(replInfoMap[r.NodeID], *r)
} else if d := info.GetRangeDescriptor(); d != nil {
descriptors = append(descriptors, *d)
} else if s := info.GetNodeStreamRestarted(); s != nil {
// If server had to restart a fan-out work because of error and retried,
// then we discard partial data for the node.
delete(replInfoMap, s.NodeID)
if logOutput != nil {
_, _ = fmt.Fprintf(logOutput, "Discarding replica info for node_id:%d."+
"The node will be revisted.\n", s.NodeID)
}

} else if m := info.GetMetadata(); m != nil {
metadata = *m
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ func TestCollectLeaseholderStatus(t *testing.T) {
// Note: we need to retry because replica collection is not atomic and
// leaseholder could move around so we could see none or more than one.
testutils.SucceedsSoon(t, func() error {
replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm,
-1 /* maxConcurrency */, nil /* logOutput */)
require.NoError(t, err, "failed to collect replica info")

foundLeaseholders := 0
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/loqrecovery/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func TestReplicaCollection(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
var stats loqrecovery.CollectionStats

replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm,
-1 /* maxConcurrency */, nil /* logOutput */)
require.NoError(t, err, "failed to retrieve replica info")

// Check counters on retrieved replica info.
Expand Down Expand Up @@ -161,7 +162,8 @@ func TestStreamRestart(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
var stats loqrecovery.CollectionStats

replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm,
-1 /* maxConcurrency */, nil /* logOutput */)
require.NoError(t, err, "failed to retrieve replica info")

// Check counters on retrieved replica info.
Expand Down Expand Up @@ -613,7 +615,8 @@ func TestRetrieveApplyStatus(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
testutils.SucceedsSoon(t, func() error {
var err error
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm,
-1 /* maxConcurrency */, nil /* logOutput */)
return err
})
plan, planDetails, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator)
Expand Down Expand Up @@ -695,7 +698,8 @@ func TestRejectBadVersionApplication(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
testutils.SucceedsSoon(t, func() error {
var err error
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm,
-1 /* maxConcurrency */, nil /* logOutput */)
return err
})
plan, _, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator)
Expand Down

0 comments on commit 8f2a27b

Please sign in to comment.