Skip to content

Commit

Permalink
Recover peers from translog, ignoring soft deletes (#38904)
Browse files Browse the repository at this point in the history
Today if soft deletes are enabled then we read the operations needed for peer
recovery from Lucene. However we do not currently make any attempt to retain
history in Lucene specifically for peer recoveries so we may discard it and
fall back to a more expensive file-based recovery. Yet we still retain
sufficient history in the translog to perform an operations-based peer
recovery.

In the long run we would like to fix this by retaining more history in Lucene,
possibly using shard history retention leases (#37165). For now, however, this
commit reverts to performing peer recoveries using the history retained in the
translog regardless of whether soft deletes are enabled or not.
  • Loading branch information
DaveCTurner authored Feb 15, 2019
1 parent d10fa1c commit 3f1125f
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,16 +502,11 @@ public void syncTranslog() throws IOException {
}

/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}

/**
Expand Down Expand Up @@ -2546,21 +2541,17 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
} else {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
return tracker.getCheckpoint() >= currentLocalCheckpoint;
}
return tracker.getCheckpoint() >= currentLocalCheckpoint;
}

/**
Expand All @@ -2575,7 +2566,15 @@ public final long getMinRetainedSeqNo() {
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
return softDeletesPolicy.acquireRetentionLock();
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
final Closeable translogRetentionLock;
try {
translogRetentionLock = translog.acquireRetentionLock();
} catch (Exception e) {
softDeletesRetentionLock.close();
throw e;
}
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
} else {
return translog.acquireRetentionLock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
// We must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
// still filter out legacy operations without seqNo.
startingSeqNo = 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ public synchronized void reset() {
stopTime = 0;
}

// for tests
public long getStartNanoTime() {
return startNanoTime;
}
}

public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
}
if (syncNeeded && globalCheckPoint < numDocs - 1) {
if (shard.indexSettings.isSoftDeleteEnabled()) {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
} else {
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
}
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
} else {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -786,4 +788,55 @@ public void sendRequest(Transport.Connection connection, long requestId, String
assertHitCount(client().prepareSearch(indexName).get(), numDocs);
}
}

@TestLogging("org.elasticsearch.indices.recovery:TRACE")
public void testHistoryRetention() throws Exception {
internalCluster().startNodes(3);

final String indexName = "test";
client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get();
ensureGreen(indexName);

// Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case
final List<IndexRequestBuilder> requests = new ArrayList<>();
final int replicatedDocCount = scaledRandomIntBetween(25, 250);
while (requests.size() < replicatedDocCount) {
requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON));
}
indexRandom(true, requests);
if (randomBoolean()) {
flush(indexName);
}

internalCluster().stopRandomNode(s -> true);
internalCluster().stopRandomNode(s -> true);

final long desyncNanoTime = System.nanoTime();
while (System.nanoTime() <= desyncNanoTime) {
// time passes
}

final int numNewDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numNewDocs; i++) {
client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();
}
// Flush twice to update the safe commit's local checkpoint
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));

assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)));
internalCluster().startNode();
ensureGreen(indexName);

final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName);
recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime);

assertThat(recoveryStates, hasSize(1));
assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0));
assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public void testTranslogHistoryTransferred() throws Exception {
shards.addReplica();
shards.startAll();
final IndexShard replica = shards.getReplicas().get(0);
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs));
shards.assertAllEqual(docs + moreDocs);
}
}
Expand Down Expand Up @@ -282,8 +281,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
shards.recoverReplica(newReplica);
// file based recovery should be made
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));

// history uuid was restored
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
Expand Down Expand Up @@ -387,8 +385,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception {
shards.recoverReplica(replica);
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
shards.assertAllEqual(numDocs);
}
}
Expand Down

0 comments on commit 3f1125f

Please sign in to comment.