Skip to content

Commit

Permalink
Minor Refactoring and Spotless fixes
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Aug 7, 2023
1 parent 23a15db commit 207060b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,17 @@ protected void clusterManagerOperation(
final ClusterState state,
final ActionListener<RestoreRemoteStoreResponse> listener
) {
restoreService.restore(
request,
ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
})
);
restoreService.restore(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
}));
}
}
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.plugins.ExtensionAwarePlugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.snapshots.*;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.snapshots.RemoteStoreRestoreService;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.snapshots.SnapshotShardsService;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.TracerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -46,7 +47,14 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.*;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;

public class RemoteStoreRestoreService implements ClusterStateApplier {

Expand Down Expand Up @@ -103,77 +111,42 @@ private IndexMetadata getRemoteIndexMetadata() {
// Dummy data for initial testing
try {
return IndexMetadata.builder("my-index-01")
.settings(Settings.builder()
.put(SETTING_INDEX_UUID, "TLHafcwfTAazM5hFSFidyA")
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "my-fs-repository")
.put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "my-fs-repository")
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_VERSION_CREATED, "137217827")
.settings(
Settings.builder()
.put(SETTING_INDEX_UUID, "TLHafcwfTAazM5hFSFidyA")
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "my-fs-repository")
.put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "my-fs-repository")
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_VERSION_CREATED, "137217827")
)
.primaryTerm(0, 2)
.putMapping("{\"_doc\":{\"properties\":{\"settings\":{\"properties\":{\"index\":{\"properties\":{\"number_of_replicas\":{\"type\":\"long\"},\"number_of_shards\":{\"type\":\"long\"},\"remote_store\":{\"properties\":{\"enabled\":{\"type\":\"boolean\"},\"repository\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"translog\":{\"properties\":{\"buffer_interval\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"enabled\":{\"type\":\"boolean\"},\"repository\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}}},\"replication\":{\"properties\":{\"type\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}}}}}}}}")
.putMapping(
"{\"_doc\":{\"properties\":{\"settings\":{\"properties\":{\"index\":{\"properties\":{\"number_of_replicas\":{\"type\":\"long\"},\"number_of_shards\":{\"type\":\"long\"},\"remote_store\":{\"properties\":{\"enabled\":{\"type\":\"boolean\"},\"repository\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"translog\":{\"properties\":{\"buffer_interval\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"enabled\":{\"type\":\"boolean\"},\"repository\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}}},\"replication\":{\"properties\":{\"type\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}}}}}}}}"
)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private boolean isIndexMetadataFromRemoteStore(IndexMetadata indexMetadata) {
// might move it somewhere else.
// but we need a way to distinguish if we are restore IndexMetadata from restore.
return true;
}

private void validate(ClusterState currentState, Map<String, IndexMetadata> indexMetadataMap,
boolean allowPartial, boolean restoreAllShards) {
for (Map.Entry<String, IndexMetadata> indexMetadataSet: indexMetadataMap.entrySet()) {
String indexName = indexMetadataSet.getKey();
IndexMetadata indexMetadata = indexMetadataSet.getValue();
if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState())) {
String errorMsg = "cannot restore index ["
+ indexName
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index";
if (allowPartial) {
throw new IllegalStateException(errorMsg);
} else {
logger.warn(errorMsg);
}
}

if (isIndexMetadataFromRemoteStore(indexMetadata)) {
Version minIndexCompatibilityVersion = currentState.getNodes()
.getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
metadataIndexUpgradeService.upgradeIndexMetadata(indexMetadata, minIndexCompatibilityVersion);
boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.getSettings());
createIndexService.validateIndexName(indexName, currentState);
createIndexService.validateDotIndex(indexName, isHidden);
createIndexService.validateIndexSettings(indexName, indexMetadata.getSettings(), false);
}
// TODO other validation will come here. still figuring out what else we need to validate
} else {
logger.warn("Remote store is not enabled for index: {}", indexName);
}

}
}

private ClusterState executeRestore(ClusterState currentState, Map<String, IndexMetadata> indexMetadataMap,
boolean restoreAllShards) {
private ClusterState executeRestore(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean restoreAllShards
) {
List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (Map.Entry<String, IndexMetadata> indexMetadataSet : indexMetadataMap.entrySet()) {
String indexName = indexMetadataSet.getKey();
IndexMetadata indexMetadata = indexMetadataSet.getValue();
for (Map.Entry<String, Tuple<Boolean, IndexMetadata>> indexMetadataEntry : indexMetadataMap.entrySet()) {
String indexName = indexMetadataEntry.getKey();
IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2();
boolean fromRemoteStore = indexMetadataEntry.getValue().v1();
IndexMetadata updatedIndexMetadata = indexMetadata;
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
if (restoreAllShards) {
Expand All @@ -185,7 +158,7 @@ private ClusterState executeRestore(ClusterState currentState, Map<String, Index
.settingsVersion(1 + indexMetadata.getSettingsVersion())
.aliasesVersion(1 + indexMetadata.getAliasesVersion())
.build();
} else if (isIndexMetadataFromRemoteStore(indexMetadata) == false) {
} else if (fromRemoteStore == false) {
activeInitializingShards = currentState.routingTable()
.index(indexName)
.shards()
Expand Down Expand Up @@ -219,8 +192,20 @@ private ClusterState executeRestore(ClusterState currentState, Map<String, Index

@Override
public ClusterState execute(ClusterState currentState) {
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indexMetadataMap.put("my-index-01", getRemoteIndexMetadata());
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
boolean isFullClusterRestore = false; // TODO will be controlled using request query param to be introduced in future
if (isFullClusterRestore) {
indexMetadataMap.put("my-index-01", new Tuple<>(true, getRemoteIndexMetadata()));
} else {
for (String indexName : request.indices()) {
IndexMetadata indexMetadata = currentState.metadata().index(indexName);
if (indexMetadata == null) {
logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName);
} else {
indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata));
}
}
}
validate(currentState, indexMetadataMap, true, request.restoreAllShards());
return executeRestore(currentState, indexMetadataMap, request.restoreAllShards());
}
Expand All @@ -243,4 +228,43 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});

}

private void validate(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean allowPartial,
boolean restoreAllShards
) {
for (Map.Entry<String, Tuple<Boolean, IndexMetadata>> indexMetadataEntry : indexMetadataMap.entrySet()) {
String indexName = indexMetadataEntry.getKey();
IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2();
boolean fromRemoteStore = indexMetadataEntry.getValue().v1();
if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState())) {
String errorMsg = "cannot restore index ["
+ indexName
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index";
if (allowPartial) {
throw new IllegalStateException(errorMsg);
} else {
logger.warn(errorMsg);
}
}

if (fromRemoteStore) {
Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();
metadataIndexUpgradeService.upgradeIndexMetadata(indexMetadata, minIndexCompatibilityVersion);
boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.getSettings());
createIndexService.validateIndexName(indexName, currentState);
createIndexService.validateDotIndex(indexName, isHidden);
createIndexService.validateIndexSettings(indexName, indexMetadata.getSettings(), false);
}
// TODO other validation will come here. still figuring out what else we need to validate
} else {
logger.warn("Remote store is not enabled for index: {}", indexName);
}

}
}
}

0 comments on commit 207060b

Please sign in to comment.