Skip to content

Commit

Permalink
Support for translog pruning based on retention leases
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon committed Sep 2, 2021
1 parent b7cf1fa commit 49e7627
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ public final class IndexSettings {
settings -> Boolean.toString(IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(LegacyESVersion.V_7_0_0)),
Property.IndexScope, Property.Final);

/**
* Specifies if the index translog should prune based on retention leases.
*/
public static final Setting<Boolean> INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING =
Setting.boolSetting("index.translog.retention_lease.pruning.enabled", false,
Property.IndexScope, Property.Dynamic);

/**
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
Expand Down Expand Up @@ -389,6 +396,7 @@ public final class IndexSettings {
private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private final boolean softDeleteEnabled;
private volatile boolean translogPruningByRetentionLease;
private volatile long softDeleteRetentionOperations;

private volatile long retentionLeaseMillis;
Expand Down Expand Up @@ -525,6 +533,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(LegacyESVersion.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
translogPruningByRetentionLease = version.onOrAfter(LegacyESVersion.V_6_5_0) &&
scopedSettings.get(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING);
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING).millis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
Expand Down Expand Up @@ -593,6 +603,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING,
this::setTranslogPruningByRetentionLease);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
Expand Down Expand Up @@ -623,6 +635,10 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
this.flushAfterMergeThresholdSize = byteSizeValue;
}

private void setTranslogPruningByRetentionLease(boolean enabled) {
this.translogPruningByRetentionLease = enabled;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
Expand Down Expand Up @@ -1071,6 +1087,13 @@ public void setRequiredPipeline(final String requiredPipeline) {
this.requiredPipeline = requiredPipeline;
}

/**
* Returns <code>true</code> if translog ops should be pruned based on retention lease
*/
public boolean shouldPruneTranslogByRetentionLease() {
return translogPruningByRetentionLease;
}

/**
* Returns <code>true</code> if soft-delete is enabled.
*/
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,11 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran

}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize,
long softDeletesRetentionOps, boolean translogPruningByRetentionLease) {

}

/**
* Returns the timestamp of the last write in nanoseconds.
* Note: this time might not be absolutely accurate since the {@link Operation#startTime()} is used which might be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public InternalEngine(EngineConfig engineConfig) {
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(),
engineConfig.retentionLeasesSupplier()
);
store.incRef();
IndexWriter writer = null;
Expand Down Expand Up @@ -2572,7 +2573,8 @@ final void ensureCanFlush() {
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize,
long softDeletesRetentionOps, boolean translogPruningByRetentionLease) {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletes();
Expand All @@ -2585,6 +2587,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
translogDeletionPolicy.shouldPruneTranslogByRetentionLease(translogPruningByRetentionLease);
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
}

Expand Down Expand Up @@ -2702,6 +2705,9 @@ private void ensureSoftDeletesEnabled() {
@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
if (source.equals(HistorySource.TRANSLOG.name())) {
return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
}
ensureSoftDeletesEnabled();
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public final class MissingHistoryOperationsException extends IllegalStateException {

MissingHistoryOperationsException(String message) {
public MissingHistoryOperationsException(String message) {
super(message);
}
}
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1962,7 +1962,8 @@ public void onSettingsChanged() {
engineOrNull.onSettingsChanged(
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
indexSettings.getSoftDeleteRetentionOperations(),
indexSettings.shouldPruneTranslogByRetentionLease()
);
}
}
Expand Down Expand Up @@ -2009,6 +2010,16 @@ public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySourc
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
}

/**
*
* Creates a new history snapshot for reading operations since
* the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source,
long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(source.name(), mapperService, startingSeqNo, endSeqNo, true);
}
/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
Expand Down
23 changes: 18 additions & 5 deletions server/src/main/java/org/opensearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.MissingHistoryOperationsException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShardComponent;
Expand Down Expand Up @@ -614,7 +615,11 @@ final Checkpoint getLastSyncedCheckpoint() {

// for testing
public Snapshot newSnapshot() throws IOException {
return newSnapshot(0, Long.MAX_VALUE);
return newSnapshot(0, Long.MAX_VALUE, false);
}

public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
return newSnapshot(fromSeqNo, toSeqNo, false);
}

/**
Expand All @@ -624,7 +629,7 @@ public Snapshot newSnapshot() throws IOException {
* @param toSeqNo the upper bound of the range (inclusive)
* @return the new snapshot
*/
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo;
assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo;
try (ReleasableLock ignored = readLock.acquire()) {
Expand All @@ -633,7 +638,7 @@ public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
.filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo())
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
final Snapshot snapshot = newMultiSnapshot(snapshots);
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo, requiredFullRange);
}
}

