Skip to content

Commit

Permalink
Create retention leases file during recovery (elastic#39359)
Browse files Browse the repository at this point in the history
Today we load the shard history retention leases from disk whenever opening the
engine, and treat a missing file as an empty set of leases. However in some
cases this is inappropriate: we might be restoring from a snapshot (if the
target index already exists then there may be leases on disk) or
force-allocating a stale primary, and in neither case does it make sense to
restore the retention leases from disk.

With this change we write an empty retention leases file during recovery,
except for the following cases:

- During peer recovery the on-disk leases may be accurate and could be needed
  if the recovery target is made into a primary.

- During recovery from an existing store, as long as we are not
  force-allocating a stale primary.

Relates elastic#37165
  • Loading branch information
DaveCTurner committed Mar 15, 2019
1 parent 7611210 commit a6b147f
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public boolean shouldBootstrapNewHistoryUUID() {
return false;
}

public boolean expectEmptyRetentionLeases() {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -181,6 +185,11 @@ public Type getType() {
public String toString() {
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
}

@Override
public boolean expectEmptyRetentionLeases() {
return bootstrapNewHistoryUUID;
}
}

/**
Expand Down Expand Up @@ -317,5 +326,10 @@ public Type getType() {
public String toString() {
return "peer recovery";
}

@Override
public boolean expectEmptyRetentionLeases() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);

// TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead.
assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null";
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
Expand All @@ -354,6 +357,11 @@ public void persistRetentionLeases(final Path path) throws IOException {
}
}

public boolean assertRetentionLeasesPersisted(final Path path) throws IOException {
assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null;
return true;
}

public static class CheckpointState implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,9 @@ private void innerOpenEngineAndTranslog() throws IOException {
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
updateRetentionLeasesOnReplica(loadRetentionLeases());
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
trimUnsafeCommits();
synchronized (mutex) {
verifyNotClosed();
Expand Down Expand Up @@ -2080,6 +2083,10 @@ public void persistRetentionLeases() throws IOException {
replicationTracker.persistRetentionLeases(path.getShardStatePath());
}

public boolean assertRetentionLeasesPersisted() throws IOException {
return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath());
}

/**
* Syncs the current retention leases to all replicas.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,11 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
}
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
if (store.ensureIndexHas6xCommitTags()) {
Expand All @@ -427,6 +429,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
Expand All @@ -439,6 +442,12 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
}

private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
}

private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
Expand Down Expand Up @@ -478,6 +487,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);

if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}

} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.index.seqno;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -31,6 +33,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -45,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -53,6 +57,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -391,6 +396,29 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
}

// Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
.put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")));
final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4));
final MockTransportService primaryTransportService
= (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
if (randomBoolean()) {
// return a ConnectTransportException to the START_RECOVERY action
final TransportService replicaTransportService
= internalCluster().getInstance(TransportService.class, connection.getNode().getName());
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
replicaTransportService.disconnectFromNode(primaryNode);
replicaTransportService.connectToNode(primaryNode);
} else {
// return an exception to the FINALIZE action
throw new ElasticsearchException("failing recovery for test purposes");
}
}
connection.sendRequest(requestId, action, request, options);
});

// now allow the replicas to be allocated and wait for recovery to finalize
allowNodes("index", 1 + numberOfReplicas);
ensureGreen("index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ public void testPersistence() throws IOException {
} finally {
closeShards(recoveredShard);
}

// we should not recover retention leases when force-allocating a stale primary
final IndexShard forceRecoveredShard = reinitShard(
indexShard,
ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE));
try {
recoverShardFromStore(forceRecoveredShard);
final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get();
assertThat(recoveredRetentionLeases.leases(), empty());
assertThat(recoveredRetentionLeases.version(), equalTo(0L));
} finally {
closeShards(forceRecoveredShard);
}
} finally {
closeShards(indexShard);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -43,6 +49,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -87,8 +94,8 @@ public void testSnapshotAndRestore() throws Exception {
int[] docCounts = new int[indexCount];
String[] indexNames = generateRandomNames(indexCount);
for (int i = 0; i < indexCount; i++) {
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
docCounts[i] = iterations(10, 1000);
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
addRandomDocuments(indexNames[i], docCounts[i]);
assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]);
}
Expand Down Expand Up @@ -267,6 +274,58 @@ public void testIndicesDeletedFromRepository() throws Exception {
}
}

public void testRetentionLeasesClearedOnRestore() throws Exception {
final String repoName = randomAsciiName();
logger.info("--> creating repository {}", repoName);
createAndCheckTestRepository(repoName);

final String indexName = randomAsciiName();
final int shardCount = randomIntBetween(1, 5);
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get());
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));

final int snapshotDocCount = iterations(10, 1000);
logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName);
addRandomDocuments(indexName, snapshotDocCount);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);

final String leaseId = randomAsciiName();
logger.info("--> adding retention lease with id {} to {}", leaseId, shardId);
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
shardId, leaseId, RETAIN_ALL, "test")).actionGet();

final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));

final String snapshotName = randomAsciiName();
logger.info("--> create snapshot {}:{}", repoName, snapshotName);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).setIndices(indexName));

if (randomBoolean()) {
final int extraDocCount = iterations(10, 1000);
logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName);
addRandomDocuments(indexName, extraDocCount);
}

logger.info("--> close index {}", indexName);
assertAcked(client().admin().indices().prepareClose(indexName));

logger.info("--> restore index {} from snapshot", indexName);
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true));

ensureGreen();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);

final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
.getRetentionLeaseStats().retentionLeases();
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
}

protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
Expand Down

0 comments on commit a6b147f

Please sign in to comment.