Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add SegmentReplicationTargetService to orchestrate replication events. #3439

Merged
merged 3 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.SnapshotState;
Expand Down Expand Up @@ -397,7 +397,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes;
int i = randomIntBetween(0, req.content().length() - 1);
array[i] = (byte) ~array[i]; // flip one byte in the content
Expand Down Expand Up @@ -474,11 +474,11 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
if (truncate && req.length() > 1) {
BytesRef bytesRef = req.content().toBytesRef();
BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1);
request = new RecoveryFileChunkRequest(
request = new FileChunkRequest(
req.recoveryId(),
req.requestSeqNo(),
req.shardId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
Expand Down Expand Up @@ -809,7 +809,7 @@ public void sendRequest(
TransportRequestOptions options
) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
FileChunkRequest chunkRequest = (FileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFileChunkRequest;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testCancelRecoveryAndResume() throws Exception {
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
FileChunkRequest req = (FileChunkRequest) request;
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
import java.io.IOException;

/**
* Request for a recovery file chunk
* Request containing a file chunk.
*
* @opensearch.internal
*/
public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {
public final class FileChunkRequest extends RecoveryTransportRequest {
private final boolean lastChunk;
private final long recoveryId;
private final ShardId shardId;
Expand All @@ -58,7 +58,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest {

private final int totalTranslogOps;

public RecoveryFileChunkRequest(StreamInput in) throws IOException {
public FileChunkRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
Expand All @@ -75,7 +75,7 @@ public RecoveryFileChunkRequest(StreamInput in) throws IOException {
sourceThrottleTimeInNanos = in.readLong();
}

public RecoveryFileChunkRequest(
public FileChunkRequest(
long recoveryId,
final long requestSeqNo,
ShardId shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,17 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
Expand All @@ -60,7 +57,6 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IllegalIndexShardStateException;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -71,7 +67,6 @@
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -148,7 +143,7 @@ public PeerRecoveryTargetService(
transportService.registerRequestHandler(
Actions.FILE_CHUNK,
ThreadPool.Names.GENERIC,
RecoveryFileChunkRequest::new,
FileChunkRequest::new,
new FileChunkTransportRequestHandler()
);
transportService.registerRequestHandler(
Expand Down Expand Up @@ -354,12 +349,13 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.PREPARE_TRANSLOG, request);
if (listener == null) {
return;
}

recoveryRef.get().prepareForTranslogOperations(request.totalTranslogOps(), listener);
recoveryTarget.prepareForTranslogOperations(request.totalTranslogOps(), listener);
}
}
}
Expand All @@ -369,12 +365,13 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FINALIZE, request);
if (listener == null) {
return;
}

recoveryRef.get().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
recoveryTarget.finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener);
}
}
}
Expand All @@ -399,8 +396,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
throws IOException {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(
recoveryRef,
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(
channel,
Actions.TRANSLOG_OPS,
request,
Expand Down Expand Up @@ -484,20 +480,20 @@ class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesIn
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILES_INFO, request);
if (listener == null) {
return;
}

recoveryRef.get()
.receiveFileInfo(
request.phase1FileNames,
request.phase1FileSizes,
request.phase1ExistingFileNames,
request.phase1ExistingFileSizes,
request.totalTranslogOps,
listener
);
recoveryTarget.receiveFileInfo(
request.phase1FileNames,
request.phase1FileSizes,
request.phase1ExistingFileNames,
request.phase1ExistingFileSizes,
request.totalTranslogOps,
listener
);
}
}
}
Expand All @@ -507,90 +503,37 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request);
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.CLEAN_FILES, request);
if (listener == null) {
return;
}

recoveryRef.get()
.cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener);
recoveryTarget.cleanFiles(
request.totalTranslogOps(),
request.getGlobalCheckpoint(),
request.sourceMetaSnapshot(),
listener
);
}
}
}

class FileChunkTransportRequestHandler implements TransportRequestHandler<RecoveryFileChunkRequest> {
class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();

@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
return;
}

final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}

RateLimiter rateLimiter = recoverySettings.rateLimiter();
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rateLimiter.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
recoveryTarget.writeFileChunk(
request.metadata(),
request.position(),
request.content(),
request.lastChunk(),
request.totalTranslogOps(),
listener
);
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
}
}
}

private ActionListener<Void> createOrFinishListener(
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request
) {
return createOrFinishListener(recoveryRef, channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE);
}

private ActionListener<Void> createOrFinishListener(
final ReplicationRef<RecoveryTarget> recoveryRef,
final TransportChannel channel,
final String action,
final RecoveryTransportRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);

final long requestSeqNo = request.requestSeqNo();
final ActionListener<Void> listener;
if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
listener = recoveryTarget.markRequestReceivedAndCreateListener(requestSeqNo, voidListener);
} else {
listener = voidListener;
}

return listener;
}

class RecoveryRunner extends AbstractRunnable {

final long recoveryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public Translog getTranslog() {
return translog;
}

@Override
public ReplicationTimer getTimer() {
return timer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -141,7 +142,7 @@ public String description() {
}

@Override
public void notifyListener(Exception e, boolean sendShardFailure) {
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}

Expand Down
Loading