Expand Down Expand Up @@ -959,14 +964,17 @@ default int skippedOperations() {
private static final class SeqNoFilterSnapshot implements Snapshot {
private final Snapshot delegate;
private int filteredOpsCount;
private int opsCount;
private boolean requiredFullRange;
private final long fromSeqNo; // inclusive
private final long toSeqNo; // inclusive

SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
this.delegate = delegate;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.requiredFullRange = requiredFullRange;
}

@Override
Expand All @@ -980,15 +988,20 @@ public int skippedOperations() {
}

@Override
public Operation next() throws IOException {
public Operation next() throws IOException, MissingHistoryOperationsException {
Translog.Operation op;
while ((op = delegate.next()) != null) {
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
opsCount++;
return op;
} else {
filteredOpsCount++;
}
}
if(requiredFullRange && (toSeqNo - fromSeqNo +1) != opsCount) {
throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found");
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import org.apache.lucene.util.Counter;
import org.opensearch.Assertions;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
Expand All @@ -43,10 +47,13 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class TranslogDeletionPolicy {

private final Map<Object, RuntimeException> openTranslogRef;
private Supplier<RetentionLeases> retentionLeasesSupplier;
private static long LEASE_DEFAULT_RETENTION_SIZE = 512;

public void assertNoOpenTranslogRefs() {
if (openTranslogRef.isEmpty() == false) {
Expand All @@ -69,6 +76,8 @@ public void assertNoOpenTranslogRefs() {

private int retentionTotalFiles;

private boolean shouldPruneTranslogByRetentionLease;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
Expand All @@ -80,6 +89,12 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill
}
}

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles,
Supplier<RetentionLeases> retentionLeasesSupplier) {
this(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles);
this.retentionLeasesSupplier = retentionLeasesSupplier;
}

public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " +
Expand All @@ -100,6 +115,11 @@ synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}

public synchronized void shouldPruneTranslogByRetentionLease(boolean translogPruneByRetentionLease) {
this.shouldPruneTranslogByRetentionLease = translogPruneByRetentionLease;
this.retentionSizeInBytes = Math.max(retentionSizeInBytes, new ByteSizeValue(LEASE_DEFAULT_RETENTION_SIZE, ByteSizeUnit.MB).getBytes());
}

/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
* will not be deleted until the returned {@link Releasable} is closed.
Expand Down Expand Up @@ -157,6 +177,12 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
long minByRetentionLeasesAndSize = Long.MAX_VALUE;
if(shouldPruneTranslogByRetentionLease) {
// If retention size is specified, size takes precedence.
long minByRetentionLeases = getMinTranslogGenByRetentionLease(readers, writer, retentionLeasesSupplier);
minByRetentionLeasesAndSize = Math.max(minBySize, minByRetentionLeases);
}
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
Expand All @@ -165,7 +191,28 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
long minByTranslogGenSettings = Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
return Math.min(minByTranslogGenSettings, minByRetentionLeasesAndSize);
}

static long getMinTranslogGenByRetentionLease(List<TranslogReader> readers, TranslogWriter writer,
Supplier<RetentionLeases> retentionLeasesSupplier) {
long minGen = writer.getGeneration();
final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);

for (int i = readers.size() - 1; i >= 0; i--) {
final TranslogReader reader = readers.get(i);
if(reader.getCheckpoint().minSeqNo <= minimumRetainingSequenceNumber &&
reader.getCheckpoint().maxSeqNo >= minimumRetainingSequenceNumber) {
minGen = Math.min(minGen, reader.getGeneration());
}
}
return minGen;
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,23 @@ && isTargetSameHistory()
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down.
startingSeqNo = softDeletesEnabled
? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
: 0;
if(softDeletesEnabled) {
final long minimumRetainingSequenceNumber = shard.getRetentionLeases()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);
final long safeCommitSeqNo = Long.parseLong(safeCommitRef.getIndexCommit()
.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;
if(shard.indexSettings().shouldPruneTranslogByRetentionLease()) {
startingSeqNo = SequenceNumbers.min(safeCommitSeqNo, minimumRetainingSequenceNumber);
} else {
startingSeqNo = safeCommitSeqNo;
}
} else {
startingSeqNo = 0;
}
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

try {
Expand Down

0 comments on commit 49e7627

Please sign in to comment.