Skip to content

Commit

Permalink
[Segment Replication] - Update replicas to commit SegmentInfos instea…
Browse files Browse the repository at this point in the history
…d of relying on segments_N from primary shards. (#4402)

* Segment Replication - Update replicas to commit SegmentInfos instead of relying on segments_N from primary shards.

This change updates replicas to commit SegmentInfos before the shard is closed, on receiving a new commit point from a primary, and when a new primary is detected. This change also makes the public commitSegmentInfos on NRTEngine obsolete, refactoring IndexShard to simply call reset on the engine.

Signed-off-by: Marc Handalian <[email protected]>

* Remove noise & extra log statement.

Signed-off-by: Marc Handalian <[email protected]>

* PR feedback.

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Sep 7, 2022
1 parent ce6c6a8 commit 3ef0046
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 153 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine {
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

public NRTReplicationEngine(EngineConfig engineConfig) {
Expand Down Expand Up @@ -120,14 +122,16 @@ public TranslogManager translogManager() {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
// Commit and roll the xlog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

Expand All @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}

protected void commitSegmentInfos() throws IOException {
commitSegmentInfos(getLatestSegmentInfos());
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down Expand Up @@ -354,6 +354,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
28 changes: 4 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ public void updateShardState(
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
promoteNRTReplicaToPrimary();
resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
Expand Down Expand Up @@ -3557,7 +3557,9 @@ private void innerAcquireReplicaOperationPermit(
currentGlobalCheckpoint,
maxSeqNo
);
if (currentGlobalCheckpoint < maxSeqNo) {
// With Segment Replication enabled, we never want to reset a replica's engine unless
// it is promoted to primary.
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().translogManager().rollTranslogGeneration();
Expand Down Expand Up @@ -4120,26 +4122,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
} catch (IOException e) {
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
}
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ temporaryFileName
+ "] in "
+ Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
// With Segment Replication, we will fsync after a full commit has been received.
if (store.indexSettings().isSegRepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Loading

0 comments on commit 3ef0046

Please sign in to comment.