-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset replica engine to global checkpoint on promotion #33473
Changes from all commits
5935e4f
18851c1
5ab9d12
571fc99
a7902f3
19129cd
b307415
b5ec957
7800e65
06f3c02
3ca021e
50cd92f
6e3d985
e4ad8bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,7 +163,6 @@ | |
import java.util.stream.StreamSupport; | ||
|
||
import static org.elasticsearch.index.mapper.SourceToParse.source; | ||
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; | ||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
|
||
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { | ||
|
@@ -1273,16 +1272,18 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine | |
return result; | ||
} | ||
|
||
// package-private for testing | ||
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { | ||
recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); | ||
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); | ||
/** | ||
* Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}. | ||
* The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully. | ||
*/ | ||
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, | ||
Runnable onOperationRecovered) throws IOException { | ||
int opsRecovered = 0; | ||
Translog.Operation operation; | ||
while ((operation = snapshot.next()) != null) { | ||
try { | ||
logger.trace("[translog] recover op {}", operation); | ||
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); | ||
Engine.Result result = applyTranslogOperation(operation, origin); | ||
switch (result.getResultType()) { | ||
case FAILURE: | ||
throw result.getFailure(); | ||
|
@@ -1295,7 +1296,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce | |
} | ||
|
||
opsRecovered++; | ||
recoveryState.getTranslog().incrementRecoveredOperations(); | ||
onOperationRecovered.run(); | ||
} catch (Exception e) { | ||
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { | ||
// mainly for MapperParsingException and Failure to detect xcontent | ||
|
@@ -1313,8 +1314,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce | |
* Operations from the translog will be replayed to bring lucene up to date. | ||
**/ | ||
public void openEngineAndRecoverFromTranslog() throws IOException { | ||
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); | ||
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { | ||
translogRecoveryStats.totalOperations(snapshot.totalOperations()); | ||
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); | ||
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, | ||
translogRecoveryStats::incrementRecoveredOperations); | ||
}; | ||
innerOpenEngineAndTranslog(); | ||
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE); | ||
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); | ||
} | ||
|
||
/** | ||
|
@@ -1352,11 +1360,7 @@ private void innerOpenEngineAndTranslog() throws IOException { | |
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); | ||
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); | ||
|
||
assertMaxUnsafeAutoIdInCommit(); | ||
|
||
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); | ||
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); | ||
trimUnsafeCommits(); | ||
|
||
createNewEngine(config); | ||
verifyNotClosed(); | ||
|
@@ -1367,6 +1371,15 @@ private void innerOpenEngineAndTranslog() throws IOException { | |
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); | ||
} | ||
|
||
private void trimUnsafeCommits() throws IOException { | ||
assert currentEngineReference.get() == null : "engine is running"; | ||
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); | ||
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); | ||
assertMaxUnsafeAutoIdInCommit(); | ||
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated()); | ||
} | ||
|
||
private boolean assertSequenceNumbersInCommit() throws IOException { | ||
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); | ||
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; | ||
|
@@ -1463,7 +1476,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn | |
if (origin == Engine.Operation.Origin.PRIMARY) { | ||
assert assertPrimaryMode(); | ||
} else { | ||
assert origin == Engine.Operation.Origin.REPLICA; | ||
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET; | ||
assert assertReplicationTarget(); | ||
} | ||
if (writeAllowedStates.contains(state) == false) { | ||
|
@@ -2166,9 +2179,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { | |
|
||
private Engine createNewEngine(EngineConfig config) { | ||
synchronized (mutex) { | ||
if (state == IndexShardState.CLOSED) { | ||
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); | ||
} | ||
verifyNotClosed(); | ||
assert this.currentEngineReference.get() == null; | ||
Engine engine = newEngine(config); | ||
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen | ||
|
@@ -2314,19 +2325,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g | |
bumpPrimaryTerm(opPrimaryTerm, () -> { | ||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); | ||
final long currentGlobalCheckpoint = getGlobalCheckpoint(); | ||
final long localCheckpoint; | ||
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) { | ||
localCheckpoint = NO_OPS_PERFORMED; | ||
final long maxSeqNo = seqNoStats().getMaxSeqNo(); | ||
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", | ||
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); | ||
if (currentGlobalCheckpoint < maxSeqNo) { | ||
resetEngineToGlobalCheckpoint(); | ||
} else { | ||
localCheckpoint = currentGlobalCheckpoint; | ||
getEngine().rollTranslogGeneration(); | ||
} | ||
logger.trace( | ||
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", | ||
opPrimaryTerm, | ||
getLocalCheckpoint(), | ||
localCheckpoint); | ||
getEngine().resetLocalCheckpoint(localCheckpoint); | ||
getEngine().rollTranslogGeneration(); | ||
}); | ||
} | ||
} | ||
|
@@ -2687,4 +2693,26 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { | |
} | ||
}; | ||
} | ||
|
||
/** | ||
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. | ||
*/ | ||
void resetEngineToGlobalCheckpoint() throws IOException { | ||
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; | ||
sync(); // persist the global checkpoint to disk | ||
final long globalCheckpoint = getGlobalCheckpoint(); | ||
final Engine newEngine; | ||
synchronized (mutex) { | ||
verifyNotClosed(); | ||
IOUtils.close(currentEngineReference.getAndSet(null)); | ||
trimUnsafeCommits(); | ||
newEngine = createNewEngine(newEngineConfig()); | ||
active.set(true); | ||
} | ||
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( | ||
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { | ||
// TODO: add a dedicate recovery stats for the reset translog | ||
}); | ||
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at the implications of exposing an engine that isn't fully recovered yet and it's OK, with the exception of syncFlush(). Depending how this ends up being, we may need to make sure that runs under a permit. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -519,18 +519,14 @@ public void testSeqNoCollision() throws Exception { | |
shards.promoteReplicaToPrimary(replica2).get(); | ||
logger.info("--> Recover replica3 from replica2"); | ||
recoverReplica(replica3, replica2, true); | ||
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { | ||
try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) { | ||
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); | ||
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations); | ||
expectedOps.add(op2); | ||
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); | ||
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); | ||
} | ||
// TODO: We should assert the content of shards in the ReplicationGroup. | ||
// Without rollback replicas(current implementation), we don't have the same content across shards: | ||
// - replica1 has {doc1} | ||
// - replica2 has {doc1, doc2} | ||
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery | ||
shards.assertAllEqual(initDocs + 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. w00t |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just wondering. Would it make sense to do the trimUnsafeCommits as part of the new engine creation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was before, but we prefer not to modify the Store implicitly (#33473 (comment)).