Skip to content

Commit

Permalink
Only skip routing table update in case of remote recovery
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Sep 11, 2023
1 parent 9b7c098 commit e893146
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,11 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.ClusterSettings;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -126,39 +121,17 @@ static ClusterState updateRoutingTable(final ClusterState state) {
// initialize all index routing tables as empty
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(state.routingTable());
for (final IndexMetadata cursor : state.metadata().indices().values()) {
Optional<ShardRouting> shardRoutingOp = Optional.empty();
if (cursor.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
// get recovery source if possible from existing shards
// this is the case of auto restore.
IndexRoutingTable indexRoutingTable = state.routingTable().indicesRouting().get(cursor.getIndex().getName());
int totalPrimariesWithRemoteRecovery = indexRoutingTable.shardsMatchingPredicateCount(
shardRouting -> shardRouting.primary()
&& shardRouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource
);

// only if all primaries are set up for remote recovery, we would initialize the routable with RemoteStoreRecoverySource
if (cursor.getNumberOfShards() == totalPrimariesWithRemoteRecovery) {
shardRoutingOp = state.routingTable()
.indicesRouting()
.get(cursor.getIndex().getName())
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardrouting -> shardrouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource)
.findAny();
}
}

if (shardRoutingOp.isPresent()) {
// add as recovery source
routingTableBuilder.addAsRemoteStoreRestore(
cursor,
(RecoverySource.RemoteStoreRecoverySource) shardRoutingOp.get().recoverySource(),
new HashMap<>(),
true
);
} else {
// Whether IndexMetadata is recovered from local disk or remote it doesn't matter to us at this point.
// We are only concerted about index data recovery here. Which is why we only check for remote store enabled and not for remote
// cluster state enabled.
// Even if one of the primaries have recovery source as remote, all other primaries would also be remote backed.
// It cannot be the case where some primaries are remote backed and some are not.
if (cursor.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) == false
|| state.routingTable()
.shardsMatchingPredicateCount(
shardRouting -> shardRouting.primary()
&& shardRouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource
) > 0) {
routingTableBuilder.addAsRecovery(cursor);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public GatewayService(
if (discovery instanceof Coordinator) {
recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
} else {
// confirm if it just for test
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,15 @@ private RemoteRestoreResult executeRestore(
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
boolean metadataFromRemoteStore = false;
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (Map.Entry<String, Tuple<Boolean, IndexMetadata>> indexMetadataEntry : indexMetadataMap.entrySet()) {
String indexName = indexMetadataEntry.getKey();
IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2();
boolean metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
metadataFromRemoteStore = indexMetadataEntry.getValue().v1();
IndexMetadata updatedIndexMetadata = indexMetadata;
if (restoreAllShards || metadataFromRemoteStore) {
updatedIndexMetadata = IndexMetadata.builder(indexMetadata)
Expand All @@ -199,27 +200,22 @@ private RemoteRestoreResult executeRestore(

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());

Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = new HashMap<>();
// Skip creation of routing table if this is for full cluster state restore
if (metadataFromRemoteStore == false) {
indexShardRoutingTableMap = currentState.routingTable()
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = currentState.routingTable()
.index(indexName)
.shards()
.values()
.stream()
.collect(Collectors.toMap(IndexShardRoutingTable::shardId, Function.identity()));
}

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(
updatedIndexMetadata,
recoverySource,
indexShardRoutingTableMap,
restoreAllShards || metadataFromRemoteStore
);
RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
}
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(indexName);
Expand All @@ -230,7 +226,11 @@ private RemoteRestoreResult executeRestore(

RoutingTable rt = rtBuilder.build();
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return RemoteRestoreResult.build(restoreUUID, restoreInfo, allocationService.reroute(updatedState, "restored from remote store"));
return RemoteRestoreResult.build(
restoreUUID,
restoreInfo,
metadataFromRemoteStore ? updatedState : allocationService.reroute(updatedState, "restored from remote store")
);
}

/**
Expand Down

0 comments on commit e893146

Please sign in to comment.