From 9bc332a080034deb09147bd203a3a73b28273e9d Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 15 Mar 2019 07:36:05 +0000 Subject: [PATCH] Create retention leases file during recovery (#39359) Today we load the shard history retention leases from disk whenever opening the engine, and treat a missing file as an empty set of leases. However in some cases this is inappropriate: we might be restoring from a snapshot (if the target index already exists then there may be leases on disk) or force-allocating a stale primary, and in neither case does it make sense to restore the retention leases from disk. With this change we write an empty retention leases file during recovery, except for the following cases: - During peer recovery the on-disk leases may be accurate and could be needed if the recovery target is made into a primary. - During recovery from an existing store, as long as we are not force-allocating a stale primary. Relates #37165 --- .../cluster/routing/RecoverySource.java | 14 +++++ .../index/seqno/ReplicationTracker.java | 8 +++ .../elasticsearch/index/shard/IndexShard.java | 7 +++ .../index/shard/StoreRecovery.java | 10 +++ .../indices/recovery/RecoveryTarget.java | 9 +++ .../index/seqno/RetentionLeaseIT.java | 28 +++++++++ .../shard/IndexShardRetentionLeaseTests.java | 14 +++++ .../ESBlobStoreRepositoryIntegTestCase.java | 61 ++++++++++++++++++- 8 files changed, 150 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index 94cd536f63805..90ae4ec3b2f8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -96,6 +96,10 @@ public boolean shouldBootstrapNewHistoryUUID() { return false; } + public boolean expectEmptyRetentionLeases() { + return true; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -174,6 +178,11 @@ public Type getType() { public String toString() { return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID; } + + @Override + public boolean expectEmptyRetentionLeases() { + return bootstrapNewHistoryUUID; + } } /** @@ -304,5 +313,10 @@ public Type getType() { public String toString() { return "peer recovery"; } + + @Override + public boolean expectEmptyRetentionLeases() { + return false; + } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index e238e86fa7224..702b829193bc2 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -329,6 +329,9 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re */ public RetentionLeases loadRetentionLeases(final Path path) throws IOException { final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); + + // TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead. + assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null"; if (retentionLeases == null) { return RetentionLeases.EMPTY; } @@ -354,6 +357,11 @@ public void persistRetentionLeases(final Path path) throws WriteStateException { } } + public boolean assertRetentionLeasesPersisted(final Path path) throws IOException { + assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null; + return true; + } + public static class CheckpointState implements Writeable { /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6476bbbb3ab48..99dab5a0f3918 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1434,6 +1434,9 @@ private void innerOpenEngineAndTranslog() throws IOException { final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); updateRetentionLeasesOnReplica(loadRetentionLeases()); + assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() + : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + + "] but got " + getRetentionLeases(); trimUnsafeCommits(); synchronized (mutex) { verifyNotClosed(); @@ -2029,6 +2032,10 @@ public void persistRetentionLeases() throws WriteStateException { replicationTracker.persistRetentionLeases(path.getShardStatePath()); } + public boolean assertRetentionLeasesPersisted() throws IOException { + return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath()); + } + /** * Syncs the current retention leases to all replicas. */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index d15de54c54e99..c97c19eb0f3ec 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -401,9 +401,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); + writeEmptyRetentionLeasesFile(indexShard); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { store.bootstrapNewHistory(); + writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size try { @@ -420,6 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); + writeEmptyRetentionLeasesFile(indexShard); } indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); @@ -432,6 +435,12 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe } } + private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException { + assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet + indexShard.persistRetentionLeases(); + assert indexShard.loadRetentionLeases().leases().isEmpty(); + } + private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException { final Directory directory = store.directory(); for (String name : Lucene.files(si)) { @@ -471,6 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + writeEmptyRetentionLeasesFile(indexShard); indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b75cb23e9e656..a97208561962e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -414,6 +414,15 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); + + if (indexShard.getRetentionLeases().leases().isEmpty()) { + // if empty, may be a fresh IndexShard, so write an empty leases file to disk + indexShard.persistRetentionLeases(); + assert indexShard.loadRetentionLeases().leases().isEmpty(); + } else { + assert indexShard.assertRetentionLeasesPersisted(); + } + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index f005e35469d93..4839844781e12 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -31,6 +33,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; @@ -45,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -53,6 +57,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -388,6 +393,29 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source)); } + // Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms"))); + final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4)); + final MockTransportService primaryTransportService + = (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName); + primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) { + if (randomBoolean()) { + // return a ConnectTransportException to the START_RECOVERY action + final TransportService replicaTransportService + = internalCluster().getInstance(TransportService.class, connection.getNode().getName()); + final DiscoveryNode primaryNode = primaryTransportService.getLocalNode(); + replicaTransportService.disconnectFromNode(primaryNode); + replicaTransportService.connectToNode(primaryNode); + } else { + // return an exception to the FINALIZE action + throw new ElasticsearchException("failing recovery for test purposes"); + } + } + connection.sendRequest(requestId, action, request, options); + }); + // now allow the replicas to be allocated and wait for recovery to finalize allowNodes("index", 1 + numberOfReplicas); ensureGreen("index"); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 163bfd14647ce..974e060bf2520 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -256,6 +256,20 @@ public void testPersistence() throws IOException { } finally { closeShards(recoveredShard); } + + // we should not recover retention leases when force-allocating a stale primary + final IndexShard forceRecoveredShard = reinitShard( + indexShard, + ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)); + try { + recoverShardFromStore(forceRecoveredShard); + final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get(); + assertThat(recoveredRetentionLeases.leases(), empty()); + assertThat(recoveredRetentionLeases.version(), equalTo(0L)); + } finally { + closeShards(forceRecoveredShard); + } } finally { closeShards(indexShard); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 2b8fba34c2f3e..8187a46fa7425 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -23,9 +23,15 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -43,6 +49,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -87,8 +94,8 @@ public void testSnapshotAndRestore() throws Exception { int[] docCounts = new int[indexCount]; String[] indexNames = generateRandomNames(indexCount); for (int i = 0; i < indexCount; i++) { - logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]); docCounts[i] = iterations(10, 1000); + logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]); addRandomDocuments(indexNames[i], docCounts[i]); assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]); } @@ -267,6 +274,58 @@ public void testIndicesDeletedFromRepository() throws Exception { } } + public void testRetentionLeasesClearedOnRestore() throws Exception { + final String repoName = randomAsciiName(); + logger.info("--> creating repository {}", repoName); + createAndCheckTestRepository(repoName); + + final String indexName = randomAsciiName(); + final int shardCount = randomIntBetween(1, 5); + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings( + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get()); + final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1)); + + final int snapshotDocCount = iterations(10, 1000); + logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName); + addRandomDocuments(indexName, snapshotDocCount); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); + + final String leaseId = randomAsciiName(); + logger.info("--> adding retention lease with id {} to {}", leaseId, shardId); + client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest( + shardId, leaseId, RETAIN_ALL, "test")).actionGet(); + + final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards()) + .filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get(); + final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases(); + assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId)); + + final String snapshotName = randomAsciiName(); + logger.info("--> create snapshot {}:{}", repoName, snapshotName); + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).setIndices(indexName)); + + if (randomBoolean()) { + final int extraDocCount = iterations(10, 1000); + logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName); + addRandomDocuments(indexName, extraDocCount); + } + + logger.info("--> close index {}", indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + + logger.info("--> restore index {} from snapshot", indexName); + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true)); + + ensureGreen(); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount); + + final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get() + .getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get() + .getRetentionLeaseStats().retentionLeases(); + assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId)); + } + protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) {