Skip to content

Commit

Permalink
Read remote index routing
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed May 30, 2024
1 parent 2e49743 commit 933933c
Showing 1 changed file with 223 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;

import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class RemoteRoutingTableService implements Closeable {

/**
* Cluster setting to specify if routing table should be published to remote store
*/
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.routing.enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Final
);
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String ROUTING_TABLE = "routing-table";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings,
ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadPool;
}

public List<IndexRoutingTable> getChangedIndicesRouting( ClusterState previousClusterState,
ClusterState clusterState) {
Map<String, IndexRoutingTable> previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting();
List<IndexRoutingTable> changedIndicesRouting = new ArrayList<>();
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) {
changedIndicesRouting.add(indexRouting);
logger.info("changedIndicesRouting {}", indexRouting.prettyPrint());
}
}

return changedIndicesRouting;
}

private String getIndexRoutingFileName() {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);

}

public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));

return () -> readAsync(
blobContainer,
blobFileName,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure)
);
}

public void readAsync(BlobContainer blobContainer, String name, ExecutorService executorService, ActionListener<IndexRoutingTableInputStreamReader> listener) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name));
} catch (Exception e) {
logger.error("routing table download failed : ", e);
listener.onFailure(e);
}
});
}

public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) {
try {
return new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path));
} catch (IOException e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
}
return null;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}

public static List<String> getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> deletedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) {
if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is deleted
deletedIndicesRouting.add(previousIndexRouting.getIndex().getName());
}
}
return deletedIndicesRouting;
}

public static List<String> getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> updatedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) {
if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is created
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
} else {
if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) {
// if the latest routing table has the same routing table as the previous routing table, then the index is not updated
continue;
}
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
}
}
return updatedIndicesRouting;
}

@Override
public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

public void start() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}

public static class RemoteIndexRoutingResult {
String indexName;
IndexRoutingTable indexRoutingTable;

public RemoteIndexRoutingResult(String indexName, IndexRoutingTable indexRoutingTable) {
this.indexName = indexName;
this.indexRoutingTable = indexRoutingTable;
}

public String getIndexName() {
return indexName;
}

public IndexRoutingTable getIndexRoutingTable() {
return indexRoutingTable;
}
}

}

0 comments on commit 933933c

Please sign in to comment.