From 76fc53336cc8798ac68fbb37be32d7808c791b57 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Tue, 6 Dec 2022 20:29:15 +0000 Subject: [PATCH 1/2] loqrecovery: add replica info collection to admin server This commit adds replica info and range descriptor collection from partially available cluster that lost quorum on some ranges. Collection is done using AdminServer calls. Cluster wide call performs fan-out using node info from gossip. Local replica info collection on nodes is done by scanning storages. Release note (cli change): File format used for transient loss of quorum recovery files has changed. It is not possible to use replica info files generated by earlier versions to be used with current and future versions. --- docs/generated/http/full.md | 19 ++ pkg/BUILD.bazel | 2 + pkg/base/testing_knobs.go | 1 + pkg/cli/cliflags/flags.go | 3 +- pkg/cli/debug_recover_loss_of_quorum.go | 113 ++++++-- pkg/cli/debug_recover_loss_of_quorum_test.go | 126 ++++++-- pkg/cli/flags.go | 1 + pkg/kv/kvserver/loqrecovery/BUILD.bazel | 14 + pkg/kv/kvserver/loqrecovery/collect.go | 180 +++++++++--- .../loqrecovery/loqrecoverypb/BUILD.bazel | 14 +- .../loqrecovery/loqrecoverypb/recovery.go | 47 +++ .../loqrecovery/loqrecoverypb/recovery.proto | 21 +- .../loqrecoverypb/recovery_test.go | 172 +++++++++++ .../kvserver/loqrecovery/recovery_env_test.go | 18 +- pkg/kv/kvserver/loqrecovery/server.go | 273 ++++++++++++++++++ pkg/kv/kvserver/loqrecovery/server_test.go | 182 ++++++++++++ pkg/kv/kvserver/loqrecovery/testing_knobs.go | 30 ++ pkg/server/admin.go | 80 +++-- pkg/server/server.go | 13 +- pkg/server/serverpb/admin.proto | 13 + pkg/util/grpcutil/grpc_util.go | 9 + 21 files changed, 1181 insertions(+), 150 deletions(-) create mode 100644 pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go create mode 100644 pkg/kv/kvserver/loqrecovery/server.go create mode 100644 pkg/kv/kvserver/loqrecovery/server_test.go create mode 100644 pkg/kv/kvserver/loqrecovery/testing_knobs.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 4c62e1db9484..8ecfc470491a 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7239,12 +7239,31 @@ Support status: [reserved](#support-status) | ----- | ---- | ----- | ----------- | -------------- | | range_descriptor | [cockroach.roachpb.RangeDescriptor](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.roachpb.RangeDescriptor) | | | [reserved](#support-status) | | replica_info | [cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo) | | | [reserved](#support-status) | +| node_stream_restarted | [RecoveryCollectReplicaRestartNodeStream](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-cockroach.server.serverpb.RecoveryCollectReplicaRestartNodeStream) | | | [reserved](#support-status) | + +#### RecoveryCollectReplicaRestartNodeStream + +RecoveryCollectReplicaRestartNodeStream is sent by collector node to client +if it experiences a transient failure collecting data from one of the nodes. +This message instructs client to drop any data that it collected locally +for specified node as streaming for this node would be restarted. +This mechanism is needed to avoid restarting the whole collection procedure +in large cluster if one of the nodes fails transiently. + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| node_id | [int32](#cockroach.server.serverpb.RecoveryCollectReplicaInfoResponse-int32) | | | [reserved](#support-status) | + + + + + ## RecoveryCollectLocalReplicaInfo diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 4668bd99d87d..9b408a6fbd60 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -203,6 +203,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/liveness:liveness_test", "//pkg/kv/kvserver/logstore:logstore_test", + "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test", "//pkg/kv/kvserver/loqrecovery:loqrecovery_test", "//pkg/kv/kvserver/multiqueue:multiqueue_test", "//pkg/kv/kvserver/protectedts/ptcache:ptcache_test", @@ -1200,6 +1201,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/logstore:logstore", "//pkg/kv/kvserver/logstore:logstore_test", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb", + "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test", "//pkg/kv/kvserver/loqrecovery:loqrecovery", "//pkg/kv/kvserver/loqrecovery:loqrecovery_test", "//pkg/kv/kvserver/multiqueue:multiqueue", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index ec3fc5dd9cef..ebafe1792d38 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -53,4 +53,5 @@ type TestingKnobs struct { ExternalConnection ModuleTestingKnobs EventExporter ModuleTestingKnobs EventLog ModuleTestingKnobs + LOQRecovery ModuleTestingKnobs } diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index a84bac95810f..779a1e95090b 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -1822,8 +1822,7 @@ See start --help for more flag details and examples. } ConfirmActions = FlagInfo{ - Name: "confirm", - Shorthand: "p", + Name: "confirm", Description: ` Confirm action:
diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go
index f627ca144ad1..85812d9abc24 100644
--- a/pkg/cli/debug_recover_loss_of_quorum.go
+++ b/pkg/cli/debug_recover_loss_of_quorum.go
@@ -165,6 +165,13 @@ Now the cluster could be started again.
 	RunE: UsageAndErr,
 }
 
