Skip to content

Commit

Permalink
Add serializer to get IndexShardRoutingTable diffs from IndexRoutingT…
Browse files Browse the repository at this point in the history
…able.

Signed-off-by: Shailendra Singh <[email protected]>
  • Loading branch information
Shailendra Singh committed Jul 10, 2024
1 parent c0265a2 commit 3589857
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package org.opensearch.gateway.remote;public class RemoteRoutingTableServiceIT {
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -21,8 +22,10 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableDiff;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -61,6 +64,22 @@ public void write(IndexRoutingTable value, StreamOutput out) throws IOException
public Diff<IndexRoutingTable> readDiff(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readDiffFrom(in);
}

@Override
public Diff<IndexRoutingTable> diff(IndexRoutingTable currentState, IndexRoutingTable previousState) {
List<IndexShardRoutingTable> diffs = new ArrayList<>();
for (Map.Entry<Integer, IndexShardRoutingTable> entry : currentState.getShards().entrySet()) {
Integer index = entry.getKey();
IndexShardRoutingTable currentShardRoutingTable = entry.getValue();
IndexShardRoutingTable previousShardRoutingTable = previousState.shard(index);
if (previousShardRoutingTable == null) {
diffs.add(currentShardRoutingTable);
} else if (!previousShardRoutingTable.equals(currentShardRoutingTable)) {
diffs.add(currentShardRoutingTable);
}
}
return new IndexRoutingTableDiff(diffs);
}
};

List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ UploadedMetadataResults writeMetadataInParallel(
});
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.put(
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(),
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX,
remoteRoutingTableService.getIndexRoutingAsyncAction(
clusterState,
indexRoutingTable,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/


public class IndexRoutingTableDiff implements Diff<IndexRoutingTable> {
private final List<IndexShardRoutingTable> indexShardRoutingTables;

public IndexRoutingTableDiff(List<IndexShardRoutingTable> indexShardRoutingTables) {
this.indexShardRoutingTables = indexShardRoutingTables;
}

@Override
public IndexRoutingTable apply(IndexRoutingTable part) {
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex());
for (IndexShardRoutingTable shardRoutingTable : part) {
builder.addIndexShard(shardRoutingTable); // Add existing shard to builder
}

// Apply the diff: update or add the new shard routing tables
for (IndexShardRoutingTable diffShard : indexShardRoutingTables) {
builder.addIndexShard(diffShard);
}
return builder.build();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexShardRoutingTables.size());
for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) {
IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out);
}
}

public static IndexRoutingTableDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
indexShardRoutingTables.add(shardRoutingTable);
}
return new IndexRoutingTableDiff(indexShardRoutingTables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.InputStreamDataInput;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.core.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
Expand All @@ -22,8 +23,10 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.List;

public class RemoteIndexRoutingTableDiff implements Diff<IndexRoutingTable>, Writeable {

Expand Down Expand Up @@ -52,9 +55,22 @@ public RemoteIndexRoutingTableDiff(InputStream inputStream) throws IOException {
CodecUtil.checkHeader(new InputStreamDataInput(inputStream), codec, VERSION, VERSION);
int size = in.readVInt();
Map<String, Diff<IndexRoutingTable>> diffs = new HashMap<>();

for (int i = 0; i < size; i++) {
String key = in.readString();
Diff<IndexRoutingTable> diff = IndexRoutingTable.readDiffFrom(in);
List<IndexShardRoutingTable> shardRoutingTables = new ArrayList<>();

// Read each IndexShardRoutingTable from the stream
int numShards = in.readVInt();
for (int j = 0; j < numShards; j++) {
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
shardRoutingTables.add(shardRoutingTable);
}

// Create a diff object for the index
Diff<IndexRoutingTable> diff = new IndexRoutingTableDiff(shardRoutingTables);

// Put the diff into the map with the key
diffs.put(key, diff);
}
verifyCheckSum(in);
Expand Down

0 comments on commit 3589857

Please sign in to comment.