From 750bfc6216e1b67f865595bb37123d29d659ac2f Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 27 Jan 2023 17:03:16 +0530 Subject: [PATCH] Add method to Engine to fetch max seq no of given SegmentInfos (#5970) * Add method to Engine to fetch max seq no of last refresh Co-authored-by: Sachin Kale --- .../org/opensearch/index/engine/Engine.java | 44 ++++++++ .../index/engine/InternalEngineTests.java | 102 ++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 604fa7a53e65c..9ceac9535b22c 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -42,11 +42,15 @@ import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; @@ -63,6 +67,7 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; @@ -71,9 +76,11 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.index.VersionType; +import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.ParseContext.Document; import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SeqNoFieldMapper; import org.opensearch.index.merge.MergeStats; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; @@ -275,6 +282,42 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } + /** + * Get max sequence number from segments that are referenced by given SegmentInfos + */ + public long getMaxSeqNoFromSegmentInfos(SegmentInfos segmentInfos) throws IOException { + try (DirectoryReader innerReader = StandardDirectoryReader.open(store.directory(), segmentInfos, null, null)) { + final IndexSearcher searcher = new IndexSearcher(innerReader); + return getMaxSeqNoFromSearcher(searcher); + } + } + + /** + * Get max sequence number that is part of given searcher. Sequence number is part of each document that is indexed. + * This method fetches the _id of last indexed document that was part of the given searcher and + * retrieves the _seq_no of the retrieved document. + */ + protected long getMaxSeqNoFromSearcher(IndexSearcher searcher) throws IOException { + searcher.setQueryCache(null); + ScoreDoc[] docs = searcher.search( + Queries.newMatchAllQuery(), + 1, + new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.DOC, true)) + ).scoreDocs; + if (docs.length == 0) { + return SequenceNumbers.NO_OPS_PERFORMED; + } + org.apache.lucene.document.Document document = searcher.storedFields().document(docs[0].doc); + Term uidTerm = new Term(IdFieldMapper.NAME, document.getField(IdFieldMapper.NAME).binaryValue()); + VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion( + searcher.getIndexReader(), + uidTerm, + true + ); + assert docIdAndVersion != null; + return docIdAndVersion.seqNo; + } + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling @@ -2026,4 +2069,5 @@ public long getMaxSeenAutoIdTimestamp() { * to advance this marker to at least the given sequence number. */ public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); + } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index c635c708e3b52..214988af7c552 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -7549,4 +7549,106 @@ public void testGetProcessedLocalCheckpoint() throws IOException { store.close(); engine.close(); } + + public void testGetMaxSeqNoRefreshedWithoutRefresh() throws IOException { + IOUtils.close(store, engine); + + final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s"); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + Store store = createStore(); + InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy()); + + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + } + + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(NO_OPS_PERFORMED, engine.getMaxSeqNoFromSearcher(searcher)); + } + + store.close(); + engine.close(); + } + + public void testGetMaxSeqNoRefreshed() throws IOException { + IOUtils.close(store, engine); + + final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s"); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + Store store = createStore(); + InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy()); + + int totalNumberOfDocsRefreshed = 0; + for (int j = 0; j < randomIntBetween(1, 10); j++) { + int numDocs = randomIntBetween(10, 100); + for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + numDocs); i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + } + // this is just to make sure that refresh post flush has the same impact. + if (randomBoolean()) { + engine.refresh("test"); + } else { + engine.flush(); + } + totalNumberOfDocsRefreshed += numDocs; + } + // Optionally, index more docs without refreshing. These should not be part of getMaxSeqNoRefreshed + if (randomBoolean()) { + for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + randomIntBetween(10, 100)); i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + } + } + + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoFromSearcher(searcher)); + } + + store.close(); + engine.close(); + } + + public void testGetMaxSeqNoFromSegmentInfos() throws IOException { + IOUtils.close(store, engine); + + final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s"); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + Store store = createStore(); + InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy()); + + int totalNumberOfDocsRefreshed = 0; + for (int j = 0; j < randomIntBetween(1, 10); j++) { + int numDocs = randomIntBetween(10, 100); + for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + numDocs); i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + } + // this is just to make sure that refresh post flush has the same impact. + if (randomBoolean()) { + engine.refresh("test"); + } else { + engine.flush(); + } + totalNumberOfDocsRefreshed += numDocs; + } + // Optionally, index more docs without refreshing. These should not be part of getMaxSeqNoRefreshed + if (randomBoolean()) { + for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + randomIntBetween(10, 100)); i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + } + } + + try (GatedCloseable segmentInfosGatedCloseable = engine.getSegmentInfosSnapshot()) { + assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoFromSegmentInfos(segmentInfosGatedCloseable.get())); + } + + store.close(); + engine.close(); + } + }