Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup legacy logic in CombinedDeletionPolicy #43484

Merged
merged 1 commit into from
Jun 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ synchronized boolean releaseCommit(final IndexCommit snapshotCommit) {
/**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
* If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit.
* If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
* In this case, this method will return the oldest index commit.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path, String)}
Expand Down Expand Up @@ -172,22 +173,13 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
return Math.min(commits.size() - 1, i + 1);
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
/*
* We may reach to this point in these cases:
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
* However, that commit may not be a safe commit if writes are in progress in the primary.
*/
// If an index was created before 6.2 or recovered from remote, we might not have a safe commit.
// In this case, we return the oldest index commit instead.
return 0;
}

Expand All @@ -204,11 +196,9 @@ synchronized boolean hasSnapshottedCommits() {
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonList;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -180,41 +178,6 @@ public void testAcquireIndexCommit() throws Exception {
Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
}

public void testLegacyIndex() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
final UUID translogUUID = UUID.randomUUID();

TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
assertThat(CombinedDeletionPolicy.findSafeCommitPoint(singletonList(legacyCommit), globalCheckpoint.get()),
equalTo(legacyCommit));

long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE);
long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE);
final IndexCommit freshCommit = mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, translogUUID, safeTranslogGen);

globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit.
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));

// Make the fresh commit safe.
resetDeletion(legacyCommit);
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(2)).delete();
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(getLocalCheckpoint(freshCommit) + 1));
}

public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
Expand Down Expand Up @@ -317,15 +280,4 @@ void resetDeletion(IndexCommit commit) {
private long getLocalCheckpoint(IndexCommit commit) throws IOException {
return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
}

IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
final IndexCommit commit = mock(IndexCommit.class);
when(commit.getUserData()).thenReturn(userData);
resetDeletion(commit);
return commit;
}

}