Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposed engine must include all operations below global checkpoint during rollback #36159

Merged
merged 4 commits into from
Dec 9, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 75 additions & 60 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
Expand Down Expand Up @@ -155,6 +156,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -686,20 +688,20 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
assert versionType.validateVersionForWrites(version);
return applyIndexOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp,
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp,
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}

public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
boolean isRetry, SourceToParse sourceToParse)
throws IOException {
return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
Engine.Operation.Origin.REPLICA, sourceToParse);
}

private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException {
private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version,
@Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry,
Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException {
assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
+ "]";
ensureWriteAllowed(origin);
Expand All @@ -721,7 +723,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l
return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
}

return index(getEngine(), operation);
return index(engine, operation);
}

public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo,
Expand Down Expand Up @@ -755,17 +757,17 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
}

public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException {
return markSeqNoAsNoop(seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
}

private Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason,
private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason,
Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
+ "]";
long startTime = System.nanoTime();
ensureWriteAllowed(origin);
final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason);
return noOp(getEngine(), noOp);
return noOp(engine, noOp);
}

private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
Expand All @@ -787,15 +789,15 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
throws IOException {
assert versionType.validateVersionForWrites(version);
return applyDeleteOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
Engine.Operation.Origin.PRIMARY);
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
return applyDeleteOperation(seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
}

private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
+ "]";
Expand Down Expand Up @@ -826,7 +828,7 @@ private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm,
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version,
versionType, origin);
return delete(getEngine(), delete);
return delete(engine, delete);
}

private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
Expand Down Expand Up @@ -1265,6 +1267,11 @@ public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimar
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
return applyTranslogOperation(getEngine(), operation, origin);
}

private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation,
Engine.Operation.Origin origin) throws IOException {
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
final Engine.Result result;
Expand All @@ -1273,19 +1280,19 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
final Translog.Index index = (Translog.Index) operation;
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(),
versionType, index.getAutoGeneratedIdTimestamp(), true, origin,
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
XContentHelper.xContentType(index.source())).routing(index.routing()));
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
versionType, origin);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
result = markSeqNoAsNoop(noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);
result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
Expand All @@ -1304,7 +1311,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
Engine.Result result = applyTranslogOperation(operation, origin);
Engine.Result result = applyTranslogOperation(engine, operation, origin);
switch (result.getResultType()) {
case FAILURE:
throw result.getFailure();
Expand Down Expand Up @@ -1384,18 +1391,26 @@ private void innerOpenEngineAndTranslog() throws IOException {
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
trimUnsafeCommits();

createNewEngine(config);
verifyNotClosed();
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

private void trimUnsafeCommits() throws IOException {
assert currentEngineReference.get() == null : "engine is running";
assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
Expand Down Expand Up @@ -2224,31 +2239,6 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
}
}

private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) {
verifyNotClosed();
assert this.currentEngineReference.get() == null;
Engine engine = newEngine(config);
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen
// inside the callback are not visible. This one enforces happens-before
this.currentEngineReference.set(engine);
}

// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
// settings changes could possibly have happened, so here we forcefully push any config changes to the new engine:
Engine engine = getEngineOrNull();

// engine could perhaps be null if we were e.g. concurrently closed:
if (engine != null) {
engine.onSettingsChanged();
}
return engine;
}

protected Engine newEngine(EngineConfig config) {
return engineFactory.newReadWriteEngine(config);
}

private static void persistMetadata(
final ShardPath shardPath,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -2852,21 +2842,46 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
void resetEngineToGlobalCheckpoint() throws IOException {
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
sync(); // persist the global checkpoint to disk
final long globalCheckpoint = getGlobalCheckpoint();
final Engine newEngine;
final SeqNoStats seqNoStats = seqNoStats();
final TranslogStats translogStats = translogStats();
// flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations.
flush(new FlushRequest());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(null));
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity());
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
}

Engine newEngine = null;
try {
final long globalCheckpoint = getGlobalCheckpoint();
trimUnsafeCommits();
newEngine = createNewEngine(newEngineConfig());
active.set(true);
synchronized (mutex) {
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
newEngine = engineFactory.newReadWriteEngine(newEngineConfig());
onNewEngine(newEngine);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onNewEngine does't publish the new engine right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onNewEngine does not expose the engine itself but exposes only the last refresh translog location to RefreshListeners.

}
newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngine));
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
newEngine = null;
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
} finally {
IOUtils.close(newEngine);
}
newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}

/**
Expand Down
Loading