Skip to content

Commit

Permalink
Add batch async shard fetch transport action for replica #8218 (#8356)
Browse files Browse the repository at this point in the history
* Add batch async shard fetch transport action for replica

Signed-off-by: sudarshan baliga <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
sudarshan-baliga authored Mar 14, 2024
1 parent b265215 commit 12115d1
Show file tree
Hide file tree
Showing 8 changed files with 728 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@

package org.opensearch.gateway;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.cluster.coordination.ElectionSchedulerFactory;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand All @@ -63,6 +66,8 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster.RestartCallback;
Expand All @@ -82,8 +87,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -817,6 +825,131 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
assertTrue(nodeGatewayStartedShards.primary());
}

public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
String indexName = "test";
DiscoveryNode[] nodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
new String[] { indexName },
nodes
);
Index index = resolveIndex(indexName);
ShardId shardId = new ShardId(index, 0);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(nodes[0].getId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
}

public void testShardStoreFetchMultiNodeMultiIndexesUsingBatchAction() throws Exception {
internalCluster().startNodes(2);
String indexName1 = "test1";
String indexName2 = "test2";
DiscoveryNode[] nodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
new String[] { indexName1, indexName2 },
nodes
);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
ShardRouting[] shardRoutings = clusterSearchShardsGroup.getShards();
assertEquals(2, shardRoutings.length);
for (ShardRouting shardRouting : shardRoutings) {
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
.get(shardRouting.currentNodeId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
}
}
}

public void testShardStoreFetchNodeNotConnectedUsingBatchAction() {
DiscoveryNode nonExistingNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
String indexName = "test";
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
new String[] { indexName },
new DiscoveryNode[] { nonExistingNode }
);
assertTrue(response.hasFailures());
assertEquals(1, response.failures().size());
assertEquals(nonExistingNode.getId(), response.failures().get(0).nodeId());
}

public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception {
internalCluster().startNodes(2);
String index1Name = "test1";
String index2Name = "test2";
prepareIndices(new String[] { index1Name, index2Name }, 1, 1);
Map<ShardId, ShardAttributes> shardAttributesMap = prepareRequestMap(new String[] { index1Name, index2Name }, 1);
Index index1 = resolveIndex(index1Name);
ShardId shardId1 = new ShardId(index1, 0);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(index1Name).get();
assertEquals(2, searchShardsResponse.getNodes().length);

// corrupt test1 index shards
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId1);
corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId1);
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(false).get();
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, discoveryNodes)
);
Map<ShardId, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata> nodeStoreFilesMetadata = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeStoreFilesMetadataBatch();
// We don't store exception in case of corrupt index, rather just return an empty response
assertNull(nodeStoreFilesMetadata.get(shardId1).getStoreFileFetchException());
assertEquals(shardId1, nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().shardId());
assertTrue(nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().isEmpty());

Index index2 = resolveIndex(index2Name);
ShardId shardId2 = new ShardId(index2, 0);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2);
}

private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) {
for (String index : indices) {
createIndex(
index,
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards)
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicaShards)
.build()
);
index(index, "type", "1", Collections.emptyMap());
flush(index);
}
}

private TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch prepareAndSendRequest(
String[] indices,
DiscoveryNode[] nodes
) {
Map<ShardId, ShardAttributes> shardAttributesMap = null;
prepareIndices(indices, 1, 1);
shardAttributesMap = prepareRequestMap(indices, 1);
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
return ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, nodes)
);
}

private void assertNodeStoreFilesMetadataSuccessCase(
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
ShardId shardId
) {
assertNull(nodeStoreFilesMetadata.getStoreFileFetchException());
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
assertFalse(storeFileMetadata.isEmpty());
assertEquals(shardId, storeFileMetadata.shardId());
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
}

private void assertNodeGatewayStartedShardsHappyCase(
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -223,7 +223,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand Down Expand Up @@ -357,10 +357,7 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore(
DiscoveryNode node,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data
) {
private static StoreFilesMetadata findStore(DiscoveryNode node, AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data) {
NodeStoreFilesMetadata nodeFilesStore = data.getData().get(node);
if (nodeFilesStore == null) {
return null;
Expand All @@ -373,7 +370,7 @@ private MatchingNodes findMatchingNodes(
RoutingAllocation allocation,
boolean noMatchFailedNodes,
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data,
boolean explain
) {
Expand All @@ -386,7 +383,7 @@ private MatchingNodes findMatchingNodes(
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
// we don't have any files at all, it is an empty index
if (storeFilesMetadata.isEmpty()) {
continue;
Expand Down Expand Up @@ -441,10 +438,7 @@ private MatchingNodes findMatchingNodes(
return new MatchingNodes(matchingNodes, nodeDecisions);
}

private static long computeMatchingBytes(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata
) {
private static long computeMatchingBytes(StoreFilesMetadata primaryStore, StoreFilesMetadata storeFilesMetadata) {
long sizeMatched = 0;
for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) {
String metadataFileName = storeFileMetadata.name();
Expand All @@ -455,19 +449,16 @@ private static long computeMatchingBytes(
return sizeMatched;
}

private static boolean hasMatchingSyncId(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
) {
private static boolean hasMatchingSyncId(StoreFilesMetadata primaryStore, StoreFilesMetadata replicaStore) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
StoreFilesMetadata primaryStore,
DiscoveryNode replicaNode,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
StoreFilesMetadata replicaStore
) {
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
Expand All @@ -478,7 +469,7 @@ private static MatchingNode computeMatchingNode(
}

private static boolean canPerformOperationBasedRecovery(
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores,
DiscoveryNode targetNode
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.plugins.MapperPlugin;

import java.util.ArrayList;
Expand Down Expand Up @@ -281,6 +282,7 @@ protected void configure() {
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetadata.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetadataBatch.class).asEagerSingleton();
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
bind(TransportResyncReplicationAction.class).asEagerSingleton();
bind(PrimaryReplicaSyncer.class).asEagerSingleton();
Expand Down
Loading

0 comments on commit 12115d1

Please sign in to comment.