Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ReplicationFailedException instead of OpensearchException in ReplicationTarget #5955

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
### Changed
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725))
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
### Deprecated
### Removed
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -88,6 +87,7 @@
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -829,7 +829,11 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

package org.opensearch.indices.recovery;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationFailedException;

import java.io.IOException;

Expand All @@ -45,7 +45,7 @@
*
* @opensearch.internal
*/
public class RecoveryFailedException extends OpenSearchException {
public class RecoveryFailedException extends ReplicationFailedException {

public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) {
this(request, null, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.indices.recovery;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;

Expand Down Expand Up @@ -49,7 +49,7 @@ public void onDone(ReplicationState state) {
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -56,10 +55,11 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTarget;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationCollection;

import java.io.IOException;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -135,7 +135,7 @@ public String description() {
}

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -105,16 +104,14 @@ public String description() {
}

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
state.setStage(SegmentReplicationState.Stage.CANCELLED);
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
} else {
listener.onFailure(state(), e, sendShardFailure);
}
listener.onFailure(state(), e, sendShardFailure);
}

@Override
Expand Down Expand Up @@ -150,7 +147,7 @@ public void startReplication(ActionListener<Void> listener) {
// SegmentReplicationSource does not share CancellableThreads.
final CancellableThreads.ExecutionCancelledException executionCancelledException =
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
notifyListener(executionCancelledException, false);
notifyListener(new ReplicationFailedException("Segment replication failed", executionCancelledException), false);
throw executionCancelledException;
});
state.setStage(SegmentReplicationState.Stage.REPLICATING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
Expand All @@ -27,6 +26,7 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -196,7 +196,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
Expand Down Expand Up @@ -249,13 +249,13 @@ default void onDone(ReplicationState state) {
}

@Override
default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
default void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure);
}

void onReplicationDone(SegmentReplicationState state);

void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure);
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}

/**
Expand Down Expand Up @@ -293,13 +293,14 @@ public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
if (onGoingReplications.getTarget(replicationId) != null) {
IndexShard indexShard = onGoingReplications.getTarget(replicationId).indexShard();
// if the target still exists in our collection, the primary initiated the cancellation, fail the replication
// but do not fail the shard. Cancellations initiated by this node from Index events will be removed with
// onGoingReplications.cancel and not appear in the collection when this listener resolves.
onGoingReplications.fail(replicationId, (CancellableThreads.ExecutionCancelledException) cause, false);
onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false);
}
} else {
onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true);
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -134,7 +132,7 @@ public T reset(final long id, final TimeValue activityTimeout) {
} catch (Exception e) {
// fail shard to be safe
assert oldTarget != null;
oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true);
oldTarget.notifyListener(new ReplicationFailedException("Unable to reset target", e), true);
return null;
}
}
Expand Down Expand Up @@ -187,7 +185,7 @@ public boolean cancel(long id, String reason) {
* @param e exception with reason for the failure
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void fail(long id, OpenSearchException e, boolean sendShardFailure) {
public void fail(long id, ReplicationFailedException e, boolean sendShardFailure) {
T removed = onGoingTargetEvents.remove(id);
if (removed != null) {
logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure);
Expand Down Expand Up @@ -299,7 +297,7 @@ protected void doRun() throws Exception {
String message = "no activity after [" + checkInterval + "]";
fail(
id,
new OpenSearchTimeoutException(message),
new ReplicationFailedException(message),
true // to be safe, we don't know what go stuck
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,16 @@ public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, T
public ReplicationFailedException(StreamInput in) throws IOException {
super(in);
}

public ReplicationFailedException(Exception e) {
super(e);
}

public ReplicationFailedException(String msg) {
super(msg);
}

public ReplicationFailedException(String msg, Throwable cause) {
super(msg, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.indices.replication.common;

import org.opensearch.OpenSearchException;

/**
* Interface for listeners that run when there's a change in {@link ReplicationState}
*
Expand All @@ -19,5 +17,5 @@ public interface ReplicationListener {

void onDone(ReplicationState state);

void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure);
void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -78,7 +77,7 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure);
public abstract void notifyListener(ReplicationFailedException e, boolean sendShardFailure);

public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
super(name);
Expand Down Expand Up @@ -170,7 +169,7 @@ public void cancel(String reason) {
* @param e exception that encapsulates the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public void fail(OpenSearchException e, boolean sendShardFailure) {
public void fail(ReplicationFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
notifyListener(e, sendShardFailure);
Expand All @@ -187,7 +186,7 @@ public void fail(OpenSearchException e, boolean sendShardFailure) {

protected void ensureRefCount() {
if (refCount() <= 0) {
throw new OpenSearchException(
throw new ReplicationFailedException(
"ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -45,6 +44,7 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -790,8 +790,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
assertTrue(e instanceof CancellableThreads.ExecutionCancelledException);
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertFalse(sendShardFailure);
assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage());
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -471,7 +471,7 @@ public void onDone(ReplicationState state) {
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("Unexpected error", e);
Assert.fail("Test should succeed");
}
Expand Down Expand Up @@ -149,7 +150,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
// failures leave state object in last entered stage.
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
assertEquals(expectedError, e.getCause());
Expand Down
Loading