Skip to content

Commit

Permalink
Cleanup legacy logic in CombinedDeletionPolicy (#43484)
Browse files Browse the repository at this point in the history
This change removes the support for pre-v6 index commits which 
do not have sequence numbers.
  • Loading branch information
dnhatn authored Jun 23, 2019
1 parent 45733fd commit 70839bf
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
/**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
* If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit.
* If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
* In this case, this method will return the oldest index commit.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path, String)}
Expand Down Expand Up @@ -172,22 +173,13 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return Math.min(commits.size() - 1, i + 1);
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
/*
* We may reach to this point in these cases:
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
* However, that commit may not be a safe commit if writes are in progress in the primary.
*/
// If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
// In this case, we return the oldest index commit instead.
return 0;
}

Expand All @@ -204,11 +196,9 @@ synchronized boolean hasSnapshottedCommits() {
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonList;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -180,41 +178,6 @@ public void testAcquireIndexCommit() throws Exception {
Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
}

public void testLegacyIndex() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
final UUID translogUUID = UUID.randomUUID();

TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
assertThat(CombinedDeletionPolicy.findSafeCommitPoint(singletonList(legacyCommit), globalCheckpoint.get()),
equalTo(legacyCommit));

long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE);
long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE);
final IndexCommit freshCommit = mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, translogUUID, safeTranslogGen);

globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit.
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));

// Make the fresh commit safe.
resetDeletion(legacyCommit);
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(2)).delete();
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(getLocalCheckpoint(freshCommit) + 1));
}

public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
Expand Down Expand Up @@ -317,15 +280,4 @@ void resetDeletion(IndexCommit commit) {
private long getLocalCheckpoint(IndexCommit commit) throws IOException {
return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}

IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
final IndexCommit commit = mock(IndexCommit.class);
when(commit.getUserData()).thenReturn(userData);
resetDeletion(commit);
return commit;
}

}

0 comments on commit 70839bf

Please sign in to comment.