diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 8c9183f37cf0..2e9e67a2b1e9 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7241,12 +7241,31 @@ Support status: [reserved](#support-status) | ----- | ---- | ----- | ----------- | -------------- | | range_descriptor | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.roachpb.RangeDescriptor) | | | [reserved](#support-status) | | replica_info | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo) | | | [reserved](#support-status) | +| node_stream_restarted | [RecoveryCollectReplicaRestartNodeStream](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.server.serverpb.RecoveryCollectReplicaRestartNodeStream) | | | [reserved](#support-status) | + +#### RecoveryCollectReplicaRestartNodeStream + +RecoveryCollectReplicaRestartNodeStream is sent by collector node to client +if it experiences a transient failure collecting data from one of the nodes. +This message instructs client to drop any data that it collected locally +for specified node as streaming for this node would be restarted. +This mechanism is needed to avoid restarting the whole collection procedure +in large cluster if one of the nodes fails transiently. + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| node_id | [int32](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-int32) | | | [reserved](#support-status) | + + + + + ## RecoveryCollectLocalReplicaInfo diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4668bd99d87d..9b408a6fbd60 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -203,6 +203,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/liveness:liveness_test", "//pkg/kv/kvserver/logstore:logstore_test", + "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test", "//pkg/kv/kvserver/loqrecovery:loqrecovery_test", "//pkg/kv/kvserver/multiqueue:multiqueue_test", "//pkg/kv/kvserver/protectedts/ptcache:ptcache_test", @@ -1200,6 +1201,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/logstore:logstore", "//pkg/kv/kvserver/logstore:logstore_test", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb", + "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test", "//pkg/kv/kvserver/loqrecovery:loqrecovery", "//pkg/kv/kvserver/loqrecovery:loqrecovery_test", "//pkg/kv/kvserver/multiqueue:multiqueue", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index ec3fc5dd9cef..ebafe1792d38 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -53,4 +53,5 @@ type TestingKnobs struct { ExternalConnection ModuleTestingKnobs EventExporter ModuleTestingKnobs EventLog ModuleTestingKnobs + LOQRecovery ModuleTestingKnobs } diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index a84bac95810f..779a1e95090b 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -1822,8 +1822,7 @@ See start --help for more flag details and examples. } ConfirmActions = FlagInfo{ - Name: "confirm", - Shorthand: "p", + Name: "confirm", Description: ` Confirm action:
diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go
index f627ca144ad1..85812d9abc24 100644
--- a/pkg/cli/debug_recover_loss_of_quorum.go
+++ b/pkg/cli/debug_recover_loss_of_quorum.go
@@ -165,6 +165,13 @@ Now the cluster could be started again.
 	RunE: UsageAndErr,
 }
 
