-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fill LocalCheckpointTracker with Lucene commit #34474
Conversation
Today we rely on the LocalCheckpointTracker to ensure no duplicate when enabling optimization using max_seq_no_of_updates. The problem is that the LocalCheckpointTracker is not fully reloaded when opening an engine with an out-of-order index commit. Suppose the starting commit has seq#0 and seq#2, then the current LocalCheckpointTracker would return "false" when asking if seq#2 was processed before although seq#2 in the commit. This change scans the existing sequence numbers in the starting commit, then marks these as completed in the LocalCheckpointTracker to ensure the consistent state between LocalCheckpointTracker and Lucene commit.
Pinging @elastic/es-distributed |
final IndexSearcher searcher = new IndexSearcher(reader); | ||
searcher.setQueryCache(null); | ||
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, localCheckpoint + 1, maxSeqNo); | ||
for (LeafReaderContext leaf : reader.leaves()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes Let me know if you still prefer using the "Snapshot" API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments on the testing and an ask for support from the Lucene SWAT team. Other than that, LGTM
localCheckpoint = seqNoStats.localCheckpoint; | ||
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); | ||
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); | ||
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance we can make this method static and give it what it needs? I'm afraid we'll use an uninitialized field by mistake (as we call it from the constructor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I pushed 779c5c7.
final DocIdSetIterator docIdSetIterator = scorer.iterator(); | ||
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); | ||
int docId; | ||
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to use the reader's PointsValue
directly to speed up the search since they are always indexed.
Something like the following untested snippet:
void markAsCompleted(IndexReader reader, LocalCheckpointTracker tracker, long minSeqNo, long maxSeqNo) throws IOException {
for (LeafReaderContext ctx : reader.leaves()) {
PointValues values = ctx.reader().getPointValues(SeqNoFieldMapper.NAME);
byte[] lowerPoint = new byte[values.getBytesPerDimension()];
LongPoint.encodeDimension(minSeqNo, lowerPoint, 0);
byte[] upperPoint = new byte[values.getBytesPerDimension()];
LongPoint.encodeDimension(maxSeqNo, upperPoint, 0);
final Bits liveDocs = ctx.reader().getLiveDocs() == null ?
new Bits.MatchAllBits(reader.maxDoc()) : ctx.reader().getLiveDocs();
int numBytes = values.getBytesPerDimension();
values.intersect(new PointValues.IntersectVisitor() {
@Override
public void visit(int docID) throws IOException {
throw new IllegalStateException("should never be called");
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
if (liveDocs.get(docID) == false) {
return;
}
if (FutureArrays.compareUnsigned(packedValue, 0, numBytes, lowerPoint, 0, numBytes) < 0) {
// Doc's value is too low, in this dimension
return;
}
if (FutureArrays.compareUnsigned(packedValue, 0, numBytes, upperPoint, 0, numBytes) > 0) {
// Doc's value is too high, in this dimension
return;
}
long seqNo = LongPoint.decodeDimension(packedValue, 0);
tracker.markSeqNoAsCompleted(seqNo);
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if (FutureArrays.compareUnsigned(minPackedValue, 0, numBytes, upperPoint, 0, numBytes) > 0 ||
FutureArrays.compareUnsigned(maxPackedValue, 0, numBytes, lowerPoint, 0, numBytes) < 0) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
return PointValues.Relation.CELL_CROSSES_QUERY;
}
});
}
}
This would avoid the need to execute a query to read from doc values but I am not sure if it's worth the complexity. If only few documents should be visited this is probably useless and using a query is the right approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pull this in, and it works perfectly. However, I am not sure if we should use it either. Normally we would expect to visit a few to several hundred documents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is probably a premature optimization. I wrote it mainly to see how it looks but this shouldn't bring much if the number of documents to visit is low. The query+doc_values approach seems easier to maintain.
@@ -279,6 +279,33 @@ public void testRetryBulkShardOperations() throws Exception { | |||
} | |||
} | |||
|
|||
public void testOutOfOrderOnFollower() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not about out of order but rather over recovery of ops you already have. I checked and I can't find any existing tests that cover recovery (which can cover this case). I think we should have some (i.e., rename this test and extend it to cover general peer recovery (with and without holes). We also need concurrent recovery + indexing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll do.
final DocIdSetIterator docIdSetIterator = scorer.iterator(); | ||
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); | ||
int docId; | ||
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to use the reader's PointsValue
directly to speed up the search since they are always indexed.
Something like the following untested snippet:
void markAsCompleted(IndexReader reader, LocalCheckpointTracker tracker, long minSeqNo, long maxSeqNo) throws IOException {
for (LeafReaderContext ctx : reader.leaves()) {
PointValues values = ctx.reader().getPointValues(SeqNoFieldMapper.NAME);
byte[] lowerPoint = new byte[values.getBytesPerDimension()];
LongPoint.encodeDimension(minSeqNo, lowerPoint, 0);
byte[] upperPoint = new byte[values.getBytesPerDimension()];
LongPoint.encodeDimension(maxSeqNo, upperPoint, 0);
final Bits liveDocs = ctx.reader().getLiveDocs() == null ?
new Bits.MatchAllBits(reader.maxDoc()) : ctx.reader().getLiveDocs();
int numBytes = values.getBytesPerDimension();
values.intersect(new PointValues.IntersectVisitor() {
@Override
public void visit(int docID) throws IOException {
throw new IllegalStateException("should never be called");
}
@Override
public void visit(int docID, byte[] packedValue) throws IOException {
if (liveDocs.get(docID) == false) {
return;
}
if (FutureArrays.compareUnsigned(packedValue, 0, numBytes, lowerPoint, 0, numBytes) < 0) {
// Doc's value is too low, in this dimension
return;
}
if (FutureArrays.compareUnsigned(packedValue, 0, numBytes, upperPoint, 0, numBytes) > 0) {
// Doc's value is too high, in this dimension
return;
}
long seqNo = LongPoint.decodeDimension(packedValue, 0);
tracker.markSeqNoAsCompleted(seqNo);
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if (FutureArrays.compareUnsigned(minPackedValue, 0, numBytes, upperPoint, 0, numBytes) > 0 ||
FutureArrays.compareUnsigned(maxPackedValue, 0, numBytes, lowerPoint, 0, numBytes) < 0) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
return PointValues.Relation.CELL_CROSSES_QUERY;
}
});
}
}
This would avoid the need to execute a query to read from doc values but I am not sure if it's worth the complexity. If only few documents should be visited this is probably useless and using a query is the right approach.
searcher.setQueryCache(null); | ||
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, localCheckpoint + 1, maxSeqNo); | ||
for (LeafReaderContext leaf : reader.leaves()) { | ||
final Scorer scorer = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f).scorer(leaf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can create the weight once for all leaves and reuse it to build the Scorer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question, would it be enough to bootstrap the max_seq_no_of_updates with the max seqID in the index instead? |
@s1monw Yes, it's good enough. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approach LGTM
can you leave a comment about this? |
@bleskes I've added two tests which add new replicas during indexing. Could you please have another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left some minor suggestions. Feel free to reject.
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java
Show resolved
Hide resolved
public void testAddNewReplicas() throws Exception { | ||
try (ReplicationGroup shards = createGroup(between(0, 1))) { | ||
shards.startAll(); | ||
Thread[] threads = new Thread[between(1, 8)]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe reduce the number of threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reduced to between(1,3).
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
Show resolved
Hide resolved
@Override | ||
protected EngineFactory getEngineFactory(ShardRouting routing) { | ||
if (routing.primary()) { | ||
return new InternalEngineFactory(); // use the internal engine so we can index directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we use the standard following engine and supply ops in the same way we do for real following? the goal is to check that everything works together like it should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes Yes, we can but I chose not to because we have an integration test does the same thing (https://github.com/elastic/elasticsearch/pull/34474/files#diff-d36012b5317805e8b3ff77b77dbaa5ecR672). I am fine to update this. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I didn't want to fully spec out the testing, but I would have ideally loved to see a randomized non multi threaded test using replication groups that create all kind of scenarios (like the one you have caught). The multi threaded test can stay in integration land and make sure that if we miss a scenario something will find it. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes Thanks for the explanation 👍. I will update this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Today we rely on the LocalCheckpointTracker to ensure no duplicate when enabling optimization using max_seq_no_of_updates. The problem is that the LocalCheckpointTracker is not fully reloaded when opening an engine with an out-of-order index commit. Suppose the starting commit has seq#0 and seq#2, then the current LocalCheckpointTracker would return "false" when asking if seq#2 was processed before although seq#2 in the commit. This change scans the existing sequence numbers in the starting commit, then marks these as completed in the LocalCheckpointTracker to ensure the consistent state between LocalCheckpointTracker and Lucene commit.
Since elastic#34412 and elastic#34474, a follower must have soft-deletes enabled to work correctly. This change requires soft-deletes on the follower. Relates elastic#34412 Relates elastic#34474
Today we rely on the LocalCheckpointTracker to ensure no duplicate when enabling optimization using max_seq_no_of_updates. The problem is that the LocalCheckpointTracker is not fully reloaded when opening an engine with an out-of-order index commit. Suppose the starting commit has seq#0 and seq#2, then the current LocalCheckpointTracker would return "false" when asking if seq#2 was processed before although seq#2 in the commit. This change scans the existing sequence numbers in the starting commit, then marks these as completed in the LocalCheckpointTracker to ensure the consistent state between LocalCheckpointTracker and Lucene commit.
A CCR test failure shows that the approach in #34474 is flawed. Restoring the LocalCheckpointTracker from an index commit can cause both FollowingEngine and InternalEngine to incorrectly ignore some deletes. Here is a small scenario illustrating the problem: 1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene 2. Flush a commit consisting of only the delete tombstone 3. Index doc with seq=0 => engine will add that doc to Lucene but soft-deleted 4. Restart an engine with the commit (step 2); the engine will fill its LocalCheckpointTracker with the delete tombstone in the commit 5. Replay the local translog in reverse order: index#0 then delete#1 6. When process index#0, an engine will add it into Lucene as a live doc and advance the local checkpoint to 1 (seq#1 was restored from the commit - step 4). 7. When process delete#1, an engine will skip it because seq_no=1 is less than or equal to the local checkpoint. We should have zero document after recovering from translog, but here we have one. Since all operations after the local checkpoint of the safe commit are retained, we should find them if the look-up considers also soft-deleted documents. This PR fills the disparity between the version map and the local checkpoint tracker by taking soft-deleted documents into account while resolving strategy for engine operations. Relates #34474 Relates #33656
A CCR test failure shows that the approach in #34474 is flawed. Restoring the LocalCheckpointTracker from an index commit can cause both FollowingEngine and InternalEngine to incorrectly ignore some deletes. Here is a small scenario illustrating the problem: 1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene 2. Flush a commit consisting of only the delete tombstone 3. Index doc with seq=0 => engine will add that doc to Lucene but soft-deleted 4. Restart an engine with the commit (step 2); the engine will fill its LocalCheckpointTracker with the delete tombstone in the commit 5. Replay the local translog in reverse order: index#0 then delete#1 6. When process index#0, an engine will add it into Lucene as a live doc and advance the local checkpoint to 1 (seq#1 was restored from the commit - step 4). 7. When process delete#1, an engine will skip it because seq_no=1 is less than or equal to the local checkpoint. We should have zero document after recovering from translog, but here we have one. Since all operations after the local checkpoint of the safe commit are retained, we should find them if the look-up considers also soft-deleted documents. This PR fills the disparity between the version map and the local checkpoint tracker by taking soft-deleted documents into account while resolving strategy for engine operations. Relates #34474 Relates #33656
A CCR test failure shows that the approach in #34474 is flawed. Restoring the LocalCheckpointTracker from an index commit can cause both FollowingEngine and InternalEngine to incorrectly ignore some deletes. Here is a small scenario illustrating the problem: 1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene 2. Flush a commit consisting of only the delete tombstone 3. Index doc with seq=0 => engine will add that doc to Lucene but soft-deleted 4. Restart an engine with the commit (step 2); the engine will fill its LocalCheckpointTracker with the delete tombstone in the commit 5. Replay the local translog in reverse order: index#0 then delete#1 6. When process index#0, an engine will add it into Lucene as a live doc and advance the local checkpoint to 1 (seq#1 was restored from the commit - step 4). 7. When process delete#1, an engine will skip it because seq_no=1 is less than or equal to the local checkpoint. We should have zero document after recovering from translog, but here we have one. Since all operations after the local checkpoint of the safe commit are retained, we should find them if the look-up considers also soft-deleted documents. This PR fills the disparity between the version map and the local checkpoint tracker by taking soft-deleted documents into account while resolving strategy for engine operations. Relates #34474 Relates #33656
…ic#35230) A CCR test failure shows that the approach in elastic#34474 is flawed. Restoring the LocalCheckpointTracker from an index commit can cause both FollowingEngine and InternalEngine to incorrectly ignore some deletes. Here is a small scenario illustrating the problem: 1. Delete doc with seq=1 => engine will add a delete tombstone to Lucene 2. Flush a commit consisting of only the delete tombstone 3. Index doc with seq=0 => engine will add that doc to Lucene but soft-deleted 4. Restart an engine with the commit (step 2); the engine will fill its LocalCheckpointTracker with the delete tombstone in the commit 5. Replay the local translog in reverse order: index#0 then delete#1 6. When process index#0, an engine will add it into Lucene as a live doc and advance the local checkpoint to 1 (seq#1 was restored from the commit - step 4). 7. When process delete#1, an engine will skip it because seq_no=1 is less than or equal to the local checkpoint. We should have zero document after recovering from translog, but here we have one. Since all operations after the local checkpoint of the safe commit are retained, we should find them if the look-up considers also soft-deleted documents. This PR fills the disparity between the version map and the local checkpoint tracker by taking soft-deleted documents into account while resolving strategy for engine operations. Relates elastic#34474 Relates elastic#33656
In #34474, we added a new assertion to ensure that the LocalCheckpointTracker is always consistent with Lucene index. However, we reset LocalCheckpoinTracker in testDedupByPrimaryTerm cause this assertion to be violated. This commit removes resetCheckpoint from LocalCheckpointTracker and rewrites testDedupByPrimaryTerm without resetting the local checkpoint. Relates #34474
In #34474, we added a new assertion to ensure that the LocalCheckpointTracker is always consistent with Lucene index. However, we reset LocalCheckpoinTracker in testDedupByPrimaryTerm cause this assertion to be violated. This commit removes resetCheckpoint from LocalCheckpointTracker and rewrites testDedupByPrimaryTerm without resetting the local checkpoint. Relates #34474
Today we rely on the LocalCheckpointTracker to ensure no duplicate when enabling optimization using max_seq_no_of_updates in the FollowingEngine. The problem is that the LocalCheckpointTracker is not fully reloaded when opening an engine with an out-of-order index commit. Suppose the starting commit has seq#0 and seq#2, then the current LocalCheckpointTracker would return "false" when asking if seq#2 was processed before although seq#2 in the commit.
This change scans the existing sequence numbers in the starting commit, then marks these as completed in the LocalCheckpointTracker to achieve the consistent state between LocalCheckpointTracker and Lucene commit.
(This is an unreleased bug in the FollowingEngine of CCR).
Relates #33656