diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 9e73d6fdd9c4f..559d0fc62935b 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -156,6 +156,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 10aaaf821cc15..1a980e6731467 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -260,6 +260,13 @@ public final class IndexSettings { settings -> Boolean.toString(IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(LegacyESVersion.V_7_0_0)), Property.IndexScope, Property.Final); + /** + * Specifies if the index translog should prune based on retention leases. + */ + public static final Setting INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING = + Setting.boolSetting("index.translog.retention_lease.pruning.enabled", false, + Property.IndexScope, Property.Dynamic); + /** * Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted * documents increases the chance of operation-based recoveries and allows querying a longer history of documents. @@ -389,6 +396,7 @@ public final class IndexSettings { private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private final boolean softDeleteEnabled; + private volatile boolean translogPruningByRetentionLease; private volatile long softDeleteRetentionOperations; private volatile long retentionLeaseMillis; @@ -525,6 +533,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); softDeleteEnabled = version.onOrAfter(LegacyESVersion.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING); + translogPruningByRetentionLease = version.onOrAfter(LegacyESVersion.V_6_5_0) && + scopedSettings.get(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING); softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING); retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING).millis(); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); @@ -593,6 +603,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING, + this::setTranslogPruningByRetentionLease); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset); @@ -623,6 +635,10 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) { this.flushAfterMergeThresholdSize = byteSizeValue; } + private void setTranslogPruningByRetentionLease(boolean enabled) { + this.translogPruningByRetentionLease = enabled; + } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) { // ignore the translog retention settings if soft-deletes enabled @@ -1071,6 +1087,13 @@ public void setRequiredPipeline(final String requiredPipeline) { this.requiredPipeline = requiredPipeline; } + /** + * Returns true if translog ops should be pruned based on retention lease + */ + public boolean shouldPruneTranslogByRetentionLease() { + return translogPruningByRetentionLease; + } + /** * Returns true if soft-delete is enabled. */ diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c4be05b779d42..55a8c549188f1 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1852,6 +1852,11 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran } + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, + long softDeletesRetentionOps, boolean translogPruningByRetentionLease) { + + } + /** * Returns the timestamp of the last write in nanoseconds. * Note: this time might not be absolutely accurate since the {@link Operation#startTime()} is used which might be diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 4a6b8e21715ab..ff215ba46b5ae 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -227,7 +227,8 @@ public InternalEngine(EngineConfig engineConfig) { final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(), + engineConfig.retentionLeasesSupplier() ); store.incRef(); IndexWriter writer = null; @@ -2572,7 +2573,8 @@ final void ensureCanFlush() { } @Override - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, + long softDeletesRetentionOps, boolean translogPruningByRetentionLease) { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletes(); @@ -2585,6 +2587,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); + translogDeletionPolicy.shouldPruneTranslogByRetentionLease(translogPruningByRetentionLease); softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); } @@ -2702,6 +2705,9 @@ private void ensureSoftDeletesEnabled() { @Override public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { + if (source.equals(HistorySource.TRANSLOG.name())) { + return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange); + } ensureSoftDeletesEnabled(); ensureOpen(); refreshIfNeeded(source, toSeqNo); diff --git a/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java b/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java index 39defce5b1e59..7c7a32a57bf9a 100644 --- a/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java +++ b/server/src/main/java/org/opensearch/index/engine/MissingHistoryOperationsException.java @@ -38,7 +38,7 @@ */ public final class MissingHistoryOperationsException extends IllegalStateException { - MissingHistoryOperationsException(String message) { + public MissingHistoryOperationsException(String message) { super(message); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 75cf919185c13..efa1f20d6fa2c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1962,7 +1962,8 @@ public void onSettingsChanged() { engineOrNull.onSettingsChanged( disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations() + indexSettings.getSoftDeleteRetentionOperations(), + indexSettings.shouldPruneTranslogByRetentionLease() ); } } @@ -2009,6 +2010,16 @@ public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySourc return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo); } + /** + * + * Creates a new history snapshot for reading operations since + * the provided starting seqno (inclusive) and ending seqno (inclusive) + * The returned snapshot can be retrieved from either Lucene index or translog files. + */ + public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, + long startingSeqNo, long endSeqNo) throws IOException { + return getEngine().newChangesSnapshot(source.name(), mapperService, startingSeqNo, endSeqNo, true); + } /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)} diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 15104a4fc32be..3151e36785342 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -53,6 +53,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.MissingHistoryOperationsException; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShardComponent; @@ -614,7 +615,11 @@ final Checkpoint getLastSyncedCheckpoint() { // for testing public Snapshot newSnapshot() throws IOException { - return newSnapshot(0, Long.MAX_VALUE); + return newSnapshot(0, Long.MAX_VALUE, false); + } + + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + return newSnapshot(fromSeqNo, toSeqNo, false); } /** @@ -624,7 +629,7 @@ public Snapshot newSnapshot() throws IOException { * @param toSeqNo the upper bound of the range (inclusive) * @return the new snapshot */ - public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo; assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo; try (ReleasableLock ignored = readLock.acquire()) { @@ -633,7 +638,7 @@ public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { .filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) .map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); final Snapshot snapshot = newMultiSnapshot(snapshots); - return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); + return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo, requiredFullRange); } } @@ -959,14 +964,17 @@ default int skippedOperations() { private static final class SeqNoFilterSnapshot implements Snapshot { private final Snapshot delegate; private int filteredOpsCount; + private int opsCount; + private boolean requiredFullRange; private final long fromSeqNo; // inclusive private final long toSeqNo; // inclusive - SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) { + SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo, boolean requiredFullRange) { assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]"; this.delegate = delegate; this.fromSeqNo = fromSeqNo; this.toSeqNo = toSeqNo; + this.requiredFullRange = requiredFullRange; } @Override @@ -980,15 +988,20 @@ public int skippedOperations() { } @Override - public Operation next() throws IOException { + public Operation next() throws IOException, MissingHistoryOperationsException { Translog.Operation op; while ((op = delegate.next()) != null) { if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) { + opsCount++; return op; } else { filteredOpsCount++; } } + if(requiredFullRange && (toSeqNo - fromSeqNo +1) != opsCount) { + throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " + + "and to_seqno [" + toSeqNo + "] found"); + } return null; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java index 42f3893fd98ad..0740330eb3c32 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java @@ -35,6 +35,10 @@ import org.apache.lucene.util.Counter; import org.opensearch.Assertions; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SequenceNumbers; import java.io.IOException; @@ -43,10 +47,13 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; public class TranslogDeletionPolicy { private final Map openTranslogRef; + private Supplier retentionLeasesSupplier; + private static long LEASE_DEFAULT_RETENTION_SIZE = 512; public void assertNoOpenTranslogRefs() { if (openTranslogRef.isEmpty() == false) { @@ -69,6 +76,8 @@ public void assertNoOpenTranslogRefs() { private int retentionTotalFiles; + private boolean shouldPruneTranslogByRetentionLease; + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; @@ -80,6 +89,12 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill } } + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles, + Supplier retentionLeasesSupplier) { + this(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles); + this.retentionLeasesSupplier = retentionLeasesSupplier; + } + public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { if (newCheckpoint < this.localCheckpointOfSafeCommit) { throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " + @@ -100,6 +115,11 @@ synchronized void setRetentionTotalFiles(int retentionTotalFiles) { this.retentionTotalFiles = retentionTotalFiles; } + public synchronized void shouldPruneTranslogByRetentionLease(boolean translogPruneByRetentionLease) { + this.shouldPruneTranslogByRetentionLease = translogPruneByRetentionLease; + this.retentionSizeInBytes = Math.max(retentionSizeInBytes, new ByteSizeValue(LEASE_DEFAULT_RETENTION_SIZE, ByteSizeUnit.MB).getBytes()); + } + /** * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * will not be deleted until the returned {@link Releasable} is closed. @@ -157,6 +177,12 @@ synchronized long minTranslogGenRequired(List readers, TranslogW long minByLocks = getMinTranslogGenRequiredByLocks(); long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); + long minByRetentionLeasesAndSize = Long.MAX_VALUE; + if(shouldPruneTranslogByRetentionLease) { + // If retention size is specified, size takes precedence. + long minByRetentionLeases = getMinTranslogGenByRetentionLease(readers, writer, retentionLeasesSupplier); + minByRetentionLeasesAndSize = Math.max(minBySize, minByRetentionLeases); + } final long minByAgeAndSize; if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { // both size and age are disabled; @@ -165,7 +191,28 @@ synchronized long minTranslogGenRequired(List readers, TranslogW minByAgeAndSize = Math.max(minByAge, minBySize); } long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); + long minByTranslogGenSettings = Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); + return Math.min(minByTranslogGenSettings, minByRetentionLeasesAndSize); + } + + static long getMinTranslogGenByRetentionLease(List readers, TranslogWriter writer, + Supplier retentionLeasesSupplier) { + long minGen = writer.getGeneration(); + final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get() + .leases() + .stream() + .mapToLong(RetentionLease::retainingSequenceNumber) + .min() + .orElse(Long.MAX_VALUE); + + for (int i = readers.size() - 1; i >= 0; i--) { + final TranslogReader reader = readers.get(i); + if(reader.getCheckpoint().minSeqNo <= minimumRetainingSequenceNumber && + reader.getCheckpoint().maxSeqNo >= minimumRetainingSequenceNumber) { + minGen = Math.min(minGen, reader.getGeneration()); + } + } + return minGen; } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 2025126cdbd78..f276982b8eb8e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -261,9 +261,23 @@ && isTargetSameHistory() // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = softDeletesEnabled - ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L - : 0; + if(softDeletesEnabled) { + final long minimumRetainingSequenceNumber = shard.getRetentionLeases() + .leases() + .stream() + .mapToLong(RetentionLease::retainingSequenceNumber) + .min() + .orElse(Long.MAX_VALUE); + final long safeCommitSeqNo = Long.parseLong(safeCommitRef.getIndexCommit() + .getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; + if(shard.indexSettings().shouldPruneTranslogByRetentionLease()) { + startingSeqNo = SequenceNumbers.min(safeCommitSeqNo, minimumRetainingSequenceNumber); + } else { + startingSeqNo = safeCommitSeqNo; + } + } else { + startingSeqNo = 0; + } logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); try {