-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset replica engine to global checkpoint on promotion #33473
Changes from 8 commits
5935e4f
18851c1
5ab9d12
571fc99
a7902f3
19129cd
b307415
b5ec957
7800e65
06f3c02
3ca021e
50cd92f
6e3d985
e4ad8bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { | |
* @param checkpoint the local checkpoint to reset this tracker to | ||
*/ | ||
public synchronized void resetCheckpoint(final long checkpoint) { | ||
// TODO: remove this method as we no longer need it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are we waiting on? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have tests which verify that we restore the local checkpoint after resetting it to the global checkpoint. I decided to leave out this method in PR to minimize the changes. I will remove this method in the next PR. |
||
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
assert checkpoint <= this.checkpoint; | ||
processedSeqNo.clear(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1275,14 +1275,21 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine | |
|
||
// package-private for testing | ||
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { | ||
recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); | ||
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); | ||
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); | ||
translogRecoveryStats.totalOperations(snapshot.totalOperations()); | ||
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); | ||
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, | ||
translogRecoveryStats::incrementRecoveredOperations); | ||
} | ||
|
||
private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, | ||
Runnable onPerOperationRecovered) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just call it |
||
int opsRecovered = 0; | ||
Translog.Operation operation; | ||
while ((operation = snapshot.next()) != null) { | ||
try { | ||
logger.trace("[translog] recover op {}", operation); | ||
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); | ||
Engine.Result result = applyTranslogOperation(operation, origin); | ||
switch (result.getResultType()) { | ||
case FAILURE: | ||
throw result.getFailure(); | ||
|
@@ -1293,9 +1300,8 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce | |
default: | ||
throw new AssertionError("Unknown result type [" + result.getResultType() + "]"); | ||
} | ||
|
||
opsRecovered++; | ||
recoveryState.getTranslog().incrementRecoveredOperations(); | ||
onPerOperationRecovered.run(); | ||
} catch (Exception e) { | ||
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { | ||
// mainly for MapperParsingException and Failure to detect xcontent | ||
|
@@ -1340,30 +1346,22 @@ private void innerOpenEngineAndTranslog() throws IOException { | |
} | ||
} | ||
recoveryState.setStage(RecoveryState.Stage.TRANSLOG); | ||
|
||
final EngineConfig config = newEngineConfig(); | ||
|
||
// we disable deletes since we allow for operations to be executed against the shard while recovering | ||
// but we need to make sure we don't loose deletes until we are done recovering | ||
config.setEnableGcDeletes(false); | ||
// we have to set it before we open an engine and recover from the translog because | ||
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, | ||
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. | ||
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); | ||
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); | ||
|
||
assertMaxUnsafeAutoIdInCommit(); | ||
|
||
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); | ||
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); | ||
|
||
createNewEngine(config); | ||
verifyNotClosed(); | ||
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, | ||
// we still give sync'd flush a chance to run: | ||
active.set(true); | ||
assertSequenceNumbersInCommit(); | ||
final EngineConfig config = newEngineConfig(); | ||
// we disable deletes since we allow for operations to be executed against the shard while recovering | ||
// but we need to make sure we don't loose deletes until we are done recovering | ||
config.setEnableGcDeletes(false); | ||
synchronized (mutex) { | ||
assert currentEngineReference.get() == null : "engine is initialized already"; | ||
currentEngineReference.set(createNewEngine(config)); | ||
} | ||
assert assertSequenceNumbersInCommit(); | ||
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); | ||
} | ||
|
||
|
@@ -1463,7 +1461,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn | |
if (origin == Engine.Operation.Origin.PRIMARY) { | ||
assert assertPrimaryMode(); | ||
} else { | ||
assert origin == Engine.Operation.Origin.REPLICA; | ||
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET; | ||
assert assertReplicationTarget(); | ||
} | ||
if (writeAllowedStates.contains(state) == false) { | ||
|
@@ -2164,33 +2162,21 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { | |
} | ||
} | ||
|
||
private Engine createNewEngine(EngineConfig config) { | ||
synchronized (mutex) { | ||
if (state == IndexShardState.CLOSED) { | ||
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); | ||
} | ||
assert this.currentEngineReference.get() == null; | ||
Engine engine = newEngine(config); | ||
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen | ||
// inside the callback are not visible. This one enforces happens-before | ||
this.currentEngineReference.set(engine); | ||
} | ||
|
||
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which | ||
// settings changes could possibly have happened, so here we forcefully push any config changes to the new engine: | ||
Engine engine = getEngineOrNull(); | ||
|
||
// engine could perhaps be null if we were e.g. concurrently closed: | ||
if (engine != null) { | ||
engine.onSettingsChanged(); | ||
} | ||
private Engine createNewEngine(EngineConfig config) throws IOException { | ||
assert Thread.holdsLock(mutex); | ||
verifyNotClosed(); | ||
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); | ||
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); | ||
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it feels weird to do these things here - this method now only creates an engine but doesn't change the IndexShard fields - imo it shouldn't touch the store (because it doesn't know anything about any current engine running it) |
||
assertMaxUnsafeAutoIdInCommit(); | ||
final Engine engine = engineFactory.newReadWriteEngine(config); | ||
onNewEngine(engine); | ||
engine.onSettingsChanged(); | ||
active.set(true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment - it's weird this change the IndexShard active state without actually exposing the engine. |
||
return engine; | ||
} | ||
|
||
protected Engine newEngine(EngineConfig config) { | ||
return engineFactory.newReadWriteEngine(config); | ||
} | ||
|
||
private static void persistMetadata( | ||
final ShardPath shardPath, | ||
final IndexSettings indexSettings, | ||
|
@@ -2314,19 +2300,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g | |
bumpPrimaryTerm(opPrimaryTerm, () -> { | ||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); | ||
final long currentGlobalCheckpoint = getGlobalCheckpoint(); | ||
final long localCheckpoint; | ||
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) { | ||
localCheckpoint = NO_OPS_PERFORMED; | ||
final long maxSeqNo = seqNoStats().getMaxSeqNo(); | ||
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", | ||
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); | ||
if (currentGlobalCheckpoint < maxSeqNo) { | ||
resetEngineToGlobalCheckpoint(); | ||
} else { | ||
localCheckpoint = currentGlobalCheckpoint; | ||
getEngine().rollTranslogGeneration(); | ||
} | ||
logger.trace( | ||
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", | ||
opPrimaryTerm, | ||
getLocalCheckpoint(), | ||
localCheckpoint); | ||
getEngine().resetLocalCheckpoint(localCheckpoint); | ||
getEngine().rollTranslogGeneration(); | ||
}); | ||
} | ||
} | ||
|
@@ -2687,4 +2668,24 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { | |
} | ||
}; | ||
} | ||
|
||
/** | ||
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. | ||
*/ | ||
void resetEngineToGlobalCheckpoint() throws IOException { | ||
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; | ||
sync(); // persist the global checkpoint to disk | ||
final long globalCheckpoint = getGlobalCheckpoint(); | ||
final long maxSeqNo = seqNoStats().getMaxSeqNo(); | ||
logger.info("resetting replica engine from max_seq_no [{}] to global checkpoint [{}]", maxSeqNo, globalCheckpoint); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be trace? we already have a info log message before. |
||
final Engine resettingEngine; | ||
synchronized (mutex) { | ||
IOUtils.close(currentEngineReference.getAndSet(null)); | ||
resettingEngine = createNewEngine(newEngineConfig()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we do this, why did we need to change how createNewEngine behaved (i.e., update currentEngineReference etc.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I copied this change from the previous PR where we have a transition to a read-only engine. I'll revert this change and re-introduce later. |
||
currentEngineReference.set(resettingEngine); | ||
} | ||
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will be good to have some kind of progress logs here (like log ever 10k ops or something) under debug. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We intend to have a dedicate recovery stats for the reset. |
||
runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}); | ||
resettingEngine.recoverFromTranslog(translogRunner, globalCheckpoint); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,7 @@ | |
import org.elasticsearch.index.shard.IndexShardTests; | ||
import org.elasticsearch.index.store.Store; | ||
import org.elasticsearch.index.translog.SnapshotMatchers; | ||
import org.elasticsearch.index.translog.TestTranslog; | ||
import org.elasticsearch.index.translog.Translog; | ||
import org.elasticsearch.indices.recovery.RecoveryTarget; | ||
import org.elasticsearch.threadpool.TestThreadPool; | ||
|
@@ -74,6 +75,7 @@ | |
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.instanceOf; | ||
import static org.hamcrest.Matchers.isIn; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
import static org.hamcrest.Matchers.nullValue; | ||
import static org.hamcrest.core.Is.is; | ||
|
@@ -520,17 +522,11 @@ public void testSeqNoCollision() throws Exception { | |
logger.info("--> Recover replica3 from replica2"); | ||
recoverReplica(replica3, replica2, true); | ||
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { | ||
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); | ||
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations); | ||
expectedOps.add(op2); | ||
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); | ||
List<Translog.Operation> operations = TestTranslog.drainAll(snapshot); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we lost the check that initOperations are also part of the snapshot? |
||
assertThat(op2, isIn(operations)); | ||
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); | ||
} | ||
// TODO: We should assert the content of shards in the ReplicationGroup. | ||
// Without rollback replicas(current implementation), we don't have the same content across shards: | ||
// - replica1 has {doc1} | ||
// - replica2 has {doc1, doc2} | ||
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery | ||
shards.assertAllEqual(initDocs + 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. w00t |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename this to
isTranslog
? then it will tie directly to what's happening in this code.