diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0b6eea26e0357..8a560e02fe449 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -565,18 +565,10 @@ public enum SearcherScope { EXTERNAL, INTERNAL } - /** - * Returns the translog associated with this engine. - * Prefer to keep the translog package-private, so that an engine can control all accesses to the translog. - */ - abstract Translog getTranslog(); - /** * Checks if the underlying storage sync is required. */ - public boolean isTranslogSyncNeeded() { - return getTranslog().syncNeeded(); - } + public abstract boolean isTranslogSyncNeeded(); /** * Ensures that all locations in the given stream have been written to the underlying storage. @@ -585,35 +577,25 @@ public boolean isTranslogSyncNeeded() { public abstract void syncTranslog() throws IOException; - public Closeable acquireTranslogRetentionLock() { - return getTranslog().acquireRetentionLock(); - } + public abstract Closeable acquireTranslogRetentionLock(); /** * Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#. * The caller has to close the returned snapshot after finishing the reading. */ - public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); - } + public abstract Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException; /** * Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#. */ - public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { - return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); - } + public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo); - public TranslogStats getTranslogStats() { - return getTranslog().stats(); - } + public abstract TranslogStats getTranslogStats(); /** * Returns the last location that the translog of this engine has written into. */ - public Translog.Location getTranslogLastWriteLocation() { - return getTranslog().getLastWriteLocation(); - } + public abstract Translog.Location getTranslogLastWriteLocation(); protected final void ensureOpen(Exception suppressed) { if (isClosed.get()) { @@ -661,9 +643,7 @@ public CommitStats commitStats() { /** * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) */ - public long getLastSyncedGlobalCheckpoint() { - return getTranslog().getLastSyncedGlobalCheckpoint(); - } + public abstract long getLastSyncedGlobalCheckpoint(); /** * Global stats on segments. @@ -935,9 +915,7 @@ public final boolean refreshNeeded() { * * @return {@code true} if the current generation should be rolled to a new generation */ - public boolean shouldRollTranslogGeneration() { - return getTranslog().shouldRollGeneration(); - } + public abstract boolean shouldRollTranslogGeneration(); /** * Rolls the translog generation and cleans unneeded. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 53f209ccf6306..92c64d415ad0b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -73,8 +73,10 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.index.translog.TranslogDeletionPolicy; +import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -422,12 +424,17 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier()); } - @Override + // Package private for testing purposes only Translog getTranslog() { ensureOpen(); return translog; } + @Override + public boolean isTranslogSyncNeeded() { + return getTranslog().syncNeeded(); + } + @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { final boolean synced = translog.ensureSynced(locations); @@ -443,6 +450,31 @@ public void syncTranslog() throws IOException { revisitIndexDeletionPolicyOnTranslogSynced(); } + @Override + public Closeable acquireTranslogRetentionLock() { + return getTranslog().acquireRetentionLock(); + } + + @Override + public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); + } + + @Override + public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { + return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); + } + + @Override + public TranslogStats getTranslogStats() { + return getTranslog().stats(); + } + + @Override + public Translog.Location getTranslogLastWriteLocation() { + return getTranslog().getLastWriteLocation(); + } + private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); @@ -1570,6 +1602,11 @@ public void trimUnreferencedTranslogFiles() throws EngineException { } } + @Override + public boolean shouldRollTranslogGeneration() { + return getTranslog().shouldRollGeneration(); + } + @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { try (ReleasableLock lock = readLock.acquire()) { @@ -2191,6 +2228,11 @@ LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } + @Override + public long getLastSyncedGlobalCheckpoint() { + return getTranslog().getLastSyncedGlobalCheckpoint(); + } + @Override public long getLocalCheckpoint() { return localCheckpointTracker.getCheckpoint(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d67148dbff2fc..2e89a66805ce1 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -731,7 +731,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }; - assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs)); + assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(committed.get()); } finally { @@ -758,7 +758,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); initialEngine.index(indexForDoc(doc)); if (rarely()) { - initialEngine.getTranslog().rollGeneration(); + getTranslog(initialEngine).rollGeneration(); } else if (rarely()) { initialEngine.flush(); } @@ -3983,14 +3983,14 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint()); trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); + assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); // now snapshot the tlog and ensure the primary term is updated - try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(recoveringEngine).newSnapshot()) { assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations()); Translog.Operation operation; while ((operation = snapshot.next()) != null) { @@ -4005,7 +4005,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); if ((flushed = randomBoolean())) { globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); - recoveringEngine.getTranslog().sync(); + getTranslog(recoveringEngine).sync(); recoveringEngine.flush(true, true); } } @@ -4018,7 +4018,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { trimUnsafeCommits(replicaEngine.config()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { - assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -4221,7 +4221,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null))); if (frequently()) { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); - engine.getTranslog().sync(); + engine.syncTranslog(); } if (frequently()) { final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); @@ -4243,7 +4243,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } // Make sure we keep all translog operations after the local checkpoint of the safe commit. long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 0d5e693d62da6..a23e29b0bcd6b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -507,6 +507,8 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, * Exposes a translog associated with the given engine for testing purpose. */ public static Translog getTranslog(Engine engine) { - return engine.getTranslog(); + assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); + InternalEngine internalEngine = (InternalEngine) engine; + return internalEngine.getTranslog(); } }