Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Implement segment replication event cancellation. #4225

Merged
merged 2 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Do not fail replica shard due to primary closure ([#4133](https://github.com/opensearch-project/OpenSearch/pull/4133))
- Add timeout on Mockito.verify to reduce flakyness in testReplicationOnDone test([#4314](https://github.com/opensearch-project/OpenSearch/pull/4314))
- Commit workflow for dependabot changelog helper ([#4331](https://github.com/opensearch-project/OpenSearch/pull/4331))
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
Expand Down Expand Up @@ -152,6 +153,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final SegmentReplicationSourceService segmentReplicationSourceService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -170,6 +172,7 @@ public IndicesClusterStateService(
threadPool,
checkpointPublisher,
segmentReplicationTargetService,
segmentReplicationSourceService,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -191,6 +194,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final SegmentReplicationTargetService segmentReplicationTargetService,
final SegmentReplicationSourceService segmentReplicationSourceService,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -211,6 +215,7 @@ public IndicesClusterStateService(
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
}
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
* @opensearch.internal
*/
class OngoingSegmentReplications {

private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
Expand Down Expand Up @@ -161,14 +160,27 @@ synchronized void cancel(IndexShard shard, String reason) {
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}

/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");

}

/**
Expand Down Expand Up @@ -243,11 +255,7 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
cancel(allocationId, reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,10 @@ public void getSegmentFiles(
);
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
}

@Override
public void cancel() {
transportClient.cancel();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.opensearch.action.ActionListener;
import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -47,4 +48,9 @@ void getSegmentFiles(
Store store,
ActionListener<GetSegmentFilesResponse> listener
);

/**
* Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}.
*/
default void cancel() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
final Closeable releaseResources = () -> IOUtils.close(resources);
try {
timer.start();
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
final RuntimeException e = new CancellableThreads.ExecutionCancelledException(
"replication was canceled reason [" + reason + "]"
);
if (beforeCancelEx != null) {
e.addSuppressed(beforeCancelEx);
}
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e -> {
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
Expand Down Expand Up @@ -153,6 +163,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = segmentFileTransferHandler
.createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
resources.add(transfer);
cancellableThreads.checkForCancel();
transfer.start();

sendFileStep.whenComplete(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -42,7 +43,25 @@
*
* @opensearch.internal
*/
public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
public class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

// Empty Implementation, only required while Segment Replication is under feature flag.
public static final SegmentReplicationSourceService NO_OP = new SegmentReplicationSourceService() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
// NoOp;
}

@Override
public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
// NoOp;
}

@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
// NoOp;
}
};

private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class);
private final RecoverySettings recoverySettings;
Expand All @@ -62,6 +81,14 @@ public static class Actions {

private final OngoingSegmentReplications ongoingSegmentReplications;

// Used only for empty implementation.
private SegmentReplicationSourceService() {
recoverySettings = null;
ongoingSegmentReplications = null;
transportService = null;
indicesService = null;
}

public SegmentReplicationSourceService(
IndicesService indicesService,
TransportService transportService,
Expand Down Expand Up @@ -163,10 +190,25 @@ protected void doClose() throws IOException {

}

/**
*
* Cancels any replications on this node to a replica shard that is about to be closed.
*/
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
ongoingSegmentReplications.cancel(indexShard, "shard is closed");
}
}

/**
* Cancels any replications on this node to a replica that has been promoted as primary.
*/
@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
if (indexShard != null && oldRouting.primary() == false && newRouting.primary()) {
ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum Stage {
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6);
FINALIZE_REPLICATION((byte) 6),
CANCELLED((byte) 7);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -118,6 +119,10 @@ protected void validateAndSetStage(Stage expected, Stage next) {
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
stopTimersAndSetStage(next);
}

private void stopTimersAndSetStage(Stage next) {
// save the timing data for the current step
stageTimer.stop();
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
Expand Down Expand Up @@ -155,6 +160,14 @@ public void setStage(Stage stage) {
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
case CANCELLED:
if (this.stage == Stage.DONE) {
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
}
stopTimersAndSetStage(Stage.CANCELLED);
overallTimer.stop();
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -103,7 +104,15 @@ public String description() {

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), e, sendShardFailure);
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
state.setStage(SegmentReplicationState.Stage.CANCELLED);
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
} else {
listener.onFailure(state(), e, sendShardFailure);
}
}

@Override
Expand Down Expand Up @@ -134,13 +143,22 @@ public void writeFileChunk(
* @param listener {@link ActionListener} listener.
*/
public void startReplication(ActionListener<Void> listener) {
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
// This method only executes when cancellation is triggered by this node and caught by a call to checkForCancel,
// SegmentReplicationSource does not share CancellableThreads.
final CancellableThreads.ExecutionCancelledException executionCancelledException =
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
notifyListener(executionCancelledException, false);
throw executionCancelledException;
});
state.setStage(SegmentReplicationState.Stage.REPLICATING);
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
final StepListener<Void> finalizeListener = new StepListener<>();

logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
// Get list of files to copy from this checkpoint.
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Expand All @@ -154,6 +172,7 @@ public void startReplication(ActionListener<Void> listener) {

private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
throws IOException {
cancellableThreads.checkForCancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the case for partial files copy is handled i.e. replica copied few but not all files ?
Can this be problematic when this replica is promoted as new primary with few files from previous primary ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are partial files copied they will remain tmp with a prefix like replication_0.cfe. The rename occurs once all files have been copied in finalize. So in this case the replication would be cancelled and the partial files discarded in the commit made before the shard's engine is re-opened swapped.

state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
Expand Down Expand Up @@ -188,12 +207,14 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
}
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this check added again on this method, shouldn't checkForCancel handle this as well ?

Is this added just as a safeguard for state changes just before b/w start of this method and when actual files are requested from primary ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I wanted to add the check in both the GET_FILES step and FILE_DIFF

source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
ActionListener.completeWith(listener, () -> {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
Expand Down Expand Up @@ -261,4 +282,10 @@ Store.MetadataSnapshot getMetadataSnapshot() throws IOException {
}
return store.getMetadata(indexShard.getSegmentInfosSnapshot().get());
}

@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
source.cancel();
}
}
Loading