+var recoverCommands = []*cobra.Command{
+	debugRecoverCollectInfoCmd,
+	debugRecoverPlanCmd,
+	//debugRecoverStagePlan,
+	//debugRecoverVerify,
+}
+
 func init() {
 	debugRecoverCmd.AddCommand(
 		debugRecoverCollectInfoCmd,
@@ -174,18 +181,28 @@ func init() {
 
 var debugRecoverCollectInfoCmd = &cobra.Command{
 	Use:   "collect-info [destination-file]",
-	Short: "collect replica information from the given stores",
+	Short: "collect replica information from a cluster",
 	Long: `
-Collect information about replicas by reading data from underlying stores. Store
-locations must be provided using --store flags.
+Collect information about replicas in the cluster.
+
+The command can collect data from an online or an offline cluster.
+
+In the first case, the address of a single healthy cluster node must be provided
+using the --host flag. This designated node will handle collection of data from
+all surviving nodes.
+
+In the second case data is read directly from local stores on each node.
+CockroachDB must not be running on any node. The location of each store must be
+provided using the --store flag. The command must be executed for all surviving
+stores.
+
+Multiple store locations can be provided to the command to collect all info
+from all stores on a node at once. It is also possible to call it per store, in
+that case all resulting files should be fed to the plan subcommand.
 
 Collected information is written to a destination file if file name is provided,
 or to stdout.
 
-Multiple store locations could be provided to the command to collect all info from
-node at once. It is also possible to call it per store, in that case all resulting
-files should be fed to plan subcommand.
-
 See debug recover command help for more details on how to use this command.
 `,
 	Args: cobra.MaximumNArgs(1),
@@ -197,28 +214,49 @@ var debugRecoverCollectInfoOpts struct {
 }
 
 func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
+	// We must have cancellable context here to obtain grpc client connection.
+	ctx, cancel := context.WithCancel(cmd.Context())
+	defer cancel()
 	stopper := stop.NewStopper()
-	defer stopper.Stop(cmd.Context())
+	defer stopper.Stop(ctx)
+
+	var replicaInfo loqrecoverypb.ClusterReplicaInfo
+	var stats loqrecovery.CollectionStats
 
-	var stores []storage.Engine
-	for _, storeSpec := range debugRecoverCollectInfoOpts.Stores.Specs {
-		db, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist, storage.ReadOnly)
+	if len(debugRecoverCollectInfoOpts.Stores.Specs) == 0 {
+		c, finish, err := getAdminClient(ctx, serverCfg)
 		if err != nil {
-			return errors.Wrapf(err, "failed to open store at path %q, ensure that store path is "+
-				"correct and that it is not used by another process", storeSpec.Path)
+			return errors.Wrapf(err, "failed to get admin connection to cluster")
+		}
+		defer finish()
+		replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c)
+		if err != nil {
+			return errors.WithHint(errors.Wrap(err,
+				"failed to retrieve replica info from cluster"),
+				"Check cluster health and retry the operation.")
+		}
+	} else {
+		var stores []storage.Engine
+		for _, storeSpec := range debugRecoverCollectInfoOpts.Stores.Specs {
+			db, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist, storage.ReadOnly)
+			if err != nil {
+				return errors.WithHint(errors.Wrapf(err,
+					"failed to open store at path %q", storeSpec.Path),
+					"Ensure that store path is correct and that it is not used by another process.")
+			}
+			stores = append(stores, db)
+		}
+		var err error
+		replicaInfo, stats, err = loqrecovery.CollectStoresReplicaInfo(ctx, stores)
+		if err != nil {
+			return errors.Wrapf(err, "failed to collect replica info from local stores")
 		}
-		stores = append(stores, db)
-	}
-
-	replicaInfo, err := loqrecovery.CollectReplicaInfo(cmd.Context(), stores)
-	if err != nil {
-		return err
 	}
 
 	var writer io.Writer = os.Stdout
 	if len(args) > 0 {
 		filename := args[0]
-		if _, err = os.Stat(filename); err == nil {
+		if _, err := os.Stat(filename); err == nil {
 			return errors.Newf("file %q already exists", filename)
 		}
 
@@ -230,14 +268,20 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
 		writer = outFile
 	}
 	jsonpb := protoutil.JSONPb{Indent: "  "}
-	var out []byte
-	if out, err = jsonpb.Marshal(replicaInfo); err != nil {
+	out, err := jsonpb.Marshal(&replicaInfo)
+	if err != nil {
 		return errors.Wrap(err, "failed to marshal collected replica info")
 	}
-	if _, err = writer.Write(out); err != nil {
+	if _, err := writer.Write(out); err != nil {
 		return errors.Wrap(err, "failed to write collected replica info")
 	}
-	_, _ = fmt.Fprintf(stderr, "Collected info about %d replicas.\n", len(replicaInfo.Replicas))
+	_, _ = fmt.Fprintf(stderr, `Collected recovery info from:
+nodes             %d
+stores            %d
+Collected info:
+replicas          %d
+range descriptors %d
+`, stats.Nodes, stats.Stores, replicaInfo.ReplicaCount(), stats.Descriptors)
 	return nil
 }
 
@@ -278,7 +322,7 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
 		deadStoreIDs = append(deadStoreIDs, roachpb.StoreID(id))
 	}
 
-	plan, report, err := loqrecovery.PlanReplicas(cmd.Context(), replicas, deadStoreIDs)
+	plan, report, err := loqrecovery.PlanReplicas(cmd.Context(), replicas.LocalInfo, deadStoreIDs)
 	if err != nil {
 		return err
 	}
@@ -377,7 +421,7 @@ Discarded live replicas: %d
 
 	jsonpb := protoutil.JSONPb{Indent: "  "}
 	var out []byte
-	if out, err = jsonpb.Marshal(plan); err != nil {
+	if out, err = jsonpb.Marshal(&plan); err != nil {
 		return errors.Wrap(err, "failed to marshal recovery plan")
 	}
 	if _, err = writer.Write(out); err != nil {
@@ -393,20 +437,25 @@ Discarded live replicas: %d
 	return nil
 }
 
-func readReplicaInfoData(fileNames []string) ([]loqrecoverypb.NodeReplicaInfo, error) {
-	var replicas []loqrecoverypb.NodeReplicaInfo
+func readReplicaInfoData(fileNames []string) (loqrecoverypb.ClusterReplicaInfo, error) {
+	var replicas loqrecoverypb.ClusterReplicaInfo
 	for _, filename := range fileNames {
 		data, err := os.ReadFile(filename)
 		if err != nil {
-			return nil, errors.Wrapf(err, "failed to read replica info file %q", filename)
+			return loqrecoverypb.ClusterReplicaInfo{}, errors.Wrapf(err, "failed to read replica info file %q", filename)
 		}
 
-		var nodeReplicas loqrecoverypb.NodeReplicaInfo
+		var nodeReplicas loqrecoverypb.ClusterReplicaInfo
 		jsonpb := protoutil.JSONPb{}
 		if err = jsonpb.Unmarshal(data, &nodeReplicas); err != nil {
-			return nil, errors.Wrapf(err, "failed to unmarshal replica info from file %q", filename)
+			return loqrecoverypb.ClusterReplicaInfo{}, errors.WithHint(errors.Wrapf(err,
+				"failed to unmarshal replica info from file %q", filename),
+				"Ensure that replica info file is generated with the same binary version and file is not corrupted.")
+		}
+		if err = replicas.Merge(nodeReplicas); err != nil {
+			return loqrecoverypb.ClusterReplicaInfo{}, errors.Wrapf(err,
+				"failed to merge replica info from file %q", filename)
 		}
-		replicas = append(replicas, nodeReplicas)
 	}
 	return replicas, nil
 }
diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go
index d69040b3f61c..3e32fe099fde 100644
--- a/pkg/cli/debug_recover_loss_of_quorum_test.go
+++ b/pkg/cli/debug_recover_loss_of_quorum_test.go
@@ -78,12 +78,73 @@ func TestCollectInfoFromMultipleStores(t *testing.T) {
 	replicas, err := readReplicaInfoData([]string{replicaInfoFileName})
 	require.NoError(t, err, "failed to read generated replica info")
 	stores := map[roachpb.StoreID]interface{}{}
-	for _, r := range replicas[0].Replicas {
+	for _, r := range replicas.LocalInfo[0].Replicas {
 		stores[r.StoreID] = struct{}{}
 	}
 	require.Equal(t, 2, len(stores), "collected replicas from stores")
 }
 
+// TestCollectInfoFromOnlineCluster verifies that given a test cluster with
+// one stopped node, we can collect replica info and metadata from remaining
+// nodes using an admin recovery call.
+func TestCollectInfoFromOnlineCluster(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	ctx := context.Background()
+	dir, cleanupFn := testutils.TempDir(t)
+	defer cleanupFn()
+
+	c := NewCLITest(TestCLIParams{
+		NoServer: true,
+	})
+	defer c.Cleanup()
+
+	tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{
+		ServerArgs: base.TestServerArgs{
+			StoreSpecs: []base.StoreSpec{{InMemory: true}},
+			Insecure:   true,
+		},
+	})
+	tc.Start(t)
+	defer tc.Stopper().Stop(ctx)
+	require.NoError(t, tc.WaitForFullReplication())
+	tc.ToggleReplicateQueues(false)
+
+	r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases")
+	var totalRanges int
+	require.NoError(t, r.Scan(&totalRanges), "failed to query range count")
+
+	tc.StopServer(0)
+	replicaInfoFileName := dir + "/all-nodes.json"
+
+	c.RunWithArgs([]string{
+		"debug",
+		"recover",
+		"collect-info",
+		"--insecure",
+		"--host",
+		tc.Server(2).ServingRPCAddr(),
+		replicaInfoFileName,
+	})
+
+	replicas, err := readReplicaInfoData([]string{replicaInfoFileName})
+	require.NoError(t, err, "failed to read generated replica info")
+	stores := map[roachpb.StoreID]interface{}{}
+	totalReplicas := 0
+	for _, li := range replicas.LocalInfo {
+		for _, r := range li.Replicas {
+			stores[r.StoreID] = struct{}{}
+		}
+		totalReplicas += len(li.Replicas)
+	}
+	require.Equal(t, 2, len(stores), "collected replicas from stores")
+	require.Equal(t, 2, len(replicas.LocalInfo), "collected info is not split by node")
+	require.Equal(t, totalRanges*2, totalReplicas, "number of collected replicas")
+	require.Equal(t, totalRanges, len(replicas.Descriptors),
+		"number of collected descriptors from metadata")
+}
+
 // TestLossOfQuorumRecovery performs a sanity check on end to end recovery workflow.
 // This test doesn't try to validate all possible test cases, but instead check that
 // artifacts are correctly produced and overall cluster recovery could be performed
@@ -273,43 +334,48 @@ func createIntentOnRangeDescriptor(
 func TestJsonSerialization(t *testing.T) {
 	defer leaktest.AfterTest(t)()
 
-	nr := loqrecoverypb.NodeReplicaInfo{
-		Replicas: []loqrecoverypb.ReplicaInfo{
+	nr := loqrecoverypb.ClusterReplicaInfo{
+		ClusterID: "id1",
+		LocalInfo: []loqrecoverypb.NodeReplicaInfo{
 			{
-				NodeID:  1,
-				StoreID: 2,
-				Desc: roachpb.RangeDescriptor{
-					RangeID:  3,
-					StartKey: roachpb.RKey(keys.MetaMin),
-					EndKey:   roachpb.RKey(keys.MetaMax),
-					InternalReplicas: []roachpb.ReplicaDescriptor{
-						{
-							NodeID:    1,
-							StoreID:   2,
-							ReplicaID: 3,
-							Type:      roachpb.VOTER_INCOMING,
-						},
-					},
-					NextReplicaID: 4,
-					Generation:    7,
-				},
-				RaftAppliedIndex:   13,
-				RaftCommittedIndex: 19,
-				RaftLogDescriptorChanges: []loqrecoverypb.DescriptorChangeInfo{
+				Replicas: []loqrecoverypb.ReplicaInfo{
 					{
-						ChangeType: 1,
-						Desc:       &roachpb.RangeDescriptor{},
-						OtherDesc:  &roachpb.RangeDescriptor{},
+						NodeID:  1,
+						StoreID: 2,
+						Desc: roachpb.RangeDescriptor{
+							RangeID:  3,
+							StartKey: roachpb.RKey(keys.MetaMin),
+							EndKey:   roachpb.RKey(keys.MetaMax),
+							InternalReplicas: []roachpb.ReplicaDescriptor{
+								{
+									NodeID:    1,
+									StoreID:   2,
+									ReplicaID: 3,
+									Type:      roachpb.VOTER_INCOMING,
+								},
+							},
+							NextReplicaID: 4,
+							Generation:    7,
+						},
+						RaftAppliedIndex:   13,
+						RaftCommittedIndex: 19,
+						RaftLogDescriptorChanges: []loqrecoverypb.DescriptorChangeInfo{
+							{
+								ChangeType: 1,
+								Desc:       &roachpb.RangeDescriptor{},
+								OtherDesc:  &roachpb.RangeDescriptor{},
+							},
+						},
 					},
 				},
 			},
 		},
 	}
 	jsonpb := protoutil.JSONPb{Indent: "  "}
-	data, err := jsonpb.Marshal(nr)
+	data, err := jsonpb.Marshal(&nr)
 	require.NoError(t, err)
 
-	var nrFromJSON loqrecoverypb.NodeReplicaInfo
-	require.NoError(t, jsonpb.Unmarshal(data, &nrFromJSON))
-	require.Equal(t, nr, nrFromJSON, "objects before and after serialization")
+	var crFromJSON loqrecoverypb.ClusterReplicaInfo
+	require.NoError(t, jsonpb.Unmarshal(data, &crFromJSON))
+	require.Equal(t, nr, crFromJSON, "objects before and after serialization")
 }
diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go
index b242d2095a01..04cc68c36c38 100644
--- a/pkg/cli/flags.go
+++ b/pkg/cli/flags.go
@@ -618,6 +618,7 @@ func init() {
 	clientCmds = append(clientCmds, userFileCmds...)
 	clientCmds = append(clientCmds, stmtDiagCmds...)
 	clientCmds = append(clientCmds, debugResetQuorumCmd)
+	clientCmds = append(clientCmds, recoverCommands...)
 	for _, cmd := range clientCmds {
 		clientflags.AddBaseFlags(cmd, &cliCtx.clientOpts, &baseCfg.Insecure, &baseCfg.SSLCertsDir)
 
diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel
index efae010fa4e2..ffe773651e41 100644
--- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel
+++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel
@@ -8,22 +8,33 @@ go_library(
         "collect.go",
         "plan.go",
         "record.go",
+        "server.go",
+        "testing_knobs.go",
         "utils.go",
     ],
     importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery",
     visibility = ["//visibility:public"],
     deps = [
+        "//pkg/base",
+        "//pkg/gossip",
         "//pkg/keys",
+        "//pkg/kv",
+        "//pkg/kv/kvserver",
         "//pkg/kv/kvserver/kvserverpb",
         "//pkg/kv/kvserver/kvstorage",
         "//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
         "//pkg/kv/kvserver/raftlog",
         "//pkg/kv/kvserver/stateloader",
         "//pkg/roachpb",
+        "//pkg/rpc",
+        "//pkg/server/serverpb",
         "//pkg/storage",
+        "//pkg/util/contextutil",
+        "//pkg/util/grpcutil",
         "//pkg/util/hlc",
         "//pkg/util/log",
         "//pkg/util/protoutil",
+        "//pkg/util/retry",
         "//pkg/util/timeutil",
         "//pkg/util/uuid",
         "@com_github_cockroachdb_errors//:errors",
@@ -39,6 +50,7 @@ go_test(
         "record_test.go",
         "recovery_env_test.go",
         "recovery_test.go",
+        "server_test.go",
     ],
     args = ["-test.timeout=295s"],
     data = glob(["testdata/**"]),
@@ -57,6 +69,7 @@ go_test(
         "//pkg/security/securityassets",
         "//pkg/security/securitytest",
         "//pkg/server",
+        "//pkg/server/serverpb",
         "//pkg/storage",
         "//pkg/storage/enginepb",
         "//pkg/testutils",
@@ -73,6 +86,7 @@ go_test(
         "//pkg/util/uuid",
         "@com_github_cockroachdb_datadriven//:datadriven",
         "@com_github_cockroachdb_errors//:errors",
+        "@com_github_google_uuid//:uuid",
         "@com_github_stretchr_testify//require",
         "@in_gopkg_yaml_v2//:yaml_v2",
         "@io_etcd_go_raft_v3//raftpb",
diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go
index 4ba2a87b6618..6bac3c4e5146 100644
--- a/pkg/kv/kvserver/loqrecovery/collect.go
+++ b/pkg/kv/kvserver/loqrecovery/collect.go
@@ -12,6 +12,7 @@ package loqrecovery
 
 import (
 	"context"
+	"io"
 	"math"
 
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
@@ -19,61 +20,158 @@ import (
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
 	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
 	"github.com/cockroachdb/cockroach/pkg/storage"
+	"github.com/cockroachdb/cockroach/pkg/util/uuid"
 	"github.com/cockroachdb/errors"
 	"go.etcd.io/raft/v3/raftpb"
 )
 
-// CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
-func CollectReplicaInfo(
+type CollectionStats struct {
+	Nodes       int
+	Stores      int
+	Descriptors int
+}
+
+func CollectRemoteReplicaInfo(
+	ctx context.Context, c serverpb.AdminClient,
+) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
+	cInfo, err := c.Cluster(ctx, &serverpb.ClusterRequest{})
+	if err != nil {
+		return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+	}
+	cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{})
+	if err != nil {
+		return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+	}
+	stores := make(map[roachpb.StoreID]struct{})
+	nodes := make(map[roachpb.NodeID]struct{})
+	var descriptors []roachpb.RangeDescriptor
+	var clusterReplInfo []loqrecoverypb.NodeReplicaInfo
+	var nodeReplicas []loqrecoverypb.ReplicaInfo
+	var currentNode roachpb.NodeID
+	for {
+		info, err := cc.Recv()
+		if err != nil {
+			if errors.Is(err, io.EOF) {
+				if len(nodeReplicas) > 0 {
+					clusterReplInfo = append(clusterReplInfo, loqrecoverypb.NodeReplicaInfo{Replicas: nodeReplicas})
+				}
+				break
+			}
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+		}
+		if r := info.GetReplicaInfo(); r != nil {
+			if currentNode != r.NodeID {
+				currentNode = r.NodeID
+				if len(nodeReplicas) > 0 {
+					clusterReplInfo = append(clusterReplInfo, loqrecoverypb.NodeReplicaInfo{Replicas: nodeReplicas})
+					nodeReplicas = nil
+				}
+			}
+			nodeReplicas = append(nodeReplicas, *r)
+			stores[r.StoreID] = struct{}{}
+			nodes[r.NodeID] = struct{}{}
+		} else if d := info.GetRangeDescriptor(); d != nil {
+			descriptors = append(descriptors, *d)
+		} else if s := info.GetNodeStreamRestarted(); s != nil {
+			// If server had to restart a fan-out work because of error and retried,
+			// then we discard partial data for the node.
+			if s.NodeID == currentNode {
+				nodeReplicas = nil
+			}
+		}
+	}
+	return loqrecoverypb.ClusterReplicaInfo{
+			ClusterID:   cInfo.ClusterID,
+			Descriptors: descriptors,
+			LocalInfo:   clusterReplInfo,
+		}, CollectionStats{
+			Nodes:       len(nodes),
+			Stores:      len(stores),
+			Descriptors: len(descriptors),
+		}, nil
+}
+
+// CollectStoresReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
+func CollectStoresReplicaInfo(
 	ctx context.Context, stores []storage.Engine,
-) (loqrecoverypb.NodeReplicaInfo, error) {
+) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
 	if len(stores) == 0 {
-		return loqrecoverypb.NodeReplicaInfo{}, errors.New("no stores were provided for info collection")
+		return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("no stores were provided for info collection")
 	}
-
+	var clusterUUID uuid.UUID
+	nodes := make(map[roachpb.NodeID]struct{})
 	var replicas []loqrecoverypb.ReplicaInfo
-	for _, reader := range stores {
-		storeIdent, err := kvstorage.ReadStoreIdent(ctx, reader)
+	for i, reader := range stores {
+		ident, err := kvstorage.ReadStoreIdent(ctx, reader)
 		if err != nil {
-			return loqrecoverypb.NodeReplicaInfo{}, err
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
 		}
-		if err = kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error {
-			rsl := stateloader.Make(desc.RangeID)
-			rstate, err := rsl.Load(ctx, reader, &desc)
-			if err != nil {
-				return err
-			}
-			hstate, err := rsl.LoadHardState(ctx, reader)
-			if err != nil {
-				return err
-			}
-			// Check raft log for un-applied range descriptor changes. We start from
-			// applied+1 (inclusive) and read until the end of the log. We also look
-			// at potentially uncommitted entries as we have no way to determine their
-			// outcome, and they will become committed as soon as the replica is
-			// designated as a survivor.
-			rangeUpdates, err := GetDescriptorChangesFromRaftLog(desc.RangeID,
-				rstate.RaftAppliedIndex+1, math.MaxInt64, reader)
-			if err != nil {
-				return err
-			}
+		if i == 0 {
+			clusterUUID = ident.ClusterID
+		}
+		if !ident.ClusterID.Equal(clusterUUID) {
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("can't collect info from stored that belong to different clusters")
+		}
+		nodes[ident.NodeID] = struct{}{}
+		if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID,
+			func(info loqrecoverypb.ReplicaInfo) error {
+				replicas = append(replicas, info)
+				return nil
+			}); err != nil {
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+		}
+	}
+	return loqrecoverypb.ClusterReplicaInfo{
+			ClusterID: clusterUUID.String(),
+			LocalInfo: []loqrecoverypb.NodeReplicaInfo{{Replicas: replicas}},
+		}, CollectionStats{
+			Nodes:  len(nodes),
+			Stores: len(stores),
+		}, nil
+}
 
-			replicaData := loqrecoverypb.ReplicaInfo{
-				StoreID:                  storeIdent.StoreID,
-				NodeID:                   storeIdent.NodeID,
-				Desc:                     desc,
-				RaftAppliedIndex:         rstate.RaftAppliedIndex,
-				RaftCommittedIndex:       hstate.Commit,
-				RaftLogDescriptorChanges: rangeUpdates,
-			}
-			replicas = append(replicas, replicaData)
-			return nil
-		}); err != nil {
-			return loqrecoverypb.NodeReplicaInfo{}, err
+func visitStoreReplicas(
+	ctx context.Context,
+	reader storage.Reader,
+	storeID roachpb.StoreID,
+	nodeID roachpb.NodeID,
+	send func(info loqrecoverypb.ReplicaInfo) error,
+) error {
+	if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error {
+		rsl := stateloader.Make(desc.RangeID)
+		rstate, err := rsl.Load(ctx, reader, &desc)
+		if err != nil {
+			return err
 		}
+		hstate, err := rsl.LoadHardState(ctx, reader)
+		if err != nil {
+			return err
+		}
+		// Check raft log for un-applied range descriptor changes. We start from
+		// applied+1 (inclusive) and read until the end of the log. We also look
+		// at potentially uncommitted entries as we have no way to determine their
+		// outcome, and they will become committed as soon as the replica is
+		// designated as a survivor.
+		rangeUpdates, err := GetDescriptorChangesFromRaftLog(desc.RangeID,
+			rstate.RaftAppliedIndex+1, math.MaxInt64, reader)
+		if err != nil {
+			return err
+		}
+
+		return send(loqrecoverypb.ReplicaInfo{
+			StoreID:                  storeID,
+			NodeID:                   nodeID,
+			Desc:                     desc,
+			RaftAppliedIndex:         rstate.RaftAppliedIndex,
+			RaftCommittedIndex:       hstate.Commit,
+			RaftLogDescriptorChanges: rangeUpdates,
+		})
+	}); err != nil {
+		return err
 	}
-	return loqrecoverypb.NodeReplicaInfo{Replicas: replicas}, nil
+	return nil
 }
 
 // GetDescriptorChangesFromRaftLog iterates over raft log between indices
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
index c75c3d4fff71..1aa1667ac2d6 100644
--- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
@@ -1,5 +1,5 @@
 load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 # gazelle:go_grpc_compilers //pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_grpc_compiler,  @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-grpc-gateway:go_gen_grpc_gateway
 
@@ -49,4 +49,16 @@ go_library(
     ],
 )
 
+go_test(
+    name = "loqrecoverypb_test",
+    srcs = ["recovery_test.go"],
+    args = ["-test.timeout=295s"],
+    embed = [":loqrecoverypb"],
+    deps = [
+        "//pkg/roachpb",
+        "//pkg/util/uuid",
+        "@com_github_stretchr_testify//require",
+    ],
+)
+
 get_x_data(name = "get_x_data")
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
index 670ede19992a..fff394377321 100644
--- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
@@ -93,3 +93,50 @@ func (m *ReplicaRecoveryRecord) AsStructuredLog() eventpb.DebugRecoverReplica {
 		EndKey:            m.EndKey.AsRKey().String(),
 	}
 }
+
+func (m *ClusterReplicaInfo) Merge(o ClusterReplicaInfo) error {
+	// When making a cluster id check, make sure that we can create empty
+	// cluster info and merge everything into it. i.e. merging into empty
+	// struct should not trip check failure.
+	if len(m.LocalInfo) > 0 || len(m.Descriptors) > 0 {
+		if m.ClusterID != o.ClusterID {
+			return errors.Newf("can't merge cluster info from different cluster: %s != %s", m.ClusterID,
+				o.ClusterID)
+		}
+	} else {
+		m.ClusterID = o.ClusterID
+	}
+	if len(o.Descriptors) > 0 {
+		if len(m.Descriptors) > 0 {
+			return errors.New("only single cluster replica info could contain descriptors")
+		}
+		m.Descriptors = append(m.Descriptors, o.Descriptors...)
+	}
+	type nsk struct {
+		n roachpb.NodeID
+		s roachpb.StoreID
+	}
+	existing := make(map[nsk]struct{})
+	for _, n := range m.LocalInfo {
+		for _, r := range n.Replicas {
+			existing[nsk{n: r.NodeID, s: r.StoreID}] = struct{}{}
+		}
+	}
+	for _, n := range o.LocalInfo {
+		for _, r := range n.Replicas {
+			if _, ok := existing[nsk{n: r.NodeID, s: r.StoreID}]; ok {
+				return errors.Newf("failed to merge cluster info, replicas from n%d/s%d are already present",
+					r.NodeID, r.StoreID)
+			}
+		}
+	}
+	m.LocalInfo = append(m.LocalInfo, o.LocalInfo...)
+	return nil
+}
+
+func (m *ClusterReplicaInfo) ReplicaCount() (size int) {
+	for _, i := range m.LocalInfo {
+		size += len(i.Replicas)
+	}
+	return size
+}
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
index 460640ed2adb..292d2de65596 100644
--- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
@@ -51,11 +51,30 @@ message ReplicaInfo {
     (gogoproto.jsontag) = "raft_log_descriptor_changes,omitempty"];
 }
 
-// Collection of replica information gathered from a collect-info run on a single node.
+// Collection of replica information gathered in a collect-info run.
+// ReplicaInfo in Replicas does not have to be constrained to a single node,
+// but in practice info collected from remote cluster will contain info per
+// node. In case of offline collection, replicas will belong to all stores
+// provided to the command regardless of owning node.
 message NodeReplicaInfo {
   repeated ReplicaInfo replicas = 1 [(gogoproto.nullable) = false];
 }
 
+// Replica info collected from one or more nodes of a cluster.
+message ClusterReplicaInfo {
+  // ClusterID contains id of the cluster from which info was collected.
+  string cluster_id = 1 [(gogoproto.customname) = "ClusterID"];
+  // Descriptors contains range descriptors collected from meta ranges.
+  // Descriptors are optional and only present in collected info if system
+  // ranges didn't lose quorum. It could also be partial in some cases.
+  repeated roachpb.RangeDescriptor descriptors = 2 [(gogoproto.nullable) = false,
+    (gogoproto.jsontag) = "descriptors,omitempty"];
+  // LocalInfo contains one or more NodeReplicaInfo structs each containing a
+  // subset of full info. They are not guaranteed to be split by node, each
+  // element should contain disjoint subset of replica infos.
+  repeated NodeReplicaInfo local_info = 3 [(gogoproto.nullable) = false];
+}
+
 // ReplicaUpdate contains information that needs to be updated on replica on the node
 // to make it a designated survivor so that replica could act as a source of truth when
 // doing loss of quorum recovery.
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go
new file mode 100644
index 000000000000..2db4c86c7238
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go
@@ -0,0 +1,172 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecoverypb
+
+import (
+	"testing"
+
+	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/util/uuid"
+	"github.com/stretchr/testify/require"
+)
+
+func TestClusterInfoMergeChecksOverlapping(t *testing.T) {
+	info123 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	info1 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info12 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+
+	require.Error(t, info123.Merge(info1))
+	require.Error(t, info123.Merge(info12))
+	require.Error(t, info123.Merge(info3))
+
+	_ = info1.Merge(info3)
+	require.EqualValues(t, info1, info123)
+}
+
+func TestClusterInfoMergeChecksDescriptor(t *testing.T) {
+	info1 := ClusterReplicaInfo{
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 2}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	require.Error(t, info1.Merge(info3))
+}
+
+func TestClusterInfoMergeSameClusterID(t *testing.T) {
+	uuid1 := uuid.FastMakeV4()
+	info1 := ClusterReplicaInfo{
+		ClusterID:   uuid1.String(),
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		ClusterID: uuid1.String(),
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	require.NoError(t, info1.Merge(info3),
+		"should be able to merge partial info with equal cluster ids")
+}
+
+func TestClusterInfoMergeRejectDifferentClusterIDs(t *testing.T) {
+	uuid1 := uuid.FastMakeV4()
+	uuid2 := uuid.FastMakeV4()
+	info1 := ClusterReplicaInfo{
+		ClusterID:   uuid1.String(),
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		ClusterID: uuid2.String(),
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	require.Error(t, info1.Merge(info3), "reject merging of info from different clusters")
+}
+
+func TestClusterInfoInitializeByMerge(t *testing.T) {
+	uuid1 := uuid.FastMakeV4().String()
+	info := ClusterReplicaInfo{
+		ClusterID:   uuid1,
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	empty := ClusterReplicaInfo{}
+	require.NoError(t, empty.Merge(info), "should be able to merge into empty struct")
+	require.Equal(t, empty.ClusterID, uuid1, "merge should update empty info fields")
+}
diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go
index 99a56e54a6a7..c26cd3a57369 100644
--- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go
+++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go
@@ -135,10 +135,11 @@ type wrappedStore struct {
 
 type quorumRecoveryEnv struct {
 	// Stores with data
-	stores map[roachpb.StoreID]wrappedStore
+	clusterID uuid.UUID
+	stores    map[roachpb.StoreID]wrappedStore
 
 	// Collected info from nodes
-	replicas []loqrecoverypb.NodeReplicaInfo
+	replicas loqrecoverypb.ClusterReplicaInfo
 
 	// plan to update replicas
 	plan loqrecoverypb.ReplicaUpdatePlan
@@ -190,6 +191,7 @@ func (e *quorumRecoveryEnv) handleReplicationData(t *testing.T, d datadriven.Tes
 	// Close existing stores in case we have multiple use cases within a data file.
 	e.cleanupStores()
 	e.stores = make(map[roachpb.StoreID]wrappedStore)
+	e.clusterID = uuid.MakeV4()
 
 	// Load yaml from data into local range info.
 	var replicaData []testReplicaInfo
@@ -390,7 +392,7 @@ func parsePrettyKey(t *testing.T, pretty string) roachpb.RKey {
 
 func (e *quorumRecoveryEnv) handleMakePlan(t *testing.T, d datadriven.TestData) (string, error) {
 	stores := e.parseStoresArg(t, d, false /* defaultToAll */)
-	plan, report, err := PlanReplicas(context.Background(), e.replicas, stores)
+	plan, report, err := PlanReplicas(context.Background(), e.replicas.LocalInfo, stores)
 	if err != nil {
 		return "", err
 	}
@@ -424,7 +426,7 @@ func (e *quorumRecoveryEnv) getOrCreateStore(
 			t.Fatalf("failed to crate in mem store: %v", err)
 		}
 		sIdent := roachpb.StoreIdent{
-			ClusterID: uuid.MakeV4(),
+			ClusterID: e.clusterID,
 			NodeID:    nodeID,
 			StoreID:   storeID,
 		}
@@ -447,13 +449,15 @@ func (e *quorumRecoveryEnv) handleCollectReplicas(
 	stores := e.parseStoresArg(t, d, true /* defaultToAll */)
 	nodes := e.groupStoresByNode(t, stores)
 	// save collected results into environment
-	e.replicas = nil
+	e.replicas = loqrecoverypb.ClusterReplicaInfo{}
 	for _, nodeStores := range nodes {
-		info, err := CollectReplicaInfo(ctx, nodeStores)
+		info, _, err := CollectStoresReplicaInfo(ctx, nodeStores)
 		if err != nil {
 			return "", err
 		}
-		e.replicas = append(e.replicas, info)
+		if err = e.replicas.Merge(info); err != nil {
+			return "", err
+		}
 	}
 	return "ok", nil
 }
diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go
new file mode 100644
index 000000000000..e9e2a026c87d
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/server.go
@@ -0,0 +1,273 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecovery
+
+import (
+	"context"
+	"io"
+	"time"
+
+	"github.com/cockroachdb/cockroach/pkg/base"
+	"github.com/cockroachdb/cockroach/pkg/gossip"
+	"github.com/cockroachdb/cockroach/pkg/keys"
+	"github.com/cockroachdb/cockroach/pkg/kv"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
+	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/rpc"
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+	"github.com/cockroachdb/cockroach/pkg/util/contextutil"
+	"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
+	"github.com/cockroachdb/cockroach/pkg/util/log"
+	"github.com/cockroachdb/cockroach/pkg/util/protoutil"
+	"github.com/cockroachdb/cockroach/pkg/util/retry"
+	"github.com/cockroachdb/errors"
+)
+
+const rangeMetadataScanChunkSize = 100
+
+var replicaInfoStreamRetryOptions = retry.Options{
+	MaxRetries:     3,
+	InitialBackoff: time.Second,
+	Multiplier:     1,
+}
+
+var errMarkRetry = errors.New("retryable")
+
+func IsRetryableError(err error) bool {
+	return errors.Is(err, errMarkRetry)
+}
+
+type visitNodesFn func(ctx context.Context, retryOpts retry.Options,
+	visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error,
+) error
+
+type Server struct {
+	stores               *kvserver.Stores
+	visitNodes           visitNodesFn
+	metadataQueryTimeout time.Duration
+	forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error
+}
+
+func NewServer(
+	stores *kvserver.Stores,
+	g *gossip.Gossip,
+	loc roachpb.Locality,
+	rpcCtx *rpc.Context,
+	knobs base.ModuleTestingKnobs,
+) *Server {
+	// Server side timeouts are necessary in recovery collector since we do best
+	// effort operations where cluster info collection as an operation succeeds
+	// even if some parts of it time out.
+	metadataQueryTimeout := 1 * time.Minute
+	var forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error
+	if rk, ok := knobs.(*TestingKnobs); ok {
+		if rk.MetadataScanTimeout > 0 {
+			metadataQueryTimeout = rk.MetadataScanTimeout
+		}
+		forwardReplicaFilter = rk.ForwardReplicaFilter
+	}
+	return &Server{
+		stores:               stores,
+		visitNodes:           makeVisitAvailableNodes(g, loc, rpcCtx),
+		metadataQueryTimeout: metadataQueryTimeout,
+		forwardReplicaFilter: forwardReplicaFilter,
+	}
+}
+
+func (s Server) ServeLocalReplicas(
+	ctx context.Context,
+	_ *serverpb.RecoveryCollectLocalReplicaInfoRequest,
+	stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
+) error {
+	return s.stores.VisitStores(func(s *kvserver.Store) error {
+		reader := s.Engine().NewSnapshot()
+		defer reader.Close()
+		return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(),
+			func(info loqrecoverypb.ReplicaInfo) error {
+				return stream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info})
+			})
+	})
+}
+
+func (s Server) ServeClusterReplicas(
+	ctx context.Context,
+	_ *serverpb.RecoveryCollectReplicaInfoRequest,
+	outStream serverpb.Admin_RecoveryCollectReplicaInfoServer,
+	kvDB *kv.DB,
+) (err error) {
+	var (
+		descriptors, nodes, replicas int
+	)
+	defer func() {
+		if err == nil {
+			log.Infof(ctx, "streamed info: range descriptors %d, nodes %d, replica infos %d", descriptors,
+				nodes, replicas)
+		}
+	}()
+
+	err = contextutil.RunWithTimeout(ctx, "scan-range-descriptors", s.metadataQueryTimeout,
+		func(txnCtx context.Context) error {
+			txn := kvDB.NewTxn(txnCtx, "scan-range-descriptors")
+			if err := txn.SetFixedTimestamp(txnCtx, kvDB.Clock().Now()); err != nil {
+				return err
+			}
+			defer func() { _ = txn.Rollback(txnCtx) }()
+			log.Infof(txnCtx, "serving recovery range descriptors for all ranges")
+			return txn.Iterate(txnCtx, keys.Meta2Prefix, keys.MetaMax, rangeMetadataScanChunkSize,
+				func(kvs []kv.KeyValue) error {
+					for _, rangeDescKV := range kvs {
+						var rangeDesc roachpb.RangeDescriptor
+						if err := rangeDescKV.ValueProto(&rangeDesc); err != nil {
+							return err
+						}
+						if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{
+							Info: &serverpb.RecoveryCollectReplicaInfoResponse_RangeDescriptor{
+								RangeDescriptor: &rangeDesc,
+							},
+						}); err != nil {
+							return err
+						}
+						descriptors++
+					}
+					return nil
+				})
+		})
+	if err != nil {
+		// Error means either kv transaction error or stream send error.
+		// We don't care about transaction errors because cluster is might be in a
+		// crippled state, but we don't want to keep continue if client stream is
+		// closed.
+		if outStream.Context().Err() != nil {
+			return err
+		}
+		log.Infof(ctx, "failed to iterate all descriptors: %s", err)
+	}
+
+	// Stream local replica info from all nodes wrapping them in response stream.
+	return s.visitNodes(ctx,
+		replicaInfoStreamRetryOptions,
+		func(nodeID roachpb.NodeID, client serverpb.AdminClient) error {
+			log.Infof(ctx, "trying to get info from node n%d", nodeID)
+			nodeReplicas := 0
+			inStream, err := client.RecoveryCollectLocalReplicaInfo(ctx,
+				&serverpb.RecoveryCollectLocalReplicaInfoRequest{})
+			if err != nil {
+				return errors.Mark(errors.Wrapf(err,
+					"failed retrieving replicas from node n%d during fan-out", nodeID), errMarkRetry)
+			}
+			for {
+				r, err := inStream.Recv()
+				if err == io.EOF {
+					break
+				}
+				if s.forwardReplicaFilter != nil {
+					err = s.forwardReplicaFilter(r)
+				}
+				if err != nil {
+					// Some replicas were already sent back, need to notify client of stream
+					// restart.
+					if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{
+						Info: &serverpb.RecoveryCollectReplicaInfoResponse_NodeStreamRestarted{
+							NodeStreamRestarted: &serverpb.RecoveryCollectReplicaRestartNodeStream{
+								NodeID: nodeID,
+							},
+						},
+					}); err != nil {
+						return err
+					}
+					return errors.Mark(errors.Wrapf(err,
+						"failed retrieving replicas from node n%d during fan-out",
+						nodeID), errMarkRetry)
+				}
+				if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{
+					Info: &serverpb.RecoveryCollectReplicaInfoResponse_ReplicaInfo{
+						ReplicaInfo: r.ReplicaInfo,
+					},
+				}); err != nil {
+					return err
+				}
+				nodeReplicas++
+			}
+
+			replicas += nodeReplicas
+			nodes++
+			return nil
+		})
+}
+
+func makeVisitAvailableNodes(
+	g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context,
+) visitNodesFn {
+	return func(ctx context.Context, retryOpts retry.Options,
+		visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error,
+	) error {
+		collectNodeWithRetry := func(node roachpb.NodeDescriptor) error {
+			var err error
+			for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
+				log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt())
+				addr := node.AddressForLocality(loc)
+				conn, err := rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).Connect(ctx)
+				client := serverpb.NewAdminClient(conn)
+				// Nodes would contain dead nodes that we don't need to visit. We can skip
+				// them and let caller handle incomplete info.
+				if err != nil {
+					if grpcutil.IsConnectionUnavailable(err) {
+						return nil
+					}
+					// This was an initial heartbeat type error, we must retry as node seems
+					// live.
+					continue
+				}
+				err = visitor(node.NodeID, client)
+				if err == nil {
+					return nil
+				}
+				log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err)
+				if !IsRetryableError(err) {
+					// For non retryable errors abort immediately.
+					return err
+				}
+			}
+			return err
+		}
+
+		var nodes []roachpb.NodeDescriptor
+		if err := g.IterateInfos(gossip.KeyNodeDescPrefix, func(key string, i gossip.Info) error {
+			b, err := i.Value.GetBytes()
+			if err != nil {
+				return errors.Wrapf(err, "failed to get node gossip info for key %s", key)
+			}
+
+			var d roachpb.NodeDescriptor
+			if err := protoutil.Unmarshal(b, &d); err != nil {
+				return errors.Wrapf(err, "failed to unmarshal node gossip info for key %s", key)
+			}
+
+			// Don't use node descriptors with NodeID 0, because that's meant to
+			// indicate that the node has been removed from the cluster.
+			if d.NodeID != 0 {
+				nodes = append(nodes, d)
+			}
+
+			return nil
+		}); err != nil {
+			return err
+		}
+
+		for _, node := range nodes {
+			if err := collectNodeWithRetry(node); err != nil {
+				return err
+			}
+		}
+		return nil
+	}
+}
diff --git a/pkg/kv/kvserver/loqrecovery/server_test.go b/pkg/kv/kvserver/loqrecovery/server_test.go
new file mode 100644
index 000000000000..3e8a6c7a22fe
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/server_test.go
@@ -0,0 +1,182 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecovery_test
+
+import (
+	"context"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/cockroachdb/cockroach/pkg/base"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
+	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+	"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+	"github.com/cockroachdb/cockroach/pkg/util/leaktest"
+	"github.com/cockroachdb/cockroach/pkg/util/log"
+	"github.com/cockroachdb/errors"
+	"github.com/google/uuid"
+	"github.com/stretchr/testify/require"
+)
+
+type clusterInfoCounters struct {
+	nodes, stores, replicas, descriptors int
+}
+
+func TestReplicaCollection(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	ctx := context.Background()
+
+	tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{
+		ServerArgs: base.TestServerArgs{
+			StoreSpecs: []base.StoreSpec{{InMemory: true}},
+			Insecure:   true,
+			Knobs: base.TestingKnobs{
+				LOQRecovery: &loqrecovery.TestingKnobs{
+					MetadataScanTimeout: 15 * time.Second,
+				},
+			},
+		},
+	})
+	tc.Start(t)
+	defer tc.Stopper().Stop(ctx)
+	require.NoError(t, tc.WaitForFullReplication())
+	tc.ToggleReplicateQueues(false)
+
+	r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases")
+	var totalRanges int
+	require.NoError(t, r.Scan(&totalRanges), "failed to query range count")
+	adm, err := tc.GetAdminClient(ctx, t, 2)
+	require.NoError(t, err, "failed to get admin client")
+
+	// Collect and assert replica metadata. For expectMeta case we sometimes have
+	// meta and sometimes doesn't depending on which node holds the lease.
+	// We just ignore descriptor counts if we are not expecting meta.
+	assertReplicas := func(liveNodes int, expectMeta bool) {
+		var replicas loqrecoverypb.ClusterReplicaInfo
+		var stats loqrecovery.CollectionStats
+
+		replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
+		require.NoError(t, err, "failed to retrieve replica info")
+
+		// Check counters on retrieved replica info.
+		cnt := getInfoCounters(replicas)
+		require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores")
+		require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes")
+		if expectMeta {
+			require.Equal(t, totalRanges, cnt.descriptors,
+				"number of collected descriptors from metadata")
+		}
+		require.Equal(t, totalRanges*liveNodes, cnt.replicas, "number of collected replicas")
+		// Check stats counters as well.
+		require.Equal(t, liveNodes, stats.Nodes, "node counter stats")
+		require.Equal(t, liveNodes, stats.Stores, "store counter stats")
+		if expectMeta {
+			require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats")
+		}
+		require.NotEqual(t, replicas.ClusterID, uuid.UUID{}.String(), "cluster UUID must not be empty")
+	}
+
+	tc.StopServer(0)
+	assertReplicas(2, true)
+	tc.StopServer(1)
+	assertReplicas(1, false)
+
+	tc.Stopper().Stop(ctx)
+}
+
+// TestStreamRestart verifies that if connection is dropped mid way through
+// replica stream, it would be handled correctly with a stream restart that
+// allows caller to rewind back partial replica data and receive consistent
+// stream of replcia infos.
+func TestStreamRestart(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	ctx := context.Background()
+
+	var failCount atomic.Int64
+	tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{
+		ServerArgs: base.TestServerArgs{
+			StoreSpecs: []base.StoreSpec{{InMemory: true}},
+			Insecure:   true,
+			Knobs: base.TestingKnobs{
+				LOQRecovery: &loqrecovery.TestingKnobs{
+					MetadataScanTimeout: 15 * time.Second,
+					ForwardReplicaFilter: func(response *serverpb.RecoveryCollectLocalReplicaInfoResponse) error {
+						if response.ReplicaInfo.NodeID == 2 && response.ReplicaInfo.Desc.RangeID == 14 && failCount.Add(1) < 3 {
+							return errors.New("rpc stream stopped")
+						}
+						return nil
+					},
+				},
+			},
+		},
+	})
+	tc.Start(t)
+	defer tc.Stopper().Stop(ctx)
+	require.NoError(t, tc.WaitForFullReplication())
+	tc.ToggleReplicateQueues(false)
+
+	r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases")
+	var totalRanges int
+	require.NoError(t, r.Scan(&totalRanges), "failed to query range count")
+	adm, err := tc.GetAdminClient(ctx, t, 2)
+	require.NoError(t, err, "failed to get admin client")
+
+	assertReplicas := func(liveNodes int) {
+		var replicas loqrecoverypb.ClusterReplicaInfo
+		var stats loqrecovery.CollectionStats
+
+		replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
+		require.NoError(t, err, "failed to retrieve replica info")
+
+		// Check counters on retrieved replica info.
+		cnt := getInfoCounters(replicas)
+		require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores")
+		require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes")
+		require.Equal(t, totalRanges, cnt.descriptors,
+			"number of collected descriptors from metadata")
+		require.Equal(t, totalRanges*liveNodes, cnt.replicas,
+			"number of collected replicas")
+		// Check stats counters as well.
+		require.Equal(t, liveNodes, stats.Nodes, "node counter stats")
+		require.Equal(t, liveNodes, stats.Stores, "store counter stats")
+		require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats")
+	}
+
+	assertReplicas(3)
+
+	tc.Stopper().Stop(ctx)
+}
+
+func getInfoCounters(info loqrecoverypb.ClusterReplicaInfo) clusterInfoCounters {
+	stores := map[roachpb.StoreID]interface{}{}
+	nodes := map[roachpb.NodeID]interface{}{}
+	totalReplicas := 0
+	for _, nr := range info.LocalInfo {
+		for _, r := range nr.Replicas {
+			stores[r.StoreID] = struct{}{}
+			nodes[r.NodeID] = struct{}{}
+		}
+		totalReplicas += len(nr.Replicas)
+	}
+	return clusterInfoCounters{
+		nodes:       len(nodes),
+		stores:      len(stores),
+		replicas:    totalReplicas,
+		descriptors: len(info.Descriptors),
+	}
+}
diff --git a/pkg/kv/kvserver/loqrecovery/testing_knobs.go b/pkg/kv/kvserver/loqrecovery/testing_knobs.go
new file mode 100644
index 000000000000..34bca894132d
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/testing_knobs.go
@@ -0,0 +1,30 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecovery
+
+import (
+	"time"
+
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+)
+
+type TestingKnobs struct {
+	// MetadataScanTimeout controls how long loss of quorum recovery service will
+	// wait for range metadata. Useful to speed up tests verifying collection
+	// behaviors when meta ranges are unavailable.
+	MetadataScanTimeout time.Duration
+
+	// Replica filter for forwarded replica info when collecting fan-out data.
+	ForwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error
+}
+
+// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
+func (*TestingKnobs) ModuleTestingKnobs() {}
diff --git a/pkg/server/admin.go b/pkg/server/admin.go
index fed62e9dac92..d80f1d701702 100644
--- a/pkg/server/admin.go
+++ b/pkg/server/admin.go
@@ -3219,6 +3219,54 @@ func (s *systemAdminServer) SendKVBatch(
 	return br, nil
 }
 
+func (s *systemAdminServer) RecoveryCollectReplicaInfo(
+	request *serverpb.RecoveryCollectReplicaInfoRequest,
+	stream serverpb.Admin_RecoveryCollectReplicaInfoServer,
+) error {
+	ctx := stream.Context()
+	ctx = s.server.AnnotateCtx(ctx)
+	_, err := s.requireAdminUser(ctx)
+	if err != nil {
+		return err
+	}
+	log.Ops.Info(ctx, "streaming cluster replica recovery info")
+
+	return s.server.recoveryServer.ServeClusterReplicas(ctx, request, stream, s.server.db)
+}
+
+func (s *systemAdminServer) RecoveryCollectLocalReplicaInfo(
+	request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
+	stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
+) error {
+	ctx := stream.Context()
+	ctx = s.server.AnnotateCtx(ctx)
+	_, err := s.requireAdminUser(ctx)
+	if err != nil {
+		return err
+	}
+
+	log.Ops.Info(ctx, "streaming local replica recovery info")
+	return s.server.recoveryServer.ServeLocalReplicas(ctx, request, stream)
+}
+
+func (s *systemAdminServer) RecoveryStagePlan(
+	ctx context.Context, request *serverpb.RecoveryStagePlanRequest,
+) (*serverpb.RecoveryStagePlanResponse, error) {
+	return nil, errors.AssertionFailedf("To be implemented by #93044")
+}
+
+func (s *systemAdminServer) RecoveryNodeStatus(
+	ctx context.Context, request *serverpb.RecoveryNodeStatusRequest,
+) (*serverpb.RecoveryNodeStatusResponse, error) {
+	return nil, errors.AssertionFailedf("To be implemented by #93043")
+}
+
+func (s *systemAdminServer) RecoveryVerify(
+	ctx context.Context, request *serverpb.RecoveryVerifyRequest,
+) (*serverpb.RecoveryVerifyResponse, error) {
+	return nil, errors.AssertionFailedf("To be implemented by #93043")
+}
+
 // sqlQuery allows you to incrementally build a SQL query that uses
 // placeholders. Instead of specific placeholders like $1, you instead use the
 // temporary placeholder $.
@@ -4061,35 +4109,3 @@ func (s *adminServer) SetTraceRecordingType(
 	})
 	return &serverpb.SetTraceRecordingTypeResponse{}, nil
 }
-
-func (s *adminServer) RecoveryCollectReplicaInfo(
-	request *serverpb.RecoveryCollectReplicaInfoRequest,
-	server serverpb.Admin_RecoveryCollectReplicaInfoServer,
-) error {
-	return errors.AssertionFailedf("To be implemented by #93040")
-}
-
-func (s *adminServer) RecoveryCollectLocalReplicaInfo(
-	request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
-	server serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
-) error {
-	return errors.AssertionFailedf("To be implemented by #93040")
-}
-
-func (s *adminServer) RecoveryStagePlan(
-	ctx context.Context, request *serverpb.RecoveryStagePlanRequest,
-) (*serverpb.RecoveryStagePlanResponse, error) {
-	return nil, errors.AssertionFailedf("To be implemented by #93044")
-}
-
-func (s *adminServer) RecoveryNodeStatus(
-	ctx context.Context, request *serverpb.RecoveryNodeStatusRequest,
-) (*serverpb.RecoveryNodeStatusResponse, error) {
-	return nil, errors.AssertionFailedf("To be implemented by #93043")
-}
-
-func (s *adminServer) RecoveryVerify(
-	ctx context.Context, request *serverpb.RecoveryVerifyRequest,
-) (*serverpb.RecoveryVerifyResponse, error) {
-	return nil, errors.AssertionFailedf("To be implemented by #93043")
-}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index ecc512bd9893..b1291197059f 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -39,6 +39,7 @@ import (
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile"
@@ -145,10 +146,11 @@ type Server struct {
 	tsServer        *ts.Server
 	// The Observability Server, used by the Observability Service to subscribe to
 	// CRDB data.
-	eventsServer  *obs.EventsServer
-	raftTransport *kvserver.RaftTransport
-	stopper       *stop.Stopper
-	stopTrigger   *stopTrigger
+	eventsServer   *obs.EventsServer
+	recoveryServer *loqrecovery.Server
+	raftTransport  *kvserver.RaftTransport
+	stopper        *stop.Stopper
+	stopTrigger    *stopTrigger
 
 	debug    *debug.Server
 	kvProber *kvprober.Prober
@@ -1017,6 +1019,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
 		},
 	)
 
+	recoveryServer := loqrecovery.NewServer(stores, g, cfg.Locality, rpcContext, cfg.TestingKnobs.LOQRecovery)
+
 	*lateBoundServer = Server{
 		nodeIDContainer:        nodeIDContainer,
 		cfg:                    cfg,
@@ -1050,6 +1054,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
 		tsDB:                   tsDB,
 		tsServer:               &sTS,
 		eventsServer:           eventsServer,
+		recoveryServer:         recoveryServer,
 		raftTransport:          raftTransport,
 		stopper:                stopper,
 		stopTrigger:            stopTrigger,
diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto
index 457ebd603ff3..b6c7f77ade7c 100644
--- a/pkg/server/serverpb/admin.proto
+++ b/pkg/server/serverpb/admin.proto
@@ -837,10 +837,23 @@ message CertBundleResponse {
 
 message RecoveryCollectReplicaInfoRequest {}
 
+// RecoveryCollectReplicaRestartNodeStream is sent by collector node to client
+// if it experiences a transient failure collecting data from one of the nodes.
+// This message instructs client to drop any data that it collected locally
+// for specified node as streaming for this node would be restarted.
+// This mechanism is needed to avoid restarting the whole collection procedure
+// in large cluster if one of the nodes fails transiently.
+message RecoveryCollectReplicaRestartNodeStream {
+  int32 node_id = 1 [
+    (gogoproto.customname) = "NodeID",
+    (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"];
+}
+
 message RecoveryCollectReplicaInfoResponse {
   oneof info {
     roachpb.RangeDescriptor range_descriptor = 1;
     cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo replica_info = 2;
+    RecoveryCollectReplicaRestartNodeStream node_stream_restarted = 3;
   }
 }
 
diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog
index d1146a7033c0..5a90ef16b276 100644
--- a/pkg/sql/logictest/testdata/logic_test/pg_catalog
+++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog
@@ -2367,6 +2367,15 @@ json_extract_path  25           2         3802        3802 25      {i,v}
 
 user root
 
+query TT rowsort
+SELECT DISTINCT proname, prokind FROM pg_catalog.pg_proc
+WHERE proname IN ('lag', 'abs', 'max')
+----
+abs  f
+lag  w
+max  a
+
+
 ## pg_catalog.pg_range
 query IIIIII colnames
 SELECT * from pg_catalog.pg_range
diff --git a/pkg/sql/logictest/testdata/logic_test/udf b/pkg/sql/logictest/testdata/logic_test/udf
index 2e142682d463..eca6514c9b34 100644
--- a/pkg/sql/logictest/testdata/logic_test/udf
+++ b/pkg/sql/logictest/testdata/logic_test/udf
@@ -166,13 +166,13 @@ CREATE SCHEMA sc;
 statement
 CREATE FUNCTION sc.proc_f_2(STRING) RETURNS STRING LANGUAGE SQL AS $$ SELECT 'hello' $$;
 
-query TTTTTBBBTITTTTT
-SELECT oid, proname, pronamespace, proowner, prolang, proleakproof, proisstrict, proretset, provolatile, pronargs, prorettype, proargtypes, proargmodes, proargnames, prosrc
+query TTTTTBBBTITTTTTT
+SELECT oid, proname, pronamespace, proowner, prolang, proleakproof, proisstrict, proretset, provolatile, pronargs, prorettype, proargtypes, proargmodes, proargnames, prokind, prosrc
 FROM pg_catalog.pg_proc WHERE proname IN ('proc_f', 'proc_f_2');
 ----
-100118  proc_f    105  1546506610  14  false  false  false  v  1  20  20     {i}    NULL    SELECT 1;
-100119  proc_f    105  1546506610  14  true   true   false  i  2  25  25 20  {i,i}  {"",b}  SELECT 'hello';
-100121  proc_f_2  120  1546506610  14  false  false  false  v  1  25  25     {i}    NULL    SELECT 'hello';
+100118  proc_f    105  1546506610  14  false  false  false  v  1  20  20     {i}    NULL    f  SELECT 1;
+100119  proc_f    105  1546506610  14  true   true   false  i  2  25  25 20  {i,i}  {"",b}  f  SELECT 'hello';
+100121  proc_f_2  120  1546506610  14  false  false  false  v  1  25  25     {i}    NULL    f  SELECT 'hello';
 
 # Ensure that the pg_proc virtual index works properly.
 
diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go
index ed03aac739ad..6a3dbb31c14b 100644
--- a/pkg/sql/pg_catalog.go
+++ b/pkg/sql/pg_catalog.go
@@ -2339,6 +2339,16 @@ func addPgProcBuiltinRow(name string, addRow func(...tree.Datum) error) error {
 		name = name[len(crdbInternal):]
 	}
 
+	var kind tree.Datum
+	switch {
+	case isAggregate:
+		kind = tree.NewDString("a")
+	case isWindow:
+		kind = tree.NewDString("w")
+	default:
+		kind = tree.NewDString("f")
+	}
+
 	for _, builtin := range overloads {
 		dName := tree.NewDName(name)
 		dSrc := tree.NewDString(name)
@@ -2435,8 +2445,8 @@ func addPgProcBuiltinRow(name string, addRow func(...tree.Datum) error) error {
 			tree.DNull,                                      // probin
 			tree.DNull,                                      // proconfig
 			tree.DNull,                                      // proacl
+			kind,                                            // prokind
 			// These columns were automatically created by pg_catalog_test's missing column generator.
-			tree.DNull, // prokind
 			tree.DNull, // prosupport
 		)
 		if err != nil {
@@ -2511,8 +2521,8 @@ func addPgProcUDFRow(
 		tree.DNull,                                       // probin
 		tree.DNull,                                       // proconfig
 		tree.DNull,                                       // proacl
+		tree.NewDString("f"),                             // prokind
 		// These columns were automatically created by pg_catalog_test's missing column generator.
-		tree.DNull, // prokind
 		tree.DNull, // prosupport
 	)
 }
diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go
index 1dadd424c5a9..a81b1f884ce9 100644
--- a/pkg/util/grpcutil/grpc_util.go
+++ b/pkg/util/grpcutil/grpc_util.go
@@ -54,6 +54,15 @@ func IsTimeout(err error) bool {
 	return false
 }
 
+// IsConnectionUnavailable checks if grpc code is codes.Unavailable which is
+// set when we are not able to establish connection to remote node.
+func IsConnectionUnavailable(err error) bool {
+	if s, ok := status.FromError(errors.UnwrapAll(err)); ok {
+		return s.Code() == codes.Unavailable
+	}
+	return false
+}
+
 // IsContextCanceled returns true if err's Cause is an error produced by gRPC
 // on context cancellation.
 func IsContextCanceled(err error) bool {