From 0e21a0c785a9fb10c3e5436f000eaebf15b50e70 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 31 Jul 2023 12:25:20 -0700 Subject: [PATCH 1/6] route to pri shards Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 41 ++++++++++++++++++- .../action/get/TransportGetAction.java | 24 +++++++---- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index c7bff082f7dd4..a3625ca70d220 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -21,9 +21,13 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.junit.Before; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; @@ -43,7 +47,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; @@ -74,7 +77,6 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -87,6 +89,7 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.search.SearchContextId.decode; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -110,6 +113,10 @@ private void setup() { internalCluster().startClusterManagerOnlyNode(); } + static String indexOrAlias() { + return randomBoolean() ? INDEX_NAME : "alias"; + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -1440,4 +1447,34 @@ public void testRestartPrimary_NoReplicas() throws Exception { ensureYellow(INDEX_NAME); assertDocCounts(1, primary); } + + /** + * Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads. + */ + public void testRealtimeGetRequests() { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) + .addAlias(new Alias("alias")) + ); + ensureGreen(INDEX_NAME); + + GetResponse response = client().prepareGet(indexOrAlias(), "1").get(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> index doc 1"); + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get(); + + logger.info("--> non realtime get 1"); + response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> realtime get 1"); + response = client().prepareGet(indexOrAlias(), "1").get(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); + } } diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index c7eac009ed49a..374daaa5ccffb 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -36,7 +36,9 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -47,6 +49,7 @@ import org.opensearch.index.get.GetResult; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -90,14 +93,21 @@ protected boolean resolveIndex(GetRequest request) { @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { + String preference = request.request().preference(); + // route realtime GET requests when segment replication is enabled to primary shards, + // iff there are no other preferences/routings enabled for routing to a specific shard + if (state.getMetadata() + .index(request.concreteIndex()) + .getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString()) + && request.request().realtime() + && request.request().routing() == null + && request.request().preference() == null) { + preference = Preference.PRIMARY.type(); + } return clusterService.operationRouting() - .getShards( - clusterService.state(), - request.concreteIndex(), - request.request().id(), - request.request().routing(), - request.request().preference() - ); + .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference); } @Override From f05e2000792e459299b44fb79da0e9fe9e87c666 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Wed, 9 Aug 2023 16:36:15 -0700 Subject: [PATCH 2/6] Add changelog entry Signed-off-by: Poojita Raj --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7af3c171e8c6a..557daa2a492ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introducing Default and Best Compression codecs as their algorithm name ([#9123]()https://github.com/opensearch-project/OpenSearch/pull/9123) - Make SearchTemplateRequest implement IndicesRequest.Replaceable ([#9122]()https://github.com/opensearch-project/OpenSearch/pull/9122) - [BWC and API enforcement] Define the initial set of annotations, their meaning and relations between them ([#9223](https://github.com/opensearch-project/OpenSearch/pull/9223)) +- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) From e9b6e709bef8bdfe8acce1da26f74269c73ce812 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 10 Aug 2023 17:20:13 -0700 Subject: [PATCH 3/6] refactor + add multiGet support Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 146 +++++++++++++++++- .../action/get/TransportGetAction.java | 30 +++- .../action/get/TransportMultiGetAction.java | 28 ++++ 3 files changed, 189 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index a3625ca70d220..55b05d78bc7e0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -23,11 +23,14 @@ import org.apache.lucene.util.BytesRef; import org.junit.Before; import org.opensearch.common.action.ActionFuture; +import org.opensearch.OpenSearchException; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; +import org.opensearch.action.get.MultiGetRequest; +import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; @@ -44,6 +47,7 @@ import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -90,6 +94,7 @@ import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.search.SearchContextId.decode; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1451,7 +1456,7 @@ public void testRestartPrimary_NoReplicas() throws Exception { /** * Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads. */ - public void testRealtimeGetRequests() { + public void testRealtimeGetRequestsSuccessful() { final String primary = internalCluster().startDataOnlyNode(); final String replica = internalCluster().startDataOnlyNode(); @@ -1462,19 +1467,146 @@ public void testRealtimeGetRequests() { ensureGreen(INDEX_NAME); GetResponse response = client().prepareGet(indexOrAlias(), "1").get(); - assertThat(response.isExists(), equalTo(false)); + assertFalse(response.isExists()); - logger.info("--> index doc 1"); + // index doc 1 client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get(); - logger.info("--> non realtime get 1"); + // non realtime get 1 response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); - assertThat(response.isExists(), equalTo(false)); + assertFalse(response.isExists()); - logger.info("--> realtime get 1"); + // non realtime get 1 (on replica shard only) + response = client(replica).prepareGet(indexOrAlias(), "1").setPreference("_only_local").setRealtime(false).get(); + assertFalse(response.isExists()); + + // realtime get 1 response = client().prepareGet(indexOrAlias(), "1").get(); - assertThat(response.isExists(), equalTo(true)); + assertTrue(response.isExists()); assertThat(response.getIndex(), equalTo(INDEX_NAME)); assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); } + + public void testRealtimeGetRequestsUnsuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + assertAcked( + prepareCreate(INDEX_NAME).setSettings( + Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + ).addAlias(new Alias("alias")) + ); + ensureGreen(INDEX_NAME); + + final String id = routingKeyForShard(INDEX_NAME, 0); + final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1); + + // index doc 1 + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get(); + + // non realtime get 1 + GetResponse response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + assertFalse(response.isExists()); + + // realtime get 1 (preference = _replica) + response = client().prepareGet(indexOrAlias(), "1").setPreference(Preference.REPLICA.type()).get(); + assertFalse(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + + // realtime get 1 (with routing set) + response = client().prepareGet(INDEX_NAME, "1").setRouting(routingOtherShard).get(); + assertFalse(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + } + + /** + * Tests whether segment replication supports realtime MultiGet requests and reads and parses source from the translog to serve strong reads. + */ + public void testRealtimeMultiGetRequestsSuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) + .addAlias(new Alias("alias")) + ); + ensureGreen(INDEX_NAME); + + // index doc 1 + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + + // multi get non realtime 1 + MultiGetResponse mgetResponse = client().prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .setRealtime(false) + .get(); + assertThat(mgetResponse.getResponses().length, is(2)); + + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].isFailed()); + assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); + + // multi get realtime 1 + mgetResponse = client().prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .get(); + + assertThat(mgetResponse.getResponses().length, is(2)); + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].isFailed()); + assertThat(mgetResponse.getResponses()[0].getResponse().getSourceAsMap().get("foo").toString(), equalTo("bar")); + + assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[1].isFailed()); + assertThat(mgetResponse.getResponses()[1].getFailure().getMessage(), is("no such index [nonExistingIndex]")); + assertThat( + ((OpenSearchException) mgetResponse.getResponses()[1].getFailure().getFailure()).getIndex().getName(), + is("nonExistingIndex") + ); + } + + public void testRealtimeMultiGetRequestsUnsuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + assertAcked( + prepareCreate(INDEX_NAME).setSettings( + Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + ).addAlias(new Alias("alias")) + ); + ensureGreen(INDEX_NAME); + + final String id = routingKeyForShard(INDEX_NAME, 0); + final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1); + + // index doc 1 + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get(); + + // realtime multi get 1 (preference = _replica) + MultiGetResponse mgetResponse = client().prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .setPreference(Preference.REPLICA.type()) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .get(); + assertThat(mgetResponse.getResponses().length, is(2)); + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); + + assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[1].isFailed()); + + // realtime multi get 1 (routing set) + mgetResponse = client().prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1").routing(routingOtherShard)) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .get(); + assertThat(mgetResponse.getResponses().length, is(2)); + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); + assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[1].isFailed()); + + } } diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 374daaa5ccffb..5c5cfa468d0b7 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -91,19 +91,33 @@ protected boolean resolveIndex(GetRequest request) { return true; } + /** + * Returns true if GET request should be routed to primary shards, else false. + */ + protected boolean isPrimaryBasedRouting(ClusterState state, InternalRequest request) { + try { + if (state.getMetadata() + .index(request.concreteIndex()) + .getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString()) + && request.request().realtime() + && request.request().routing() == null + && request.request().preference() == null) { + return true; + } + } catch (NullPointerException e) { + return false; + } + return false; + } + @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { String preference = request.request().preference(); // route realtime GET requests when segment replication is enabled to primary shards, // iff there are no other preferences/routings enabled for routing to a specific shard - if (state.getMetadata() - .index(request.concreteIndex()) - .getSettings() - .get(IndexMetadata.SETTING_REPLICATION_TYPE) - .equals(ReplicationType.SEGMENT.toString()) - && request.request().realtime() - && request.request().routing() == null - && request.request().preference() == null) { + if (isPrimaryBasedRouting(state, request)) { preference = Preference.PRIMARY.type(); } return clusterService.operationRouting() diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index 37ce45dc3d372..f0bcbf91821fc 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -37,12 +37,15 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -75,6 +78,28 @@ public TransportMultiGetAction( this.indexNameExpressionResolver = resolver; } + /** + * Returns true if MultiGet request should be routed to primary shards, else false. + */ + boolean isPrimaryBasedRouting(MultiGetRequest request, MultiGetRequest.Item item) { + try { + if (request.preference == null + && item.routing() == null + && request.realtime + && clusterService.state() + .getMetadata() + .index(item.index()) + .getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString())) { + return true; + } + } catch (NullPointerException e) { + return false; + } + return false; + } + @Override protected void doExecute(Task task, final MultiGetRequest request, final ActionListener listener) { ClusterState clusterState = clusterService.state(); @@ -109,6 +134,9 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { + if (isPrimaryBasedRouting(request, item)) { + request.preference(Preference.PRIMARY.type()); + } shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); shardRequests.put(shardId, shardRequest); } From ad6bb6ff487fe186f866627edd4757fea447045b Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 15 Aug 2023 13:32:46 -0700 Subject: [PATCH 4/6] address changes Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 34 +++++++------------ .../action/get/TransportGetAction.java | 21 ++++-------- .../action/get/TransportMultiGetAction.java | 22 ++++-------- 3 files changed, 25 insertions(+), 52 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 55b05d78bc7e0..b969f7c4c72e4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -118,7 +118,7 @@ private void setup() { internalCluster().startClusterManagerOnlyNode(); } - static String indexOrAlias() { + private static String indexOrAlias() { return randomBoolean() ? INDEX_NAME : "alias"; } @@ -1458,15 +1458,14 @@ public void testRestartPrimary_NoReplicas() throws Exception { */ public void testRealtimeGetRequestsSuccessful() { final String primary = internalCluster().startDataOnlyNode(); - final String replica = internalCluster().startDataOnlyNode(); - assertAcked( prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) .addAlias(new Alias("alias")) ); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); - GetResponse response = client().prepareGet(indexOrAlias(), "1").get(); + GetResponse response = client(replica).prepareGet(indexOrAlias(), "1").get(); assertFalse(response.isExists()); // index doc 1 @@ -1476,12 +1475,8 @@ public void testRealtimeGetRequestsSuccessful() { response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); assertFalse(response.isExists()); - // non realtime get 1 (on replica shard only) - response = client(replica).prepareGet(indexOrAlias(), "1").setPreference("_only_local").setRealtime(false).get(); - assertFalse(response.isExists()); - - // realtime get 1 - response = client().prepareGet(indexOrAlias(), "1").get(); + // realtime get 1 (on replica shard only) + response = client(replica).prepareGet(indexOrAlias(), "1").get(); assertTrue(response.isExists()); assertThat(response.getIndex(), equalTo(INDEX_NAME)); assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); @@ -1489,13 +1484,12 @@ public void testRealtimeGetRequestsSuccessful() { public void testRealtimeGetRequestsUnsuccessful() { final String primary = internalCluster().startDataOnlyNode(); - final String replica = internalCluster().startDataOnlyNode(); - assertAcked( prepareCreate(INDEX_NAME).setSettings( Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) ).addAlias(new Alias("alias")) ); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final String id = routingKeyForShard(INDEX_NAME, 0); @@ -1509,12 +1503,12 @@ public void testRealtimeGetRequestsUnsuccessful() { assertFalse(response.isExists()); // realtime get 1 (preference = _replica) - response = client().prepareGet(indexOrAlias(), "1").setPreference(Preference.REPLICA.type()).get(); + response = client(replica).prepareGet(indexOrAlias(), "1").setPreference(Preference.REPLICA.type()).get(); assertFalse(response.isExists()); assertThat(response.getIndex(), equalTo(INDEX_NAME)); // realtime get 1 (with routing set) - response = client().prepareGet(INDEX_NAME, "1").setRouting(routingOtherShard).get(); + response = client(replica).prepareGet(INDEX_NAME, "1").setRouting(routingOtherShard).get(); assertFalse(response.isExists()); assertThat(response.getIndex(), equalTo(INDEX_NAME)); } @@ -1524,12 +1518,11 @@ public void testRealtimeGetRequestsUnsuccessful() { */ public void testRealtimeMultiGetRequestsSuccessful() { final String primary = internalCluster().startDataOnlyNode(); - final String replica = internalCluster().startDataOnlyNode(); - assertAcked( prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) .addAlias(new Alias("alias")) ); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); // index doc 1 @@ -1548,7 +1541,7 @@ public void testRealtimeMultiGetRequestsSuccessful() { assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); // multi get realtime 1 - mgetResponse = client().prepareMultiGet() + mgetResponse = client(replica).prepareMultiGet() .add(new MultiGetRequest.Item(INDEX_NAME, "1")) .add(new MultiGetRequest.Item("nonExistingIndex", "1")) .get(); @@ -1569,13 +1562,12 @@ public void testRealtimeMultiGetRequestsSuccessful() { public void testRealtimeMultiGetRequestsUnsuccessful() { final String primary = internalCluster().startDataOnlyNode(); - final String replica = internalCluster().startDataOnlyNode(); - assertAcked( prepareCreate(INDEX_NAME).setSettings( Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) ).addAlias(new Alias("alias")) ); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final String id = routingKeyForShard(INDEX_NAME, 0); @@ -1585,7 +1577,7 @@ public void testRealtimeMultiGetRequestsUnsuccessful() { client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get(); // realtime multi get 1 (preference = _replica) - MultiGetResponse mgetResponse = client().prepareMultiGet() + MultiGetResponse mgetResponse = client(replica).prepareMultiGet() .add(new MultiGetRequest.Item(INDEX_NAME, "1")) .setPreference(Preference.REPLICA.type()) .add(new MultiGetRequest.Item("nonExistingIndex", "1")) @@ -1598,7 +1590,7 @@ public void testRealtimeMultiGetRequestsUnsuccessful() { assertTrue(mgetResponse.getResponses()[1].isFailed()); // realtime multi get 1 (routing set) - mgetResponse = client().prepareMultiGet() + mgetResponse = client(replica).prepareMultiGet() .add(new MultiGetRequest.Item(INDEX_NAME, "1").routing(routingOtherShard)) .add(new MultiGetRequest.Item("nonExistingIndex", "1")) .get(); diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 5c5cfa468d0b7..ecb7f94b8dc3d 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -95,21 +95,12 @@ protected boolean resolveIndex(GetRequest request) { * Returns true if GET request should be routed to primary shards, else false. */ protected boolean isPrimaryBasedRouting(ClusterState state, InternalRequest request) { - try { - if (state.getMetadata() - .index(request.concreteIndex()) - .getSettings() - .get(IndexMetadata.SETTING_REPLICATION_TYPE) - .equals(ReplicationType.SEGMENT.toString()) - && request.request().realtime() - && request.request().routing() == null - && request.request().preference() == null) { - return true; - } - } catch (NullPointerException e) { - return false; - } - return false; + IndexMetadata indexMetadata = state.getMetadata().index(request.concreteIndex()); + return indexMetadata != null + && indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE).equals(ReplicationType.SEGMENT.toString()) + && request.request().realtime() + && request.request().routing() == null + && request.request().preference() == null; } @Override diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index f0bcbf91821fc..8beea167e8837 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -82,22 +82,12 @@ public TransportMultiGetAction( * Returns true if MultiGet request should be routed to primary shards, else false. */ boolean isPrimaryBasedRouting(MultiGetRequest request, MultiGetRequest.Item item) { - try { - if (request.preference == null - && item.routing() == null - && request.realtime - && clusterService.state() - .getMetadata() - .index(item.index()) - .getSettings() - .get(IndexMetadata.SETTING_REPLICATION_TYPE) - .equals(ReplicationType.SEGMENT.toString())) { - return true; - } - } catch (NullPointerException e) { - return false; - } - return false; + IndexMetadata indexMetadata = clusterService.state().getMetadata().index(item.index()); + return indexMetadata != null + && indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE).equals(ReplicationType.SEGMENT.toString()) + && request.preference == null + && item.routing() == null + && request.realtime; } @Override From 9fece7c2002bf3d5b6aa7806949b18d694ac2a44 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 15 Aug 2023 13:46:05 -0700 Subject: [PATCH 5/6] add comment and rebase Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 52 ++++++--- .../action/get/TransportGetAction.java | 21 ++-- .../action/get/TransportMultiGetAction.java | 18 +-- .../action/get/TransportGetActionTests.java | 108 ++++++++++++++++++ 4 files changed, 160 insertions(+), 39 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index b969f7c4c72e4..69cdd80bb5085 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -21,9 +21,6 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; -import org.junit.Before; -import org.opensearch.common.action.ActionFuture; -import org.opensearch.OpenSearchException; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; @@ -51,6 +48,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.common.action.ActionFuture; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; @@ -81,6 +79,7 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -93,8 +92,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.search.SearchContextId.decode; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -109,6 +106,8 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends SegmentReplicationBaseIT { @@ -1458,6 +1457,7 @@ public void testRestartPrimary_NoReplicas() throws Exception { */ public void testRealtimeGetRequestsSuccessful() { final String primary = internalCluster().startDataOnlyNode(); + // refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime get assertAcked( prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) .addAlias(new Alias("alias")) @@ -1465,6 +1465,8 @@ public void testRealtimeGetRequestsSuccessful() { final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); + final String id = routingKeyForShard(INDEX_NAME, 0); + GetResponse response = client(replica).prepareGet(indexOrAlias(), "1").get(); assertFalse(response.isExists()); @@ -1475,11 +1477,20 @@ public void testRealtimeGetRequestsSuccessful() { response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); assertFalse(response.isExists()); - // realtime get 1 (on replica shard only) + // realtime get 1 response = client(replica).prepareGet(indexOrAlias(), "1").get(); assertTrue(response.isExists()); assertThat(response.getIndex(), equalTo(INDEX_NAME)); assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); + + // index doc 2 + client().prepareIndex(indexOrAlias()).setId("2").setSource("foo2", "bar2").setRouting(id).get(); + + // realtime get 2 (with routing) + response = client(replica).prepareGet(indexOrAlias(), "2").setRouting(id).get(); + assertTrue(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + assertThat(response.getSourceAsMap().get("foo2").toString(), equalTo("bar2")); } public void testRealtimeGetRequestsUnsuccessful() { @@ -1518,16 +1529,23 @@ public void testRealtimeGetRequestsUnsuccessful() { */ public void testRealtimeMultiGetRequestsSuccessful() { final String primary = internalCluster().startDataOnlyNode(); + // refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime multi get assertAcked( - prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) - .addAlias(new Alias("alias")) + prepareCreate(INDEX_NAME).setSettings( + Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + ).addAlias(new Alias("alias")) ); final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); + final String id = routingKeyForShard(INDEX_NAME, 0); + // index doc 1 client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + // index doc 2 + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").setRouting(id).get(); + // multi get non realtime 1 MultiGetResponse mgetResponse = client().prepareMultiGet() .add(new MultiGetRequest.Item(INDEX_NAME, "1")) @@ -1543,21 +1561,22 @@ public void testRealtimeMultiGetRequestsSuccessful() { // multi get realtime 1 mgetResponse = client(replica).prepareMultiGet() .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .add(new MultiGetRequest.Item(INDEX_NAME, "2").routing(id)) .add(new MultiGetRequest.Item("nonExistingIndex", "1")) .get(); - assertThat(mgetResponse.getResponses().length, is(2)); + assertThat(mgetResponse.getResponses().length, is(3)); assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); assertFalse(mgetResponse.getResponses()[0].isFailed()); assertThat(mgetResponse.getResponses()[0].getResponse().getSourceAsMap().get("foo").toString(), equalTo("bar")); - assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); - assertTrue(mgetResponse.getResponses()[1].isFailed()); - assertThat(mgetResponse.getResponses()[1].getFailure().getMessage(), is("no such index [nonExistingIndex]")); - assertThat( - ((OpenSearchException) mgetResponse.getResponses()[1].getFailure().getFailure()).getIndex().getName(), - is("nonExistingIndex") - ); + assertThat(mgetResponse.getResponses()[1].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[1].isFailed()); + assertThat(mgetResponse.getResponses()[1].getResponse().getSourceAsMap().get("foo2").toString(), equalTo("bar2")); + + assertThat(mgetResponse.getResponses()[2].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[2].isFailed()); + assertThat(mgetResponse.getResponses()[2].getFailure().getMessage(), is("no such index [nonExistingIndex]")); } public void testRealtimeMultiGetRequestsUnsuccessful() { @@ -1596,6 +1615,7 @@ public void testRealtimeMultiGetRequestsUnsuccessful() { .get(); assertThat(mgetResponse.getResponses().length, is(2)); assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + // expecting failure since we explicitly route request to a shard on which it doesn't exist assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); assertTrue(mgetResponse.getResponses()[1].isFailed()); diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index ecb7f94b8dc3d..309e2e16decad 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -54,6 +54,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Optional; /** * Performs the get operation. @@ -91,16 +92,20 @@ protected boolean resolveIndex(GetRequest request) { return true; } + static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) { + return Optional.ofNullable(state.getMetadata().index(indexName)) + .map( + indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) + .equals(ReplicationType.SEGMENT) + ) + .orElse(false); + } + /** * Returns true if GET request should be routed to primary shards, else false. */ - protected boolean isPrimaryBasedRouting(ClusterState state, InternalRequest request) { - IndexMetadata indexMetadata = state.getMetadata().index(request.concreteIndex()); - return indexMetadata != null - && indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE).equals(ReplicationType.SEGMENT.toString()) - && request.request().realtime() - && request.request().routing() == null - && request.request().preference() == null; + protected static boolean isPrimaryBasedRouting(ClusterState state, boolean realtime, String preference, String indexName) { + return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null; } @Override @@ -108,7 +113,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) { String preference = request.request().preference(); // route realtime GET requests when segment replication is enabled to primary shards, // iff there are no other preferences/routings enabled for routing to a specific shard - if (isPrimaryBasedRouting(state, request)) { + if (isPrimaryBasedRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) { preference = Preference.PRIMARY.type(); } return clusterService.operationRouting() diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index 8beea167e8837..ec570b7556055 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -37,7 +37,6 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; @@ -45,7 +44,6 @@ import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -53,6 +51,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static org.opensearch.action.get.TransportGetAction.isPrimaryBasedRouting; + /** * Perform the multi get action. * @@ -78,18 +78,6 @@ public TransportMultiGetAction( this.indexNameExpressionResolver = resolver; } - /** - * Returns true if MultiGet request should be routed to primary shards, else false. - */ - boolean isPrimaryBasedRouting(MultiGetRequest request, MultiGetRequest.Item item) { - IndexMetadata indexMetadata = clusterService.state().getMetadata().index(item.index()); - return indexMetadata != null - && indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE).equals(ReplicationType.SEGMENT.toString()) - && request.preference == null - && item.routing() == null - && request.realtime; - } - @Override protected void doExecute(Task task, final MultiGetRequest request, final ActionListener listener) { ClusterState clusterState = clusterService.state(); @@ -124,7 +112,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { - if (isPrimaryBasedRouting(request, item)) { + if (isPrimaryBasedRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) { request.preference(Preference.PRIMARY.type()); } shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); diff --git a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java new file mode 100644 index 0000000000000..24b3b683431ce --- /dev/null +++ b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.get; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.Preference; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.BeforeClass; + +import static org.opensearch.common.UUIDs.randomBase64UUID; + +public class TransportGetActionTests extends OpenSearchTestCase { + + private static ClusterState clusterState; + private static ClusterState clusterState2; + + @BeforeClass + public static void beforeClass() throws Exception { + + final Index index1 = new Index("index1", randomBase64UUID()); + final Index index2 = new Index("index2", randomBase64UUID()); + clusterState = ClusterState.builder(new ClusterName(TransportGetActionTests.class.getSimpleName())) + .metadata( + new Metadata.Builder().put( + new IndexMetadata.Builder(index1.getName()).settings( + Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + ) + ) + ) + .build(); + + clusterState2 = ClusterState.builder(new ClusterName(TransportGetActionTests.class.getSimpleName())) + .metadata( + new Metadata.Builder().put( + new IndexMetadata.Builder(index2.getName()).settings( + Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID()) + ) + ) + ) + .build(); + } + + public void testIsPrimaryBasedRouting() { + + // should return false since preference is set for request + assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState, true, Preference.REPLICA.type(), "index1")); + + // should return false since request is not realtime + assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState, false, null, "index1")); + + // should return true since segment replication is enabled + assertTrue(TransportGetAction.isPrimaryBasedRouting(clusterState, true, null, "index1")); + + // should return false since index doesn't exist + assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState, true, null, "index3")); + + // should fail since document replication enabled + assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState2, true, null, "index2")); + + } + +} From 49d6ebfae0d6ef13f155b9b78341c4ccf29a0030 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 17 Aug 2023 10:05:06 -0700 Subject: [PATCH 6/6] address review comments Signed-off-by: Poojita Raj --- .../action/get/TransportGetAction.java | 8 ++-- .../action/get/TransportMultiGetAction.java | 4 +- .../action/get/TransportGetActionTests.java | 43 ++++++------------- 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 309e2e16decad..583815b91ae68 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -104,17 +104,19 @@ static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) /** * Returns true if GET request should be routed to primary shards, else false. */ - protected static boolean isPrimaryBasedRouting(ClusterState state, boolean realtime, String preference, String indexName) { + protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null; } @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { - String preference = request.request().preference(); + final String preference; // route realtime GET requests when segment replication is enabled to primary shards, // iff there are no other preferences/routings enabled for routing to a specific shard - if (isPrimaryBasedRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) { + if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) { preference = Preference.PRIMARY.type(); + } else { + preference = request.request().preference(); } return clusterService.operationRouting() .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference); diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index ec570b7556055..a1a74208dc725 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -51,7 +51,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.action.get.TransportGetAction.isPrimaryBasedRouting; +import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting; /** * Perform the multi get action. @@ -112,7 +112,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { - if (isPrimaryBasedRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) { + if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) { request.preference(Preference.PRIMARY.type()); } shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); diff --git a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java index 24b3b683431ce..2eca49fb3032f 100644 --- a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java +++ b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java @@ -42,21 +42,14 @@ import org.opensearch.core.index.Index; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; -import org.junit.BeforeClass; import static org.opensearch.common.UUIDs.randomBase64UUID; public class TransportGetActionTests extends OpenSearchTestCase { - private static ClusterState clusterState; - private static ClusterState clusterState2; - - @BeforeClass - public static void beforeClass() throws Exception { - + private static ClusterState clusterState(ReplicationType replicationType) { final Index index1 = new Index("index1", randomBase64UUID()); - final Index index2 = new Index("index2", randomBase64UUID()); - clusterState = ClusterState.builder(new ClusterName(TransportGetActionTests.class.getSimpleName())) + return ClusterState.builder(new ClusterName(TransportGetActionTests.class.getSimpleName())) .metadata( new Metadata.Builder().put( new IndexMetadata.Builder(index1.getName()).settings( @@ -65,43 +58,33 @@ public static void beforeClass() throws Exception { .put("index.number_of_shards", 1) .put("index.number_of_replicas", 1) .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - ) - ) - ) - .build(); - - clusterState2 = ClusterState.builder(new ClusterName(TransportGetActionTests.class.getSimpleName())) - .metadata( - new Metadata.Builder().put( - new IndexMetadata.Builder(index2.getName()).settings( - Settings.builder() - .put("index.version.created", Version.CURRENT) - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType) ) ) ) .build(); } - public void testIsPrimaryBasedRouting() { + public void testShouldForcePrimaryRouting() { + + ClusterState clusterState = clusterState(ReplicationType.SEGMENT); // should return false since preference is set for request - assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState, true, Preference.REPLICA.type(), "index1")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1")); // should return false since request is not realtime - assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState, false, null, "index1")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1")); // should return true since segment replication is enabled - assertTrue(TransportGetAction.isPrimaryBasedRouting(clusterState, true, null, "index1")); + assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1")); // should return false since index doesn't exist - assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState, true, null, "index3")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3")); + + clusterState = clusterState(ReplicationType.DOCUMENT); // should fail since document replication enabled - assertFalse(TransportGetAction.isPrimaryBasedRouting(clusterState2, true, null, "index2")); + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1")); }