Skip to content

Commit

Permalink
Use local checkpoint of safe commit to calculate committed stats
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Feb 7, 2020
1 parent 6572ed4 commit c4bccea
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ private void updateRetentionPolicy() throws IOException {
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long localCheckpointOfLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfLastCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ public void trimUnreferencedTranslogFiles() {
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ protected void closeOnTragicEvent(final Exception ex) {
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
final long uncommittedGen = minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfLastCommit() + 1, current, readers);
final long uncommittedGen = minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers);
return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen),
sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void assertNoOpenTranslogRefs() {
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();
private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
private long localCheckpointOfLastCommit = SequenceNumbers.NO_OPS_PERFORMED;


public TranslogDeletionPolicy() {
Expand All @@ -66,14 +65,6 @@ public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
this.localCheckpointOfSafeCommit = newCheckpoint;
}

public synchronized void setLocalCheckpointOfLastCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfLastCommit) {
throw new IllegalArgumentException("local checkpoint of the last commit can't go backwards: " +
"current [" + this.localCheckpointOfLastCommit + "] new [" + newCheckpoint + "]");
}
this.localCheckpointOfLastCommit = newCheckpoint;
}

/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
* will not be deleted until the returned {@link Releasable} is closed.
Expand Down Expand Up @@ -134,12 +125,6 @@ public synchronized long getLocalCheckpointOfSafeCommit() {
return localCheckpointOfSafeCommit;
}

/**
* Returns the local checkpoint of the last commit. This value is used to calculate the translog stats.
*/
public synchronized long getLocalCheckpointOfLastCommit() {
return localCheckpointOfLastCommit;
}

