Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based recovery with soft-deletes should send only operations after the local checkpoint #33190

Merged
merged 4 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,13 @@ public RecoveryResponse recoverToTarget() throws IOException {
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// we set this to 0 to create a translog roughly according to the retention policy
// on the target. Note that it will still filter out legacy operations with no sequence numbers
startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled.
// but we must have everything above the local checkpoint in the commit
// We must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
// both cases will be zero once we start sending only ops after local checkpoint of the safe commit
int expectedTranslogOps = softDeleteEnabled ? numDocs + moreDocs : 0;
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(expectedTranslogOps));
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {

@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
public void testRecoveryAfterPrimaryPromotion() throws Exception {
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
try (ReplicationGroup shards = createGroup(2, settings)) {
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
int totalDocs = shards.indexDocs(randomInt(10));
int committedDocs = 0;
Expand All @@ -232,7 +231,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard replica = shards.getReplicas().get(1);
boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled();
if (randomBoolean()) {
// simulate docs that were inflight when primary failed, these will be rolled back
final int rollbackDocs = randomIntBetween(1, 5);
Expand Down Expand Up @@ -280,12 +278,13 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo()));
});
newPrimary.flush(new FlushRequest().force(true));
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
// we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen
if (softDeleteEnabled) {
if (replica.indexSettings().isSoftDeleteEnabled()) {
// We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen.
// The min_retained_seqno only advances when a merge asks for the retention query.
newPrimary.flush(new FlushRequest().force(true));
}
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
}

if (randomBoolean()) {
Expand All @@ -305,8 +304,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
} else {
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary;
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps));
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary));
}

// roll back the extra ops in the replica
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public void testTranslogHistoryTransferred() throws Exception {
int docs = shards.indexDocs(10);
getTranslog(shards.getPrimary()).rollGeneration();
shards.flush();
if (randomBoolean()) {
docs += shards.indexDocs(10);
}
int moreDocs = shards.indexDocs(randomInt(10));
shards.addReplica();
shards.startAll();
final IndexShard replica = shards.getReplicas().get(0);
assertThat(getTranslog(replica).totalOperations(), equalTo(docs));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
shards.assertAllEqual(docs + moreDocs);
}
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception {
}
}

public void testRecoveryWithOutOfOrderDelete() throws Exception {
public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception {
/*
* The flow of this test:
* - delete #1
Expand All @@ -117,12 +117,9 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
* - index #2
* - index #5
* - If flush and the translog/lucene retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
* - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
*/
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10)
// If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted
// index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build();
Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
try (ReplicationGroup shards = createGroup(1, settings)) {
shards.startAll();
// create out of order delete and index op on replica
Expand All @@ -131,7 +128,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {

// delete #1
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation
// index #0
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
Expand All @@ -151,17 +148,16 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
final int translogOps;
if (randomBoolean()) {
if (randomBoolean()) {
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
logger.info("--> flushing shard (translog will be trimmed)");
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1"));
orgReplica.indexSettings().updateIndexMetaData(builder.build());
orgReplica.onSettingsChanged();
translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
} else {
logger.info("--> flushing shard (translog/soft-deletes will be retained)");
logger.info("--> flushing shard (translog will be retained)");
translogOps = 6; // 5 ops + seqno gaps
}
flushShard(orgReplica);
Expand All @@ -180,6 +176,62 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception {
}
}

public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10)
// If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted
// index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build();
try (ReplicationGroup shards = createGroup(1, settings)) {
shards.startAll();
// create out of order delete and index op on replica
final IndexShard orgReplica = shards.getReplicas().get(0);
final String indexName = orgReplica.shardId().getIndexName();

// delete #1
orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id");
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
// index #0
orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON));
// index #3
orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON));
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
// index #2
orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON));
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
// index #5 -> force NoOp #4.
orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON));

if (randomBoolean()) {
if (randomBoolean()) {
logger.info("--> flushing shard (translog/soft-deletes will be trimmed)");
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0));
orgReplica.indexSettings().updateIndexMetaData(builder.build());
orgReplica.onSettingsChanged();
}
flushShard(orgReplica);
}

final IndexShard orgPrimary = shards.getPrimary();
shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed.

IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(3);
try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) {
assertThat(snapshot, SnapshotMatchers.size(6));
}
}
}

public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
Expand Down Expand Up @@ -228,7 +280,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
shards.recoverReplica(newReplica);
// file based recovery should be made
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));

// history uuid was restored
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
Expand Down Expand Up @@ -332,7 +385,8 @@ public void testShouldFlushAfterPeerRecovery() throws Exception {
shards.recoverReplica(replica);
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
shards.assertAllEqual(numDocs);
}
}
Expand Down