Skip to content

Commit

Permalink
Segment Replication - Release incorrectly retained commits on primary…
Browse files Browse the repository at this point in the history
… shards (#6660)

* Segment Replication - Release incorrectly retained index commits on primary shards

This change ensures that primary shards clean up any state when a replica is marked
out of sync. This can happen when replicas fail due to store corruption or mismatching segments
during file copy.

Signed-off-by: Marc Handalian <[email protected]>

* PR feedback.

Signed-off-by: Marc Handalian <[email protected]>

* spotless fix.

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
(cherry picked from commit 73a2279)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Mar 14, 2023
1 parent 4d018f4 commit b88d3d2
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand Down Expand Up @@ -683,4 +693,55 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testReplicaHasDiffFilesThanPrimary() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
IndexWriterConfig iwc = newIndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.APPEND);

// create a doc to index
int numDocs = 2 + random().nextInt(100);

List<Document> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
Document doc = new Document();
doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(
new TextField(
"body",
TestUtil.randomRealisticUnicodeString(random()),
random().nextBoolean() ? Field.Store.YES : Field.Store.NO
)
);
doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
docs.add(doc);
}
// create some segments on the replica before copy.
try (IndexWriter writer = new IndexWriter(replicaShard.store().directory(), iwc)) {
for (Document d : docs) {
writer.addDocument(d);
}
writer.flush();
writer.commit();
}

final SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(replicaShard.store().directory());
replicaShard.finalizeReplication(segmentInfos);

final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
refresh(INDEX_NAME);
}
// Refresh, this should trigger round of segment replication
assertBusy(() -> { assertDocCounts(docCount, replicaNode); });
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.indices.replication;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -26,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -91,6 +95,8 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
}
}

private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class);

/**
* Start sending files to the replica.
*
Expand Down Expand Up @@ -163,8 +169,8 @@ synchronized void cancel(IndexShard shard, String reason) {
/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
Expand Down Expand Up @@ -195,6 +201,11 @@ int size() {
return allocationIdToHandlers.size();
}

// Visible for tests.
Map<String, SegmentReplicationSourceHandler> getHandlers() {
return allocationIdToHandlers;
}

int cachedCopyStateSize() {
return copyStateMap.size();
}
Expand Down Expand Up @@ -254,8 +265,22 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
.filter(predicate)
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
logger.warn(() -> new ParameterizedMessage("Cancelling replications for allocationIds {}", allocationIds));
for (String allocationId : allocationIds) {
cancel(allocationId, reason);
}
}

/**
* Clear copystate and target handlers for any non insync allocationIds.
* @param shardId {@link ShardId}
* @param inSyncAllocationIds {@link List} of in-sync allocation Ids.
*/
public void clearOutOfSyncIds(ShardId shardId, Set<String> inSyncAllocationIds) {
cancelHandlers(
(handler) -> handler.getCopyState().getShard().shardId().equals(shardId)
&& inSyncAllocationIds.contains(handler.getAllocationId()) == false,
"Shard is no longer in-sync with the primary"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -162,6 +165,19 @@ public void clusterChanged(ClusterChangedEvent event) {
ongoingSegmentReplications.cancelReplication(removedNode);
}
}
// if a replica for one of the primary shards on this node has closed,
// we need to ensure its state has cleared up in ongoing replications.
if (event.routingTableChanged()) {
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry().primary()) {
final IndexMetadata indexMetadata = indexService.getIndexSettings().getIndexMetadata();
final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(indexShard.shardId().id());
ongoingSegmentReplications.clearOutOfSyncIds(indexShard.shardId(), inSyncAllocationIds);
}
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -359,4 +358,45 @@ public void testCancelAllReplicationsForShard() throws IOException {
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}

public void testCancelForMissingIds() throws IOException {
// This tests when primary has multiple ongoing replications.
IndexShard replica_2 = newShard(primary.shardId(), false);
recoverReplica(replica_2, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final String replicaAllocationId = replica.routingEntry().allocationId().getId();
final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint);

final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
assertEquals(1, copyState.refCount());

final String replica_2AllocationId = replica_2.routingEntry().allocationId().getId();
final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica_2AllocationId,
replicaDiscoveryNode,
testCheckpoint
);
replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));

assertEquals(2, copyState.refCount());
assertEquals(2, replications.size());
assertTrue(replications.getHandlers().containsKey(replicaAllocationId));
assertTrue(replications.getHandlers().containsKey(replica_2AllocationId));
assertEquals(1, replications.cachedCopyStateSize());

replications.clearOutOfSyncIds(primary.shardId(), Set.of(replica_2AllocationId));
assertEquals(1, copyState.refCount());
assertEquals(1, replications.size());
assertTrue(replications.getHandlers().containsKey(replica_2AllocationId));
assertEquals(1, replications.cachedCopyStateSize());

// cancel the primary's ongoing replications.
replications.clearOutOfSyncIds(primary.shardId(), Collections.emptySet());
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}
}

0 comments on commit b88d3d2

Please sign in to comment.