Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate Translog in Engine #31220

Merged
merged 2 commits into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 8 additions & 30 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -565,18 +565,10 @@ public enum SearcherScope {
EXTERNAL, INTERNAL
}

/**
* Returns the translog associated with this engine.
* Prefer to keep the translog package-private, so that an engine can control all accesses to the translog.
*/
abstract Translog getTranslog();

/**
* Checks if the underlying storage sync is required.
*/
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}
public abstract boolean isTranslogSyncNeeded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this all looks ok but I wonder if we should remove the Translog part from all these methods. They would read my cleaner to me and it would suddenly be an implementation detail for the engine if it's a translog.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that would mean:

  • isTranslogSyncNeeded -> isSyncNeeded
  • syncTranslog -> sync
  • ensureTranslogSynced -> ensureSynced
  • acquireTranslogRetentionLock -> acquireRetentionLock
  • newTranslogSnapshotFromMinSeqNo -> newSnapshotFromMinSeqNo
  • estimateTranslogOperationsFromMinSeq -> estimateOperationsFromMinSeq
  • getTranslogStats -> getStats
  • getTranslogLastWriteLocation -> getLastWriteLocation

So, I don't think this would be necessarily bad, but I do think it's not trivial, for example, we'd then have sync and flush, which to me is strange to have both (as an external person, is "sync" for synchronizing everything with Lucene?), we would also want getStats to return generic statistic information, not just from a translog? Does getLastWriteLocation return writes from Lucene or the Translog now?

I think maybe reorganizing Engine to be as small as possible is good, I think we might need better terminology than dropping "translog" from the names though, especially since we still return Translog.Snapshot and Translog.Location if we want to completely hide the translog. Right now even if we rename it's still not an implementation detail, because the classes are still Translog-specific.

For now I'll merge this and open a seperate issue to make Translog completely an implementation detail of the Engine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the translog is needed for now, the translog is not an implementation detail, it's a fundamental component of the system that other components go through the engine to interact with. A lot of these methods would not make sense if they didn't refer to the fact that they are doing something to the translog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what Lee and Jason said. It's a good idea but I don't think we're ready yet. @dakrone do you mind adding your renaming list to the #31248 so it clarifies the issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added the list


/**
* Ensures that all locations in the given stream have been written to the underlying storage.
Expand All @@ -585,35 +577,25 @@ public boolean isTranslogSyncNeeded() {

public abstract void syncTranslog() throws IOException;

public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}
public abstract Closeable acquireTranslogRetentionLock();

/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#.
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}
public abstract Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException;

/**
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
*/
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}
public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo);

public TranslogStats getTranslogStats() {
return getTranslog().stats();
}
public abstract TranslogStats getTranslogStats();

/**
* Returns the last location that the translog of this engine has written into.
*/
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}
public abstract Translog.Location getTranslogLastWriteLocation();

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
Expand Down Expand Up @@ -661,9 +643,7 @@ public CommitStats commitStats() {
/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
*/
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}
public abstract long getLastSyncedGlobalCheckpoint();

/**
* Global stats on segments.
Expand Down Expand Up @@ -935,9 +915,7 @@ public final boolean refreshNeeded() {
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}
public abstract boolean shouldRollTranslogGeneration();

/**
* Rolls the translog generation and cleans unneeded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -422,12 +424,17 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}

@Override
// Package private for testing purposes only
Translog getTranslog() {
ensureOpen();
return translog;
}

@Override
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
Expand All @@ -443,6 +450,31 @@ public void syncTranslog() throws IOException {
revisitIndexDeletionPolicyOnTranslogSynced();
}

@Override
public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}

@Override
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}

@Override
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}

@Override
public TranslogStats getTranslogStats() {
return getTranslog().stats();
}

@Override
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
Expand Down Expand Up @@ -1570,6 +1602,11 @@ public void trimUnreferencedTranslogFiles() throws EngineException {
}
}

@Override
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
Expand Down Expand Up @@ -2191,6 +2228,11 @@ LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

@Override
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
super.commitIndexWriter(writer, translog, syncId);
}
};
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.recoverFromTranslog();
assertTrue(committed.get());
} finally {
Expand All @@ -758,7 +758,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
initialEngine.index(indexForDoc(doc));
if (rarely()) {
initialEngine.getTranslog().rollGeneration();
getTranslog(initialEngine).rollGeneration();
} else if (rarely()) {
initialEngine.flush();
}
Expand Down Expand Up @@ -3983,14 +3983,14 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));

// now snapshot the tlog and ensure the primary term is updated
try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(recoveringEngine).newSnapshot()) {
assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
Expand All @@ -4005,7 +4005,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
if ((flushed = randomBoolean())) {
globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
recoveringEngine.getTranslog().sync();
getTranslog(recoveringEngine).sync();
recoveringEngine.flush(true, true);
}
}
Expand All @@ -4018,7 +4018,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
trimUnsafeCommits(replicaEngine.config());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
Expand Down Expand Up @@ -4221,7 +4221,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
if (frequently()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.getTranslog().sync();
engine.syncTranslog();
}
if (frequently()) {
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
Expand All @@ -4243,7 +4243,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo,
* Exposes a translog associated with the given engine for testing purpose.
*/
public static Translog getTranslog(Engine engine) {
return engine.getTranslog();
assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.getTranslog();
}
}