synchronized long getTranslogRefCount(long gen) {
final Counter counter = translogRefCounts.get(gen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
}
}
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(keptIndex))));
assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(Math.max(NO_OPS_PERFORMED,
Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
Expand Down Expand Up @@ -149,7 +148,6 @@ public void testAcquireIndexCommit() throws Exception {
snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false)));
// We don't need to retain translog for snapshotting commits.
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(safeIndex))));
assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(
Math.max(NO_OPS_PERFORMED,
Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
Expand All @@ -163,7 +161,6 @@ public void testAcquireIndexCommit() throws Exception {
}
assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false));
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(
Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
Expand Down Expand Up @@ -224,7 +221,6 @@ public void testCheckUnreferencedCommits() throws Exception {
if (safeCommitIndex == commitList.size() - 1) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough for any commit after the safe commit becomes safe
Expand All @@ -242,7 +238,6 @@ public void testCheckUnreferencedCommits() throws Exception {
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ protected void afterIfSuccessful() throws Exception {

if (translog.isOpen()) {
if (translog.currentFileGeneration() > 1) {
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(Long.MAX_VALUE);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(Long.MAX_VALUE);
translog.trimUnreferencedReaders();
assertFileDeleted(translog, translog.currentFileGeneration() - 1);
Expand Down Expand Up @@ -479,7 +478,6 @@ public void testStats() throws IOException {
}
}
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE));
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomLongBetween(3, Long.MAX_VALUE));
translog.trimUnreferencedReaders();
{
final TranslogStats stats = stats();
Expand All @@ -506,8 +504,7 @@ public void testUncommittedOperations() throws Exception {
}
assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps));
if (frequently()) {
deletionPolicy.setLocalCheckpointOfSafeCommit(randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i));
deletionPolicy.setLocalCheckpointOfLastCommit(i);
deletionPolicy.setLocalCheckpointOfSafeCommit(i);
assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen));
uncommittedOps = operationsInLastGen;
}
Expand Down Expand Up @@ -625,7 +622,7 @@ public void testSnapshotWithNewTranslog() throws IOException {

Translog.Snapshot snapshot2 = translog.newSnapshot();
toClose.add(snapshot2);
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(2);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(2);
assertThat(snapshot2, containsOperationsInAnyOrder(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
} finally {
Expand Down Expand Up @@ -972,7 +969,6 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
translog.rollGeneration();
// expose the new checkpoint (simulating a commit), before we trim the translog
lastCommittedLocalCheckpoint.set(localCheckpoint);
deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog.trimUnreferencedReaders();
}
Expand Down Expand Up @@ -1360,7 +1356,7 @@ public void testBasicRecovery() throws IOException {
Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations - 1) {
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(op);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op);
minUncommittedOp = op + 1;
translogGeneration = translog.getGeneration();
}
Expand Down Expand Up @@ -1816,7 +1812,6 @@ public void testOpenForeignTranslog() throws IOException {
if (randomBoolean()) {
translog.rollGeneration();
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op);
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(op);
translog.trimUnreferencedReaders();
firstUncommitted = op + 1;
}
Expand Down Expand Up @@ -2219,7 +2214,6 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException {
TranslogConfig config = translog.getConfig();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, Long.MAX_VALUE));
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {});
assertThat(translog.getMinFileGeneration(), equalTo(1L));
Expand Down Expand Up @@ -2268,7 +2262,6 @@ public void testRecoveryFromFailureOnTrimming() throws IOException {
}
}
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, op));
minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
fail.failRandomly();
try {
Expand All @@ -2278,7 +2271,6 @@ public void testRecoveryFromFailureOnTrimming() throws IOException {
}
}
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, Long.MAX_VALUE));
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) {
Expand Down Expand Up @@ -2626,7 +2618,6 @@ public void testWithRandomException() throws IOException {
unsynced.clear();
failableTLog.rollGeneration();
committing = true;
failableTLog.getDeletionPolicy().setLocalCheckpointOfLastCommit(opsAdded);
failableTLog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(opsAdded);
syncedDocs.clear();
failableTLog.trimUnreferencedReaders();
Expand Down Expand Up @@ -2671,7 +2662,6 @@ public void testWithRandomException() throws IOException {
if (randomBoolean()) {
try {
TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfSafeCommit);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
Expand All @@ -2684,7 +2674,6 @@ public void testWithRandomException() throws IOException {
fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfSafeCommit);
if (generationUUID == null) {
// we never managed to successfully create a translog, make it
generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(),
Expand Down Expand Up @@ -2868,12 +2857,15 @@ public void testRollGeneration() throws Exception {
Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent()))
.filter(r -> r.getCheckpoint().minSeqNo >= 0)
.collect(Collectors.toList()));
int retainedOps = Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent()))
.filter(r -> r.getCheckpoint().generation >= minRetainedReader.generation)
.mapToInt(r -> r.getCheckpoint().numOps)
.sum();
deletionPolicy.setLocalCheckpointOfSafeCommit(
randomLongBetween(minRetainedReader.getCheckpoint().minSeqNo, minRetainedReader.getCheckpoint().maxSeqNo) - 1);
deletionPolicy.setLocalCheckpointOfLastCommit(seqNo);
translog.trimUnreferencedReaders();
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
assertThat(translog.stats().getUncommittedOperations(), equalTo(0));
assertThat(translog.stats().getUncommittedOperations(), equalTo(retainedOps));
// immediate cleanup
for (long i = 0; i < minRetainedReader.generation; i++) {
assertFileDeleted(translog, i);
Expand Down Expand Up @@ -2967,7 +2959,6 @@ public void testSimpleCommit() throws IOException {
}
}
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(0, operations));
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomLongBetween(operations, Long.MAX_VALUE));
}

public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException {
Expand All @@ -2981,8 +2972,6 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException {
if (rarely()) {
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(
randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i));
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(
randomLongBetween(deletionPolicy.getLocalCheckpointOfLastCommit(), Long.MAX_VALUE));
}
if (frequently()) {
long minGen;
Expand Down Expand Up @@ -3136,9 +3125,6 @@ public void testMaxSeqNo() throws Exception {
translog.sync();
assertThat(translog.getMaxSeqNo(),
equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values())));
if (randomBoolean()) {
translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomFrom(maxSeqNoPerGeneration.values()));
}
long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream()
.filter(e -> e.getKey() >= translog.getMinFileGeneration()).mapToLong(e -> e.getValue())
.max().orElse(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down

0 comments on commit c4bccea

Please sign in to comment.