Skip to content

Commit

Permalink
[POC] try restoring an index with hardcoded metadata
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Aug 3, 2023
1 parent 91bc891 commit d192f2d
Showing 1 changed file with 107 additions and 64 deletions.
171 changes: 107 additions & 64 deletions server/src/main/java/org/opensearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand All @@ -107,15 +108,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_HISTORY_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_UPGRADED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.*;
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.common.util.set.Sets.newHashSet;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
Expand Down Expand Up @@ -217,68 +210,110 @@ public void restoreFromRemoteStore(RestoreRemoteStoreRequest request, final Acti
final String restoreUUID = UUIDs.randomBase64UUID();
RestoreInfo restoreInfo = null;

@Override
public ClusterState execute(ClusterState currentState) {
// Updating cluster state
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
private IndexMetadata getRemoteIndexMetadata() {
// Dummy data for initial testing
return IndexMetadata.builder("my-index-01")
.settings(Settings.builder()
.put(SETTING_INDEX_UUID, "TLHafcwfTAazM5hFSFidyA")
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "my-fs-repository")
.put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "my-fs-repository")
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_VERSION_CREATED, "137217827")
)
.primaryTerm(0, 2)
.build();
}

List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
for (String index : request.indices()) {
IndexMetadata currentIndexMetadata = currentState.metadata().index(index);
if (currentIndexMetadata == null) {
// ToDo: Handle index metadata does not exist case. (GitHub #3457)
logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index);
continue;
}
if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
IndexMetadata updatedIndexMetadata = currentIndexMetadata;
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
if (request.restoreAllShards()) {
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
throw new IllegalStateException(
"cannot restore index ["
+ index
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index"
);
private boolean isIndexMetadataFromRemoteStore(IndexMetadata indexMetadata){
// might move it somewhere else.
// but we need a way to distinguish if we are restore IndexMetadata from restore.
return true;
}

private void validate(ClusterState currentState, Map<String, IndexMetadata> indexMetadataMap,
boolean allowPartial, boolean restoreAllShards) {
for (Map.Entry<String, IndexMetadata> indexMetadataSet: indexMetadataMap.entrySet()) {
String indexName = indexMetadataSet.getKey();
IndexMetadata indexMetadata = indexMetadataSet.getValue();
if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState())) {
String errorMsg = "cannot restore index ["
+ indexName
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index";
if (allowPartial) {
throw new IllegalStateException(errorMsg);
} else {
logger.warn(errorMsg);
}
updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata)
.state(IndexMetadata.State.OPEN)
.version(1 + currentIndexMetadata.getVersion())
.mappingVersion(1 + currentIndexMetadata.getMappingVersion())
.settingsVersion(1 + currentIndexMetadata.getSettingsVersion())
.aliasesVersion(1 + currentIndexMetadata.getAliasesVersion())
.build();
} else {
activeInitializingShards = currentState.routingTable()
.index(index)
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardRouting -> shardRouting.unassigned() == false)
.collect(Collectors.toMap(ShardRouting::shardId, Function.identity()));
}

IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID());
if (isIndexMetadataFromRemoteStore(indexMetadata)) {
Version minIndexCompatibilityVersion = currentState.getNodes()
.getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
metadataIndexUpgradeService.upgradeIndexMetadata(indexMetadata, minIndexCompatibilityVersion);
boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.getSettings());
createIndexService.validateIndexName(indexName, currentState);
createIndexService.validateDotIndex(indexName, isHidden);
createIndexService.validateIndexSettings(indexName, indexMetadata.getSettings(), false);
}
// TODO other validation will come here. still figuring out what else we need to validate
} else {
logger.warn("Remote store is not enabled for index: {}", indexName);
}

RemoteStoreRecoverySource recoverySource = new RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(index);
totalShards += updatedIndexMetadata.getNumberOfShards();
}
}

private ClusterState executeRestore(ClusterState currentState, Map<String, IndexMetadata> indexMetadataMap,
boolean restoreAllShards) {
List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (Map.Entry<String, IndexMetadata> indexMetadataSet : indexMetadataMap.entrySet()) {
String indexName = indexMetadataSet.getKey();
IndexMetadata indexMetadata = indexMetadataSet.getValue();
IndexMetadata updatedIndexMetadata = indexMetadata;
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
if (restoreAllShards) {
updatedIndexMetadata = IndexMetadata.builder(indexMetadata)
.state(IndexMetadata.State.OPEN)
// do we need to increment this during restore from remote index metadata
.version(1 + indexMetadata.getVersion())
.mappingVersion(1 + indexMetadata.getMappingVersion())
.settingsVersion(1 + indexMetadata.getSettingsVersion())
.aliasesVersion(1 + indexMetadata.getAliasesVersion())
.build();
} else {
logger.warn("Remote store is not enabled for index: {}", index);
activeInitializingShards = currentState.routingTable()
.index(indexName)
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardRouting -> shardRouting.unassigned() == false)
.collect(Collectors.toMap(ShardRouting::shardId, Function.identity()));
}

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());

RemoteStoreRecoverySource recoverySource = new RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(indexName);
totalShards += updatedIndexMetadata.getNumberOfShards();
}

restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards);
Expand All @@ -288,6 +323,14 @@ public ClusterState execute(ClusterState currentState) {
return allocationService.reroute(updatedState, "restored from remote store");
}

@Override
public ClusterState execute(ClusterState currentState) {
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indexMetadataMap.put("my-index-01", getRemoteIndexMetadata());
validate(currentState, indexMetadataMap, true, request.restoreAllShards());
return executeRestore(currentState, indexMetadataMap, request.restoreAllShards());
}

@Override
public void onFailure(String source, Exception e) {
logger.warn("failed to restore from remote store", e);
Expand Down

0 comments on commit d192f2d

Please sign in to comment.