Skip to content

Commit

Permalink
Create and swap engine in two steps
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Aug 8, 2019
1 parent 16cd233 commit bd56da8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 247 deletions.

This file was deleted.

109 changes: 62 additions & 47 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile IndexShardState state;
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
private final Object engineMutex = new Object();
private final EngineReference currentEngineReference = new EngineReference();
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
Expand Down Expand Up @@ -1191,20 +1191,23 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
engine = getEngineOrNull();
if (engine == null) {
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
synchronized (mutex) {
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
}
if (indexCommit == null) {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireLastIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down Expand Up @@ -1318,14 +1321,15 @@ public void close(String reason, boolean flushEngine) throws IOException {
try {
changeState(IndexShardState.CLOSED, reason);
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
try {
if (flushEngine) {
currentEngineReference.flushAndClose();
if (engine != null && flushEngine) {
engine.flushAndClose();
}
} finally {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(currentEngineReference, globalCheckpointListeners, refreshListeners);
IOUtils.close(engine, globalCheckpointListeners, refreshListeners);
indexShardOperationPermits.close();
}
}
Expand All @@ -1347,7 +1351,7 @@ public IndexShard postRecovery(String reason)
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
refresh("post_recovery");
getEngine().refresh("post_recovery");
return this;
}

Expand Down Expand Up @@ -1420,7 +1424,9 @@ public long recoverLocallyUpToGlobalCheckpoint() {
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
currentEngineReference.swapReference(null);
synchronized (mutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
}
}
} catch (Exception e) {
logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e);
Expand Down Expand Up @@ -1579,7 +1585,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
}

private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
assert Thread.holdsLock(mutex) == false : "opening engine under mutex [" + Thread.currentThread() + "]";
assert Thread.holdsLock(mutex) == false : "opening engine under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand All @@ -1593,19 +1599,25 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
final Engine newEngine = engineFactory.newReadWriteEngine(config);
boolean success = false;
try {
currentEngineReference.swapReference(newEngine);
newEngine = null;
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
success = true;
}
} finally {
IOUtils.close(newEngine);
if (success == false) {
newEngine.close();
}
}
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand All @@ -1628,6 +1640,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}

Expand All @@ -1637,7 +1650,7 @@ private void onNewEngine(Engine newEngine) {
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
currentEngineReference.swapReference(null);
IOUtils.close(currentEngineReference.getAndSet(null));
resetRecoveryStage();
}
}
Expand Down Expand Up @@ -2671,8 +2684,10 @@ private DocumentMapperForType docMapper(String type) {
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex [" + Thread.currentThread() + "]";
this.warmer.warm(reader);
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
if (this.warmer != null) {
this.warmer.warm(reader);
}
};
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
Expand Down Expand Up @@ -3303,7 +3318,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex [" + Thread.currentThread() + "]";
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk
Expand All @@ -3318,41 +3333,46 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
synchronized (engineMutex) {
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
Engine readOnlyEngine =
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {
synchronized (mutex) {
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
return newEngineReference.get().acquireLastIndexCommit(false);
}
}

@Override
public IndexCommitRef acquireSafeIndexCommit() {
synchronized (engineMutex) {
synchronized (mutex) {
return newEngineReference.get().acquireSafeIndexCommit();
}
}

@Override
public void close() throws IOException {
Engine newEngine;
synchronized (engineMutex) {
newEngine = newEngineReference.get();
if (newEngine == currentEngineReference.get()) {
// we successfully installed the new engine so do not close it.
newEngine = null;
}
assert Thread.holdsLock(mutex);

Engine newEngine = newEngineReference.get();
if (newEngine == currentEngineReference.get()) {
// we successfully installed the new engine so do not close it.
newEngine = null;
}
IOUtils.close(super::close, newEngine);
}
};
boolean success = false;
try {
currentEngineReference.swapReference(readOnlyEngine);
readOnlyEngine = null;
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
success = true;
}
} finally {
IOUtils.close(readOnlyEngine);
if (success == false) {
readOnlyEngine.close();
}
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
Expand All @@ -3362,14 +3382,9 @@ public void close() throws IOException {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
synchronized (engineMutex) {
Engine newEngine = newEngineReference.get();
try {
currentEngineReference.swapReference(newEngine);
newEngine = null;
} finally {
IOUtils.close(newEngine);
}
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
Expand Down
Loading

0 comments on commit bd56da8

Please sign in to comment.