Skip to content

Commit

Permalink
Tidy up Store#trimUnsafeCommits (#47062)
Browse files Browse the repository at this point in the history
This change removes unused parameters and simplifies the logic.
  • Loading branch information
dnhatn authored Sep 25, 2019
1 parent b52d1da commit 65374c9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
Expand All @@ -98,7 +97,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -203,7 +201,7 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
trimUnsafeCommits(engineConfig);
store.trimUnsafeCommits(config().getTranslogConfig().getTranslogPath());
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(),
seqNo -> {
final LocalCheckpointTracker tracker = getLocalCheckpointTracker();
Expand Down Expand Up @@ -2806,15 +2804,6 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
return true;
}

private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
final Store store = engineConfig.getStore();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
}

/**
* Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
Expand Down
28 changes: 7 additions & 21 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1501,31 +1501,17 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long minRetainedTranslogGen,
final org.elasticsearch.Version indexVersionCreated) throws IOException {
public void trimUnsafeCommits(final Path translogPath) throws IOException {
metadataLock.writeLock().lock();
try {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);

if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) {
throw new IllegalStateException("starting commit translog uuid ["
+ startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid ["
+ translogUUID + "]");
}
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
assert existingCommits.isEmpty() == false;
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final IndexCommit startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
if (startingIndexCommit.equals(lastIndexCommit) == false) {
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5260,7 +5260,7 @@ public void testTrimUnsafeCommits() throws Exception {
minTranslogGen = engine.getTranslog().getMinFileGeneration();
}

store.trimUnsafeCommits(globalCheckpoint.get(), minTranslogGen,config.getIndexSettings().getIndexVersionCreated());
store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath());
long safeMaxSeqNo =
commitMaxSeqNo.stream().filter(s -> s <= globalCheckpoint.get())
.reduce((s1, s2) -> s2) // get the last one.
Expand Down

0 comments on commit 65374c9

Please sign in to comment.