-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Lucene soft-deletes in peer recovery #30522
Changes from 6 commits
b18eb06
4b2e385
176b497
9ae627c
ff2215c
188138d
6d901bf
8a78f65
6fe8847
cc2b3f0
0612a05
a61d00b
f3f1fa2
65b8458
fc3d7d1
bd1b8ac
b1e73aa
f7ea71c
04112c6
e34154a
1531024
86c3eba
b3d0d5f
8320647
6b95e21
3be0e30
ca3f781
65ede0b
a0e58b9
c1e03d1
c5ba76f
33be718
c717bf7
df132e1
1bcd443
f232c8a
591d521
76a035f
78c0d92
c30de4a
5ff18f9
8a37126
88950b3
dbe0472
f9eeb90
ccb6e80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.index.engine; | ||
|
||
import org.apache.lucene.document.LongPoint; | ||
import org.apache.lucene.search.Query; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.index.mapper.SeqNoFieldMapper; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.translog.Translog; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.LongSupplier; | ||
|
||
/** | ||
* A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying changes purpose. | ||
*/ | ||
final class SoftDeletesPolicy { | ||
private final LongSupplier globalCheckpointSupplier; | ||
private int retentionLockCount; | ||
private long checkpointOfSafeCommit; | ||
private long minRequiredSeqNoForRecovery; | ||
private long retentionOperations; | ||
|
||
SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long retentionOperations) { | ||
this.globalCheckpointSupplier = globalCheckpointSupplier; | ||
this.retentionOperations = retentionOperations; | ||
this.checkpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; | ||
this.minRequiredSeqNoForRecovery = checkpointOfSafeCommit; | ||
this.retentionLockCount = 0; | ||
} | ||
|
||
/** | ||
* Updates the number of soft-deleted prior to the global checkpoint to be retained | ||
* See {@link org.elasticsearch.index.IndexSettings#INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING} | ||
*/ | ||
synchronized void setRetentionOperations(long retentionOperations) { | ||
this.retentionOperations = retentionOperations; | ||
} | ||
|
||
/** | ||
* Sets the local checkpoint of the current safe commit. | ||
* All operations whose seqno are greater than this checkpoint will be retained until the new checkpoint is advanced. | ||
*/ | ||
synchronized void setCheckpointOfSafeCommit(long newCheckpoint) { | ||
if (newCheckpoint < this.checkpointOfSafeCommit) { | ||
throw new IllegalArgumentException("Local checkpoint can't go backwards; " + | ||
"new checkpoint [" + newCheckpoint + "]," + "current checkpoint [" + checkpointOfSafeCommit + "]"); | ||
} | ||
this.checkpointOfSafeCommit = newCheckpoint; | ||
updateMinRequiredSeqNoForRecovery(); | ||
} | ||
|
||
private void updateMinRequiredSeqNoForRecovery() { | ||
assert Thread.holdsLock(this) : Thread.currentThread().getName(); | ||
if (retentionLockCount == 0) { | ||
this.minRequiredSeqNoForRecovery = checkpointOfSafeCommit; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. name this storeRecovery? I think it will help clarify that this is only used for store recovery. |
||
} | ||
} | ||
|
||
/** | ||
* Acquires a lock on soft-deleted documents to prevent them from cleaning up in merge processes. This is necessary to | ||
* make sure that all operations after the local checkpoint of the safe commit are retained until the lock is released. | ||
* This is a analogy to the translog's retention lock; see {@link Translog#acquireRetentionLock()} | ||
*/ | ||
synchronized Releasable acquireRetentionLock() { | ||
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
Sorry, something went wrong. |
||
assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]"; | ||
retentionLockCount++; | ||
final AtomicBoolean released = new AtomicBoolean(); | ||
return () -> { | ||
if (released.compareAndSet(false, true)) { | ||
releaseRetentionLock(); | ||
} | ||
}; | ||
} | ||
|
||
private synchronized void releaseRetentionLock() { | ||
assert retentionLockCount > 0 : "Invalid number of retention locks [" + retentionLockCount + "]"; | ||
retentionLockCount--; | ||
updateMinRequiredSeqNoForRecovery(); | ||
} | ||
|
||
/** | ||
* Returns a soft-deletes retention query that will be used in {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy} | ||
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. | ||
*/ | ||
Query retentionQuery() { | ||
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinSeqNoToRetain(), Long.MAX_VALUE); | ||
} | ||
|
||
// Package-level for testing | ||
synchronized long getMinSeqNoToRetain() { | ||
final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was doubting a bit if we should use the maxSeqNo for this as I thought it might be more intuitive. I got to the conclusion that I prefer the global checkpoint as the global checkpoint is the upper bound for CCR and the future changes API so people can set the retentionOperations and also expect to always get that much operations from the API. If you agree, do you mind adding a comment here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return Math.min(minRequiredSeqNoForRecovery, minSeqNoForQueryingChanges) + 1; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we call this localCheckpointOfSafeCommit? better be clear and not confuse potentially with the global checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done