Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication orchestration for a single shard #3533

Merged
merged 6 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1594,6 +1595,12 @@ private enum OpenSearchExceptionHandle {
org.opensearch.transport.NoSeedNodeLeftException::new,
160,
LegacyESVersion.V_7_10_0
),
REPLICATION_FAILED_EXCEPTION(
org.opensearch.indices.replication.common.ReplicationFailedException.class,
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
V_2_1_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.engine.RefreshFailedEngineException;
import org.opensearch.index.engine.SafeCommitInfo;
Expand Down Expand Up @@ -161,8 +162,8 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1363,6 +1364,20 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

private Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we are adding the method to be called by everyone but only supported for NRTReplicationEngine. May we could open up the getEngine() and SegmentReplicationTarget could make the decision if updateSegments should be called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I'm still concerned about opening up access of engine beyond IndexShard

Copy link
Collaborator

@reta reta Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enouh, I think we can still do better than that:

public interface ReplicationEngine {
    void updateSegments(final SegmentInfos infos, long seqNo) throws IOException;
}

Than IndexShard can do:

public Optional<ReplicationEngine> getReplicationEngine () {
    if (getEngine() instanceof ReplicationEngine)) {
        return Optional.of((ReplicationEngine)getEngine());
    } else {
       return Optional.empty();
    }
}

The main idea here: if something is not implemented, it should not be called. In this case, if engine does not support replication - the getReplicationEngine() returns empty and the caller won't be able to make a call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we really need a new Engine interface, does the current Engine interface not perform anything that has to do with replication already?

Copy link
Collaborator

@reta reta Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar the Engine is not exposed directly but performs everything needed, the goal here to expose only the small piece of engine functionality (ReplicationEngine) if the engine supports replication.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta I understand what the interface intends to do, what I have a concern on is the interface looks incomplete to me. I am not quite sure if void updateSegments(final SegmentInfos infos, long seqNo) throws IOException; is the only thing a ReplicationEngine should do

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar 👍

Copy link
Member

@mch2 mch2 Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicationEngine is a regular Engine implementation and right now the only additional functionality is updating & refreshing on an externally provided SegmentInfos in NRTReplicationEngine. @Bukhtawar what additional functionality do you think is missing? We could add a default implementation in Engine that throws a UnsupportedOperationException and only implement it in NRTReplicationEngine but I like the isolated interface.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would revisit the Engine interface and see what can be extracted into a separate ReplicationEngine interface and decide on the split. Based on what you mention @mch2 we can also create a functional interface and call it a SegmentUpdateService instead so that it appears less overloaded and performs the limited responsibility it specialises in/is tasked with.

public interface SegmentUpdateService {
    
    @FunctionalInterface
    void updateSegments(final SegmentInfos infos, long seqNo) throws IOException;
}

Copy link
Member

@kartg kartg Jun 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: @Poojita-Raj pointed me to #3533 (comment) for more context on this change. Given that the only caller of getReplicationEngine is the finalizeReplication method, could we mitigate this by simply making getReplicationEngine private, and then using NRTReplicationEngine directly as described below?


i agree that the current ReplicationEngine interface seems out of place and incomplete. Why not use NRTReplicationEngine directly?

public Optional<NRTReplicationEngine> getReplicationEngine () {
    if (getEngine() instanceof NRTReplicationEngine)) {
        return Optional.of((NRTReplicationEngine)getEngine());
    } else {
       return Optional.empty();
    }
}

if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
}
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
Expand Down
46 changes: 46 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -706,6 +707,51 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
private static final String RECOVERY_PREFIX = "recovery.";

private final DiscoveryNode sourceNode;
private final CancellableThreads cancellableThreads;
protected final MultiFileWriter multiFileWriter;
protected final Store store;

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);
Expand All @@ -93,13 +91,10 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.cancellableThreads = new CancellableThreads();
this.sourceNode = sourceNode;
indexShard.recoveryStats().incCurrentAsTarget();
this.store = indexShard.store();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
store.incRef();
}

/**
Expand Down Expand Up @@ -132,11 +127,6 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public Store store() {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
ensureRefCount();
return store;
}

public String description() {
return "recovery from " + source();
}
Expand Down Expand Up @@ -258,14 +248,6 @@ protected void onDone() {
indexShard.postRecovery("peer recovery done");
}

/**
* if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
}

/*** Implementation of {@link RecoveryTargetHandler } */

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public class SegmentReplicationState implements ReplicationState {
public enum Stage {
DONE((byte) 0),

INIT((byte) 1);
INIT((byte) 1),

REPLICATING((byte) 2);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -56,29 +58,58 @@ public static Stage fromId(byte id) {
}
}

public SegmentReplicationState() {
this.stage = Stage.INIT;
}

private Stage stage;
private final ReplicationLuceneIndex index;
private final ReplicationTimer timer;

public SegmentReplicationState(ReplicationLuceneIndex index) {
stage = Stage.INIT;
this.index = index;
timer = new ReplicationTimer();
timer.start();
}

@Override
public ReplicationLuceneIndex getIndex() {
// TODO
return null;
return index;
}

@Override
public ReplicationTimer getTimer() {
// TODO
return null;
return timer;
}

public Stage getStage() {
return stage;
}

protected void validateAndSetStage(Stage expected, Stage next) {
if (stage != expected) {
assert false : "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])";
throw new IllegalStateException(
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
stage = next;
}

public void setStage(Stage stage) {
this.stage = stage;
switch (stage) {
case INIT:
this.stage = Stage.INIT;
getIndex().reset();
break;
case REPLICATING:
validateAndSetStage(Stage.INIT, stage);
getIndex().start();
break;
case DONE:
validateAndSetStage(Stage.REPLICATING, stage);
getIndex().stop();
getTimer().stop();
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
}
}
Comment on lines 96 to 115
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have multiple threads invoking this, in which case we would need it to be synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it's only called by a single thread

Loading