Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Release incorrectly retained commits on primary shards #6660

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand Down Expand Up @@ -682,4 +692,55 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

public void testReplicaHasDiffFilesThanPrimary() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
IndexWriterConfig iwc = newIndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.APPEND);

// create a doc to index
int numDocs = 2 + random().nextInt(100);

List<Document> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
Document doc = new Document();
doc.add(new StringField("id", "" + i, random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
doc.add(
new TextField(
"body",
TestUtil.randomRealisticUnicodeString(random()),
random().nextBoolean() ? Field.Store.YES : Field.Store.NO
)
);
doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random()))));
docs.add(doc);
}
// create some segments on the replica before copy.
try (IndexWriter writer = new IndexWriter(replicaShard.store().directory(), iwc)) {
for (Document d : docs) {
writer.addDocument(d);
}
writer.flush();
writer.commit();
}

final SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(replicaShard.store().directory());
replicaShard.finalizeReplication(segmentInfos);

final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
refresh(INDEX_NAME);
}
// Refresh, this should trigger round of segment replication
assertBusy(() -> { assertDocCounts(docCount, replicaNode); });
Copy link
Member

@dreamer-89 dreamer-89 Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think irrespective of whether replica has different segment files, this test will pass ?. Can we assert on a different allocationID on shard to ensure shard indeed failed on replica ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this.

final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.indices.replication;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -26,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -91,6 +95,8 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
}
}

private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class);

/**
* Start sending files to the replica.
*
Expand Down Expand Up @@ -163,8 +169,8 @@ synchronized void cancel(IndexShard shard, String reason) {
/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
Expand Down Expand Up @@ -195,6 +201,11 @@ int size() {
return allocationIdToHandlers.size();
}

// Visible for tests.
Map<String, SegmentReplicationSourceHandler> getHandlers() {
return allocationIdToHandlers;
}

int cachedCopyStateSize() {
return copyStateMap.size();
}
Expand Down Expand Up @@ -254,8 +265,22 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
.filter(predicate)
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
logger.warn(() -> new ParameterizedMessage("Cancelling replications for allocationIds {}", allocationIds));
for (String allocationId : allocationIds) {
cancel(allocationId, reason);
}
}

/**
* Clear copystate and target handlers for any non insync allocationIds.
* @param shardId {@link ShardId}
* @param inSyncAllocationIds {@link List} of in-sync allocation Ids.
*/
public void clearOutOfSyncIds(ShardId shardId, Set<String> inSyncAllocationIds) {
cancelHandlers(
(handler) -> handler.getCopyState().getShard().shardId().equals(shardId)
&& inSyncAllocationIds.contains(handler.getAllocationId()) == false,
"Shard is no longer in-sync with the primary"
);
Comment on lines +280 to +284
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we add a unit test validating cancellation ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -162,6 +165,19 @@ public void clusterChanged(ClusterChangedEvent event) {
ongoingSegmentReplications.cancelReplication(removedNode);
}
}
// if a replica for one of the primary shards on this node has closed,
// we need to ensure its state has cleared up in ongoing replications.
if (event.routingTableChanged()) {
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry().primary()) {
final IndexMetadata indexMetadata = indexService.getIndexSettings().getIndexMetadata();
final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(indexShard.shardId().id());
ongoingSegmentReplications.clearOutOfSyncIds(indexShard.shardId(), inSyncAllocationIds);
}
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -359,4 +358,45 @@ public void testCancelAllReplicationsForShard() throws IOException {
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}

public void testCancelForMissingIds() throws IOException {
// This tests when primary has multiple ongoing replications.
IndexShard replica_2 = newShard(primary.shardId(), false);
recoverReplica(replica_2, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final String replicaAllocationId = replica.routingEntry().allocationId().getId();
final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint);

final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
assertEquals(1, copyState.refCount());

final String replica_2AllocationId = replica_2.routingEntry().allocationId().getId();
final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica_2AllocationId,
replicaDiscoveryNode,
testCheckpoint
);
replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));

assertEquals(2, copyState.refCount());
assertEquals(2, replications.size());
assertTrue(replications.getHandlers().containsKey(replicaAllocationId));
assertTrue(replications.getHandlers().containsKey(replica_2AllocationId));
assertEquals(1, replications.cachedCopyStateSize());

replications.clearOutOfSyncIds(primary.shardId(), Set.of(replica_2AllocationId));
assertEquals(1, copyState.refCount());
assertEquals(1, replications.size());
assertTrue(replications.getHandlers().containsKey(replica_2AllocationId));
assertEquals(1, replications.cachedCopyStateSize());

// cancel the primary's ongoing replications.
replications.clearOutOfSyncIds(primary.shardId(), Collections.emptySet());
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}
}