Skip to content

Commit

Permalink
[Segment Replication] Support realtime reads for GET requests (opense…
Browse files Browse the repository at this point in the history
…arch-project#9212)

* route to pri shards

Signed-off-by: Poojita Raj <[email protected]>

* Add changelog entry

Signed-off-by: Poojita Raj <[email protected]>

* refactor + add multiGet support

Signed-off-by: Poojita Raj <[email protected]>

* address changes

Signed-off-by: Poojita Raj <[email protected]>

* add comment and rebase

Signed-off-by: Poojita Raj <[email protected]>

* address review comments

Signed-off-by: Poojita Raj <[email protected]>

---------

Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj authored Aug 17, 2023
1 parent 6a5b464 commit 914544b
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introducing Default and Best Compression codecs as their algorithm name ([#9123]()https://github.com/opensearch-project/OpenSearch/pull/9123)
- Make SearchTemplateRequest implement IndicesRequest.Replaceable ([#9122]()https://github.com/opensearch-project/OpenSearch/pull/9122)
- [BWC and API enforcement] Define the initial set of annotations, their meaning and relations between them ([#9223](https://github.com/opensearch-project/OpenSearch/pull/9223))
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
Expand All @@ -40,6 +44,7 @@
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
Expand Down Expand Up @@ -101,6 +106,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {
Expand All @@ -110,6 +117,10 @@ private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

private static String indexOrAlias() {
return randomBoolean() ? INDEX_NAME : "alias";
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -1440,4 +1451,174 @@ public void testRestartPrimary_NoReplicas() throws Exception {
ensureYellow(INDEX_NAME);
assertDocCounts(1, primary);
}

/**
* Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads.
*/
public void testRealtimeGetRequestsSuccessful() {
final String primary = internalCluster().startDataOnlyNode();
// refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime get
assertAcked(
prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings()))
.addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);

GetResponse response = client(replica).prepareGet(indexOrAlias(), "1").get();
assertFalse(response.isExists());

// index doc 1
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get();

// non realtime get 1
response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
assertFalse(response.isExists());

// realtime get 1
response = client(replica).prepareGet(indexOrAlias(), "1").get();
assertTrue(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar"));

// index doc 2
client().prepareIndex(indexOrAlias()).setId("2").setSource("foo2", "bar2").setRouting(id).get();

// realtime get 2 (with routing)
response = client(replica).prepareGet(indexOrAlias(), "2").setRouting(id).get();
assertTrue(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat(response.getSourceAsMap().get("foo2").toString(), equalTo("bar2"));
}

public void testRealtimeGetRequestsUnsuccessful() {
final String primary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
).addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);
final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1);

// index doc 1
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get();

// non realtime get 1
GetResponse response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
assertFalse(response.isExists());

// realtime get 1 (preference = _replica)
response = client(replica).prepareGet(indexOrAlias(), "1").setPreference(Preference.REPLICA.type()).get();
assertFalse(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));

// realtime get 1 (with routing set)
response = client(replica).prepareGet(INDEX_NAME, "1").setRouting(routingOtherShard).get();
assertFalse(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));
}

/**
* Tests whether segment replication supports realtime MultiGet requests and reads and parses source from the translog to serve strong reads.
*/
public void testRealtimeMultiGetRequestsSuccessful() {
final String primary = internalCluster().startDataOnlyNode();
// refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime multi get
assertAcked(
prepareCreate(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
).addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);

// index doc 1
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();

// index doc 2
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").setRouting(id).get();

// multi get non realtime 1
MultiGetResponse mgetResponse = client().prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1"))
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.setRealtime(false)
.get();
assertThat(mgetResponse.getResponses().length, is(2));

assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[0].isFailed());
assertFalse(mgetResponse.getResponses()[0].getResponse().isExists());

// multi get realtime 1
mgetResponse = client(replica).prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1"))
.add(new MultiGetRequest.Item(INDEX_NAME, "2").routing(id))
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.get();

assertThat(mgetResponse.getResponses().length, is(3));
assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[0].isFailed());
assertThat(mgetResponse.getResponses()[0].getResponse().getSourceAsMap().get("foo").toString(), equalTo("bar"));

assertThat(mgetResponse.getResponses()[1].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[1].isFailed());
assertThat(mgetResponse.getResponses()[1].getResponse().getSourceAsMap().get("foo2").toString(), equalTo("bar2"));

assertThat(mgetResponse.getResponses()[2].getIndex(), is("nonExistingIndex"));
assertTrue(mgetResponse.getResponses()[2].isFailed());
assertThat(mgetResponse.getResponses()[2].getFailure().getMessage(), is("no such index [nonExistingIndex]"));
}

public void testRealtimeMultiGetRequestsUnsuccessful() {
final String primary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
).addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);
final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1);

// index doc 1
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get();

// realtime multi get 1 (preference = _replica)
MultiGetResponse mgetResponse = client(replica).prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1"))
.setPreference(Preference.REPLICA.type())
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.get();
assertThat(mgetResponse.getResponses().length, is(2));
assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[0].getResponse().isExists());

assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex"));
assertTrue(mgetResponse.getResponses()[1].isFailed());

// realtime multi get 1 (routing set)
mgetResponse = client(replica).prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1").routing(routingOtherShard))
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.get();
assertThat(mgetResponse.getResponses().length, is(2));
assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
// expecting failure since we explicitly route request to a shard on which it doesn't exist
assertFalse(mgetResponse.getResponses()[0].getResponse().isExists());
assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex"));
assertTrue(mgetResponse.getResponses()[1].isFailed());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -47,10 +49,12 @@
import org.opensearch.index.get.GetResult;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Optional;

/**
* Performs the get operation.
Expand Down Expand Up @@ -88,16 +92,34 @@ protected boolean resolveIndex(GetRequest request) {
return true;
}

static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) {
return Optional.ofNullable(state.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null;
}

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
final String preference;
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
preference = Preference.PRIMARY.type();
} else {
preference = request.request().preference();
}
return clusterService.operationRouting()
.getShards(
clusterService.state(),
request.concreteIndex(),
request.request().id(),
request.request().routing(),
request.request().preference()
);
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.AtomicArray;
Expand All @@ -50,6 +51,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting;

/**
* Perform the multi get action.
*
Expand Down Expand Up @@ -109,6 +112,9 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
request.preference(Preference.PRIMARY.type());
}
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
shardRequests.put(shardId, shardRequest);
}
Expand Down
Loading

0 comments on commit 914544b

Please sign in to comment.