From 933933cd90e6415f6ce4bb8e96506516244725ae Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 30 May 2024 19:09:28 +0530 Subject: [PATCH] Read remote index routing Signed-off-by: Arpit Bandejiya --- .../remote/RemoteRoutingTableService.java | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java new file mode 100644 index 0000000000000..517af479c6c24 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -0,0 +1,223 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexInput; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; + +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.node.Node; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +/** + * A Service which provides APIs to upload and download routing table from remote store. + * + * @opensearch.internal + */ +public class RemoteRoutingTableService implements Closeable { + + /** + * Cluster setting to specify if routing table should be published to remote store + */ + public static final Setting REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.routing.enabled", + true, + Setting.Property.NodeScope, + Setting.Property.Final + ); + public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; + public static final String ROUTING_TABLE = "routing-table"; + public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; + public static final String DELIMITER = "__"; + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; + private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class); + private final Settings settings; + private final Supplier repositoriesService; + private BlobStoreRepository blobStoreRepository; + private final ThreadPool threadPool; + + public RemoteRoutingTableService(Supplier repositoriesService, + Settings settings, + ThreadPool threadPool) { + assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; + this.repositoriesService = repositoriesService; + this.settings = settings; + this.threadPool = threadPool; + } + + public List getChangedIndicesRouting( ClusterState previousClusterState, + ClusterState clusterState) { + Map previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting(); + List changedIndicesRouting = new ArrayList<>(); + for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) { + if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) { + changedIndicesRouting.add(indexRouting); + logger.info("changedIndicesRouting {}", indexRouting.prettyPrint()); + } + } + + return changedIndicesRouting; + } + + private String getIndexRoutingFileName() { + return String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + + } + + public CheckedRunnable getAsyncIndexMetadataReadAction( + String uploadedFilename, + Index index, + LatchedActionListener latchedActionListener) { + int idx = uploadedFilename.lastIndexOf("/"); + String blobFileName = uploadedFilename.substring(idx+1); + BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx))); + + return () -> readAsync( + blobContainer, + blobFileName, + threadPool.executor(ThreadPool.Names.GENERIC), + ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure) + ); + } + + public void readAsync(BlobContainer blobContainer, String name, ExecutorService executorService, ActionListener listener) throws IOException { + executorService.execute(() -> { + try { + listener.onResponse(read(blobContainer, name)); + } catch (Exception e) { + logger.error("routing table download failed : ", e); + listener.onFailure(e); + } + }); + } + + public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, String path) { + try { + return new IndexRoutingTableInputStreamReader(blobContainer.readBlob(path)); + } catch (IOException e) { + logger.info("RoutingTable read failed with error: {}", e.toString()); + } + return null; + } + + public List getUpdatedIndexRoutingTableMetadata(List updatedIndicesRouting, List allIndicesRouting) { + return updatedIndicesRouting.stream().map(idx -> { + Optional uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst(); + assert uploadedIndexMetadataOptional.isPresent() == true; + return uploadedIndexMetadataOptional.get(); + }).collect(Collectors.toList()); + } + + public static List getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { + List deletedIndicesRouting = new ArrayList<>(); + for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) { + if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) { + // Latest Routing Table does not have entry for the index which means the index is deleted + deletedIndicesRouting.add(previousIndexRouting.getIndex().getName()); + } + } + return deletedIndicesRouting; + } + + public static List getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { + List updatedIndicesRouting = new ArrayList<>(); + for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) { + if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) { + // Latest Routing Table does not have entry for the index which means the index is created + updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); + } else { + if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) { + // if the latest routing table has the same routing table as the previous routing table, then the index is not updated + continue; + } + updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); + } + } + return updatedIndicesRouting; + } + + @Override + public void close() throws IOException { + if (blobStoreRepository != null) { + IOUtils.close(blobStoreRepository); + } + } + + public void start() { + assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; + final String remoteStoreRepo = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + assert remoteStoreRepo != null : "Remote routing table repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + public static class RemoteIndexRoutingResult { + String indexName; + IndexRoutingTable indexRoutingTable; + + public RemoteIndexRoutingResult(String indexName, IndexRoutingTable indexRoutingTable) { + this.indexName = indexName; + this.indexRoutingTable = indexRoutingTable; + } + + public String getIndexName() { + return indexName; + } + + public IndexRoutingTable getIndexRoutingTable() { + return indexRoutingTable; + } + } + +}