Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always use soft-deletes in InternalEngine #50415

Merged
merged 10 commits into from
Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,12 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {

protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
return ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES), Lucene.SOFT_DELETES_FIELD);
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
Expand Down Expand Up @@ -313,10 +310,7 @@ public Closeable acquireHistoryRetentionLock(HistorySource historySource) {

@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
boolean requiredFullRange) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) {
throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
}
boolean requiredFullRange) {
return newEmptySnapshot();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ protected void parseCreateField(ParseContext context, List<IndexableField> field
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
if (originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2911,25 +2911,23 @@ public void testDocStats() throws Exception {
indexDoc(indexShard, "_doc", id);
}
// Need to update and sync the global checkpoint and the retention leases for the soft-deletes retention MergePolicy.
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
final long newGlobalCheckpoint = indexShard.getLocalCheckpoint();
if (indexShard.routingEntry().primary()) {
indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
final long newGlobalCheckpoint = indexShard.getLocalCheckpoint();
if (indexShard.routingEntry().primary()) {
indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");

final RetentionLeases retentionLeases = indexShard.getRetentionLeases();
indexShard.updateRetentionLeasesOnReplica(new RetentionLeases(
retentionLeases.primaryTerm(), retentionLeases.version() + 1,
retentionLeases.leases().stream().map(lease -> new RetentionLease(lease.id(), newGlobalCheckpoint + 1,
lease.timestamp(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)).collect(Collectors.toList())));
}
indexShard.sync();
final RetentionLeases retentionLeases = indexShard.getRetentionLeases();
indexShard.updateRetentionLeasesOnReplica(new RetentionLeases(
retentionLeases.primaryTerm(), retentionLeases.version() + 1,
retentionLeases.leases().stream().map(lease -> new RetentionLease(lease.id(), newGlobalCheckpoint + 1,
lease.timestamp(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)).collect(Collectors.toList())));
}
indexShard.sync();
// flush the buffered deletes
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,9 +982,7 @@ public void testFilterCacheStats() throws Exception {
indexRandom(false, true,
client().prepareIndex("index").setId("1").setSource("foo", "bar"),
client().prepareIndex("index").setId("2").setSource("foo", "baz"));
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
}
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
refresh();
ensureGreen();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,7 @@ public static List<Translog.Operation> readAllOperationsInLucene(Engine engine,
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
if (mapper == null || mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
|| (engine instanceof InternalEngine) == false) {
if (mapper == null || mapper.documentMapper() == null || (engine instanceof InternalEngine) == false) {
return;
}
final List<Translog.Operation> translogOps = new ArrayList<>();
Expand All @@ -1090,8 +1089,12 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
final long seqNoForRecovery;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
if (engine.config().getIndexSettings().isSoftDeleteEnabled()) {
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
}
} else {
seqNoForRecovery = engine.getMinRetainedSeqNo();
}
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
for (Translog.Operation translogOp : translogOps) {
Expand Down