Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Wire up segment replication with peer recovery and add ITs. #3743

Merged
merged 6 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.index.SegmentInfos;
import org.junit.BeforeClass;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
waitForReplicaUpdate();
// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

// Index a second set of docs so we can merge into one segment.
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

// Index a doc to create the first set of segments. _s1.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();
// Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si)
flushAndRefresh(INDEX_NAME);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// Index to create another segment
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get();

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get();

waitForReplicaUpdate();
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

final Index index = resolveIndex(INDEX_NAME);
IndexShard primaryShard = getIndexShard(index, primaryNode);
IndexShard replicaShard = getIndexShard(index, replicaNode);
assertEquals(
primaryShard.translogStats().estimatedNumberOfOperations(),
replicaShard.translogStats().estimatedNumberOfOperations()
);
assertSegmentStats(REPLICA_COUNT);
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

List<ShardSegments[]> segmentsByIndex = getShardSegments(indicesSegmentResponse);

// There will be an entry in the list for each index.
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {

// Separate Primary & replica shards ShardSegments.
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1);
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);

assertEquals(
"There should be a ShardSegment entry for each replica in the replicationGroup",
numberOfReplicas,
replicaShardSegments.size()
);

for (ShardSegments shardSegment : replicaShardSegments) {
final Map<String, Segment> latestReplicaSegments = getLatestSegments(shardSegment);
for (Segment replicaSegment : latestReplicaSegments.values()) {
final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName());
assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration());
assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs());
assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs());
assertEquals(replicaSegment.getSize(), primarySegment.getSize());
}

// Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point.
// This ensures the previous commit point is not wiped.
final ShardRouting replicaShardRouting = shardSegment.getShardRouting();
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
final Index index = resolveIndex(INDEX_NAME);
IndexShard indexShard = getIndexShard(index, replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
}
}
}

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception
*/
private void waitForReplicaUpdate() throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
final IndicesSegmentResponse indicesSegmentResponse = client().admin()
.indices()
.segments(new IndicesSegmentsRequest())
.actionGet();
List<ShardSegments[]> segmentsByIndex = getShardSegments(indicesSegmentResponse);
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
}
}
});
}

private IndexShard getIndexShard(Index index, String node) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
}

private List<ShardSegments[]> getShardSegments(IndicesSegmentResponse indicesSegmentResponse) {
return indicesSegmentResponse.getIndices()
.values()
.stream() // get list of IndexSegments
.flatMap(is -> is.getShards().values().stream()) // Map to shard replication group
.map(IndexShardSegments::getShards) // get list of segments across replication group
.collect(Collectors.toList());
}

private Map<String, Segment> getLatestSegments(ShardSegments segments) {
final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get();
return segments.getSegments()
.stream()
.filter(s -> s.getGeneration() == latestPrimaryGen)
.collect(Collectors.toMap(Segment::getName, Function.identity()));
}

private Map<Boolean, List<ShardSegments>> segmentsByShardType(ShardSegments[] replicationGroupSegments) {
return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
}
}
16 changes: 6 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1396,9 +1396,13 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the lastest Replication Checkpoint that shard received
* Returns the lastest Replication Checkpoint that shard received. Shards will return an EMPTY checkpoint before
* the engine is opened.
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
if (getEngineOrNull() == null) {
return ReplicationCheckpoint.empty(shardId);
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
Expand All @@ -1410,15 +1414,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
segmentInfos.getVersion()
)
)
.orElse(
new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
getProcessedLocalCheckpoint(),
SequenceNumbers.NO_OPS_PERFORMED
)
);
.orElse(ReplicationCheckpoint.empty(shardId));
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
// With segment replication enabled, we compute metadata snapshots from the latest in memory infos.
// In this case we will have SegmentInfos objects fetched from the primary's reader
// where the minSegmentLuceneVersion can be null even though there are segments.
// This is because the SegmentInfos object is not read from a commit/IndexInput, which sets
// minSegmentLuceneVersion.
if (maxVersion == null || version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
Expand Down
Loading