From db6b9c21c714411caf443ba23786818a15f5b6d7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 10 Feb 2020 08:26:01 -0500 Subject: [PATCH] Use local checkpoint to calculate min translog gen for recovery (#51905) Today we use the translog_generation of the safe commit as the minimum required translog generation for recovery. This approach has a limitation, where we won't be able to clean up translog unless we flush. Reopening an already recovered engine will create a new empty translog, and we leave it there until we force flush. This commit removes the translog_generation commit tag and uses the local checkpoint of the safe commit to calculate the minimum required translog generation for recovery instead. Closes #49970 --- .../test/indices.stats/20_translog.yml | 12 +- .../index/engine/CombinedDeletionPolicy.java | 12 +- .../index/engine/InternalEngine.java | 63 ++-- .../index/engine/NoOpEngine.java | 31 +- .../index/engine/ReadOnlyEngine.java | 5 +- .../org/elasticsearch/index/store/Store.java | 8 +- .../index/translog/Translog.java | 87 ++--- .../translog/TranslogDeletionPolicy.java | 47 +-- .../translog/TruncateTranslogAction.java | 17 +- .../indices/stats/IndicesStatsTests.java | 1 - .../index/IndexServiceTests.java | 16 +- .../engine/CombinedDeletionPolicyTests.java | 40 +-- .../index/engine/InternalEngineTests.java | 76 ++--- .../index/engine/NoOpEngineTests.java | 52 +-- .../RecoveryDuringReplicationTests.java | 2 +- .../index/shard/IndexShardIT.java | 14 +- .../index/shard/IndexShardTests.java | 15 +- .../shard/PrimaryReplicaSyncerTests.java | 9 +- .../elasticsearch/index/store/StoreTests.java | 4 - .../index/translog/TestTranslog.java | 19 +- .../translog/TranslogDeletionPolicyTests.java | 31 +- .../index/translog/TranslogTests.java | 317 ++++++++---------- .../PeerRecoveryTargetServiceTests.java | 9 +- .../indices/recovery/RecoveryTests.java | 4 - 24 files changed, 341 insertions(+), 550 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml index 20f9f1f4ef336..b6ed05610e41c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -88,6 +88,13 @@ cluster.health: wait_for_no_initializing_shards: true wait_for_events: languid + # Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required + # translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices. + - do: + indices.flush: + index: test + force: true + wait_if_ongoing: true - do: indices.stats: metric: [ translog ] @@ -115,10 +122,9 @@ - do: indices.stats: metric: [ translog ] - # after flushing we have one empty translog file while an empty index before flushing has two empty translog files. - - lt: { indices.test.primaries.translog.size_in_bytes: $creation_size } + - match: { indices.test.primaries.translog.size_in_bytes: $creation_size } - match: { indices.test.primaries.translog.operations: 0 } - - lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size } + - match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size } - match: { indices.test.primaries.translog.uncommitted_operations: 0 } --- diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 70507fd18e7a3..5fbf9e69b2400 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -121,16 +121,10 @@ private void updateRetentionPolicy() throws IOException { assert Thread.holdsLock(this); logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit)); assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; - final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; - final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - - assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen"; - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); - - softDeletesPolicy.setLocalCheckpointOfSafeCommit( - Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); + final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); } protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 365980f7e0885..b76a23c014154 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -400,7 +400,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(this, snapshot); } } @@ -473,23 +473,24 @@ public void skipTranslogRecovery() { } private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; - final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) { - opsRecovered = translogRecoveryRunner.run(this, snapshot); - } catch (Exception e) { - throw new EngineException(shardId, "failed to recover from translog", e); + final long localCheckpoint = getProcessedLocalCheckpoint(); + if (localCheckpoint < recoverUpToSeqNo) { + try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + opsRecovered = translogRecoveryRunner.run(this, snapshot); + } catch (Exception e) { + throw new EngineException(shardId, "failed to recover from translog", e); + } + } else { + opsRecovered = 0; } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit if (opsRecovered > 0) { - logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", - opsRecovered, translogGeneration == null ? null : - translogGeneration.translogFileGeneration, translog.currentFileGeneration()); + logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", + opsRecovered, translog.currentFileGeneration()); commitIndexWriter(indexWriter, translog, null); refreshLastCommittedSegmentInfos(); refresh("translog_recovery"); @@ -501,7 +502,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException { final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final String translogUUID = loadTranslogUUIDFromLastCommit(); + final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); + final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer); @@ -549,7 +551,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist ensureSoftDeletesEnabled(); return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); } else { - return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); + return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE); } } @@ -598,18 +600,6 @@ public long getWritingBytes() { return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes(); } - /** - * Reads the current stored translog ID from the last commit data. - */ - @Nullable - private String loadTranslogUUIDFromLastCommit() throws IOException { - final Map commitUserData = store.readLastCommittedSegmentsInfo().getUserData(); - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - return commitUserData.get(Translog.TRANSLOG_UUID_KEY); - } - /** * Reads the current stored history ID from the IW commit data. */ @@ -1688,8 +1678,9 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); - if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { + long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && + translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1714,8 +1705,10 @@ public boolean shouldPeriodicallyFlush() { if (shouldPeriodicallyFlushAfterBigMerge.get()) { return true; } + final long localCheckpointOfLastCommit = + Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final long translogGenerationOfLastCommit = - Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration; final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; @@ -2423,11 +2416,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); - final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); - final String translogUUID = translogGeneration.translogUUID; - final String localCheckpointValue = Long.toString(localCheckpoint); - writer.setLiveCommitData(() -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes @@ -2438,10 +2426,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(8); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); - commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); + final Map commitData = new HashMap<>(7); + commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } @@ -2657,7 +2644,7 @@ public boolean hasCompleteOperationHistory(String reason, HistorySource historyS return true; } final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 06520d3036c31..78f5a885def41 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -137,31 +138,23 @@ public void trimUnreferencedTranslogFiles() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); final List commits = DirectoryReader.listCommits(store.directory()); - if (commits.size() == 1) { + if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) { final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); - - if (minTranslogGeneration < lastCommitGeneration) { - // a translog deletion policy that retains nothing but the last translog generation from safe commit - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); - - try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, - engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { - translog.trimUnreferencedReaders(); - // refresh the translog stats - this.translogStats = translog.stats(); - } + final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { + translog.trimUnreferencedReaders(); + // refresh the translog stats + this.translogStats = translog.stats(); + assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " + + " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration(); } } } catch (final Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 2ffdcb71425eb..06cb780431386 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -223,15 +223,14 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } - final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = config.getTranslogConfig(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( config.getIndexSettings().getTranslogRetentionSize().getBytes(), config.getIndexSettings().getTranslogRetentionAge().getMillis(), config.getIndexSettings().getTranslogRetentionTotalFiles() ); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit); - + final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}) ) { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 16c5e636edb06..a43adaa3c49b5 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1453,10 +1453,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); } - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - updateCommitData(writer, map); + updateCommitData(writer, Collections.singletonMap(Translog.TRANSLOG_UUID_KEY, translogUUID)); } finally { metadataLock.writeLock().unlock(); } @@ -1517,7 +1514,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) { final List recoverableCommits = new ArrayList<>(); for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + final String translogGeneration = commit.getUserData().get("translog_generation"); + if (translogGeneration == null || minRetainedTranslogGen <= Long.parseLong(translogGeneration)) { recoverableCommits.add(commit); } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index dabd6829f5401..87d228352164f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -73,9 +73,7 @@ /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. - * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. The engine - * records the current translog generation {@link Translog#getGeneration()} in it's commit metadata using {@link #TRANSLOG_GENERATION_KEY} - * to reference the generation that contains all operations that have not yet successfully been committed to the engines lucene index. + * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. * Additionally, since Elasticsearch 2.0 the engine also records a {@link #TRANSLOG_UUID_KEY} with each commit to ensure a strong * association between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction * log that belongs to a @@ -106,7 +104,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * - we need to page align the last write before we sync, we can take advantage of ensureSynced for this since we might have already * fsynced far enough */ - public static final String TRANSLOG_GENERATION_KEY = "translog_generation"; public static final String TRANSLOG_UUID_KEY = "translog_uuid"; public static final String TRANSLOG_FILE_PREFIX = "translog-"; public static final String TRANSLOG_FILE_SUFFIX = ".tlog"; @@ -222,16 +219,7 @@ private ArrayList recoverFromFiles(Checkpoint checkpoint) throws ArrayList foundTranslogs = new ArrayList<>(); try (ReleasableLock ignored = writeLock.acquire()) { logger.debug("open uncommitted translog checkpoint {}", checkpoint); - - final long minGenerationToRecoverFrom; - if (checkpoint.minTranslogGeneration < 0) { - final Version indexVersionCreated = indexSettings().getIndexVersionCreated(); - assert indexVersionCreated.before(Version.V_6_0_0_beta1) : - "no minTranslogGeneration in checkpoint, but index was created with version [" + indexVersionCreated + "]"; - minGenerationToRecoverFrom = deletionPolicy.getMinTranslogGenerationForRecovery(); - } else { - minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; - } + final long minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; // we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on // the generation id we found in the lucene commit. This gives for better error messages if the wrong @@ -608,33 +596,28 @@ final Checkpoint getLastSyncedCheckpoint() { } } - /** - * Snapshots the current transaction log allowing to safely iterate over the snapshot. - * Snapshots are fixed in time and will not be updated with future operations. - */ + // for testing public Snapshot newSnapshot() throws IOException { - try (ReleasableLock ignored = readLock.acquire()) { - return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE); - } + return newSnapshot(0, Long.MAX_VALUE); } - public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException { + /** + * Creates a new translog snapshot containing operations from the given range. + * + * @param fromSeqNo the lower bound of the range (inclusive) + * @param toSeqNo the upper bound of the range (inclusive) + * @return the new snapshot + */ + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo; + assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - final long fromFileGen = fromGeneration.translogFileGeneration; - if (fromFileGen < getMinFileGeneration()) { - throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " + - "Min referenced generation is [" + getMinFileGeneration() + "]"); - } TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo) + .filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) .map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); final Snapshot snapshot = newMultiSnapshot(snapshots); - if (upToSeqNo == Long.MAX_VALUE) { - return snapshot; - } else { - return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo); - } + return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); } } @@ -668,15 +651,6 @@ public Operation readOperation(Location location) throws IOException { return null; } - public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot) - .toArray(TranslogSnapshot[]::new); - return newMultiSnapshot(snapshots); - } - } - private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOException { final Closeable onClose; if (snapshots.length == 0) { @@ -866,7 +840,7 @@ protected void closeOnTragicEvent(final Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit(); + long uncommittedGen = getMinGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1).translogFileGeneration; return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge()); } @@ -966,7 +940,7 @@ default int skippedOperations() { * shares the same underlying resources with the {@code delegate} snapshot, therefore we should not * use the {@code delegate} after passing it to this filtered snapshot. */ - static final class SeqNoFilterSnapshot implements Snapshot { + private static final class SeqNoFilterSnapshot implements Snapshot { private final Snapshot delegate; private int filteredOpsCount; private final long fromSeqNo; // inclusive @@ -1626,20 +1600,18 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl */ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { try (ReleasableLock ignored = readLock.acquire()) { - /* - * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the - * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will - * be the current translog generation as we do not need any prior generations to have a complete history up to the current local - * checkpoint. - */ - long minTranslogFileGeneration = this.currentFileGeneration(); - for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { - minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); - } + return new TranslogGeneration(translogUUID, minGenerationForSeqNo(seqNo, current, readers)); + } + } + + private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List readers) { + long minGen = writer.generation; + for (final TranslogReader reader : readers) { + if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { + minGen = Math.min(minGen, reader.getGeneration()); } - return new TranslogGeneration(translogUUID, minTranslogFileGeneration); } + return minGen; } /** @@ -1681,7 +1653,8 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current); + long minReferencedGen = Math.min(deletionPolicy.minTranslogGenRequired(readers, current), + minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers)); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 8a553aad326b7..a26b2dc15e9ad 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.HashMap; @@ -47,17 +48,7 @@ public void assertNoOpenTranslogRefs() { * translog generation */ private final Map translogRefCounts = new HashMap<>(); - - /** - * the translog generation that is requires to properly recover from the oldest non deleted - * {@link org.apache.lucene.index.IndexCommit}. - */ - private long minTranslogGenerationForRecovery = 1; - - /** - * This translog generation is used to calculate the number of uncommitted operations since the last index commit. - */ - private long translogGenerationOfLastCommit = 1; + private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; private long retentionSizeInBytes; @@ -76,23 +67,12 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill } } - public synchronized void setMinTranslogGenerationForRecovery(long newGen) { - if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) { - throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," + - "current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]"); + public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { + if (newCheckpoint < this.localCheckpointOfSafeCommit) { + throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " + + "current [" + this.localCheckpointOfSafeCommit + "] new [" + newCheckpoint + "]"); } - minTranslogGenerationForRecovery = newGen; - } - - /** - * Sets the translog generation of the last index commit. - */ - public synchronized void setTranslogGenerationOfLastCommit(long lastGen) { - if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) { - throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," + - "current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]"); - } - translogGenerationOfLastCommit = lastGen; + this.localCheckpointOfSafeCommit = newCheckpoint; } public synchronized void setRetentionSizeInBytes(long bytes) { @@ -172,7 +152,7 @@ synchronized long minTranslogGenRequired(List readers, TranslogW minByAgeAndSize = Math.max(minByAge, minBySize); } long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); + return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { @@ -222,16 +202,11 @@ private long getMinTranslogGenRequiredByLocks() { return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } - /** returns the translog generation that will be used as a basis of a future store/peer recovery */ - public synchronized long getMinTranslogGenerationForRecovery() { - return minTranslogGenerationForRecovery; - } - /** - * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. + * Returns the local checkpoint of the safe commit. This value is used to calculate the min required generation for recovery. */ - public synchronized long getTranslogGenerationOfLastCommit() { - return translogGenerationOfLastCommit; + public synchronized long getLocalCheckpointOfSafeCommit() { + return localCheckpointOfSafeCommit; } synchronized long getTranslogRefCount(long gen) { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index 9480ee3c1e1f3..42082e0629ac6 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -129,30 +129,25 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec // Retrieve the generation and UUID from the existing data commitData = commits.get(commits.size() - 1).getUserData(); - final String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); - if (translogGeneration == null || translogUUID == null) { - throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", - translogGeneration, translogUUID); + if (translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog UUID"); } final long globalCheckpoint = commitData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)) : SequenceNumbers.UNASSIGNED_SEQ_NO; - terminal.println("Translog Generation: " + translogGeneration); terminal.println("Translog UUID : " + translogUUID); terminal.println("History UUID : " + historyUUID); Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); - Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + final long gen = 1; + Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX); + Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX); // Write empty checkpoint and translog to empty files - long gen = Long.parseLong(translogGeneration); int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); @@ -192,7 +187,7 @@ long minTranslogGenRequired(List readers, TranslogWriter writer) }; try (Translog translog = new Translog(translogConfig, translogUUID, retainAllTranslogPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {}); - Translog.Snapshot snapshot = translog.newSnapshot()) { + Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) { //noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot while (snapshot.next() != null) { } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java index c7abf1172b0a5..10cdf95015aa2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java @@ -123,7 +123,6 @@ public void testCommitStats() throws Exception { assertNotNull(commitStats); assertThat(commitStats.getGeneration(), greaterThan(0L)); assertThat(commitStats.getId(), notNullValue()); - assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 0bec748072251..a032e4b4d519c 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -43,10 +43,8 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.Path; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -393,8 +391,6 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { .build()); Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); - final Path translogPath = translog.getConfig().getTranslogPath(); - final String translogUuid = translog.getTranslogUUID(); int translogOps = 0; final int numDocs = scaledRandomIntBetween(10, 100); @@ -415,15 +411,9 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); assertTrue(indexService.getTrimTranslogTask().mustReschedule()); - final long lastCommitedTranslogGeneration; - try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) { - Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); - lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - } - assertBusy(() -> { - long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid); - assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); - }); + final Engine readOnlyEngine = getEngine(indexService.getShard(0)); + assertBusy(() -> + assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES))); assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index deb224f5b1895..a2508ee3ec8f0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -58,20 +58,16 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); - final LongArrayList translogGenList = new LongArrayList(); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = 0; long lastCheckpoint = lastMaxSeqNo; - long lastTranslogGen = 0; final UUID translogUUID = UUID.randomUUID(); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - lastTranslogGen += between(1, 100); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); maxSeqNoList.add(lastMaxSeqNo); - translogGenList.add(lastTranslogGen); } int keptIndex = randomInt(commitList.size() - 1); @@ -88,8 +84,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { verify(commitList.get(i), never()).delete(); } } - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(keptIndex)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -105,7 +100,6 @@ public void testAcquireIndexCommit() throws Exception { CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); - long lastTranslogGen = between(1, 20); int safeIndex = 0; List commitList = new ArrayList<>(); List snapshottingCommits = new ArrayList<>(); @@ -115,8 +109,7 @@ public void testAcquireIndexCommit() throws Exception { for (int n = 0; n < newCommits; n++) { lastMaxSeqNo += between(1, 1000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - lastTranslogGen += between(1, 20); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); } // Advance the global checkpoint to between [safeIndex, safeIndex + 1) safeIndex = randomIntBetween(safeIndex, commitList.size() - 1); @@ -155,10 +148,7 @@ public void testAcquireIndexCommit() throws Exception { // Snapshotting commits must not be deleted. snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); // We don't need to retain translog for snapshotting commits. - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), - equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), - equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(safeIndex)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -171,8 +161,7 @@ public void testAcquireIndexCommit() throws Exception { assertThat(commitList.get(i).isDeleted(), equalTo(true)); } assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false)); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -188,19 +177,17 @@ public void testDeleteInvalidCommits() throws Exception { final List commitList = new ArrayList<>(); for (int i = 0; i < invalidCommits; i++) { long maxSeqNo = randomNonNegativeLong(); - commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID(), randomNonNegativeLong())); + commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID())); } final UUID expectedTranslogUUID = UUID.randomUUID(); - long lastTranslogGen = 0; final int validCommits = between(1, 10); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); for (int i = 0; i < validCommits; i++) { - lastTranslogGen += between(1, 1000); lastMaxSeqNo += between(1, 1000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID)); } // We should never keep invalid commits regardless of the value of the global checkpoint. @@ -222,12 +209,10 @@ public void testCheckUnreferencedCommits() throws Exception { int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); - long lastTranslogGen = between(1, 50); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); - lastTranslogGen += between(1, 100); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); } int safeCommitIndex = randomIntBetween(0, commitList.size() - 1); globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO))); @@ -236,8 +221,7 @@ public void testCheckUnreferencedCommits() throws Exception { if (safeCommitIndex == commitList.size() - 1) { // Safe commit is the last commit - no need to clean up - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } else { // Advanced but not enough for any commit after the safe commit becomes safe @@ -254,8 +238,7 @@ public void testCheckUnreferencedCommits() throws Exception { commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); // Safe commit is the last commit - no need to clean up - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } } @@ -271,12 +254,11 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { }; } - IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); final IndexCommit commit = mock(IndexCommit.class); final Directory directory = mock(Directory.class); when(commit.getUserData()).thenReturn(userData); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 335629928396d..5e21db432cea0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -98,6 +98,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -176,6 +177,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.IntSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.function.ToLongBiFunction; @@ -697,7 +699,6 @@ public long getProcessedCheckpoint() { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); - assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats1.getUserData(), hasKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); assertThat( Long.parseLong(stats1.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), @@ -722,11 +723,7 @@ public long getProcessedCheckpoint() { assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); assertThat(stats2.getId(), not(equalTo(stats1.getId()))); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); - assertThat( - stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), - not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); @@ -1157,6 +1154,7 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)); + engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, randomNonNegativeLong()); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); boolean inSync = randomBoolean(); @@ -1167,24 +1165,20 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 4L : 2L)); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(5L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L)); } public void testSyncedFlush() throws IOException { @@ -2817,7 +2811,7 @@ public void testSettings() { assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); } - public void testCurrentTranslogIDisCommitted() throws IOException { + public void testCurrentTranslogUUIIDIsCommitted() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, @@ -2842,7 +2836,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE)); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2852,18 +2845,9 @@ public void testCurrentTranslogIDisCommitted() throws IOException { try (InternalEngine engine = new InternalEngine(config)) { expectThrows(IllegalStateException.class, engine::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - if (i == 0) { - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } else { - // creating an empty index will create the first translog gen and commit it - // opening the empty index will make the second translog file but not commit it - // opening the engine again (i=0) will make the third translog file, which then be committed - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2876,7 +2860,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); @@ -2889,12 +2872,9 @@ public void testCurrentTranslogIDisCommitted() throws IOException { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("no changes - nothing to commit", "1", - userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -3010,8 +2990,9 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s globalCheckpointSupplier))) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); - final long committedGen = Long.valueOf( - engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + final long localCheckpoint = Long.parseLong( + engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); @@ -4518,7 +4499,6 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti for (final Map.Entry entry : threads.entrySet()) { final Map userData = finalActualEngine.commitStats().getUserData(); assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i))); - assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation))); entry.getValue().countDown(); entry.getKey().join(); finalActualEngine.flush(); @@ -4579,6 +4559,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { final EngineConfig engineConfig; final SeqNoStats prevSeqNoStats; final List prevDocs; + final List existingTranslog; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { engineConfig = engine.config(); for (final long seqNo : seqNos) { @@ -4597,6 +4578,9 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { engine.syncTranslog(); prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); prevDocs = getDocIds(engine, true); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + existingTranslog = TestTranslog.drainSnapshot(snapshot, false); + } } try (InternalEngine engine = new InternalEngine(engineConfig)) { final Translog.TranslogGeneration currrentTranslogGeneration = new Translog.TranslogGeneration( @@ -4607,8 +4591,10 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint())); assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo())); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshotFromGen(currrentTranslogGeneration, Long.MAX_VALUE)) { - assertThat("restore from local translog must not add operations to translog", snapshot, SnapshotMatchers.size(0)); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + assertThat("restore from local translog must not add operations to translog", + snapshot.totalOperations(), equalTo(existingTranslog.size())); + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(existingTranslog)); } } assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); @@ -5077,6 +5063,10 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. final Translog translog = engine.getTranslog(); + final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> { + long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + }; final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); @@ -5098,7 +5088,7 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = @@ -5107,11 +5097,11 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. generateNewSeqNo(engine); // create a gap here @@ -6039,11 +6029,21 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.forceMerge(randomBoolean(), 1, false, false, false); } } + if (randomBoolean()) { + // engine is flushed properly before shutting down. + engine.syncTranslog(); + globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); + engine.flush(); + } docs = getDocIds(engine, true); } try (InternalEngine engine = new InternalEngine(config)) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); + if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { + assertThat("engine should trim all unreferenced translog after recovery", + engine.getTranslog().getMinFileGeneration(), equalTo(engine.getTranslog().currentFileGeneration())); + } } } } @@ -6098,12 +6098,12 @@ public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws E engine.rollTranslogGeneration(); engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertThat(snapshot.totalOperations(), equalTo(0)); assertNull(snapshot.next()); } applyOperations(engine, operations); try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2)); + assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), equalTo(seqNos)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 623bbe0ec50db..08783dc4a1762 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -28,6 +28,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.ParsedDocument; @@ -36,14 +38,12 @@ import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collections; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -55,7 +55,6 @@ public class NoOpEngineTests extends EngineTestCase { public void testNoopEngine() throws IOException { engine.close(); final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir)); - expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null)); assertThat(engine.refreshNeeded(), equalTo(false)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); engine.close(); @@ -123,7 +122,7 @@ public void testNoOpEngineStats() throws Exception { for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { String delId = Integer.toString(i); - Engine.DeleteResult result = engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get())); + Engine.DeleteResult result = engine.delete(new Engine.Delete("_doc", delId, newUid(delId), primaryTerm.get())); assertTrue(result.isFound()); engine.syncTranslog(); // advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); @@ -131,7 +130,7 @@ public void testNoOpEngineStats() throws Exception { } } engine.getLocalCheckpointTracker().waitForProcessedOpsToComplete(numDocs + deletions - 1); - flushAndTrimTranslog(engine); + engine.flush(true, true); } final DocsStats expectedDocStats; @@ -168,52 +167,33 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table); tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); - - boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled(); + engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, randomNonNegativeLong()); final int numDocs = scaledRandomIntBetween(10, 3000); + int totalTranslogOps = 0; for (int i = 0; i < numDocs; i++) { + totalTranslogOps++; engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); tracker.updateLocalCheckpoint(allocationId.getId(), i); if (rarely()) { + totalTranslogOps = 0; engine.flush(); } + if (randomBoolean()) { + engine.rollTranslogGeneration(); + } } + // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. + final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot(); engine.flush(true, true); - - final String translogUuid = engine.getTranslog().getTranslogUUID(); - final long minFileGeneration = engine.getTranslog().getMinFileGeneration(); - final long currentFileGeneration = engine.getTranslog().currentFileGeneration(); engine.close(); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); - final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath(); - - final long lastCommitedTranslogGeneration; - try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) { - Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); - lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration)); - } - - assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration)); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeleteEnabled ? 0 : numDocs)); - assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - + assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); noOpEngine.trimUnreferencedTranslogFiles(); - - assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration)); assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - + assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long)Translog.DEFAULT_HEADER_SIZE_IN_BYTES)); + snapshot.close(); noOpEngine.close(); } - - private void flushAndTrimTranslog(final InternalEngine engine) { - engine.flush(true, true); - final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); - deletionPolicy.setRetentionSizeInBytes(-1); - deletionPolicy.setRetentionAgeInMillis(-1); - deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); - engine.flush(true, true); - } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 8df9046a4c685..43e76efee998e 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -803,7 +803,7 @@ public void testRollbackOnPromotion() throws Exception { shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); done.set(true); thread.join(); - + shards.syncGlobalCheckpoint(); for (IndexShard shard : shards) { shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 9c70accf7ef03..35b7af704dc47 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -352,17 +352,17 @@ public void testMaybeFlush() throws Exception { assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(190 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0") + new ByteSizeValue(135 /* size of the operation + one generation header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + client().prepareIndex("test", "_doc").setId("0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertFalse(shard.shouldPeriodicallyFlush()); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - new SourceToParse("test", "test", "1", new BytesArray("{}"), XContentType.JSON), + new SourceToParse("test", "_doc", "1", new BytesArray("{}"), XContentType.JSON), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = getTranslog(shard); assertEquals(2, translog.stats().getUncommittedOperations()); - client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) + client().prepareIndex("test", "_doc", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); @@ -376,7 +376,7 @@ public void testMaybeFlush() throws Exception { client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); - client().prepareDelete("test", "test", "2").get(); + client().prepareDelete("test", "_doc", "2").get(); logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async @@ -434,8 +434,8 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { final boolean flush = randomBoolean(); final Settings settings; if (flush) { - // size of the operation plus two generations of overhead. - settings = Settings.builder().put("index.translog.flush_threshold_size", "180b").build(); + // size of the operation plus the overhead of one generation. + settings = Settings.builder().put("index.translog.flush_threshold_size", "125b").build(); } else { // size of the operation plus header and footer settings = Settings.builder().put("index.translog.generation_threshold_size", "117b").build(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 64c8c2ef4a7e1..f34d0cbc10444 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -117,7 +117,6 @@ import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; -import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -1405,7 +1404,7 @@ public void run() { latch.await(); for (int i = 0; i < 10000; i++) { semaphore.acquire(); - shard.sync(TranslogTests.randomTranslogLocation(), (ex) -> semaphore.release()); + shard.sync(new Translog.Location(randomLong(), randomLong(), randomInt()), (ex) -> semaphore.release()); } } catch (Exception ex) { throw new RuntimeException(ex); @@ -2033,17 +2032,20 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { shard.sync(); // advance local checkpoint final int translogOps; + final int replayedOps; if (randomBoolean()) { // Advance the global checkpoint to remove the 1st commit; this shard will recover the 2nd commit. shard.updateGlobalCheckpointOnReplica(3, "test"); logger.info("--> flushing shard"); shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); translogOps = 4; // delete #1 won't be replayed. - } else if (randomBoolean()) { - shard.getEngine().rollTranslogGeneration(); - translogOps = 5; + replayedOps = 3; } else { + if (randomBoolean()) { + shard.getEngine().rollTranslogGeneration(); + } translogOps = 5; + replayedOps = 5; } final ShardRouting replicaRouting = shard.routingEntry(); @@ -2053,10 +2055,9 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(recoverFromStore(newShard)); - assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(replayedOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); - assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry())); assertDocCount(newShard, 3); closeShards(newShard); diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 8b0210113d825..ef51c066f7b87 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -126,15 +126,12 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { + assertThat(resyncTask.getSkippedOperations(), equalTo(0)); + assertThat(resyncTask.getResyncedOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); if (shard.indexSettings.isSoftDeleteEnabled()) { - assertThat(resyncTask.getSkippedOperations(), equalTo(0)); - assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); } else { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + assertThat(resyncTask.getTotalOperations(), equalTo(numDocs)); } } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 59feabb699d72..082c562257418 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -67,7 +67,6 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -845,9 +844,7 @@ public void testUserDataRead() throws IOException { writer.addDocument(doc); Map commitData = new HashMap<>(2); String syncId = "a sync id"; - String translogId = "a translog id"; commitData.put(Engine.SYNC_COMMIT_ID, syncId); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogId); writer.setLiveCommitData(commitData.entrySet()); writer.commit(); writer.close(); @@ -856,7 +853,6 @@ public void testUserDataRead() throws IOException { assertFalse(metadata.asMap().isEmpty()); // do not check for correct files, we have enough tests for that above assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); - assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_GENERATION_KEY), equalTo(translogId)); TestUtil.checkIndex(store.directory()); assertDeleteContent(store, store.directory()); IOUtils.close(store); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index d78a731674258..56de723ac50cf 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -22,12 +22,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.CombinedDeletionPolicy; import java.io.IOException; import java.nio.ByteBuffer; @@ -69,7 +65,7 @@ public class TestTranslog { * See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied. */ public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir) throws IOException { - corruptRandomTranslogFile(logger, random, translogDir, minTranslogGenUsedInRecovery(translogDir)); + corruptRandomTranslogFile(logger, random, translogDir, Translog.readCheckpoint(translogDir).minTranslogGeneration); } /** @@ -188,19 +184,6 @@ static void corruptFile(Logger logger, Random random, Path fileToCorrupt, boolea } } - /** - * Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog. - */ - private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException { - try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) { - List commits = DirectoryReader.listCommits(directory); - final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); - long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); - IndexCommit recoveringCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); - return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - } - } - /** * Returns the primary term associated with the current translog writer of the given translog. */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 0d296af5f0c8d..873a210df0aef 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.List; +import static java.lang.Math.min; import static org.hamcrest.Matchers.equalTo; @@ -48,12 +49,7 @@ public void testNoRetention() throws IOException { allGens.add(readersAndWriter.v2()); try { TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0); - assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); - final int committedReader = randomIntBetween(0, allGens.size() - 1); - final long committedGen = allGens.get(committedReader).generation; - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, allGens.get(allGens.size() - 1).generation); } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); @@ -127,8 +123,6 @@ public void testRetentionHierarchy() throws IOException { allGens.add(readersAndWriter.v2()); try { TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE); - deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE); - deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationByAge = allGens.get(selectedReader).generation; long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); @@ -145,31 +139,28 @@ public void testRetentionHierarchy() throws IOException { max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)); // make a new policy as committed gen can't go backwards (for now) deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles); - long committedGen = randomFrom(allGens).generation; - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, - max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); + assertMinGenRequired(deletionPolicy, readersAndWriter, + max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)); long viewGen = randomFrom(allGens).generation; try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) { assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); + min(viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); // disable age deletionPolicy.setRetentionAgeInMillis(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles))); + min(viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles))); // disable size deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles))); + min(viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles))); // disable age and zie deletionPolicy.setRetentionAgeInMillis(-1); deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + assertMinGenRequired(deletionPolicy, readersAndWriter, viewGen); // disable total files deletionPolicy.setRetentionTotalFiles(0); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + assertMinGenRequired(deletionPolicy, readersAndWriter, viewGen); } } finally { IOUtils.close(readersAndWriter.v1()); @@ -232,8 +223,4 @@ protected long currentTime() { private static long max3(long x1, long x2, long x3) { return Math.max(Math.max(x1, x2), x3); } - - private static long min3(long x1, long x2, long x3) { - return Math.min(Math.min(x1, x2), x3); - } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 616254a16f96d..9a64d09b3cef9 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -166,7 +166,7 @@ protected void afterIfSuccessful() throws Exception { if (translog.isOpen()) { if (translog.currentFileGeneration() > 1) { - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(Long.MAX_VALUE); translog.trimUnreferencedReaders(); assertFileDeleted(translog, translog.currentFileGeneration() - 1); } @@ -201,28 +201,6 @@ protected Translog openTranslog(TranslogConfig config, String translogUUID) thro } - private void markCurrentGenAsCommitted(Translog translog) throws IOException { - long genToCommit = translog.currentFileGeneration(); - long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit); - commit(translog, genToRetain, genToCommit); - } - - private void rollAndCommit(Translog translog) throws IOException { - translog.rollGeneration(); - markCurrentGenAsCommitted(translog); - } - - private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException { - final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); - deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); - long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent()); - translog.trimUnreferencedReaders(); - assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); - assertFilePresences(translog); - return minGenRequired; - } - @Override @Before public void setUp() throws Exception { @@ -356,7 +334,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - final long seqNo = randomNonNegativeLong(); + final long seqNo = randomLongBetween(0, Integer.MAX_VALUE); final String reason = randomAlphaOfLength(16); final long noopTerm = randomLongBetween(1, primaryTerm.get()); addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason)); @@ -389,9 +367,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - markCurrentGenAsCommitted(translog); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), firstId + 1), randomNonNegativeLong())) { + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo + 1, randomLongBetween(seqNo + 1, Long.MAX_VALUE))) { assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); } @@ -450,8 +426,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); assertThat(stats.getTranslogSizeInBytes(), equalTo(162L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(162L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(107L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.Delete("test", "2", 1, primaryTerm.get(), newUid("2"))); @@ -460,8 +436,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); assertThat(stats.getTranslogSizeInBytes(), equalTo(210L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(210L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(155L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.Delete("test", "3", 2, primaryTerm.get(), newUid("3"))); @@ -470,8 +446,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); assertThat(stats.getTranslogSizeInBytes(), equalTo(258L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(258L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(203L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); @@ -480,19 +456,18 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); assertThat(stats.getTranslogSizeInBytes(), equalTo(300L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(245L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } - final long expectedSizeInBytes = 355L; translog.rollGeneration(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(355L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } { @@ -501,26 +476,26 @@ public void testStats() throws IOException { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(355L)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); copy.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes - + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes + assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + 355 + + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + 300 + ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + "}}")); } } - - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE)); + translog.trimUnreferencedReaders(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(355L)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } } @@ -542,7 +517,7 @@ public void testUncommittedOperations() throws Exception { } assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { - markCurrentGenAsCommitted(translog); + deletionPolicy.setLocalCheckpointOfSafeCommit(i); assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } @@ -603,7 +578,7 @@ public void testOldestEntryInSeconds() { assertThat(e, hasToString(containsString("earliestLastModifiedAge must be >= 0"))); } - public void testSnapshot() throws IOException { + public void testBasicSnapshot() throws IOException { ArrayList ops = new ArrayList<>(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -611,13 +586,13 @@ public void testSnapshot() throws IOException { addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(1)); } - try (Translog.Snapshot snapshot = translog.newSnapshot(); - Translog.Snapshot snapshot1 = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = translog.newSnapshot(0, randomIntBetween(0, 10)); + Translog.Snapshot snapshot1 = translog.newSnapshot(0, randomIntBetween(0, 10))) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(1)); @@ -660,7 +635,7 @@ public void testSnapshotWithNewTranslog() throws IOException { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(2); assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { @@ -676,78 +651,62 @@ public void testSnapshotOnClosedTranslog() throws IOException { assertEquals(ex.getMessage(), "translog is already closed"); } - public void testSnapshotFromMinGen() throws Exception { - Map> operationsByGen = new HashMap<>(); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), 1), randomNonNegativeLong())) { - assertThat(snapshot, SnapshotMatchers.size(0)); - } - int iters = between(1, 10); - for (int i = 0; i < iters; i++) { - long currentGeneration = translog.currentFileGeneration(); - operationsByGen.putIfAbsent(currentGeneration, new ArrayList<>()); - int numOps = between(0, 20); - for (int op = 0; op < numOps; op++) { - long seqNo = randomLongBetween(0, 1000); - addToTranslogAndList(translog, operationsByGen.get(currentGeneration), new Translog.Index("test", - Long.toString(seqNo), seqNo, primaryTerm.get(), new byte[]{1})); - } - long minGen = randomLongBetween(translog.getMinFileGeneration(), translog.currentFileGeneration()); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), Long.MAX_VALUE)) { - List expectedOps = operationsByGen.entrySet().stream() - .filter(e -> e.getKey() >= minGen) - .flatMap(e -> e.getValue().stream()) - .collect(Collectors.toList()); - assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps)); - } - long upToSeqNo = randomLongBetween(0, 2000); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), upToSeqNo)) { - List expectedOps = operationsByGen.entrySet().stream() - .filter(e -> e.getKey() >= minGen) - .flatMap(e -> e.getValue().stream().filter(op -> op.seqNo() <= upToSeqNo)) - .collect(Collectors.toList()); - assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps)); - } - translog.rollGeneration(); - } - } - - public void testSeqNoFilterSnapshot() throws Exception { + public void testRangeSnapshot() throws Exception { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final int generations = between(2, 20); + Map> operationsByGen = new HashMap<>(); for (int gen = 0; gen < generations; gen++) { - List batch = LongStream.rangeClosed(0, between(0, 100)).boxed().collect(Collectors.toList()); - Randomness.shuffle(batch); - for (long seqNo : batch) { - Translog.Index op = - new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1}); + Set seqNos = new HashSet<>(); + int numOps = randomIntBetween(1, 100); + for (int i = 0; i < numOps; i++) { + final long seqNo = randomValueOtherThanMany(seqNos::contains, () -> randomLongBetween(0, 1000)); + minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); + seqNos.add(seqNo); + } + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()}); translog.add(op); + ops.add(op); } + operationsByGen.put(translog.currentFileGeneration(), ops); translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } } - List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - operations.add(op); + + if (minSeqNo > 0) { + long fromSeqNo = randomLongBetween(0, minSeqNo - 1); + long toSeqNo = randomLongBetween(fromSeqNo, minSeqNo - 1); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); } } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, between(200, 300), between(300, 400)); // out range - assertThat(filter, SnapshotMatchers.size(0)); - assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations())); - assertThat(filter.skippedOperations(), equalTo(snapshot.totalOperations())); + + long fromSeqNo = randomLongBetween(maxSeqNo + 1, Long.MAX_VALUE); + long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int fromSeqNo = between(-2, 500); - int toSeqNo = between(fromSeqNo, 500); - List selectedOps = operations.stream() - .filter(op -> fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo).collect(Collectors.toList()); - Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); - assertThat(filter, SnapshotMatchers.containsOperationsInAnyOrder(selectedOps)); - assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations())); - assertThat(filter.skippedOperations(), equalTo(snapshot.skippedOperations() + operations.size() - selectedOps.size())); + + fromSeqNo = randomLongBetween(0, 2000); + toSeqNo = randomLongBetween(fromSeqNo, 2000); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + Set seenSeqNos = new HashSet<>(); + List expectedOps = new ArrayList<>(); + for (long gen = translog.currentFileGeneration(); gen > 0; gen--) { + for (Translog.Operation op : operationsByGen.getOrDefault(gen, Collections.emptyList())) { + if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && seenSeqNos.add(op.seqNo())) { + expectedOps.add(op); + } + } + } + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(expectedOps)); } } @@ -769,6 +728,7 @@ private void assertFilePresences(Translog translog) { for (long gen = 1; gen < translog.getMinFileGeneration(); gen++) { assertFileDeleted(translog, gen); } + } static class LocationOperation implements Comparable { @@ -950,8 +910,8 @@ public void testVerifyTranslogIsNotDeleted() throws IOException { assertThat(snapshot.totalOperations(), equalTo(1)); } translog.close(); - - assertFileIsPresent(translog, 1); + assertFileDeleted(translog, 1); + assertFileIsPresent(translog, 2); } /** @@ -1023,9 +983,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep translog.rollGeneration(); // expose the new checkpoint (simulating a commit), before we trim the translog lastCommittedLocalCheckpoint.set(localCheckpoint); - deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration()); - deletionPolicy.setMinTranslogGenerationForRecovery( - translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); translog.trimUnreferencedReaders(); } } @@ -1094,7 +1052,7 @@ protected void doRun() throws Exception { // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); - try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(committedLocalCheckpointAtView + 1L, Long.MAX_VALUE)) { Translog.Operation op; while ((op = snapshot.next()) != null) { expectedOps.remove(op); @@ -1172,7 +1130,7 @@ public void testSyncUpTo() throws IOException { assertTrue("we only synced a previous operation yet", translog.syncNeeded()); } if (rarely()) { - rollAndCommit(translog); + translog.rollGeneration(); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } @@ -1192,7 +1150,7 @@ public void testSyncUpToStream() throws IOException { ArrayList locations = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { - rollAndCommit(translog); // do this first so that there is at least one pending tlog entry + translog.rollGeneration(); } final Translog.Location location = translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), @@ -1206,7 +1164,7 @@ public void testSyncUpToStream() throws IOException { // we are the last location so everything should be synced assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); } else if (rarely()) { - rollAndCommit(translog); + translog.rollGeneration(); // not syncing now assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); @@ -1229,7 +1187,7 @@ public void testLocationComparison() throws IOException { translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { - rollAndCommit(translog); + translog.rollGeneration(); } } Collections.shuffle(locations, random()); @@ -1412,7 +1370,7 @@ public void testBasicRecovery() throws IOException { Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { - rollAndCommit(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } @@ -1435,7 +1393,7 @@ public void testBasicRecovery() throws IOException { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(minUncommittedOp, Long.MAX_VALUE)) { for (int i = minUncommittedOp; i < translogOperations; i++) { assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); @@ -1866,7 +1824,9 @@ public void testOpenForeignTranslog() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { - rollAndCommit(translog); + translog.rollGeneration(); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); + translog.trimUnreferencedReaders(); firstUncommitted = op + 1; } } @@ -1887,7 +1847,7 @@ public void testOpenForeignTranslog() throws IOException { } this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); - try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = this.translog.newSnapshot(randomLongBetween(0, firstUncommitted), Long.MAX_VALUE)) { for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); assertNotNull("" + i, next); @@ -2068,7 +2028,7 @@ public void testFailFlush() throws IOException { } try { - rollAndCommit(translog); + translog.rollGeneration(); fail("already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -2246,28 +2206,29 @@ protected void afterAdd() throws IOException { */ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { int translogOperations = randomIntBetween(10, 100); - for (int op = 0; op < translogOperations / 2; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + int op = 0; + for (; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("_doc", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } translog.rollGeneration(); - long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); - for (int op = translogOperations / 2; op < translogOperations; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op); + for (op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } + long minRetainedGen = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -2276,7 +2237,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { assertFileIsPresent(translog, gen); } translog.trimUnreferencedReaders(); - for (long gen = 1; gen < comittedGeneration; gen++) { + for (long gen = 1; gen < minRetainedGen; gen++) { assertFileDeleted(translog, gen); } } @@ -2290,8 +2251,9 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { final FailSwitch fail = new FailSwitch(); fail.failNever(); final TranslogConfig config = getTranslogConfig(tempDir); - final long comittedGeneration; + final long localCheckpoint; final String translogUUID; + long minGenForRecovery = 1L; try (Translog translog = getFailableTranslog(fail, config)) { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); // disable retention so we trim things @@ -2299,24 +2261,25 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { deletionPolicy.setRetentionAgeInMillis(-1); translogUUID = translog.getTranslogUUID(); int translogOperations = randomIntBetween(10, 100); - for (int op = 0; op < translogOperations / 2; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + int op = 0; + for (; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } translog.rollGeneration(); - comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); - for (int op = translogOperations / 2; op < translogOperations; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op); + for (op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, translog.currentFileGeneration())); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; fail.failRandomly(); try { translog.trimUnreferencedReaders(); @@ -2325,16 +2288,16 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } } final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) { // we don't know when things broke exactly assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L)); - assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration)); + assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(minGenForRecovery)); assertFilePresences(translog); + minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; translog.trimUnreferencedReaders(); - assertThat(translog.getMinFileGeneration(), equalTo(comittedGeneration)); + assertThat(translog.getMinFileGeneration(), equalTo(minGenForRecovery)); assertFilePresences(translog); } } @@ -2642,7 +2605,7 @@ public void testWithRandomException() throws IOException { fail.failRandomly(); TranslogConfig config = getTranslogConfig(tempDir); final int numOps = randomIntBetween(100, 200); - long minGenForRecovery = 1; + long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; List syncedDocs = new ArrayList<>(); List unsynced = new ArrayList<>(); if (randomBoolean()) { @@ -2672,8 +2635,8 @@ public void testWithRandomException() throws IOException { unsynced.clear(); failableTLog.rollGeneration(); committing = true; - failableTLog.getDeletionPolicy().setTranslogGenerationOfLastCommit(failableTLog.currentFileGeneration()); - failableTLog.getDeletionPolicy().setMinTranslogGenerationForRecovery(failableTLog.currentFileGeneration()); + failableTLog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(opsAdded); + syncedDocs.clear(); failableTLog.trimUnreferencedReaders(); committing = false; syncedDocs.clear(); @@ -2705,7 +2668,7 @@ public void testWithRandomException() throws IOException { assertThat(unsynced, empty()); } generationUUID = failableTLog.getTranslogUUID(); - minGenForRecovery = failableTLog.getDeletionPolicy().getMinTranslogGenerationForRecovery(); + localCheckpointOfSafeCommit = failableTLog.getDeletionPolicy().getLocalCheckpointOfSafeCommit(); IOUtils.closeWhileHandlingException(failableTLog); } } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2717,8 +2680,7 @@ public void testWithRandomException() throws IOException { if (randomBoolean()) { try { TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); - deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { // failed - that's ok, we didn't even create it @@ -2729,8 +2691,7 @@ public void testWithRandomException() throws IOException { fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); - deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); if (generationUUID == null) { // we never managed to successfully create a translog, make it generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), @@ -2738,8 +2699,7 @@ public void testWithRandomException() throws IOException { } try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); - Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(generationUUID, minGenForRecovery), Long.MAX_VALUE)) { + Translog.Snapshot snapshot = translog.newSnapshot(localCheckpointOfSafeCommit + 1, Long.MAX_VALUE)) { assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { Translog.Operation next = snapshot.next(); @@ -2922,10 +2882,20 @@ public void testRollGeneration() throws Exception { .map(t -> t.getPrimaryTerm()).collect(Collectors.toList()); assertThat(storedPrimaryTerms, equalTo(primaryTerms)); } - long minGenForRecovery = randomLongBetween(generation, generation + rolls); - commit(translog, minGenForRecovery, generation + rolls); + + final BaseTranslogReader minRetainedReader = randomFrom( + Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .filter(r -> r.getCheckpoint().minSeqNo >= 0) + .collect(Collectors.toList())); + int retainedOps = Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .filter(r -> r.getCheckpoint().generation >= minRetainedReader.generation) + .mapToInt(r -> r.getCheckpoint().numOps) + .sum(); + deletionPolicy.setLocalCheckpointOfSafeCommit( + randomLongBetween(minRetainedReader.getCheckpoint().minSeqNo, minRetainedReader.getCheckpoint().maxSeqNo) - 1); + translog.trimUnreferencedReaders(); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(retainedOps)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); @@ -2933,17 +2903,17 @@ public void testRollGeneration() throws Exception { deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1); assertBusy(() -> { translog.trimUnreferencedReaders(); - for (long i = 0; i < minGenForRecovery; i++) { + for (long i = 0; i < minRetainedReader.generation; i++) { assertFileDeleted(translog, i); } }); } else { // immediate cleanup - for (long i = 0; i < minGenForRecovery; i++) { + for (long i = 0; i < minRetainedReader.generation; i++) { assertFileDeleted(translog, i); } } - for (long i = minGenForRecovery; i < generation + rolls; i++) { + for (long i = minRetainedReader.generation; i < generation + rolls; i++) { assertFileIsPresent(translog, i); } } @@ -3002,7 +2972,7 @@ public void testMinSeqNoBasedAPI() throws IOException { } assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); int readFromSnapshot = 0; - try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo, Long.MAX_VALUE)) { assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); Translog.Operation op; while ((op = snapshot.next()) != null) { @@ -3031,8 +3001,7 @@ public void testSimpleCommit() throws IOException { translog.rollGeneration(); } } - long lastGen = randomLongBetween(1, translog.currentFileGeneration()); - commit(translog, randomLongBetween(1, lastGen), lastGen); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(0, operations)); } public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { @@ -3044,9 +3013,8 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { translog.rollGeneration(); } if (rarely()) { - long lastGen = randomLongBetween(deletionPolicy.getTranslogGenerationOfLastCommit(), translog.currentFileGeneration()); - long minGen = randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), lastGen); - commit(translog, minGen, lastGen); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit( + randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i)); } if (frequently()) { long minGen; @@ -3069,7 +3037,7 @@ public void testReadGlobalCheckpoint() throws Exception { translog.rollGeneration(); } } - rollAndCommit(translog); + translog.rollGeneration(); translog.close(); assertThat(Translog.readGlobalCheckpoint(translogDir, translogUUID), equalTo(globalCheckpoint.get())); expectThrows(TranslogCorruptedException.class, () -> Translog.readGlobalCheckpoint(translogDir, UUIDs.randomBase64UUID())); @@ -3201,9 +3169,8 @@ public void testMaxSeqNo() throws Exception { translog.sync(); assertThat(translog.getMaxSeqNo(), equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); - long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() - .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) + .filter(e -> e.getKey() >= translog.getMinFileGeneration()).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index d641448736e10..0db222c14ea41 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SeqNoStats; @@ -177,14 +176,8 @@ public void testPrepareIndexForPeerRecovery() throws Exception { long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); - final Translog.TranslogGeneration recoveringTranslogGeneration; - try (Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit()) { - recoveringTranslogGeneration = new Translog.TranslogGeneration( - commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_UUID_KEY), - Long.parseLong(commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_GENERATION_KEY))); - } int expectedTotalLocal = 0; - try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromGen(recoveringTranslogGeneration, globalCheckpoint)) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot(safeCommit.get().localCheckpoint + 1, globalCheckpoint)) { Translog.Operation op; while ((op = snapshot.next()) != null) { if (op.seqNo() <= globalCheckpoint) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index f18ee83069f11..049bc15d8d733 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -276,21 +276,17 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { .setOpenMode(IndexWriterConfig.OpenMode.APPEND); Map userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData()); final String translogUUIDtoUse; - final long translogGenToUse; final String historyUUIDtoUse = UUIDs.randomBase64UUID(random()); if (randomBoolean()) { // create a new translog translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, replica.shardId(), replica.getPendingPrimaryTerm()); - translogGenToUse = 1; } else { translogUUIDtoUse = translogGeneration.translogUUID; - translogGenToUse = translogGeneration.translogFileGeneration; } try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) { userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse)); writer.setLiveCommitData(userData.entrySet()); writer.commit(); }