Skip to content

Commit

Permalink
Use local checkpoint to calculate min translog gen for recovery (#51905)
Browse files Browse the repository at this point in the history
Today we use the translog_generation of the safe commit as the minimum
required translog generation for recovery. This approach has a
limitation, where we won't be able to clean up translog unless we flush.
Reopening an already recovered engine will create a new empty translog,
and we leave it there until we force flush.

This commit removes the translog_generation commit tag and uses the
local checkpoint of the safe commit to calculate the minimum required
translog generation for recovery instead.

Closes #49970
dnhatn committed Feb 26, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent b4179a8 commit db6b9c2
Showing 24 changed files with 341 additions and 550 deletions.
Original file line number Diff line number Diff line change
@@ -88,6 +88,13 @@
cluster.health:
wait_for_no_initializing_shards: true
wait_for_events: languid
# Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required
# translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices.
- do:
indices.flush:
index: test
force: true
wait_if_ongoing: true
- do:
indices.stats:
metric: [ translog ]
@@ -115,10 +122,9 @@
- do:
indices.stats:
metric: [ translog ]
# after flushing we have one empty translog file while an empty index before flushing has two empty translog files.
- lt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 0 }
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

---
Original file line number Diff line number Diff line change
@@ -121,16 +121,10 @@ private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}

protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
Original file line number Diff line number Diff line change
@@ -400,7 +400,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) {
return translogRecoveryRunner.run(this, snapshot);
}
}
@@ -473,23 +473,24 @@ public void skipTranslogRecovery() {
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
} else {
opsRecovered = 0;
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null :
translogGeneration.translogFileGeneration, translog.currentFileGeneration());
logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog, null);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
@@ -501,7 +502,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {

final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer);
@@ -549,7 +551,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
}
}

@@ -598,18 +600,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

/**
* Reads the current stored history ID from the IW commit data.
*/
@@ -1688,8 +1678,9 @@ final boolean tryRenewSyncCommit() {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() &&
translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
@@ -1714,8 +1705,10 @@ public boolean shouldPeriodicallyFlush() {
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long localCheckpointOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
@@ -2423,11 +2416,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpointValue = Long.toString(localCheckpoint);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
@@ -2438,10 +2426,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(8);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
@@ -2657,7 +2644,7 @@ public boolean hasCompleteOperationHistory(String reason, HistorySource historyS
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
31 changes: 12 additions & 19 deletions server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@@ -137,31 +138,23 @@ public void trimUnreferencedTranslogFiles() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
}
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Original file line number Diff line number Diff line change
@@ -223,15 +223,14 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
8 changes: 3 additions & 5 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
@@ -1453,10 +1453,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
updateCommitData(writer, Collections.singletonMap(Translog.TRANSLOG_UUID_KEY, translogUUID));
} finally {
metadataLock.writeLock().unlock();
}
@@ -1517,7 +1514,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
final String translogGeneration = commit.getUserData().get("translog_generation");
if (translogGeneration == null || minRetainedTranslogGen <= Long.parseLong(translogGeneration)) {
recoverableCommits.add(commit);
}
}
Loading

0 comments on commit db6b9c2

Please sign in to comment.