+var recoverCommands = []*cobra.Command{
+	debugRecoverCollectInfoCmd,
+	debugRecoverPlanCmd,
+	//debugRecoverStagePlan,
+	//debugRecoverVerify,
+}
+
 func init() {
 	debugRecoverCmd.AddCommand(
 		debugRecoverCollectInfoCmd,
@@ -174,18 +181,28 @@ func init() {
 
 var debugRecoverCollectInfoCmd = &cobra.Command{
 	Use:   "collect-info [destination-file]",
-	Short: "collect replica information from the given stores",
+	Short: "collect replica information from a cluster",
 	Long: `
-Collect information about replicas by reading data from underlying stores. Store
-locations must be provided using --store flags.
+Collect information about replicas in the cluster.
+
+The command can collect data from an online or an offline cluster.
+
+In the first case, the address of a single healthy cluster node must be provided
+using the --host flag. This designated node will handle collection of data from
+all surviving nodes.
+
+In the second case data is read directly from local stores on each node.
+CockroachDB must not be running on any node. The location of each store must be
+provided using the --store flag. The command must be executed for all surviving
+stores.
+
+Multiple store locations can be provided to the command to collect all info
+from all stores on a node at once. It is also possible to call it per store, in
+that case all resulting files should be fed to the plan subcommand.
 
 Collected information is written to a destination file if file name is provided,
 or to stdout.
 
-Multiple store locations could be provided to the command to collect all info from
-node at once. It is also possible to call it per store, in that case all resulting
-files should be fed to plan subcommand.
-
 See debug recover command help for more details on how to use this command.
 `,
 	Args: cobra.MaximumNArgs(1),
@@ -197,28 +214,49 @@ var debugRecoverCollectInfoOpts struct {
 }
 
 func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
+	// We must have cancellable context here to obtain grpc client connection.
+	ctx, cancel := context.WithCancel(cmd.Context())
+	defer cancel()
 	stopper := stop.NewStopper()
-	defer stopper.Stop(cmd.Context())
+	defer stopper.Stop(ctx)
+
+	var replicaInfo loqrecoverypb.ClusterReplicaInfo
+	var stats loqrecovery.CollectionStats
 
-	var stores []storage.Engine
-	for _, storeSpec := range debugRecoverCollectInfoOpts.Stores.Specs {
-		db, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist, storage.ReadOnly)
+	if len(debugRecoverCollectInfoOpts.Stores.Specs) == 0 {
+		c, finish, err := getAdminClient(ctx, serverCfg)
 		if err != nil {
-			return errors.Wrapf(err, "failed to open store at path %q, ensure that store path is "+
-				"correct and that it is not used by another process", storeSpec.Path)
+			return errors.Wrapf(err, "failed to get admin connection to cluster")
+		}
+		defer finish()
+		replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c)
+		if err != nil {
+			return errors.WithHint(errors.Wrap(err,
+				"failed to retrieve replica info from cluster"),
+				"Check cluster health and retry the operation.")
+		}
+	} else {
+		var stores []storage.Engine
+		for _, storeSpec := range debugRecoverCollectInfoOpts.Stores.Specs {
+			db, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist, storage.ReadOnly)
+			if err != nil {
+				return errors.WithHint(errors.Wrapf(err,
+					"failed to open store at path %q", storeSpec.Path),
+					"Ensure that store path is correct and that it is not used by another process.")
+			}
+			stores = append(stores, db)
+		}
+		var err error
+		replicaInfo, stats, err = loqrecovery.CollectStoresReplicaInfo(ctx, stores)
+		if err != nil {
+			return errors.Wrapf(err, "failed to collect replica info from local stores")
 		}
-		stores = append(stores, db)
-	}
-
-	replicaInfo, err := loqrecovery.CollectReplicaInfo(cmd.Context(), stores)
-	if err != nil {
-		return err
 	}
 
 	var writer io.Writer = os.Stdout
 	if len(args) > 0 {
 		filename := args[0]
-		if _, err = os.Stat(filename); err == nil {
+		if _, err := os.Stat(filename); err == nil {
 			return errors.Newf("file %q already exists", filename)
 		}
 
@@ -230,14 +268,20 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
 		writer = outFile
 	}
 	jsonpb := protoutil.JSONPb{Indent: "  "}
-	var out []byte
-	if out, err = jsonpb.Marshal(replicaInfo); err != nil {
+	out, err := jsonpb.Marshal(&replicaInfo)
+	if err != nil {
 		return errors.Wrap(err, "failed to marshal collected replica info")
 	}
-	if _, err = writer.Write(out); err != nil {
+	if _, err := writer.Write(out); err != nil {
 		return errors.Wrap(err, "failed to write collected replica info")
 	}
-	_, _ = fmt.Fprintf(stderr, "Collected info about %d replicas.\n", len(replicaInfo.Replicas))
+	_, _ = fmt.Fprintf(stderr, `Collected recovery info from:
+nodes             %d
+stores            %d
+Collected info:
+replicas          %d
+range descriptors %d
+`, stats.Nodes, stats.Stores, replicaInfo.ReplicaCount(), stats.Descriptors)
 	return nil
 }
 
@@ -278,7 +322,7 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
 		deadStoreIDs = append(deadStoreIDs, roachpb.StoreID(id))
 	}
 
-	plan, report, err := loqrecovery.PlanReplicas(cmd.Context(), replicas, deadStoreIDs)
+	plan, report, err := loqrecovery.PlanReplicas(cmd.Context(), replicas.LocalInfo, deadStoreIDs)
 	if err != nil {
 		return err
 	}
@@ -377,7 +421,7 @@ Discarded live replicas: %d
 
 	jsonpb := protoutil.JSONPb{Indent: "  "}
 	var out []byte
-	if out, err = jsonpb.Marshal(plan); err != nil {
+	if out, err = jsonpb.Marshal(&plan); err != nil {
 		return errors.Wrap(err, "failed to marshal recovery plan")
 	}
 	if _, err = writer.Write(out); err != nil {
@@ -393,20 +437,25 @@ Discarded live replicas: %d
 	return nil
 }
 
-func readReplicaInfoData(fileNames []string) ([]loqrecoverypb.NodeReplicaInfo, error) {
-	var replicas []loqrecoverypb.NodeReplicaInfo
+func readReplicaInfoData(fileNames []string) (loqrecoverypb.ClusterReplicaInfo, error) {
+	var replicas loqrecoverypb.ClusterReplicaInfo
 	for _, filename := range fileNames {
 		data, err := os.ReadFile(filename)
 		if err != nil {
-			return nil, errors.Wrapf(err, "failed to read replica info file %q", filename)
+			return loqrecoverypb.ClusterReplicaInfo{}, errors.Wrapf(err, "failed to read replica info file %q", filename)
 		}
 
-		var nodeReplicas loqrecoverypb.NodeReplicaInfo
+		var nodeReplicas loqrecoverypb.ClusterReplicaInfo
 		jsonpb := protoutil.JSONPb{}
 		if err = jsonpb.Unmarshal(data, &nodeReplicas); err != nil {
-			return nil, errors.Wrapf(err, "failed to unmarshal replica info from file %q", filename)
+			return loqrecoverypb.ClusterReplicaInfo{}, errors.WithHint(errors.Wrapf(err,
+				"failed to unmarshal replica info from file %q", filename),
+				"Ensure that replica info file is generated with the same binary version and file is not corrupted.")
+		}
+		if err = replicas.Merge(nodeReplicas); err != nil {
+			return loqrecoverypb.ClusterReplicaInfo{}, errors.Wrapf(err,
+				"failed to merge replica info from file %q", filename)
 		}
-		replicas = append(replicas, nodeReplicas)
 	}
 	return replicas, nil
 }
diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go
index d69040b3f61c..3e32fe099fde 100644
--- a/pkg/cli/debug_recover_loss_of_quorum_test.go
+++ b/pkg/cli/debug_recover_loss_of_quorum_test.go
@@ -78,12 +78,73 @@ func TestCollectInfoFromMultipleStores(t *testing.T) {
 	replicas, err := readReplicaInfoData([]string{replicaInfoFileName})
 	require.NoError(t, err, "failed to read generated replica info")
 	stores := map[roachpb.StoreID]interface{}{}
-	for _, r := range replicas[0].Replicas {
+	for _, r := range replicas.LocalInfo[0].Replicas {
 		stores[r.StoreID] = struct{}{}
 	}
 	require.Equal(t, 2, len(stores), "collected replicas from stores")
 }
 
+// TestCollectInfoFromOnlineCluster verifies that given a test cluster with
+// one stopped node, we can collect replica info and metadata from remaining
+// nodes using an admin recovery call.
+func TestCollectInfoFromOnlineCluster(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	ctx := context.Background()
+	dir, cleanupFn := testutils.TempDir(t)
+	defer cleanupFn()
+
+	c := NewCLITest(TestCLIParams{
+		NoServer: true,
+	})
+	defer c.Cleanup()
+
+	tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{
+		ServerArgs: base.TestServerArgs{
+			StoreSpecs: []base.StoreSpec{{InMemory: true}},
+			Insecure:   true,
+		},
+	})
+	tc.Start(t)
+	defer tc.Stopper().Stop(ctx)
+	require.NoError(t, tc.WaitForFullReplication())
+	tc.ToggleReplicateQueues(false)
+
+	r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases")
+	var totalRanges int
+	require.NoError(t, r.Scan(&totalRanges), "failed to query range count")
+
+	tc.StopServer(0)
+	replicaInfoFileName := dir + "/all-nodes.json"
+
+	c.RunWithArgs([]string{
+		"debug",
+		"recover",
+		"collect-info",
+		"--insecure",
+		"--host",
+		tc.Server(2).ServingRPCAddr(),
+		replicaInfoFileName,
+	})
+
+	replicas, err := readReplicaInfoData([]string{replicaInfoFileName})
+	require.NoError(t, err, "failed to read generated replica info")
+	stores := map[roachpb.StoreID]interface{}{}
+	totalReplicas := 0
+	for _, li := range replicas.LocalInfo {
+		for _, r := range li.Replicas {
+			stores[r.StoreID] = struct{}{}
+		}
+		totalReplicas += len(li.Replicas)
+	}
+	require.Equal(t, 2, len(stores), "collected replicas from stores")
+	require.Equal(t, 2, len(replicas.LocalInfo), "collected info is not split by node")
+	require.Equal(t, totalRanges*2, totalReplicas, "number of collected replicas")
+	require.Equal(t, totalRanges, len(replicas.Descriptors),
+		"number of collected descriptors from metadata")
+}
+
 // TestLossOfQuorumRecovery performs a sanity check on end to end recovery workflow.
 // This test doesn't try to validate all possible test cases, but instead check that
 // artifacts are correctly produced and overall cluster recovery could be performed
@@ -273,43 +334,48 @@ func createIntentOnRangeDescriptor(
 func TestJsonSerialization(t *testing.T) {
 	defer leaktest.AfterTest(t)()
 
-	nr := loqrecoverypb.NodeReplicaInfo{
-		Replicas: []loqrecoverypb.ReplicaInfo{
+	nr := loqrecoverypb.ClusterReplicaInfo{
+		ClusterID: "id1",
+		LocalInfo: []loqrecoverypb.NodeReplicaInfo{
 			{
-				NodeID:  1,
-				StoreID: 2,
-				Desc: roachpb.RangeDescriptor{
-					RangeID:  3,
-					StartKey: roachpb.RKey(keys.MetaMin),
-					EndKey:   roachpb.RKey(keys.MetaMax),
-					InternalReplicas: []roachpb.ReplicaDescriptor{
-						{
-							NodeID:    1,
-							StoreID:   2,
-							ReplicaID: 3,
-							Type:      roachpb.VOTER_INCOMING,
-						},
-					},
-					NextReplicaID: 4,
-					Generation:    7,
-				},
-				RaftAppliedIndex:   13,
-				RaftCommittedIndex: 19,
-				RaftLogDescriptorChanges: []loqrecoverypb.DescriptorChangeInfo{
+				Replicas: []loqrecoverypb.ReplicaInfo{
 					{
-						ChangeType: 1,
-						Desc:       &roachpb.RangeDescriptor{},
-						OtherDesc:  &roachpb.RangeDescriptor{},
+						NodeID:  1,
+						StoreID: 2,
+						Desc: roachpb.RangeDescriptor{
+							RangeID:  3,
+							StartKey: roachpb.RKey(keys.MetaMin),
+							EndKey:   roachpb.RKey(keys.MetaMax),
+							InternalReplicas: []roachpb.ReplicaDescriptor{
+								{
+									NodeID:    1,
+									StoreID:   2,
+									ReplicaID: 3,
+									Type:      roachpb.VOTER_INCOMING,
+								},
+							},
+							NextReplicaID: 4,
+							Generation:    7,
+						},
+						RaftAppliedIndex:   13,
+						RaftCommittedIndex: 19,
+						RaftLogDescriptorChanges: []loqrecoverypb.DescriptorChangeInfo{
+							{
+								ChangeType: 1,
+								Desc:       &roachpb.RangeDescriptor{},
+								OtherDesc:  &roachpb.RangeDescriptor{},
+							},
+						},
 					},
 				},
 			},
 		},
 	}
 	jsonpb := protoutil.JSONPb{Indent: "  "}
-	data, err := jsonpb.Marshal(nr)
+	data, err := jsonpb.Marshal(&nr)
 	require.NoError(t, err)
 
-	var nrFromJSON loqrecoverypb.NodeReplicaInfo
-	require.NoError(t, jsonpb.Unmarshal(data, &nrFromJSON))
-	require.Equal(t, nr, nrFromJSON, "objects before and after serialization")
+	var crFromJSON loqrecoverypb.ClusterReplicaInfo
+	require.NoError(t, jsonpb.Unmarshal(data, &crFromJSON))
+	require.Equal(t, nr, crFromJSON, "objects before and after serialization")
 }
diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go
index b242d2095a01..04cc68c36c38 100644
--- a/pkg/cli/flags.go
+++ b/pkg/cli/flags.go
@@ -618,6 +618,7 @@ func init() {
 	clientCmds = append(clientCmds, userFileCmds...)
 	clientCmds = append(clientCmds, stmtDiagCmds...)
 	clientCmds = append(clientCmds, debugResetQuorumCmd)
+	clientCmds = append(clientCmds, recoverCommands...)
 	for _, cmd := range clientCmds {
 		clientflags.AddBaseFlags(cmd, &cliCtx.clientOpts, &baseCfg.Insecure, &baseCfg.SSLCertsDir)
 
diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel
index efae010fa4e2..ffe773651e41 100644
--- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel
+++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel
@@ -8,22 +8,33 @@ go_library(
         "collect.go",
         "plan.go",
         "record.go",
+        "server.go",
+        "testing_knobs.go",
         "utils.go",
     ],
     importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery",
     visibility = ["//visibility:public"],
     deps = [
+        "//pkg/base",
+        "//pkg/gossip",
         "//pkg/keys",
+        "//pkg/kv",
+        "//pkg/kv/kvserver",
         "//pkg/kv/kvserver/kvserverpb",
         "//pkg/kv/kvserver/kvstorage",
         "//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
         "//pkg/kv/kvserver/raftlog",
         "//pkg/kv/kvserver/stateloader",
         "//pkg/roachpb",
+        "//pkg/rpc",
+        "//pkg/server/serverpb",
         "//pkg/storage",
+        "//pkg/util/contextutil",
+        "//pkg/util/grpcutil",
         "//pkg/util/hlc",
         "//pkg/util/log",
         "//pkg/util/protoutil",
+        "//pkg/util/retry",
         "//pkg/util/timeutil",
         "//pkg/util/uuid",
         "@com_github_cockroachdb_errors//:errors",
@@ -39,6 +50,7 @@ go_test(
         "record_test.go",
         "recovery_env_test.go",
         "recovery_test.go",
+        "server_test.go",
     ],
     args = ["-test.timeout=295s"],
     data = glob(["testdata/**"]),
@@ -57,6 +69,7 @@ go_test(
         "//pkg/security/securityassets",
         "//pkg/security/securitytest",
         "//pkg/server",
+        "//pkg/server/serverpb",
         "//pkg/storage",
         "//pkg/storage/enginepb",
         "//pkg/testutils",
@@ -73,6 +86,7 @@ go_test(
         "//pkg/util/uuid",
         "@com_github_cockroachdb_datadriven//:datadriven",
         "@com_github_cockroachdb_errors//:errors",
+        "@com_github_google_uuid//:uuid",
         "@com_github_stretchr_testify//require",
         "@in_gopkg_yaml_v2//:yaml_v2",
         "@io_etcd_go_raft_v3//raftpb",
diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go
index 4ba2a87b6618..6bac3c4e5146 100644
--- a/pkg/kv/kvserver/loqrecovery/collect.go
+++ b/pkg/kv/kvserver/loqrecovery/collect.go
@@ -12,6 +12,7 @@ package loqrecovery
 
 import (
 	"context"
+	"io"
 	"math"
 
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
@@ -19,61 +20,158 @@ import (
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
 	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
 	"github.com/cockroachdb/cockroach/pkg/storage"
+	"github.com/cockroachdb/cockroach/pkg/util/uuid"
 	"github.com/cockroachdb/errors"
 	"go.etcd.io/raft/v3/raftpb"
 )
 
-// CollectReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
-func CollectReplicaInfo(
+type CollectionStats struct {
+	Nodes       int
+	Stores      int
+	Descriptors int
+}
+
+func CollectRemoteReplicaInfo(
+	ctx context.Context, c serverpb.AdminClient,
+) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
+	cInfo, err := c.Cluster(ctx, &serverpb.ClusterRequest{})
+	if err != nil {
+		return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+	}
+	cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{})
+	if err != nil {
+		return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+	}
+	stores := make(map[roachpb.StoreID]struct{})
+	nodes := make(map[roachpb.NodeID]struct{})
+	var descriptors []roachpb.RangeDescriptor
+	var clusterReplInfo []loqrecoverypb.NodeReplicaInfo
+	var nodeReplicas []loqrecoverypb.ReplicaInfo
+	var currentNode roachpb.NodeID
+	for {
+		info, err := cc.Recv()
+		if err != nil {
+			if errors.Is(err, io.EOF) {
+				if len(nodeReplicas) > 0 {
+					clusterReplInfo = append(clusterReplInfo, loqrecoverypb.NodeReplicaInfo{Replicas: nodeReplicas})
+				}
+				break
+			}
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+		}
+		if r := info.GetReplicaInfo(); r != nil {
+			if currentNode != r.NodeID {
+				currentNode = r.NodeID
+				if len(nodeReplicas) > 0 {
+					clusterReplInfo = append(clusterReplInfo, loqrecoverypb.NodeReplicaInfo{Replicas: nodeReplicas})
+					nodeReplicas = nil
+				}
+			}
+			nodeReplicas = append(nodeReplicas, *r)
+			stores[r.StoreID] = struct{}{}
+			nodes[r.NodeID] = struct{}{}
+		} else if d := info.GetRangeDescriptor(); d != nil {
+			descriptors = append(descriptors, *d)
+		} else if s := info.GetNodeStreamRestarted(); s != nil {
+			// If server had to restart a fan-out work because of error and retried,
+			// then we discard partial data for the node.
+			if s.NodeID == currentNode {
+				nodeReplicas = nil
+			}
+		}
+	}
+	return loqrecoverypb.ClusterReplicaInfo{
+			ClusterID:   cInfo.ClusterID,
+			Descriptors: descriptors,
+			LocalInfo:   clusterReplInfo,
+		}, CollectionStats{
+			Nodes:       len(nodes),
+			Stores:      len(stores),
+			Descriptors: len(descriptors),
+		}, nil
+}
+
+// CollectStoresReplicaInfo captures states of all replicas in all stores for the sake of quorum recovery.
+func CollectStoresReplicaInfo(
 	ctx context.Context, stores []storage.Engine,
-) (loqrecoverypb.NodeReplicaInfo, error) {
+) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
 	if len(stores) == 0 {
-		return loqrecoverypb.NodeReplicaInfo{}, errors.New("no stores were provided for info collection")
+		return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("no stores were provided for info collection")
 	}
-
+	var clusterUUID uuid.UUID
+	nodes := make(map[roachpb.NodeID]struct{})
 	var replicas []loqrecoverypb.ReplicaInfo
-	for _, reader := range stores {
-		storeIdent, err := kvstorage.ReadStoreIdent(ctx, reader)
+	for i, reader := range stores {
+		ident, err := kvstorage.ReadStoreIdent(ctx, reader)
 		if err != nil {
-			return loqrecoverypb.NodeReplicaInfo{}, err
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
 		}
-		if err = kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error {
-			rsl := stateloader.Make(desc.RangeID)
-			rstate, err := rsl.Load(ctx, reader, &desc)
-			if err != nil {
-				return err
-			}
-			hstate, err := rsl.LoadHardState(ctx, reader)
-			if err != nil {
-				return err
-			}
-			// Check raft log for un-applied range descriptor changes. We start from
-			// applied+1 (inclusive) and read until the end of the log. We also look
-			// at potentially uncommitted entries as we have no way to determine their
-			// outcome, and they will become committed as soon as the replica is
-			// designated as a survivor.
-			rangeUpdates, err := GetDescriptorChangesFromRaftLog(desc.RangeID,
-				rstate.RaftAppliedIndex+1, math.MaxInt64, reader)
-			if err != nil {
-				return err
-			}
+		if i == 0 {
+			clusterUUID = ident.ClusterID
+		}
+		if !ident.ClusterID.Equal(clusterUUID) {
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, errors.New("can't collect info from stored that belong to different clusters")
+		}
+		nodes[ident.NodeID] = struct{}{}
+		if err := visitStoreReplicas(ctx, reader, ident.StoreID, ident.NodeID,
+			func(info loqrecoverypb.ReplicaInfo) error {
+				replicas = append(replicas, info)
+				return nil
+			}); err != nil {
+			return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
+		}
+	}
+	return loqrecoverypb.ClusterReplicaInfo{
+			ClusterID: clusterUUID.String(),
+			LocalInfo: []loqrecoverypb.NodeReplicaInfo{{Replicas: replicas}},
+		}, CollectionStats{
+			Nodes:  len(nodes),
+			Stores: len(stores),
+		}, nil
+}
 
-			replicaData := loqrecoverypb.ReplicaInfo{
-				StoreID:                  storeIdent.StoreID,
-				NodeID:                   storeIdent.NodeID,
-				Desc:                     desc,
-				RaftAppliedIndex:         rstate.RaftAppliedIndex,
-				RaftCommittedIndex:       hstate.Commit,
-				RaftLogDescriptorChanges: rangeUpdates,
-			}
-			replicas = append(replicas, replicaData)
-			return nil
-		}); err != nil {
-			return loqrecoverypb.NodeReplicaInfo{}, err
+func visitStoreReplicas(
+	ctx context.Context,
+	reader storage.Reader,
+	storeID roachpb.StoreID,
+	nodeID roachpb.NodeID,
+	send func(info loqrecoverypb.ReplicaInfo) error,
+) error {
+	if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, reader, func(desc roachpb.RangeDescriptor) error {
+		rsl := stateloader.Make(desc.RangeID)
+		rstate, err := rsl.Load(ctx, reader, &desc)
+		if err != nil {
+			return err
 		}
+		hstate, err := rsl.LoadHardState(ctx, reader)
+		if err != nil {
+			return err
+		}
+		// Check raft log for un-applied range descriptor changes. We start from
+		// applied+1 (inclusive) and read until the end of the log. We also look
+		// at potentially uncommitted entries as we have no way to determine their
+		// outcome, and they will become committed as soon as the replica is
+		// designated as a survivor.
+		rangeUpdates, err := GetDescriptorChangesFromRaftLog(desc.RangeID,
+			rstate.RaftAppliedIndex+1, math.MaxInt64, reader)
+		if err != nil {
+			return err
+		}
+
+		return send(loqrecoverypb.ReplicaInfo{
+			StoreID:                  storeID,
+			NodeID:                   nodeID,
+			Desc:                     desc,
+			RaftAppliedIndex:         rstate.RaftAppliedIndex,
+			RaftCommittedIndex:       hstate.Commit,
+			RaftLogDescriptorChanges: rangeUpdates,
+		})
+	}); err != nil {
+		return err
 	}
-	return loqrecoverypb.NodeReplicaInfo{Replicas: replicas}, nil
+	return nil
 }
 
 // GetDescriptorChangesFromRaftLog iterates over raft log between indices
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
index c75c3d4fff71..1aa1667ac2d6 100644
--- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/BUILD.bazel
@@ -1,5 +1,5 @@
 load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 # gazelle:go_grpc_compilers //pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_grpc_compiler,  @com_github_grpc_ecosystem_grpc_gateway//protoc-gen-grpc-gateway:go_gen_grpc_gateway
 
@@ -49,4 +49,16 @@ go_library(
     ],
 )
 
+go_test(
+    name = "loqrecoverypb_test",
+    srcs = ["recovery_test.go"],
+    args = ["-test.timeout=295s"],
+    embed = [":loqrecoverypb"],
+    deps = [
+        "//pkg/roachpb",
+        "//pkg/util/uuid",
+        "@com_github_stretchr_testify//require",
+    ],
+)
+
 get_x_data(name = "get_x_data")
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
index 670ede19992a..fff394377321 100644
--- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.go
@@ -93,3 +93,50 @@ func (m *ReplicaRecoveryRecord) AsStructuredLog() eventpb.DebugRecoverReplica {
 		EndKey:            m.EndKey.AsRKey().String(),
 	}
 }
+
+func (m *ClusterReplicaInfo) Merge(o ClusterReplicaInfo) error {
+	// When making a cluster id check, make sure that we can create empty
+	// cluster info and merge everything into it. i.e. merging into empty
+	// struct should not trip check failure.
+	if len(m.LocalInfo) > 0 || len(m.Descriptors) > 0 {
+		if m.ClusterID != o.ClusterID {
+			return errors.Newf("can't merge cluster info from different cluster: %s != %s", m.ClusterID,
+				o.ClusterID)
+		}
+	} else {
+		m.ClusterID = o.ClusterID
+	}
+	if len(o.Descriptors) > 0 {
+		if len(m.Descriptors) > 0 {
+			return errors.New("only single cluster replica info could contain descriptors")
+		}
+		m.Descriptors = append(m.Descriptors, o.Descriptors...)
+	}
+	type nsk struct {
+		n roachpb.NodeID
+		s roachpb.StoreID
+	}
+	existing := make(map[nsk]struct{})
+	for _, n := range m.LocalInfo {
+		for _, r := range n.Replicas {
+			existing[nsk{n: r.NodeID, s: r.StoreID}] = struct{}{}
+		}
+	}
+	for _, n := range o.LocalInfo {
+		for _, r := range n.Replicas {
+			if _, ok := existing[nsk{n: r.NodeID, s: r.StoreID}]; ok {
+				return errors.Newf("failed to merge cluster info, replicas from n%d/s%d are already present",
+					r.NodeID, r.StoreID)
+			}
+		}
+	}
+	m.LocalInfo = append(m.LocalInfo, o.LocalInfo...)
+	return nil
+}
+
+func (m *ClusterReplicaInfo) ReplicaCount() (size int) {
+	for _, i := range m.LocalInfo {
+		size += len(i.Replicas)
+	}
+	return size
+}
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
index 460640ed2adb..292d2de65596 100644
--- a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery.proto
@@ -51,11 +51,30 @@ message ReplicaInfo {
     (gogoproto.jsontag) = "raft_log_descriptor_changes,omitempty"];
 }
 
-// Collection of replica information gathered from a collect-info run on a single node.
+// Collection of replica information gathered in a collect-info run.
+// ReplicaInfo in Replicas does not have to be constrained to a single node,
+// but in practice info collected from remote cluster will contain info per
+// node. In case of offline collection, replicas will belong to all stores
+// provided to the command regardless of owning node.
 message NodeReplicaInfo {
   repeated ReplicaInfo replicas = 1 [(gogoproto.nullable) = false];
 }
 
+// Replica info collected from one or more nodes of a cluster.
+message ClusterReplicaInfo {
+  // ClusterID contains id of the cluster from which info was collected.
+  string cluster_id = 1 [(gogoproto.customname) = "ClusterID"];
+  // Descriptors contains range descriptors collected from meta ranges.
+  // Descriptors are optional and only present in collected info if system
+  // ranges didn't lose quorum. It could also be partial in some cases.
+  repeated roachpb.RangeDescriptor descriptors = 2 [(gogoproto.nullable) = false,
+    (gogoproto.jsontag) = "descriptors,omitempty"];
+  // LocalInfo contains one or more NodeReplicaInfo structs each containing a
+  // subset of full info. They are not guaranteed to be split by node, each
+  // element should contain disjoint subset of replica infos.
+  repeated NodeReplicaInfo local_info = 3 [(gogoproto.nullable) = false];
+}
+
 // ReplicaUpdate contains information that needs to be updated on replica on the node
 // to make it a designated survivor so that replica could act as a source of truth when
 // doing loss of quorum recovery.
diff --git a/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go
new file mode 100644
index 000000000000..2db4c86c7238
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/loqrecoverypb/recovery_test.go
@@ -0,0 +1,172 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecoverypb
+
+import (
+	"testing"
+
+	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/util/uuid"
+	"github.com/stretchr/testify/require"
+)
+
+func TestClusterInfoMergeChecksOverlapping(t *testing.T) {
+	info123 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	info1 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info12 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+
+	require.Error(t, info123.Merge(info1))
+	require.Error(t, info123.Merge(info12))
+	require.Error(t, info123.Merge(info3))
+
+	_ = info1.Merge(info3)
+	require.EqualValues(t, info1, info123)
+}
+
+func TestClusterInfoMergeChecksDescriptor(t *testing.T) {
+	info1 := ClusterReplicaInfo{
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 2}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	require.Error(t, info1.Merge(info3))
+}
+
+func TestClusterInfoMergeSameClusterID(t *testing.T) {
+	uuid1 := uuid.FastMakeV4()
+	info1 := ClusterReplicaInfo{
+		ClusterID:   uuid1.String(),
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		ClusterID: uuid1.String(),
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	require.NoError(t, info1.Merge(info3),
+		"should be able to merge partial info with equal cluster ids")
+}
+
+func TestClusterInfoMergeRejectDifferentClusterIDs(t *testing.T) {
+	uuid1 := uuid.FastMakeV4()
+	uuid2 := uuid.FastMakeV4()
+	info1 := ClusterReplicaInfo{
+		ClusterID:   uuid1.String(),
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	info3 := ClusterReplicaInfo{
+		ClusterID: uuid2.String(),
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 3, NodeID: 2},
+				},
+			},
+		},
+	}
+	require.Error(t, info1.Merge(info3), "reject merging of info from different clusters")
+}
+
+func TestClusterInfoInitializeByMerge(t *testing.T) {
+	uuid1 := uuid.FastMakeV4().String()
+	info := ClusterReplicaInfo{
+		ClusterID:   uuid1,
+		Descriptors: []roachpb.RangeDescriptor{{RangeID: 1}},
+		LocalInfo: []NodeReplicaInfo{
+			{
+				Replicas: []ReplicaInfo{
+					{StoreID: 1, NodeID: 1},
+					{StoreID: 2, NodeID: 1},
+				},
+			},
+		},
+	}
+	empty := ClusterReplicaInfo{}
+	require.NoError(t, empty.Merge(info), "should be able to merge into empty struct")
+	require.Equal(t, empty.ClusterID, uuid1, "merge should update empty info fields")
+}
diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go
index 99a56e54a6a7..c26cd3a57369 100644
--- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go
+++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go
@@ -135,10 +135,11 @@ type wrappedStore struct {
 
 type quorumRecoveryEnv struct {
 	// Stores with data
-	stores map[roachpb.StoreID]wrappedStore
+	clusterID uuid.UUID
+	stores    map[roachpb.StoreID]wrappedStore
 
 	// Collected info from nodes
-	replicas []loqrecoverypb.NodeReplicaInfo
+	replicas loqrecoverypb.ClusterReplicaInfo
 
 	// plan to update replicas
 	plan loqrecoverypb.ReplicaUpdatePlan
@@ -190,6 +191,7 @@ func (e *quorumRecoveryEnv) handleReplicationData(t *testing.T, d datadriven.Tes
 	// Close existing stores in case we have multiple use cases within a data file.
 	e.cleanupStores()
 	e.stores = make(map[roachpb.StoreID]wrappedStore)
+	e.clusterID = uuid.MakeV4()
 
 	// Load yaml from data into local range info.
 	var replicaData []testReplicaInfo
@@ -390,7 +392,7 @@ func parsePrettyKey(t *testing.T, pretty string) roachpb.RKey {
 
 func (e *quorumRecoveryEnv) handleMakePlan(t *testing.T, d datadriven.TestData) (string, error) {
 	stores := e.parseStoresArg(t, d, false /* defaultToAll */)
-	plan, report, err := PlanReplicas(context.Background(), e.replicas, stores)
+	plan, report, err := PlanReplicas(context.Background(), e.replicas.LocalInfo, stores)
 	if err != nil {
 		return "", err
 	}
@@ -424,7 +426,7 @@ func (e *quorumRecoveryEnv) getOrCreateStore(
 			t.Fatalf("failed to crate in mem store: %v", err)
 		}
 		sIdent := roachpb.StoreIdent{
-			ClusterID: uuid.MakeV4(),
+			ClusterID: e.clusterID,
 			NodeID:    nodeID,
 			StoreID:   storeID,
 		}
@@ -447,13 +449,15 @@ func (e *quorumRecoveryEnv) handleCollectReplicas(
 	stores := e.parseStoresArg(t, d, true /* defaultToAll */)
 	nodes := e.groupStoresByNode(t, stores)
 	// save collected results into environment
-	e.replicas = nil
+	e.replicas = loqrecoverypb.ClusterReplicaInfo{}
 	for _, nodeStores := range nodes {
-		info, err := CollectReplicaInfo(ctx, nodeStores)
+		info, _, err := CollectStoresReplicaInfo(ctx, nodeStores)
 		if err != nil {
 			return "", err
 		}
-		e.replicas = append(e.replicas, info)
+		if err = e.replicas.Merge(info); err != nil {
+			return "", err
+		}
 	}
 	return "ok", nil
 }
diff --git a/pkg/kv/kvserver/loqrecovery/server.go b/pkg/kv/kvserver/loqrecovery/server.go
new file mode 100644
index 000000000000..e9e2a026c87d
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/server.go
@@ -0,0 +1,273 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecovery
+
+import (
+	"context"
+	"io"
+	"time"
+
+	"github.com/cockroachdb/cockroach/pkg/base"
+	"github.com/cockroachdb/cockroach/pkg/gossip"
+	"github.com/cockroachdb/cockroach/pkg/keys"
+	"github.com/cockroachdb/cockroach/pkg/kv"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
+	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/rpc"
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+	"github.com/cockroachdb/cockroach/pkg/util/contextutil"
+	"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
+	"github.com/cockroachdb/cockroach/pkg/util/log"
+	"github.com/cockroachdb/cockroach/pkg/util/protoutil"
+	"github.com/cockroachdb/cockroach/pkg/util/retry"
+	"github.com/cockroachdb/errors"
+)
+
+const rangeMetadataScanChunkSize = 100
+
+var replicaInfoStreamRetryOptions = retry.Options{
+	MaxRetries:     3,
+	InitialBackoff: time.Second,
+	Multiplier:     1,
+}
+
+var errMarkRetry = errors.New("retryable")
+
+func IsRetryableError(err error) bool {
+	return errors.Is(err, errMarkRetry)
+}
+
+type visitNodesFn func(ctx context.Context, retryOpts retry.Options,
+	visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error,
+) error
+
+type Server struct {
+	stores               *kvserver.Stores
+	visitNodes           visitNodesFn
+	metadataQueryTimeout time.Duration
+	forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error
+}
+
+func NewServer(
+	stores *kvserver.Stores,
+	g *gossip.Gossip,
+	loc roachpb.Locality,
+	rpcCtx *rpc.Context,
+	knobs base.ModuleTestingKnobs,
+) *Server {
+	// Server side timeouts are necessary in recovery collector since we do best
+	// effort operations where cluster info collection as an operation succeeds
+	// even if some parts of it time out.
+	metadataQueryTimeout := 1 * time.Minute
+	var forwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error
+	if rk, ok := knobs.(*TestingKnobs); ok {
+		if rk.MetadataScanTimeout > 0 {
+			metadataQueryTimeout = rk.MetadataScanTimeout
+		}
+		forwardReplicaFilter = rk.ForwardReplicaFilter
+	}
+	return &Server{
+		stores:               stores,
+		visitNodes:           makeVisitAvailableNodes(g, loc, rpcCtx),
+		metadataQueryTimeout: metadataQueryTimeout,
+		forwardReplicaFilter: forwardReplicaFilter,
+	}
+}
+
+func (s Server) ServeLocalReplicas(
+	ctx context.Context,
+	_ *serverpb.RecoveryCollectLocalReplicaInfoRequest,
+	stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
+) error {
+	return s.stores.VisitStores(func(s *kvserver.Store) error {
+		reader := s.Engine().NewSnapshot()
+		defer reader.Close()
+		return visitStoreReplicas(ctx, reader, s.StoreID(), s.NodeID(),
+			func(info loqrecoverypb.ReplicaInfo) error {
+				return stream.Send(&serverpb.RecoveryCollectLocalReplicaInfoResponse{ReplicaInfo: &info})
+			})
+	})
+}
+
+func (s Server) ServeClusterReplicas(
+	ctx context.Context,
+	_ *serverpb.RecoveryCollectReplicaInfoRequest,
+	outStream serverpb.Admin_RecoveryCollectReplicaInfoServer,
+	kvDB *kv.DB,
+) (err error) {
+	var (
+		descriptors, nodes, replicas int
+	)
+	defer func() {
+		if err == nil {
+			log.Infof(ctx, "streamed info: range descriptors %d, nodes %d, replica infos %d", descriptors,
+				nodes, replicas)
+		}
+	}()
+
+	err = contextutil.RunWithTimeout(ctx, "scan-range-descriptors", s.metadataQueryTimeout,
+		func(txnCtx context.Context) error {
+			txn := kvDB.NewTxn(txnCtx, "scan-range-descriptors")
+			if err := txn.SetFixedTimestamp(txnCtx, kvDB.Clock().Now()); err != nil {
+				return err
+			}
+			defer func() { _ = txn.Rollback(txnCtx) }()
+			log.Infof(txnCtx, "serving recovery range descriptors for all ranges")
+			return txn.Iterate(txnCtx, keys.Meta2Prefix, keys.MetaMax, rangeMetadataScanChunkSize,
+				func(kvs []kv.KeyValue) error {
+					for _, rangeDescKV := range kvs {
+						var rangeDesc roachpb.RangeDescriptor
+						if err := rangeDescKV.ValueProto(&rangeDesc); err != nil {
+							return err
+						}
+						if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{
+							Info: &serverpb.RecoveryCollectReplicaInfoResponse_RangeDescriptor{
+								RangeDescriptor: &rangeDesc,
+							},
+						}); err != nil {
+							return err
+						}
+						descriptors++
+					}
+					return nil
+				})
+		})
+	if err != nil {
+		// Error means either kv transaction error or stream send error.
+		// We don't care about transaction errors because cluster is might be in a
+		// crippled state, but we don't want to keep continue if client stream is
+		// closed.
+		if outStream.Context().Err() != nil {
+			return err
+		}
+		log.Infof(ctx, "failed to iterate all descriptors: %s", err)
+	}
+
+	// Stream local replica info from all nodes wrapping them in response stream.
+	return s.visitNodes(ctx,
+		replicaInfoStreamRetryOptions,
+		func(nodeID roachpb.NodeID, client serverpb.AdminClient) error {
+			log.Infof(ctx, "trying to get info from node n%d", nodeID)
+			nodeReplicas := 0
+			inStream, err := client.RecoveryCollectLocalReplicaInfo(ctx,
+				&serverpb.RecoveryCollectLocalReplicaInfoRequest{})
+			if err != nil {
+				return errors.Mark(errors.Wrapf(err,
+					"failed retrieving replicas from node n%d during fan-out", nodeID), errMarkRetry)
+			}
+			for {
+				r, err := inStream.Recv()
+				if err == io.EOF {
+					break
+				}
+				if s.forwardReplicaFilter != nil {
+					err = s.forwardReplicaFilter(r)
+				}
+				if err != nil {
+					// Some replicas were already sent back, need to notify client of stream
+					// restart.
+					if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{
+						Info: &serverpb.RecoveryCollectReplicaInfoResponse_NodeStreamRestarted{
+							NodeStreamRestarted: &serverpb.RecoveryCollectReplicaRestartNodeStream{
+								NodeID: nodeID,
+							},
+						},
+					}); err != nil {
+						return err
+					}
+					return errors.Mark(errors.Wrapf(err,
+						"failed retrieving replicas from node n%d during fan-out",
+						nodeID), errMarkRetry)
+				}
+				if err := outStream.Send(&serverpb.RecoveryCollectReplicaInfoResponse{
+					Info: &serverpb.RecoveryCollectReplicaInfoResponse_ReplicaInfo{
+						ReplicaInfo: r.ReplicaInfo,
+					},
+				}); err != nil {
+					return err
+				}
+				nodeReplicas++
+			}
+
+			replicas += nodeReplicas
+			nodes++
+			return nil
+		})
+}
+
+func makeVisitAvailableNodes(
+	g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context,
+) visitNodesFn {
+	return func(ctx context.Context, retryOpts retry.Options,
+		visitor func(nodeID roachpb.NodeID, client serverpb.AdminClient) error,
+	) error {
+		collectNodeWithRetry := func(node roachpb.NodeDescriptor) error {
+			var err error
+			for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
+				log.Infof(ctx, "visiting node n%d, attempt %d", node.NodeID, r.CurrentAttempt())
+				addr := node.AddressForLocality(loc)
+				conn, err := rpcCtx.GRPCDialNode(addr.String(), node.NodeID, rpc.DefaultClass).Connect(ctx)
+				client := serverpb.NewAdminClient(conn)
+				// Nodes would contain dead nodes that we don't need to visit. We can skip
+				// them and let caller handle incomplete info.
+				if err != nil {
+					if grpcutil.IsConnectionUnavailable(err) {
+						return nil
+					}
+					// This was an initial heartbeat type error, we must retry as node seems
+					// live.
+					continue
+				}
+				err = visitor(node.NodeID, client)
+				if err == nil {
+					return nil
+				}
+				log.Infof(ctx, "failed calling a visitor for node n%d: %s", node.NodeID, err)
+				if !IsRetryableError(err) {
+					// For non retryable errors abort immediately.
+					return err
+				}
+			}
+			return err
+		}
+
+		var nodes []roachpb.NodeDescriptor
+		if err := g.IterateInfos(gossip.KeyNodeDescPrefix, func(key string, i gossip.Info) error {
+			b, err := i.Value.GetBytes()
+			if err != nil {
+				return errors.Wrapf(err, "failed to get node gossip info for key %s", key)
+			}
+
+			var d roachpb.NodeDescriptor
+			if err := protoutil.Unmarshal(b, &d); err != nil {
+				return errors.Wrapf(err, "failed to unmarshal node gossip info for key %s", key)
+			}
+
+			// Don't use node descriptors with NodeID 0, because that's meant to
+			// indicate that the node has been removed from the cluster.
+			if d.NodeID != 0 {
+				nodes = append(nodes, d)
+			}
+
+			return nil
+		}); err != nil {
+			return err
+		}
+
+		for _, node := range nodes {
+			if err := collectNodeWithRetry(node); err != nil {
+				return err
+			}
+		}
+		return nil
+	}
+}
diff --git a/pkg/kv/kvserver/loqrecovery/server_test.go b/pkg/kv/kvserver/loqrecovery/server_test.go
new file mode 100644
index 000000000000..3e8a6c7a22fe
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/server_test.go
@@ -0,0 +1,182 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecovery_test
+
+import (
+	"context"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/cockroachdb/cockroach/pkg/base"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
+	"github.com/cockroachdb/cockroach/pkg/roachpb"
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+	"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+	"github.com/cockroachdb/cockroach/pkg/util/leaktest"
+	"github.com/cockroachdb/cockroach/pkg/util/log"
+	"github.com/cockroachdb/errors"
+	"github.com/google/uuid"
+	"github.com/stretchr/testify/require"
+)
+
+type clusterInfoCounters struct {
+	nodes, stores, replicas, descriptors int
+}
+
+func TestReplicaCollection(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	ctx := context.Background()
+
+	tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{
+		ServerArgs: base.TestServerArgs{
+			StoreSpecs: []base.StoreSpec{{InMemory: true}},
+			Insecure:   true,
+			Knobs: base.TestingKnobs{
+				LOQRecovery: &loqrecovery.TestingKnobs{
+					MetadataScanTimeout: 15 * time.Second,
+				},
+			},
+		},
+	})
+	tc.Start(t)
+	defer tc.Stopper().Stop(ctx)
+	require.NoError(t, tc.WaitForFullReplication())
+	tc.ToggleReplicateQueues(false)
+
+	r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases")
+	var totalRanges int
+	require.NoError(t, r.Scan(&totalRanges), "failed to query range count")
+	adm, err := tc.GetAdminClient(ctx, t, 2)
+	require.NoError(t, err, "failed to get admin client")
+
+	// Collect and assert replica metadata. For expectMeta case we sometimes have
+	// meta and sometimes doesn't depending on which node holds the lease.
+	// We just ignore descriptor counts if we are not expecting meta.
+	assertReplicas := func(liveNodes int, expectMeta bool) {
+		var replicas loqrecoverypb.ClusterReplicaInfo
+		var stats loqrecovery.CollectionStats
+
+		replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
+		require.NoError(t, err, "failed to retrieve replica info")
+
+		// Check counters on retrieved replica info.
+		cnt := getInfoCounters(replicas)
+		require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores")
+		require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes")
+		if expectMeta {
+			require.Equal(t, totalRanges, cnt.descriptors,
+				"number of collected descriptors from metadata")
+		}
+		require.Equal(t, totalRanges*liveNodes, cnt.replicas, "number of collected replicas")
+		// Check stats counters as well.
+		require.Equal(t, liveNodes, stats.Nodes, "node counter stats")
+		require.Equal(t, liveNodes, stats.Stores, "store counter stats")
+		if expectMeta {
+			require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats")
+		}
+		require.NotEqual(t, replicas.ClusterID, uuid.UUID{}.String(), "cluster UUID must not be empty")
+	}
+
+	tc.StopServer(0)
+	assertReplicas(2, true)
+	tc.StopServer(1)
+	assertReplicas(1, false)
+
+	tc.Stopper().Stop(ctx)
+}
+
+// TestStreamRestart verifies that if connection is dropped mid way through
+// replica stream, it would be handled correctly with a stream restart that
+// allows caller to rewind back partial replica data and receive consistent
+// stream of replcia infos.
+func TestStreamRestart(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	ctx := context.Background()
+
+	var failCount atomic.Int64
+	tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{
+		ServerArgs: base.TestServerArgs{
+			StoreSpecs: []base.StoreSpec{{InMemory: true}},
+			Insecure:   true,
+			Knobs: base.TestingKnobs{
+				LOQRecovery: &loqrecovery.TestingKnobs{
+					MetadataScanTimeout: 15 * time.Second,
+					ForwardReplicaFilter: func(response *serverpb.RecoveryCollectLocalReplicaInfoResponse) error {
+						if response.ReplicaInfo.NodeID == 2 && response.ReplicaInfo.Desc.RangeID == 14 && failCount.Add(1) < 3 {
+							return errors.New("rpc stream stopped")
+						}
+						return nil
+					},
+				},
+			},
+		},
+	})
+	tc.Start(t)
+	defer tc.Stopper().Stop(ctx)
+	require.NoError(t, tc.WaitForFullReplication())
+	tc.ToggleReplicateQueues(false)
+
+	r := tc.ServerConn(0).QueryRow("select count(*) from crdb_internal.ranges_no_leases")
+	var totalRanges int
+	require.NoError(t, r.Scan(&totalRanges), "failed to query range count")
+	adm, err := tc.GetAdminClient(ctx, t, 2)
+	require.NoError(t, err, "failed to get admin client")
+
+	assertReplicas := func(liveNodes int) {
+		var replicas loqrecoverypb.ClusterReplicaInfo
+		var stats loqrecovery.CollectionStats
+
+		replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
+		require.NoError(t, err, "failed to retrieve replica info")
+
+		// Check counters on retrieved replica info.
+		cnt := getInfoCounters(replicas)
+		require.Equal(t, liveNodes, cnt.stores, "collected replicas from stores")
+		require.Equal(t, liveNodes, cnt.nodes, "collected replicas from nodes")
+		require.Equal(t, totalRanges, cnt.descriptors,
+			"number of collected descriptors from metadata")
+		require.Equal(t, totalRanges*liveNodes, cnt.replicas,
+			"number of collected replicas")
+		// Check stats counters as well.
+		require.Equal(t, liveNodes, stats.Nodes, "node counter stats")
+		require.Equal(t, liveNodes, stats.Stores, "store counter stats")
+		require.Equal(t, totalRanges, stats.Descriptors, "range descriptor counter stats")
+	}
+
+	assertReplicas(3)
+
+	tc.Stopper().Stop(ctx)
+}
+
+func getInfoCounters(info loqrecoverypb.ClusterReplicaInfo) clusterInfoCounters {
+	stores := map[roachpb.StoreID]interface{}{}
+	nodes := map[roachpb.NodeID]interface{}{}
+	totalReplicas := 0
+	for _, nr := range info.LocalInfo {
+		for _, r := range nr.Replicas {
+			stores[r.StoreID] = struct{}{}
+			nodes[r.NodeID] = struct{}{}
+		}
+		totalReplicas += len(nr.Replicas)
+	}
+	return clusterInfoCounters{
+		nodes:       len(nodes),
+		stores:      len(stores),
+		replicas:    totalReplicas,
+		descriptors: len(info.Descriptors),
+	}
+}
diff --git a/pkg/kv/kvserver/loqrecovery/testing_knobs.go b/pkg/kv/kvserver/loqrecovery/testing_knobs.go
new file mode 100644
index 000000000000..34bca894132d
--- /dev/null
+++ b/pkg/kv/kvserver/loqrecovery/testing_knobs.go
@@ -0,0 +1,30 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package loqrecovery
+
+import (
+	"time"
+
+	"github.com/cockroachdb/cockroach/pkg/server/serverpb"
+)
+
+type TestingKnobs struct {
+	// MetadataScanTimeout controls how long loss of quorum recovery service will
+	// wait for range metadata. Useful to speed up tests verifying collection
+	// behaviors when meta ranges are unavailable.
+	MetadataScanTimeout time.Duration
+
+	// Replica filter for forwarded replica info when collecting fan-out data.
+	ForwardReplicaFilter func(*serverpb.RecoveryCollectLocalReplicaInfoResponse) error
+}
+
+// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
+func (*TestingKnobs) ModuleTestingKnobs() {}
diff --git a/pkg/server/admin.go b/pkg/server/admin.go
index fed62e9dac92..d80f1d701702 100644
--- a/pkg/server/admin.go
+++ b/pkg/server/admin.go
@@ -3219,6 +3219,54 @@ func (s *systemAdminServer) SendKVBatch(
 	return br, nil
 }
 
+func (s *systemAdminServer) RecoveryCollectReplicaInfo(
+	request *serverpb.RecoveryCollectReplicaInfoRequest,
+	stream serverpb.Admin_RecoveryCollectReplicaInfoServer,
+) error {
+	ctx := stream.Context()
+	ctx = s.server.AnnotateCtx(ctx)
+	_, err := s.requireAdminUser(ctx)
+	if err != nil {
+		return err
+	}
+	log.Ops.Info(ctx, "streaming cluster replica recovery info")
+
+	return s.server.recoveryServer.ServeClusterReplicas(ctx, request, stream, s.server.db)
+}
+
+func (s *systemAdminServer) RecoveryCollectLocalReplicaInfo(
+	request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
+	stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
+) error {
+	ctx := stream.Context()
+	ctx = s.server.AnnotateCtx(ctx)
+	_, err := s.requireAdminUser(ctx)
+	if err != nil {
+		return err
+	}
+
+	log.Ops.Info(ctx, "streaming local replica recovery info")
+	return s.server.recoveryServer.ServeLocalReplicas(ctx, request, stream)
+}
+
+func (s *systemAdminServer) RecoveryStagePlan(
+	ctx context.Context, request *serverpb.RecoveryStagePlanRequest,
+) (*serverpb.RecoveryStagePlanResponse, error) {
+	return nil, errors.AssertionFailedf("To be implemented by #93044")
+}
+
+func (s *systemAdminServer) RecoveryNodeStatus(
+	ctx context.Context, request *serverpb.RecoveryNodeStatusRequest,
+) (*serverpb.RecoveryNodeStatusResponse, error) {
+	return nil, errors.AssertionFailedf("To be implemented by #93043")
+}
+
+func (s *systemAdminServer) RecoveryVerify(
+	ctx context.Context, request *serverpb.RecoveryVerifyRequest,
+) (*serverpb.RecoveryVerifyResponse, error) {
+	return nil, errors.AssertionFailedf("To be implemented by #93043")
+}
+
 // sqlQuery allows you to incrementally build a SQL query that uses
 // placeholders. Instead of specific placeholders like $1, you instead use the
 // temporary placeholder $.
@@ -4061,35 +4109,3 @@ func (s *adminServer) SetTraceRecordingType(
 	})
 	return &serverpb.SetTraceRecordingTypeResponse{}, nil
 }
-
-func (s *adminServer) RecoveryCollectReplicaInfo(
-	request *serverpb.RecoveryCollectReplicaInfoRequest,
-	server serverpb.Admin_RecoveryCollectReplicaInfoServer,
-) error {
-	return errors.AssertionFailedf("To be implemented by #93040")
-}
-
-func (s *adminServer) RecoveryCollectLocalReplicaInfo(
-	request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
-	server serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
-) error {
-	return errors.AssertionFailedf("To be implemented by #93040")
-}
-
-func (s *adminServer) RecoveryStagePlan(
-	ctx context.Context, request *serverpb.RecoveryStagePlanRequest,
-) (*serverpb.RecoveryStagePlanResponse, error) {
-	return nil, errors.AssertionFailedf("To be implemented by #93044")
-}
-
-func (s *adminServer) RecoveryNodeStatus(
-	ctx context.Context, request *serverpb.RecoveryNodeStatusRequest,
-) (*serverpb.RecoveryNodeStatusResponse, error) {
-	return nil, errors.AssertionFailedf("To be implemented by #93043")
-}
-
-func (s *adminServer) RecoveryVerify(
-	ctx context.Context, request *serverpb.RecoveryVerifyRequest,
-) (*serverpb.RecoveryVerifyResponse, error) {
-	return nil, errors.AssertionFailedf("To be implemented by #93043")
-}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index ecc512bd9893..b1291197059f 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -39,6 +39,7 @@ import (
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
+	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
 	"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile"
@@ -145,10 +146,11 @@ type Server struct {
 	tsServer        *ts.Server
 	// The Observability Server, used by the Observability Service to subscribe to
 	// CRDB data.
-	eventsServer  *obs.EventsServer
-	raftTransport *kvserver.RaftTransport
-	stopper       *stop.Stopper
-	stopTrigger   *stopTrigger
+	eventsServer   *obs.EventsServer
+	recoveryServer *loqrecovery.Server
+	raftTransport  *kvserver.RaftTransport
+	stopper        *stop.Stopper
+	stopTrigger    *stopTrigger
 
 	debug    *debug.Server
 	kvProber *kvprober.Prober
@@ -1017,6 +1019,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
 		},
 	)
 
+	recoveryServer := loqrecovery.NewServer(stores, g, cfg.Locality, rpcContext, cfg.TestingKnobs.LOQRecovery)
+
 	*lateBoundServer = Server{
 		nodeIDContainer:        nodeIDContainer,
 		cfg:                    cfg,
@@ -1050,6 +1054,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
 		tsDB:                   tsDB,
 		tsServer:               &sTS,
 		eventsServer:           eventsServer,
+		recoveryServer:         recoveryServer,
 		raftTransport:          raftTransport,
 		stopper:                stopper,
 		stopTrigger:            stopTrigger,
diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto
index 457ebd603ff3..b6c7f77ade7c 100644
--- a/pkg/server/serverpb/admin.proto
+++ b/pkg/server/serverpb/admin.proto
@@ -837,10 +837,23 @@ message CertBundleResponse {
 
 message RecoveryCollectReplicaInfoRequest {}
 
+// RecoveryCollectReplicaRestartNodeStream is sent by collector node to client
+// if it experiences a transient failure collecting data from one of the nodes.
+// This message instructs client to drop any data that it collected locally
+// for specified node as streaming for this node would be restarted.
+// This mechanism is needed to avoid restarting the whole collection procedure
+// in large cluster if one of the nodes fails transiently.
+message RecoveryCollectReplicaRestartNodeStream {
+  int32 node_id = 1 [
+    (gogoproto.customname) = "NodeID",
+    (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"];
+}
+
 message RecoveryCollectReplicaInfoResponse {
   oneof info {
     roachpb.RangeDescriptor range_descriptor = 1;
     cockroach.kv.kvserver.loqrecovery.loqrecoverypb.ReplicaInfo replica_info = 2;
+    RecoveryCollectReplicaRestartNodeStream node_stream_restarted = 3;
   }
 }
 
diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go
index 1dadd424c5a9..a81b1f884ce9 100644
--- a/pkg/util/grpcutil/grpc_util.go
+++ b/pkg/util/grpcutil/grpc_util.go
@@ -54,6 +54,15 @@ func IsTimeout(err error) bool {
 	return false
 }
 
+// IsConnectionUnavailable checks if grpc code is codes.Unavailable which is
+// set when we are not able to establish connection to remote node.
+func IsConnectionUnavailable(err error) bool {
+	if s, ok := status.FromError(errors.UnwrapAll(err)); ok {
+		return s.Code() == codes.Unavailable
+	}
+	return false
+}
+
 // IsContextCanceled returns true if err's Cause is an error produced by gRPC
 // on context cancellation.
 func IsContextCanceled(err error) bool {

From 02015bc4247f567060094132df45220d04f2a78e Mon Sep 17 00:00:00 2001
From: Raphael 'kena' Poss 
Date: Mon, 16 Jan 2023 15:41:22 +0100
Subject: [PATCH 2/2] sql: populate `pg_proc.prokind`

Release note (bug fix): The `prokind` column of `pg_catalog.pg_proc`
is now populated properly.
---
 pkg/sql/logictest/testdata/logic_test/pg_catalog |  9 +++++++++
 pkg/sql/logictest/testdata/logic_test/udf        | 10 +++++-----
 pkg/sql/pg_catalog.go                            | 14 ++++++++++++--
 3 files changed, 26 insertions(+), 7 deletions(-)

diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog
index d1146a7033c0..5a90ef16b276 100644
--- a/pkg/sql/logictest/testdata/logic_test/pg_catalog
+++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog
@@ -2367,6 +2367,15 @@ json_extract_path  25           2         3802        3802 25      {i,v}
 
 user root
 
+query TT rowsort
+SELECT DISTINCT proname, prokind FROM pg_catalog.pg_proc
+WHERE proname IN ('lag', 'abs', 'max')
+----
+abs  f
+lag  w
+max  a
+
+
 ## pg_catalog.pg_range
 query IIIIII colnames
 SELECT * from pg_catalog.pg_range
diff --git a/pkg/sql/logictest/testdata/logic_test/udf b/pkg/sql/logictest/testdata/logic_test/udf
index 2e142682d463..eca6514c9b34 100644
--- a/pkg/sql/logictest/testdata/logic_test/udf
+++ b/pkg/sql/logictest/testdata/logic_test/udf
@@ -166,13 +166,13 @@ CREATE SCHEMA sc;
 statement
 CREATE FUNCTION sc.proc_f_2(STRING) RETURNS STRING LANGUAGE SQL AS $$ SELECT 'hello' $$;
 
-query TTTTTBBBTITTTTT
-SELECT oid, proname, pronamespace, proowner, prolang, proleakproof, proisstrict, proretset, provolatile, pronargs, prorettype, proargtypes, proargmodes, proargnames, prosrc
+query TTTTTBBBTITTTTTT
+SELECT oid, proname, pronamespace, proowner, prolang, proleakproof, proisstrict, proretset, provolatile, pronargs, prorettype, proargtypes, proargmodes, proargnames, prokind, prosrc
 FROM pg_catalog.pg_proc WHERE proname IN ('proc_f', 'proc_f_2');
 ----
-100118  proc_f    105  1546506610  14  false  false  false  v  1  20  20     {i}    NULL    SELECT 1;
-100119  proc_f    105  1546506610  14  true   true   false  i  2  25  25 20  {i,i}  {"",b}  SELECT 'hello';
-100121  proc_f_2  120  1546506610  14  false  false  false  v  1  25  25     {i}    NULL    SELECT 'hello';
+100118  proc_f    105  1546506610  14  false  false  false  v  1  20  20     {i}    NULL    f  SELECT 1;
+100119  proc_f    105  1546506610  14  true   true   false  i  2  25  25 20  {i,i}  {"",b}  f  SELECT 'hello';
+100121  proc_f_2  120  1546506610  14  false  false  false  v  1  25  25     {i}    NULL    f  SELECT 'hello';
 
 # Ensure that the pg_proc virtual index works properly.
 
diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go
index ed03aac739ad..6a3dbb31c14b 100644
--- a/pkg/sql/pg_catalog.go
+++ b/pkg/sql/pg_catalog.go
@@ -2339,6 +2339,16 @@ func addPgProcBuiltinRow(name string, addRow func(...tree.Datum) error) error {
 		name = name[len(crdbInternal):]
 	}
 
+	var kind tree.Datum
+	switch {
+	case isAggregate:
+		kind = tree.NewDString("a")
+	case isWindow:
+		kind = tree.NewDString("w")
+	default:
+		kind = tree.NewDString("f")
+	}
+
 	for _, builtin := range overloads {
 		dName := tree.NewDName(name)
 		dSrc := tree.NewDString(name)
@@ -2435,8 +2445,8 @@ func addPgProcBuiltinRow(name string, addRow func(...tree.Datum) error) error {
 			tree.DNull,                                      // probin
 			tree.DNull,                                      // proconfig
 			tree.DNull,                                      // proacl
+			kind,                                            // prokind
 			// These columns were automatically created by pg_catalog_test's missing column generator.
-			tree.DNull, // prokind
 			tree.DNull, // prosupport
 		)
 		if err != nil {
@@ -2511,8 +2521,8 @@ func addPgProcUDFRow(
 		tree.DNull,                                       // probin
 		tree.DNull,                                       // proconfig
 		tree.DNull,                                       // proacl
+		tree.NewDString("f"),                             // prokind
 		// These columns were automatically created by pg_catalog_test's missing column generator.
-		tree.DNull, // prokind
 		tree.DNull, // prosupport
 	)
 }