Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset replica engine to global checkpoint on promotion #33473

Merged
merged 14 commits into from
Sep 12, 2018
13 changes: 6 additions & 7 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,6 @@ public CommitStats commitStats() {
*/
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;

/**
* Reset the local checkpoint in the tracker to the given local checkpoint
* @param localCheckpoint the new checkpoint to be set
*/
public abstract void resetLocalCheckpoint(long localCheckpoint);

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
Expand Down Expand Up @@ -1163,11 +1157,16 @@ public enum Origin {
PRIMARY,
REPLICA,
PEER_RECOVERY,
LOCAL_TRANSLOG_RECOVERY;
LOCAL_TRANSLOG_RECOVERY,
LOCAL_RESET;

public boolean isRecovery() {
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY;
}

boolean isFromTranslog() {
return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESET;
}
}

public Origin origin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ private boolean canOptimizeAddDocument(Index index) {
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
case LOCAL_RESET:
assert index.isRetry();
return true; // allow to optimize in order to update the max safe time stamp
default:
Expand Down Expand Up @@ -833,7 +834,7 @@ public IndexResult index(Index index) throws IOException {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
Expand Down Expand Up @@ -1173,7 +1174,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
}
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (delete.origin().isFromTranslog() == false) {
final Translog.Location location;
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Delete(delete, deleteResult));
Expand Down Expand Up @@ -1411,7 +1412,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
if (noOp.origin().isFromTranslog() == false) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
Expand Down Expand Up @@ -2340,11 +2341,6 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

@Override
public void resetLocalCheckpoint(long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
* @param checkpoint the local checkpoint to reset this tracker to
*/
public synchronized void resetCheckpoint(final long checkpoint) {
// TODO: remove this method as after we restore the local history on promotion.
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
Expand Down
83 changes: 55 additions & 28 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@
import java.util.stream.StreamSupport;

import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
Expand Down Expand Up @@ -1273,16 +1272,18 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
return result;
}

// package-private for testing
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
/**
* Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.
* The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.
*/
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some java docs to what to onOperationRecovered mean?

Runnable onOperationRecovered) throws IOException {
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
Engine.Result result = applyTranslogOperation(operation, origin);
switch (result.getResultType()) {
case FAILURE:
throw result.getFailure();
Expand All @@ -1295,7 +1296,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
}

opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
onOperationRecovered.run();
} catch (Exception e) {
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
Expand All @@ -1313,8 +1314,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations());
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1352,11 +1360,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");

assertMaxUnsafeAutoIdInCommit();

final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
trimUnsafeCommits();

createNewEngine(config);
verifyNotClosed();
Expand All @@ -1367,6 +1371,14 @@ private void innerOpenEngineAndTranslog() throws IOException {
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

private void trimUnsafeCommits() throws IOException {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert the currentEngineReference is null?

final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
assertMaxUnsafeAutoIdInCommit();
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated());
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
Expand Down Expand Up @@ -1463,7 +1475,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
if (origin == Engine.Operation.Origin.PRIMARY) {
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA;
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
assert assertReplicationTarget();
}
if (writeAllowedStates.contains(state) == false) {
Expand Down Expand Up @@ -2166,9 +2178,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {

private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
}
verifyNotClosed();
assert this.currentEngineReference.get() == null;
Engine engine = newEngine(config);
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen
Expand Down Expand Up @@ -2314,19 +2324,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
localCheckpoint = NO_OPS_PERFORMED;
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
localCheckpoint = currentGlobalCheckpoint;
getEngine().rollTranslogGeneration();
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
opPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
});
}
}
Expand Down Expand Up @@ -2687,4 +2692,26 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
}
};
}

/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
sync(); // persist the global checkpoint to disk
final long globalCheckpoint = getGlobalCheckpoint();
final Engine newEngine;
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(null));
trimUnsafeCommits();
newEngine = createNewEngine(newEngineConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering. Would it make sense to do the trimUnsafeCommits as part of the new engine creation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was before, but we prefer not to modify the Store implicitly (#33473 (comment)).

active.set(true);
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the implications of exposing an engine that isn't fully recovered yet and it's OK, with the exception of syncFlush(). Depending how this ends up being, we may need to make sure that runs under a permit.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
assertSeqNos();
assertSameDocIdsOnShards();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4087,7 +4087,7 @@ public void markSeqNoAsCompleted(long seqNo) {
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
final long resetLocalCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
actualEngine.resetLocalCheckpoint(resetLocalCheckpoint);
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
completedSeqNos.clear();
actualEngine.restoreLocalCheckpointFromTranslog();
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,18 +519,14 @@ public void testSeqNoCollision() throws Exception {
shards.promoteReplicaToPrimary(replica2).get();
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
// TODO: We should assert the content of shards in the ReplicationGroup.
// Without rollback replicas(current implementation), we don't have the same content across shards:
// - replica1 has {doc1}
// - replica2 has {doc1, doc2}
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
shards.assertAllEqual(initDocs + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w00t

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -306,14 +304,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
}

// roll back the extra ops in the replica
shards.removeReplica(replica);
replica.close("resync", false);
replica.store().close();
newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(totalDocs);
// Make sure that flushing on a recovering shard is ok.
shards.flush();
shards.assertAllEqual(totalDocs);
Expand Down Expand Up @@ -406,31 +396,14 @@ public void testResyncAfterPrimaryPromotion() throws Exception {
indexOnReplica(bulkShardRequest, shards, justReplica);
}

logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats());

logger.info("--> resyncing replicas");
logger.info("--> resyncing replicas seqno_stats primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats());
PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();
if (syncedGlobalCheckPoint) {
assertEquals(extraDocs, task.getResyncedOperations());
} else {
assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs));
}
List<IndexShard> replicas = shards.getReplicas();

// check all docs on primary are available on replica
Set<String> primaryIds = getShardDocUIDs(newPrimary);
assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs));
for (IndexShard replica : replicas) {
Set<String> replicaIds = getShardDocUIDs(replica);
Set<String> temp = new HashSet<>(primaryIds);
temp.removeAll(replicaIds);
assertThat(replica.routingEntry() + " is missing docs", temp, empty());
temp = new HashSet<>(replicaIds);
temp.removeAll(primaryIds);
// yeah, replica has more docs as there is no Lucene roll back on it
assertThat(replica.routingEntry() + " has to have extra docs", temp,
extraDocsToBeTrimmed > 0 ? not(empty()) : empty());
}
shards.assertAllEqual(initialDocs + extraDocs);

// check translog on replica is trimmed
int translogOperations = 0;
Expand Down
Loading