Skip to content

Commit

Permalink
only use RemoteStoreRecoverySource if all primaries are marked for re…
Browse files Browse the repository at this point in the history
…mote store recovery

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Sep 11, 2023
1 parent 7f0528d commit 9b7c098
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -125,26 +126,35 @@ static ClusterState updateRoutingTable(final ClusterState state) {
// initialize all index routing tables as empty
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(state.routingTable());
for (final IndexMetadata cursor : state.metadata().indices().values()) {
Optional<ShardRouting> shardRouting = Optional.empty();
Optional<ShardRouting> shardRoutingOp = Optional.empty();
if (cursor.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
// get recovery source if possible from existing shards
// this is the case of auto restore.
shardRouting = state.routingTable()
.indicesRouting()
.get(cursor.getIndex().getName())
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardrouting -> shardrouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource)
.findFirst();
IndexRoutingTable indexRoutingTable = state.routingTable().indicesRouting().get(cursor.getIndex().getName());
int totalPrimariesWithRemoteRecovery = indexRoutingTable.shardsMatchingPredicateCount(
shardRouting -> shardRouting.primary()
&& shardRouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource
);

// only if all primaries are set up for remote recovery, we would initialize the routable with RemoteStoreRecoverySource
if (cursor.getNumberOfShards() == totalPrimariesWithRemoteRecovery) {
shardRoutingOp = state.routingTable()
.indicesRouting()
.get(cursor.getIndex().getName())
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardrouting -> shardrouting.recoverySource() instanceof RecoverySource.RemoteStoreRecoverySource)
.findAny();
}
}

if (shardRouting.isPresent()) {
if (shardRoutingOp.isPresent()) {
// add as recovery source
routingTableBuilder.addAsRemoteStoreRestore(
cursor,
(RecoverySource.RemoteStoreRecoverySource) shardRouting.get().recoverySource(),
(RecoverySource.RemoteStoreRecoverySource) shardRoutingOp.get().recoverySource(),
new HashMap<>(),
true
);
Expand Down

0 comments on commit 9b7c098

Please sign in to comment.