Skip to content

Commit

Permalink
Add method to get max seq no of given SegmentInfos
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Jan 25, 2023
1 parent c5e71b4 commit fae6fbe
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 27 deletions.
58 changes: 33 additions & 25 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCache;
Expand Down Expand Up @@ -282,34 +283,41 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}

/**
* Get max sequence number that was part of last refresh.
* Sequence number is part of each document that is indexed.
* This method fetches the _id of last indexed document that was part of refresh and
* retrieves the _seq_no of the document.
*/
public long getMaxSeqNoRefreshed(String source) throws IOException {
try (Engine.Searcher searcher = acquireSearcher(source, Engine.SearcherScope.INTERNAL)) {
searcher.setQueryCache(null);
ScoreDoc[] docs = searcher.search(
Queries.newMatchAllQuery(),
1,
new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.DOC, true))
).scoreDocs;
if (docs.length == 0) {
return SequenceNumbers.NO_OPS_PERFORMED;
}
org.apache.lucene.document.Document document = searcher.storedFields().document(docs[0].doc);
Term uidTerm = new Term(IdFieldMapper.NAME, document.getField(IdFieldMapper.NAME).binaryValue());
VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.getIndexReader(),
uidTerm,
true
);
assert docIdAndVersion != null;
return docIdAndVersion.seqNo;
* Get max sequence number from segments that are referenced by given SegmentInfos
*/
public long getMaxSeqNoFromSegmentInfos(SegmentInfos segmentInfos) throws IOException {
try (DirectoryReader innerReader = StandardDirectoryReader.open(store.directory(), segmentInfos, null, null)) {
final IndexSearcher searcher = new IndexSearcher(innerReader);
return getMaxSeqNoFromSearcher(searcher);
}
}

/**
* Get max sequence number that is part of given searcher. Sequence number is part of each document that is indexed.
* This method fetches the _id of last indexed document that was part of the given searcher and
* retrieves the _seq_no of the retrieved document.
*/
protected long getMaxSeqNoFromSearcher(IndexSearcher searcher) throws IOException {
searcher.setQueryCache(null);
ScoreDoc[] docs = searcher.search(
Queries.newMatchAllQuery(),
1,
new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.DOC, true))
).scoreDocs;
if (docs.length == 0) {
return SequenceNumbers.NO_OPS_PERFORMED;
}
org.apache.lucene.document.Document document = searcher.storedFields().document(docs[0].doc);
Term uidTerm = new Term(IdFieldMapper.NAME, document.getField(IdFieldMapper.NAME).binaryValue());
VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.getIndexReader(),
uidTerm,
true
);
assert docIdAndVersion != null;
return docIdAndVersion.seqNo;
}

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7565,7 +7565,9 @@ public void testGetMaxSeqNoRefreshedWithoutRefresh() throws IOException {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}

assertEquals(NO_OPS_PERFORMED, engine.getMaxSeqNoRefreshed("test"));
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(NO_OPS_PERFORMED, engine.getMaxSeqNoFromSearcher(searcher));
}

store.close();
engine.close();
Expand Down Expand Up @@ -7602,9 +7604,51 @@ public void testGetMaxSeqNoRefreshed() throws IOException {
}
}

assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoRefreshed("test"));
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoFromSearcher(searcher));
}

store.close();
engine.close();
}

public void testGetMaxSeqNoFromSegmentInfos() throws IOException {
IOUtils.close(store, engine);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Store store = createStore();
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy());

int totalNumberOfDocsRefreshed = 0;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
int numDocs = randomIntBetween(10, 100);
for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + numDocs); i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}
// this is just to make sure that refresh post flush has the same impact.
if (randomBoolean()) {
engine.refresh("test");
} else {
engine.flush();
}
totalNumberOfDocsRefreshed += numDocs;
}
// Optionally, index more docs without refreshing. These should not be part of getMaxSeqNoRefreshed
if (randomBoolean()) {
for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + randomIntBetween(10, 100)); i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = engine.getSegmentInfosSnapshot()) {
assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoFromSegmentInfos(segmentInfosGatedCloseable.get()));
}

store.close();
engine.close();
}

}

0 comments on commit fae6fbe

Please sign in to comment.