diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java new file mode 100644 index 0000000000000..ccb437edde2b0 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java @@ -0,0 +1,2 @@ +package org.opensearch.gateway.remote;public class RemoteRoutingTableServiceIT { +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 5eb80b96f8c2d..d3841c18c9109 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; @@ -21,8 +22,10 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.routingtable.IndexRoutingTableDiff; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -61,6 +64,22 @@ public void write(IndexRoutingTable value, StreamOutput out) throws IOException public Diff readDiff(StreamInput in, String key) throws IOException { return IndexRoutingTable.readDiffFrom(in); } + + @Override + public Diff diff(IndexRoutingTable currentState, IndexRoutingTable previousState) { + List diffs = new ArrayList<>(); + for (Map.Entry entry : currentState.getShards().entrySet()) { + Integer index = entry.getKey(); + IndexShardRoutingTable currentShardRoutingTable = entry.getValue(); + IndexShardRoutingTable previousShardRoutingTable = previousState.shard(index); + if (previousShardRoutingTable == null) { + diffs.add(currentShardRoutingTable); + } else if (!previousShardRoutingTable.equals(currentShardRoutingTable)) { + diffs.add(currentShardRoutingTable); + } + } + return new IndexRoutingTableDiff(diffs); + } }; List getIndicesRouting(RoutingTable routingTable); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 41309d5e6cf39..b8b2563d72861 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -678,7 +678,7 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX, remoteRoutingTableService.getIndexRoutingAsyncAction( clusterState, indexRoutingTable, diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableDiff.java new file mode 100644 index 0000000000000..24ffc1bbb6d46 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/IndexRoutingTableDiff.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A Service which provides APIs to upload and download routing table from remote store. + * + * @opensearch.internal + */ + + +public class IndexRoutingTableDiff implements Diff { + private final List indexShardRoutingTables; + + public IndexRoutingTableDiff(List indexShardRoutingTables) { + this.indexShardRoutingTables = indexShardRoutingTables; + } + + @Override + public IndexRoutingTable apply(IndexRoutingTable part) { + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex()); + for (IndexShardRoutingTable shardRoutingTable : part) { + builder.addIndexShard(shardRoutingTable); // Add existing shard to builder + } + + // Apply the diff: update or add the new shard routing tables + for (IndexShardRoutingTable diffShard : indexShardRoutingTables) { + builder.addIndexShard(diffShard); + } + return builder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(indexShardRoutingTables.size()); + for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) { + IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out); + } + } + + public static IndexRoutingTableDiff readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + List indexShardRoutingTables = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); + indexShardRoutingTables.add(shardRoutingTable); + } + return new IndexRoutingTableDiff(indexShardRoutingTables); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiff.java index fbbf9ca724e04..e897a425ffd7a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiff.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiff.java @@ -13,6 +13,7 @@ import org.apache.lucene.store.InputStreamDataInput; import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.InputStreamStreamInput; @@ -22,8 +23,10 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.List; public class RemoteIndexRoutingTableDiff implements Diff, Writeable { @@ -52,9 +55,22 @@ public RemoteIndexRoutingTableDiff(InputStream inputStream) throws IOException { CodecUtil.checkHeader(new InputStreamDataInput(inputStream), codec, VERSION, VERSION); int size = in.readVInt(); Map> diffs = new HashMap<>(); + for (int i = 0; i < size; i++) { String key = in.readString(); - Diff diff = IndexRoutingTable.readDiffFrom(in); + List shardRoutingTables = new ArrayList<>(); + + // Read each IndexShardRoutingTable from the stream + int numShards = in.readVInt(); + for (int j = 0; j < numShards; j++) { + IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); + shardRoutingTables.add(shardRoutingTable); + } + + // Create a diff object for the index + Diff diff = new IndexRoutingTableDiff(shardRoutingTables); + + // Put the diff into the map with the key diffs.put(key, diff); } verifyCheckSum(in);