diff --git a/CHANGELOG.md b/CHANGELOG.md index ffc7228fe63c0..406afd015b3f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introducing Default and Best Compression codecs as their algorithm name ([#9123](https://github.com/opensearch-project/OpenSearch/pull/9123)) - Make SearchTemplateRequest implement IndicesRequest.Replaceable ([#9122](https://github.com/opensearch-project/OpenSearch/pull/9122)) - [BWC and API enforcement] Define the initial set of annotations, their meaning and relations between them ([#9223](https://github.com/opensearch-project/OpenSearch/pull/9223)) +- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212)) - Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index c7bff082f7dd4..69cdd80bb5085 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -21,9 +21,13 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.get.MultiGetRequest; +import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; @@ -40,6 +44,7 @@ import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -101,6 +106,8 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends SegmentReplicationBaseIT { @@ -110,6 +117,10 @@ private void setup() { internalCluster().startClusterManagerOnlyNode(); } + private static String indexOrAlias() { + return randomBoolean() ? INDEX_NAME : "alias"; + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -1440,4 +1451,174 @@ public void testRestartPrimary_NoReplicas() throws Exception { ensureYellow(INDEX_NAME); assertDocCounts(1, primary); } + + /** + * Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads. + */ + public void testRealtimeGetRequestsSuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + // refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime get + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) + .addAlias(new Alias("alias")) + ); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final String id = routingKeyForShard(INDEX_NAME, 0); + + GetResponse response = client(replica).prepareGet(indexOrAlias(), "1").get(); + assertFalse(response.isExists()); + + // index doc 1 + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get(); + + // non realtime get 1 + response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + assertFalse(response.isExists()); + + // realtime get 1 + response = client(replica).prepareGet(indexOrAlias(), "1").get(); + assertTrue(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); + + // index doc 2 + client().prepareIndex(indexOrAlias()).setId("2").setSource("foo2", "bar2").setRouting(id).get(); + + // realtime get 2 (with routing) + response = client(replica).prepareGet(indexOrAlias(), "2").setRouting(id).get(); + assertTrue(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + assertThat(response.getSourceAsMap().get("foo2").toString(), equalTo("bar2")); + } + + public void testRealtimeGetRequestsUnsuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + assertAcked( + prepareCreate(INDEX_NAME).setSettings( + Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + ).addAlias(new Alias("alias")) + ); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final String id = routingKeyForShard(INDEX_NAME, 0); + final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1); + + // index doc 1 + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get(); + + // non realtime get 1 + GetResponse response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + assertFalse(response.isExists()); + + // realtime get 1 (preference = _replica) + response = client(replica).prepareGet(indexOrAlias(), "1").setPreference(Preference.REPLICA.type()).get(); + assertFalse(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + + // realtime get 1 (with routing set) + response = client(replica).prepareGet(INDEX_NAME, "1").setRouting(routingOtherShard).get(); + assertFalse(response.isExists()); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + } + + /** + * Tests whether segment replication supports realtime MultiGet requests and reads and parses source from the translog to serve strong reads. + */ + public void testRealtimeMultiGetRequestsSuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + // refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime multi get + assertAcked( + prepareCreate(INDEX_NAME).setSettings( + Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + ).addAlias(new Alias("alias")) + ); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final String id = routingKeyForShard(INDEX_NAME, 0); + + // index doc 1 + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + + // index doc 2 + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").setRouting(id).get(); + + // multi get non realtime 1 + MultiGetResponse mgetResponse = client().prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .setRealtime(false) + .get(); + assertThat(mgetResponse.getResponses().length, is(2)); + + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].isFailed()); + assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); + + // multi get realtime 1 + mgetResponse = client(replica).prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .add(new MultiGetRequest.Item(INDEX_NAME, "2").routing(id)) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .get(); + + assertThat(mgetResponse.getResponses().length, is(3)); + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].isFailed()); + assertThat(mgetResponse.getResponses()[0].getResponse().getSourceAsMap().get("foo").toString(), equalTo("bar")); + + assertThat(mgetResponse.getResponses()[1].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[1].isFailed()); + assertThat(mgetResponse.getResponses()[1].getResponse().getSourceAsMap().get("foo2").toString(), equalTo("bar2")); + + assertThat(mgetResponse.getResponses()[2].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[2].isFailed()); + assertThat(mgetResponse.getResponses()[2].getFailure().getMessage(), is("no such index [nonExistingIndex]")); + } + + public void testRealtimeMultiGetRequestsUnsuccessful() { + final String primary = internalCluster().startDataOnlyNode(); + assertAcked( + prepareCreate(INDEX_NAME).setSettings( + Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + ).addAlias(new Alias("alias")) + ); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final String id = routingKeyForShard(INDEX_NAME, 0); + final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1); + + // index doc 1 + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get(); + + // realtime multi get 1 (preference = _replica) + MultiGetResponse mgetResponse = client(replica).prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1")) + .setPreference(Preference.REPLICA.type()) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .get(); + assertThat(mgetResponse.getResponses().length, is(2)); + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); + + assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[1].isFailed()); + + // realtime multi get 1 (routing set) + mgetResponse = client(replica).prepareMultiGet() + .add(new MultiGetRequest.Item(INDEX_NAME, "1").routing(routingOtherShard)) + .add(new MultiGetRequest.Item("nonExistingIndex", "1")) + .get(); + assertThat(mgetResponse.getResponses().length, is(2)); + assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME)); + // expecting failure since we explicitly route request to a shard on which it doesn't exist + assertFalse(mgetResponse.getResponses()[0].getResponse().isExists()); + assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex")); + assertTrue(mgetResponse.getResponses()[1].isFailed()); + + } } diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index c7eac009ed49a..583815b91ae68 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -36,7 +36,9 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -47,10 +49,12 @@ import org.opensearch.index.get.GetResult; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Optional; /** * Performs the get operation. @@ -88,16 +92,34 @@ protected boolean resolveIndex(GetRequest request) { return true; } + static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) { + return Optional.ofNullable(state.getMetadata().index(indexName)) + .map( + indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) + .equals(ReplicationType.SEGMENT) + ) + .orElse(false); + } + + /** + * Returns true if GET request should be routed to primary shards, else false. + */ + protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { + return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null; + } + @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { + final String preference; + // route realtime GET requests when segment replication is enabled to primary shards, + // iff there are no other preferences/routings enabled for routing to a specific shard + if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) { + preference = Preference.PRIMARY.type(); + } else { + preference = request.request().preference(); + } return clusterService.operationRouting() - .getShards( - clusterService.state(), - request.concreteIndex(), - request.request().id(), - request.request().routing(), - request.request().preference() - ); + .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference); } @Override diff --git a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java index 37ce45dc3d372..a1a74208dc725 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.AtomicArray; @@ -50,6 +51,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting; + /** * Perform the multi get action. * @@ -109,6 +112,9 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { + if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) { + request.preference(Preference.PRIMARY.type()); + } shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); shardRequests.put(shardId, shardRequest); } diff --git a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java new file mode 100644 index 0000000000000..2eca49fb3032f --- /dev/null +++ b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.get; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.Preference; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; + +import static org.opensearch.common.UUIDs.randomBase64UUID; + +public class TransportGetActionTests extends OpenSearchTestCase { + + private static ClusterState clusterState(ReplicationType replicationType) { + final Index index1 = new Index("index1", randomBase64UUID()); + return ClusterState.builder(new ClusterName(TransportGetActionTests.class.getSimpleName())) + .metadata( + new Metadata.Builder().put( + new IndexMetadata.Builder(index1.getName()).settings( + Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType) + ) + ) + ) + .build(); + } + + public void testShouldForcePrimaryRouting() { + + ClusterState clusterState = clusterState(ReplicationType.SEGMENT); + + // should return false since preference is set for request + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1")); + + // should return false since request is not realtime + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1")); + + // should return true since segment replication is enabled + assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1")); + + // should return false since index doesn't exist + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3")); + + clusterState = clusterState(ReplicationType.DOCUMENT); + + // should fail since document replication enabled + assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1")); + + } + +}