Skip to content

Commit

Permalink
[Segment Replication] Wire up segment replication with peer recovery …
Browse files Browse the repository at this point in the history
…and add ITs. (opensearch-project#3743)

* Add null check when computing max segment version.

With segment replication enabled it is possible Lucene does not set the SegmentInfos
min segment version, leaving the default value as null.

Signed-off-by: Marc Handalian <[email protected]>

* Update peer recovery to set the translogUUID of replicas to the UUID generated on the primary.

This change updates the UUID when the translog is created to the value stored in the passed segment userdata.
This is to ensure during failover scenarios that the replica can be promoted and not have a uuid mismatch with the value stored in user data.

Signed-off-by: Marc Handalian <[email protected]>

* Wire up Segment Replication under the feature flag.

This PR wires up segment replication and adds some initial integration tests.

Signed-off-by: Marc Handalian <[email protected]>

* Add test to ensure replicas use primary translog uuid with segrep.

Signed-off-by: Marc Handalian <[email protected]>

* Update SegmentReplicationIT to assert previous commit points are valid and SegmentInfos can be built.
Fix nitpicks in PR feedback.

Signed-off-by: Marc Handalian <[email protected]>

* Fix test with Assert.fail to include a message.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored and Rishikesh1159 committed Aug 9, 2022
1 parent a7ad735 commit f8bc770
Show file tree
Hide file tree
Showing 17 changed files with 484 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.index.SegmentInfos;
import org.junit.BeforeClass;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
waitForReplicaUpdate();
// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

// Index a second set of docs so we can merge into one segment.
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

// Index a doc to create the first set of segments. _s1.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();
// Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si)
flushAndRefresh(INDEX_NAME);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// Index to create another segment
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get();

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get();

waitForReplicaUpdate();
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

final Index index = resolveIndex(INDEX_NAME);
IndexShard primaryShard = getIndexShard(index, primaryNode);
IndexShard replicaShard = getIndexShard(index, replicaNode);
assertEquals(
primaryShard.translogStats().estimatedNumberOfOperations(),
replicaShard.translogStats().estimatedNumberOfOperations()
);
assertSegmentStats(REPLICA_COUNT);
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

List<ShardSegments[]> segmentsByIndex = getShardSegments(indicesSegmentResponse);

// There will be an entry in the list for each index.
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {

// Separate Primary & replica shards ShardSegments.
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1);
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);

assertEquals(
"There should be a ShardSegment entry for each replica in the replicationGroup",
numberOfReplicas,
replicaShardSegments.size()
);

for (ShardSegments shardSegment : replicaShardSegments) {
final Map<String, Segment> latestReplicaSegments = getLatestSegments(shardSegment);
for (Segment replicaSegment : latestReplicaSegments.values()) {
final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName());
assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration());
assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs());
assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs());
assertEquals(replicaSegment.getSize(), primarySegment.getSize());
}

// Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point.
// This ensures the previous commit point is not wiped.
final ShardRouting replicaShardRouting = shardSegment.getShardRouting();
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
final Index index = resolveIndex(INDEX_NAME);
IndexShard indexShard = getIndexShard(index, replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
}
}
}

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception
*/
private void waitForReplicaUpdate() throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
final IndicesSegmentResponse indicesSegmentResponse = client().admin()
.indices()
.segments(new IndicesSegmentsRequest())
.actionGet();
List<ShardSegments[]> segmentsByIndex = getShardSegments(indicesSegmentResponse);
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
}
}
});
}

private IndexShard getIndexShard(Index index, String node) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
}

private List<ShardSegments[]> getShardSegments(IndicesSegmentResponse indicesSegmentResponse) {
return indicesSegmentResponse.getIndices()
.values()
.stream() // get list of IndexSegments
.flatMap(is -> is.getShards().values().stream()) // Map to shard replication group
.map(IndexShardSegments::getShards) // get list of segments across replication group
.collect(Collectors.toList());
}

private Map<String, Segment> getLatestSegments(ShardSegments segments) {
final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get();
return segments.getSegments()
.stream()
.filter(s -> s.getGeneration() == latestPrimaryGen)
.collect(Collectors.toMap(Segment::getName, Function.identity()));
}

private Map<Boolean, List<ShardSegments>> segmentsByShardType(ShardSegments[] replicationGroupSegments) {
return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
}
}
16 changes: 6 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1382,9 +1382,13 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the lastest Replication Checkpoint that shard received
* Returns the lastest Replication Checkpoint that shard received. Shards will return an EMPTY checkpoint before
* the engine is opened.
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
if (getEngineOrNull() == null) {
return ReplicationCheckpoint.empty(shardId);
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
Expand All @@ -1396,15 +1400,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
segmentInfos.getVersion()
)
)
.orElse(
new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
getProcessedLocalCheckpoint(),
SequenceNumbers.NO_OPS_PERFORMED
)
);
.orElse(ReplicationCheckpoint.empty(shardId));
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
Expand Down
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
// With segment replication enabled, we compute metadata snapshots from the latest in memory infos.
// In this case we will have SegmentInfos objects fetched from the primary's reader
// where the minSegmentLuceneVersion can be null even though there are segments.
// This is because the SegmentInfos object is not read from a commit/IndexInput, which sets
// minSegmentLuceneVersion.
if (maxVersion == null || version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
Expand Down
Loading

0 comments on commit f8bc770

Please sign in to comment.