Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Support realtime reads for GET requests #9212

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introducing Default and Best Compression codecs as their algorithm name ([#9123]()https://github.com/opensearch-project/OpenSearch/pull/9123)
- Make SearchTemplateRequest implement IndicesRequest.Replaceable ([#9122]()https://github.com/opensearch-project/OpenSearch/pull/9122)
- [BWC and API enforcement] Define the initial set of annotations, their meaning and relations between them ([#9223](https://github.com/opensearch-project/OpenSearch/pull/9223))
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
Expand All @@ -40,6 +44,7 @@
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
Expand Down Expand Up @@ -101,6 +106,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {
Expand All @@ -110,6 +117,10 @@ private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

private static String indexOrAlias() {
return randomBoolean() ? INDEX_NAME : "alias";
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -1440,4 +1451,174 @@ public void testRestartPrimary_NoReplicas() throws Exception {
ensureYellow(INDEX_NAME);
assertDocCounts(1, primary);
}

/**
* Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads.
*/
public void testRealtimeGetRequestsSuccessful() {
final String primary = internalCluster().startDataOnlyNode();
// refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime get
assertAcked(
prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings()))
.addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);

GetResponse response = client(replica).prepareGet(indexOrAlias(), "1").get();
assertFalse(response.isExists());

// index doc 1
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get();

// non realtime get 1
response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
assertFalse(response.isExists());

// realtime get 1
response = client(replica).prepareGet(indexOrAlias(), "1").get();
assertTrue(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar"));

// index doc 2
client().prepareIndex(indexOrAlias()).setId("2").setSource("foo2", "bar2").setRouting(id).get();

// realtime get 2 (with routing)
response = client(replica).prepareGet(indexOrAlias(), "2").setRouting(id).get();
assertTrue(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat(response.getSourceAsMap().get("foo2").toString(), equalTo("bar2"));
}

public void testRealtimeGetRequestsUnsuccessful() {
final String primary = internalCluster().startDataOnlyNode();
assertAcked(
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
prepareCreate(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
).addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);
final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1);

// index doc 1
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get();

// non realtime get 1
GetResponse response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
assertFalse(response.isExists());

// realtime get 1 (preference = _replica)
response = client(replica).prepareGet(indexOrAlias(), "1").setPreference(Preference.REPLICA.type()).get();
assertFalse(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));

// realtime get 1 (with routing set)
response = client(replica).prepareGet(INDEX_NAME, "1").setRouting(routingOtherShard).get();
assertFalse(response.isExists());
assertThat(response.getIndex(), equalTo(INDEX_NAME));
}

/**
* Tests whether segment replication supports realtime MultiGet requests and reads and parses source from the translog to serve strong reads.
*/
public void testRealtimeMultiGetRequestsSuccessful() {
final String primary = internalCluster().startDataOnlyNode();
// refresh interval disabled to ensure refresh rate of index (when data is ready for search) doesn't affect realtime multi get
assertAcked(
prepareCreate(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
).addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);

// index doc 1
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();

// index doc 2
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").setRouting(id).get();

// multi get non realtime 1
MultiGetResponse mgetResponse = client().prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1"))
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.setRealtime(false)
.get();
assertThat(mgetResponse.getResponses().length, is(2));

assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[0].isFailed());
assertFalse(mgetResponse.getResponses()[0].getResponse().isExists());

// multi get realtime 1
mgetResponse = client(replica).prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1"))
.add(new MultiGetRequest.Item(INDEX_NAME, "2").routing(id))
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.get();

assertThat(mgetResponse.getResponses().length, is(3));
assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[0].isFailed());
assertThat(mgetResponse.getResponses()[0].getResponse().getSourceAsMap().get("foo").toString(), equalTo("bar"));

assertThat(mgetResponse.getResponses()[1].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[1].isFailed());
assertThat(mgetResponse.getResponses()[1].getResponse().getSourceAsMap().get("foo2").toString(), equalTo("bar2"));

assertThat(mgetResponse.getResponses()[2].getIndex(), is("nonExistingIndex"));
assertTrue(mgetResponse.getResponses()[2].isFailed());
assertThat(mgetResponse.getResponses()[2].getFailure().getMessage(), is("no such index [nonExistingIndex]"));
}

public void testRealtimeMultiGetRequestsUnsuccessful() {
final String primary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate(INDEX_NAME).setSettings(
Settings.builder().put("index.refresh_interval", -1).put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
).addAlias(new Alias("alias"))
);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final String id = routingKeyForShard(INDEX_NAME, 0);
final String routingOtherShard = routingKeyForShard(INDEX_NAME, 1);

// index doc 1
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").setRouting(id).get();

// realtime multi get 1 (preference = _replica)
MultiGetResponse mgetResponse = client(replica).prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1"))
.setPreference(Preference.REPLICA.type())
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.get();
assertThat(mgetResponse.getResponses().length, is(2));
assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
assertFalse(mgetResponse.getResponses()[0].getResponse().isExists());

assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex"));
assertTrue(mgetResponse.getResponses()[1].isFailed());

// realtime multi get 1 (routing set)
mgetResponse = client(replica).prepareMultiGet()
.add(new MultiGetRequest.Item(INDEX_NAME, "1").routing(routingOtherShard))
.add(new MultiGetRequest.Item("nonExistingIndex", "1"))
.get();
assertThat(mgetResponse.getResponses().length, is(2));
assertThat(mgetResponse.getResponses()[0].getIndex(), is(INDEX_NAME));
// expecting failure since we explicitly route request to a shard on which it doesn't exist
assertFalse(mgetResponse.getResponses()[0].getResponse().isExists());
Poojita-Raj marked this conversation as resolved.
Show resolved Hide resolved
assertThat(mgetResponse.getResponses()[1].getIndex(), is("nonExistingIndex"));
assertTrue(mgetResponse.getResponses()[1].isFailed());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -47,10 +49,12 @@
import org.opensearch.index.get.GetResult;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Optional;

/**
* Performs the get operation.
Expand Down Expand Up @@ -88,16 +92,34 @@ protected boolean resolveIndex(GetRequest request) {
return true;
}

static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) {
return Optional.ofNullable(state.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null;
}

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
final String preference;
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
preference = Preference.PRIMARY.type();
} else {
preference = request.request().preference();
}
return clusterService.operationRouting()
.getShards(
clusterService.state(),
request.concreteIndex(),
request.request().id(),
request.request().routing(),
request.request().preference()
);
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.AtomicArray;
Expand All @@ -50,6 +51,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting;

/**
* Perform the multi get action.
*
Expand Down Expand Up @@ -109,6 +112,9 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
request.preference(Preference.PRIMARY.type());
}
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
shardRequests.put(shardId, shardRequest);
}
Expand Down
Loading
Loading