From 26cdf0ad736833106e5d819998b5396eaceb2809 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 13 Dec 2019 13:56:50 -0500 Subject: [PATCH] Migrate peer recovery from translog to retention lease (#49448) Since 7.4, we switch from translog to Lucene as the source of history for peer recoveries. However, we reduce the likelihood of operation-based recoveries when performing a full cluster restart from pre-7.4 because existing copies do not have PPRL. To remedy this issue, we fallback using translog in peer recoveries if the recovering replica does not have a peer recovery retention lease, and the replication group hasn't fully migrated to PRRL. Relates #45136 --- .../upgrades/FullClusterRestartIT.java | 66 ++++++++++++++ .../elasticsearch/upgrades/RecoveryIT.java | 57 +++++++++++- .../elasticsearch/index/IndexSettings.java | 24 +++-- .../elasticsearch/index/engine/Engine.java | 24 +++-- .../index/engine/InternalEngine.java | 88 +++++++++++-------- .../index/engine/ReadOnlyEngine.java | 13 +-- .../index/seqno/ReplicationTracker.java | 9 +- .../elasticsearch/index/shard/IndexShard.java | 70 ++++++++++++--- .../index/shard/PrimaryReplicaSyncer.java | 5 +- .../recovery/RecoverySourceHandler.java | 37 ++++---- .../indices/recovery/RecoveryTarget.java | 4 +- .../index/IndexSettingsTests.java | 2 +- .../index/engine/InternalEngineTests.java | 30 ++++--- .../IndexLevelReplicationTests.java | 9 +- .../RetentionLeasesReplicationTests.java | 22 +++++ .../index/seqno/RetentionLeaseIT.java | 7 +- .../shard/PrimaryReplicaSyncerTests.java | 6 +- .../indices/recovery/RecoveryTests.java | 2 +- .../test/rest/ESRestTestCase.java | 75 ++++++++++++++++ .../action/bulk/BulkShardOperationsTests.java | 3 +- 20 files changed, 439 insertions(+), 114 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index ec961534eb0af..2d367261f8895 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.test.rest.ESRestTestCase; @@ -1168,6 +1169,12 @@ private void indexRandomDocuments( } } + private void indexDocument(String id) throws IOException { + final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id); + indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject())); + assertOK(client().performRequest(indexRequest)); + } + private int countOfIndexedRandomDocuments() throws IOException { return Integer.parseInt(loadInfoDocument(index + "_count")); } @@ -1248,4 +1255,63 @@ public void testPeerRecoveryRetentionLeases() throws IOException { RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index); } } + + /** + * Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some + * but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog) + * before we restart the cluster. This is important when we move from translog based to retention leases based + * peer recoveries. + */ + public void testOperationBasedRecovery() throws Exception { + if (isRunningAgainstOldCluster()) { + createIndex(index, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) + .build()); + ensureGreen(index); + int committedDocs = randomIntBetween(100, 200); + for (int i = 0; i < committedDocs; i++) { + indexDocument(Integer.toString(i)); + if (rarely()) { + flush(index, randomBoolean()); + } + } + flush(index, true); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); + // less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). + int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1)); + for (int i = 0; i < uncommittedDocs; i++) { + final String id = Integer.toString(randomIntBetween(1, 100)); + indexDocument(id); + } + } else { + ensureGreen(index); + assertNoFileBasedRecovery(index, n -> true); + } + } + + /** + * Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes. + */ + public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { + if (isRunningAgainstOldCluster()) { + createIndex(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build()); + ensureGreen(index); + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + indexDocument(Integer.toString(randomIntBetween(1, 100))); + if (rarely()) { + flush(index, randomBoolean()); + } + } + } else { + ensureGreen(index); + flush(index, true); + assertEmptyTranslog(index); + } + } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 58bff342085ee..cd4a07aab3ec0 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -487,10 +487,10 @@ public void testClosedIndexNoopRecovery() throws Exception { switch (CLUSTER_TYPE) { case OLD: break; case MIXED: - assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0")); + assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0")); break; case UPGRADED: - assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME)); + assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME)); break; } } @@ -647,7 +647,7 @@ public void testUpdateDoc() throws Exception { } } - private void assertNoFileBasedRecovery(String indexName, Predicate targetNode) throws IOException { + private void assertNoopRecoveries(String indexName, Predicate targetNode) throws IOException { Map recoveries = entityAsMap(client() .performRequest(new Request("GET", indexName + "/_recovery?detailed=true"))); @@ -678,4 +678,55 @@ private void assertNoFileBasedRecovery(String indexName, Predicate targe assertTrue("must find replica", foundReplica); } + + /** + * Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some + * but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog) + * before we upgrade each node. This is important when we move from translog based to retention leases based + * peer recoveries. + */ + public void testOperationBasedRecovery() throws Exception { + final String index = "test_operation_based_recovery"; + if (CLUSTER_TYPE == ClusterType.OLD) { + createIndex(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build()); + ensureGreen(index); + indexDocs(index, 0, randomIntBetween(100, 200)); + flush(index, randomBoolean()); + ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index); + // uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING). + indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); + } else { + ensureGreen(index); + assertNoFileBasedRecovery(index, nodeName -> + CLUSTER_TYPE == ClusterType.UPGRADED + || nodeName.startsWith(CLUSTER_NAME + "-0") + || (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false)); + indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3)); + } + } + + /** + * Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes. + */ + public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { + final String index = "turn_off_translog_retention"; + if (CLUSTER_TYPE == ClusterType.OLD) { + createIndex(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 2)) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build()); + ensureGreen(index); + indexDocs(index, 0, randomIntBetween(100, 200)); + flush(index, randomBoolean()); + indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 100)); + } + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + ensureGreen(index); + flush(index, true); + assertEmptyTranslog(index); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index a1994d65c8d73..99076f812b96e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -245,21 +245,22 @@ public final class IndexSettings { * Controls how long translog files that are no longer needed for persistence reasons * will be kept around before being deleted. Keeping more files is useful to increase * the chance of ops based recoveries for indices with soft-deletes disabled. - * This setting will be ignored if soft-deletes is enabled. + * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4). **/ public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = Setting.timeSetting("index.translog.retention.age", - settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE, - Property.Dynamic, Property.IndexScope); + settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), + TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope); /** * Controls how many translog files that are no longer needed for persistence reasons * will be kept around before being deleted. Keeping more files is useful to increase * the chance of ops based recoveries for indices with soft-deletes disabled. - * This setting will be ignored if soft-deletes is enabled. + * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4). **/ public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = - Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB", + Setting.byteSizeSetting("index.translog.retention.size", + settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB", Property.Dynamic, Property.IndexScope); /** @@ -577,7 +578,7 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) { } private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { - if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) { + if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) { // ignore the translog retention settings if soft-deletes enabled this.translogRetentionSize = new ByteSizeValue(-1); } else { @@ -586,7 +587,7 @@ private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { } private void setTranslogRetentionAge(TimeValue age) { - if (softDeleteEnabled && age.millis() >= 0) { + if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) { // ignore the translog retention settings if soft-deletes enabled this.translogRetentionAge = TimeValue.MINUS_ONE; } else { @@ -774,7 +775,7 @@ public TimeValue getRefreshInterval() { * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries */ public ByteSizeValue getTranslogRetentionSize() { - assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize; + assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize; return translogRetentionSize; } @@ -783,7 +784,7 @@ public ByteSizeValue getTranslogRetentionSize() { * around */ public TimeValue getTranslogRetentionAge() { - assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize; + assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize; return translogRetentionAge; } @@ -795,6 +796,11 @@ public int getTranslogRetentionTotalFiles() { return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings()); } + private static boolean shouldDisableTranslogRetention(Settings settings) { + return INDEX_SOFT_DELETES_SETTING.get(settings) + && IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0); + } + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 75ef4e0d12d77..26ecd4b29f39d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -66,6 +66,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; @@ -729,7 +730,7 @@ public enum SearcherScope { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public abstract Closeable acquireRetentionLock(); + public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource); /** * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). @@ -742,19 +743,20 @@ public abstract Translog.Snapshot newChangesSnapshot(String source, MapperServic * Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive). * The returned snapshot can be retrieved from either Lucene index or translog files. */ - public abstract Translog.Snapshot readHistoryOperations(String source, - MapperService mapperService, long startingSeqNo) throws IOException; + public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) throws IOException; /** * Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine. */ - public abstract int estimateNumberOfHistoryOperations(String source, - MapperService mapperService, long startingSeqNo) throws IOException; + public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) throws IOException; /** * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) */ - public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; + public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) throws IOException; /** * Gets the minimum retained sequence number for this engine. @@ -1795,7 +1797,8 @@ public IndexCommit getIndexCommit() { } } - public void onSettingsChanged() { + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + } /** @@ -1929,4 +1932,11 @@ public interface TranslogRecoveryRunner { * to advance this marker to at least the given sequence number. */ public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); + + /** + * Whether we should read history operations from translog or Lucene index + */ + public enum HistorySource { + TRANSLOG, INDEX + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b60c60b89b119..1e445c0b0ac85 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -68,6 +68,8 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -532,27 +534,31 @@ public void syncTranslog() throws IOException { * The returned snapshot can be retrieved from either Lucene index or translog files. */ @Override - public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); + public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) throws IOException { + if (historySource == HistorySource.INDEX) { + ensureSoftDeletesEnabled(); + return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); + } else { + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } - - return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } /** * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. */ @Override - public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), + public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) throws IOException { + if (historySource == HistorySource.INDEX) { + ensureSoftDeletesEnabled(); + try (Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)) { return snapshot.totalOperations(); } + } else { + return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); } - - return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); } @Override @@ -2479,15 +2485,15 @@ final void ensureCanFlush() { } } - public void onSettingsChanged() { + @Override + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletes(); final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); - final IndexSettings indexSettings = engineConfig.getIndexSettings(); - translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); - translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); - softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations()); + translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); + translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); + softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); } public MergeStats getMergeStats() { @@ -2594,12 +2600,17 @@ long getNumDocUpdates() { return numDocUpdates.count(); } + private void ensureSoftDeletesEnabled() { + if (softDeleteEnabled == false) { + assert false : "index " + shardId.getIndex() + " does not have soft-deletes enabled"; + throw new IllegalStateException("index " + shardId.getIndex() + " does not have soft-deletes enabled"); + } + } + @Override public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { - if (softDeleteEnabled == false) { - throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled"); - } + ensureSoftDeletesEnabled(); ensureOpen(); refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); @@ -2621,26 +2632,28 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS } @Override - public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) throws IOException { + if (historySource == HistorySource.INDEX) { + ensureSoftDeletesEnabled(); return getMinRetainedSeqNo() <= startingSeqNo; - } - - final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - // avoid scanning translog if not necessary - if (startingSeqNo > currentLocalCheckpoint) { - return true; - } - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsProcessed(operation.seqNo()); + } else { + final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); + // avoid scanning translog if not necessary + if (startingSeqNo > currentLocalCheckpoint) { + return true; + } + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + tracker.markSeqNoAsProcessed(operation.seqNo()); + } } } + return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint; } - return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint; } /** @@ -2648,13 +2661,14 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe * Operations whose seq# are at least this value should exist in the Lucene index. */ public final long getMinRetainedSeqNo() { - assert softDeleteEnabled : Thread.currentThread().getName(); + ensureSoftDeletesEnabled(); return softDeletesPolicy.getMinRetainedSeqNo(); } @Override - public Closeable acquireRetentionLock() { - if (softDeleteEnabled) { + public Closeable acquireHistoryRetentionLock(HistorySource historySource) { + if (historySource == HistorySource.INDEX) { + ensureSoftDeletesEnabled(); return softDeletesPolicy.acquireRetentionLock(); } else { return translog.acquireRetentionLock(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index df3c8e275f3c7..768655cf1c852 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -307,7 +307,7 @@ public void syncTranslog() { } @Override - public Closeable acquireRetentionLock() { + public Closeable acquireHistoryRetentionLock(HistorySource historySource) { return () -> {}; } @@ -317,21 +317,24 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) { throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled"); } - return readHistoryOperations(source, mapperService, fromSeqNo); + return newEmptySnapshot(); } @Override - public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) { return newEmptySnapshot(); } @Override - public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) { return 0; } @Override - public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { + public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, + MapperService mapperService, long startingSeqNo) { // we can do operation-based recovery if we don't have to replay any operation. return startingSeqNo > seqNoStats.getMaxSeqNo(); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 1c8599f66cf31..14e78973504bc 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -895,9 +895,10 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) || + this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() && + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) || (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && - indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN); + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN)); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; @@ -1348,6 +1349,10 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() { assert invariant(); } + public synchronized boolean hasAllPeerRecoveryRetentionLeases() { + return hasAllPeerRecoveryRetentionLeases; + } + /** * Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8185d8fad5f16..059961e1e5897 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -157,6 +158,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -268,6 +270,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicLong lastSearcherAccess = new AtomicLong(); private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); + private volatile boolean useRetentionLeasesInPeerRecovery; public IndexShard( final ShardRouting shardRouting, @@ -364,6 +367,7 @@ public boolean shouldCache(Query query) { refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); + this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); } public ThreadPool getThreadPool() { @@ -600,6 +604,17 @@ public void onFailure(Exception e) { if (newRouting.equals(currentRouting) == false) { indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); } + + if (indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery == false) { + final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + final Set shardRoutings = new HashSet<>(routingTable.getShards()); + shardRoutings.addAll(routingTable.assignedShards()); // include relocation targets + if (shardRoutings.stream().allMatch( + shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)))) { + useRetentionLeasesInPeerRecovery = true; + turnOffTranslogRetention(); + } + } } /** @@ -1877,38 +1892,63 @@ boolean shouldRollTranslogGeneration() { public void onSettingsChanged() { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery; + engineOrNull.onSettingsChanged( + useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), + useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations() + ); } } + private void turnOffTranslogRetention() { + logger.debug("turn off the translog retention for the replication group {} " + + "as it starts using retention leases exclusively in peer recoveries", shardId); + // Off to the generic threadPool as pruning the delete tombstones can be expensive. + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to turn off translog retention", e); + } + } + + @Override + protected void doRun() { + onSettingsChanged(); + trimTranslog(); + } + }); + } + /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public Closeable acquireRetentionLock() { - return getEngine().acquireRetentionLock(); + public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) { + return getEngine().acquireHistoryRetentionLock(source); } /** * Returns the estimated number of history operations whose seq# at least the provided seq# in this shard. */ - public int estimateNumberOfHistoryOperations(String source, long startingSeqNo) throws IOException { - return getEngine().estimateNumberOfHistoryOperations(source, mapperService, startingSeqNo); + public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { + return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo); } /** * Creates a new history snapshot for reading operations since the provided starting seqno (inclusive). * The returned snapshot can be retrieved from either Lucene index or translog files. */ - public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) throws IOException { - return getEngine().readHistoryOperations(source, mapperService, startingSeqNo); + public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { + return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo); } /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). - * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()} + * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)} */ - public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException { - return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo); + public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { + return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo); } /** @@ -2097,9 +2137,9 @@ public RetentionLease addRetentionLease( assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); - try (Closeable ignore = acquireRetentionLock()) { + try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { final long actualRetainingSequenceNumber = - retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; + retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener); } catch (final IOException e) { throw new AssertionError(e); @@ -2119,7 +2159,7 @@ public RetentionLease renewRetentionLease(final String id, final long retainingS assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); - try (Closeable ignore = acquireRetentionLock()) { + try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source); @@ -2600,6 +2640,10 @@ public List getPeerRecoveryRetentionLeases() { return replicationTracker.getPeerRecoveryRetentionLeases(); } + public boolean useRetentionLeasesInPeerRecovery() { + return useRetentionLeasesInPeerRecovery; + } + private SafeCommitInfo getSafeCommitInfo() { final Engine engine = getEngineOrNull(); return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index f4cd1cdb8115e..710c78a2d294e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; @@ -90,7 +91,9 @@ public void resync(final IndexShard indexShard, final ActionListener // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - snapshot = indexShard.getHistoryOperations("resync", startingSeqNo); + snapshot = indexShard.getHistoryOperations("resync", + indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, + startingSeqNo); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c372cc4571a7c..285edc329be06 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -169,22 +169,28 @@ public void recoverToTarget(ActionListener listener) { ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - final Closeable retentionLock = shard.acquireRetentionLock(); + final Engine.HistorySource historySource; + if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) { + historySource = Engine.HistorySource.INDEX; + } else { + historySource = Engine.HistorySource.TRANSLOG; + } + final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource); resources.add(retentionLock); final long startingSeqNo; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() - && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) - && (softDeletesEnabled == false - || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); + && shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo()) + && (historySource == Engine.HistorySource.TRANSLOG || + (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold. // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && softDeletesEnabled) { + if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -203,7 +209,11 @@ && isTargetSameHistory() if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - sendFileStep.onResponse(SendFileResult.EMPTY); + if (softDeletesEnabled && retentionLeaseRef.get() == null) { + createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); + } else { + sendFileStep.onResponse(SendFileResult.EMPTY); + } } else { final Engine.IndexCommitRef safeCommitRef; try { @@ -229,7 +239,7 @@ && isTargetSameHistory() logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); try { - final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); + final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo); final Releasable releaseStore = acquireStore(shard.store()); resources.add(releaseStore); sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { @@ -282,7 +292,8 @@ && isTargetSameHistory() sendFileStep.whenComplete(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + prepareTargetForTranslog( + shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep); }, onFailure); prepareEngineStep.whenComplete(prepareEngineTime -> { @@ -298,14 +309,10 @@ && isTargetSameHistory() final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); logger.trace("snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); + shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo)); + final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo); resources.add(phase2Snapshot); - - if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) { - // we can release the retention lock here because the snapshot itself will retain the required operations. - retentionLock.close(); - } + retentionLock.close(); // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values // are at least as high as the corresponding values on the primary when any of these operations were executed on it. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 7e0e5f7cf17c8..04d347d0006b1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -316,7 +316,9 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe private boolean hasUncommittedOperations() throws IOException { long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0; + return indexShard.estimateNumberOfHistoryOperations("peer-recovery", + indexShard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, + localCheckpointOfCommit + 1) > 0; } @Override diff --git a/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java b/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java index e64e7f42a9a2e..1a19647ea7f78 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java @@ -549,7 +549,7 @@ public void testSoftDeletesDefaultSetting() { public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() { Settings.Builder settings = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomIndexCompatibleVersion(random())); + .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_7_4_0, Version.CURRENT)); if (randomBoolean()) { settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue()); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index dc37022c130fe..c38f426dbfb59 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -376,7 +376,8 @@ public void testSegmentsWithoutSoftDeletes() throws Exception { assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); assertThat(segments.get(1).isCompound(), equalTo(true)); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); ParsedDocument doc4 = testParsedDocument("4", null, testDocumentWithTextField(), B_3, null); engine.index(indexForDoc(doc4)); engine.refresh("test"); @@ -1623,7 +1624,8 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); globalCheckpoint.set(localCheckpoint); engine.syncTranslog(); @@ -1714,7 +1716,8 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); // If we already merged down to 1 segment, then the next force-merge will be a noop. We need to add an extra segment to make // merges happen so we can verify that _recovery_source are pruned. See: https://github.com/elastic/elasticsearch/issues/41628. final int numSegments; @@ -5040,7 +5043,8 @@ public void testShouldPeriodicallyFlush() throws Exception { .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); @@ -5088,7 +5092,8 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build(); indexSettings.updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); @@ -5113,7 +5118,8 @@ public void testStressShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b") .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); final int numOps = scaledRandomIntBetween(100, 10_000); for (int i = 0; i < numOps; i++) { final long localCheckPoint = engine.getProcessedLocalCheckpoint(); @@ -5141,7 +5147,8 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build(); engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null); final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), @@ -5411,7 +5418,8 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (rarely()) { settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - engine.onSettingsChanged(); + engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations()); } if (rarely()) { engine.refresh("test"); @@ -5424,7 +5432,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (rarely()) { engine.forceMerge(randomBoolean()); } - try (Closeable ignored = engine.acquireRetentionLock()) { + try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); @@ -5705,9 +5713,9 @@ public void testRequireSoftDeletesWhenAccessingChangesSnapshot() throws Exceptio IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder(). put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)).build()); try (InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) { - IllegalStateException error = expectThrows(IllegalStateException.class, + AssertionError error = expectThrows(AssertionError.class, () -> engine.newChangesSnapshot("test", createMapperService(), 0, randomNonNegativeLong(), randomBoolean())); - assertThat(error.getMessage(), equalTo("accessing changes snapshot requires soft-deletes enabled")); + assertThat(error.getMessage(), containsString("does not have soft-deletes enabled")); } } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 4882b9262d98c..f565459b26eba 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -470,7 +470,8 @@ public long addDocument(Iterable doc) throws IOExcepti assertThat(snapshot.totalOperations(), equalTo(0)); } } - try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = shard.getHistoryOperations( + "test", shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } @@ -488,7 +489,8 @@ public long addDocument(Iterable doc) throws IOExcepti assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2))); } } - try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = shard.getHistoryOperations( + "test", shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } @@ -585,7 +587,8 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2).get(); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); - try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = replica3.getHistoryOperations( + "test", replica3.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); final List expectedOps = new ArrayList<>(initOperations); expectedOps.add(op2); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java index 75de0bb677296..7ca6c1ef1bf7f 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; import java.util.List; @@ -147,6 +148,27 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti } } + public void testTurnOffTranslogRetentionAfterAllShardStarted() throws Exception { + final Settings.Builder settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + if (randomBoolean()) { + settings.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomIndexCompatibleVersion(random())); + } + try (ReplicationGroup group = createGroup(between(1, 2), settings.build())) { + group.startAll(); + group.indexDocs(randomIntBetween(1, 10)); + for (IndexShard shard : group) { + shard.updateShardState(shard.routingEntry(), shard.getOperationPrimaryTerm(), null, 1L, + group.getPrimary().getReplicationGroup().getInSyncAllocationIds(), + group.getPrimary().getReplicationGroup().getRoutingTable()); + } + group.syncGlobalCheckpoint(); + group.flush(); + for (IndexShard shard : group) { + assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(0)); + } + } + } + static final class SyncRetentionLeasesResponse extends ReplicationResponse { final RetentionLeaseSyncAction.Request syncRequest; SyncRetentionLeasesResponse(RetentionLeaseSyncAction.Request syncRequest) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index a52f816ff6df2..831422f8dad86 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -110,7 +111,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = countDownLatchListener(latch); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -160,7 +161,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = countDownLatchListener(latch); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -171,7 +172,7 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final CountDownLatch latch = new CountDownLatch(1); primary.removeRetentionLease(id, countDownLatchListener(latch)); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; currentRetentionLeases.remove(id); latch.await(); retentionLock.close(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d6cd94cd07aaa..3099795b138d9 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.TestTranslog; @@ -63,6 +64,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -210,7 +212,9 @@ public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception { operations.add(new Translog.Index( Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1})); } - doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong()); + Engine.HistorySource source = + shard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG; + doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong()); TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); List sentOperations = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 35f431e84eb9b..05aebcc459a6e 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -245,7 +245,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); shards.assertAllEqual(3); - try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) { assertThat(snapshot, SnapshotMatchers.size(6)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 965e4ae37fa97..1eb52ac6de769 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -1062,4 +1063,78 @@ private static boolean isXPackTemplate(String name) { return false; } } + + public void flush(String index, boolean force) throws IOException { + logger.info("flushing index {} force={}", index, force); + final Request flushRequest = new Request("POST", "/" + index + "/_flush"); + flushRequest.addParameter("force", Boolean.toString(force)); + flushRequest.addParameter("wait_if_ongoing", "true"); + assertOK(client().performRequest(flushRequest)); + } + + /** + * Asserts that replicas on nodes satisfying the {@code targetNode} should have perform operation-based recoveries. + */ + public void assertNoFileBasedRecovery(String indexName, Predicate targetNode) throws IOException { + Map recoveries = entityAsMap(client().performRequest(new Request("GET", indexName + "/_recovery?detailed=true"))); + @SuppressWarnings("unchecked") + List> shards = (List>) XContentMapValues.extractValue(indexName + ".shards", recoveries); + assertNotNull(shards); + boolean foundReplica = false; + logger.info("index {} recovery stats {}", indexName, shards); + for (Map shard : shards) { + if (shard.get("primary") == Boolean.FALSE && targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) { + List details = (List) XContentMapValues.extractValue("index.files.details", shard); + // once detailed recoveries works, remove this if. + if (details == null) { + long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue(); + long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue(); + logger.info("total [{}] reused [{}]", totalFiles, reusedFiles); + assertThat("must reuse all files, recoveries [" + recoveries + "]", totalFiles, equalTo(reusedFiles)); + } else { + assertNotNull(details); + assertThat(details, Matchers.empty()); + } + foundReplica = true; + } + } + assertTrue("must find replica", foundReplica); + } + + /** + * Asserts that we do not retain any extra translog for the given index (i.e., turn off the translog retention) + */ + public void assertEmptyTranslog(String index) throws Exception { + Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards"))); + assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0)); + assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.operations", stats), equalTo(0)); + } + + /** + * Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures + * that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies. + */ + public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception { + assertBusy(() -> { + Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards"))); + @SuppressWarnings("unchecked") Map>> shards = + (Map>>) XContentMapValues.extractValue("indices." + index + ".shards", stats); + for (List> shard : shards.values()) { + for (Map copy : shard) { + Integer globalCheckpoint = (Integer) XContentMapValues.extractValue("seq_no.global_checkpoint", copy); + assertNotNull(globalCheckpoint); + @SuppressWarnings("unchecked") List> retentionLeases = + (List>) XContentMapValues.extractValue("retention_leases.leases", copy); + if (retentionLeases == null) { + continue; + } + for (Map retentionLease : retentionLeases) { + if (((String) retentionLease.get("id")).startsWith("peer_recovery/")) { + assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1)); + } + } + } + } + }, 60, TimeUnit.SECONDS); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index b14403406dd66..2510a177056d3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -75,7 +76,7 @@ public void testPrimaryTermFromFollower() throws IOException { operations, numOps - 1, followerPrimary, logger); - try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); Translog.Operation operation; while ((operation = snapshot.next()) != null) {