diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 27c7c8f716943..30bc510cd0ab7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -81,6 +81,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; @@ -155,6 +156,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -686,20 +688,20 @@ private IndexShardState changeState(IndexShardState newState, String reason) { public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, long autoGeneratedTimestamp, boolean isRetry) throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, + return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, + return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } - private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType, - long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, - SourceToParse sourceToParse) throws IOException { + private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, + @Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry, + Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; ensureWriteAllowed(origin); @@ -721,7 +723,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo); } - return index(getEngine(), operation); + return index(engine, operation); } public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo, @@ -755,17 +757,17 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc } public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException { - return markSeqNoAsNoop(seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA); + return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA); } - private Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason, + private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; long startTime = System.nanoTime(); ensureWriteAllowed(origin); final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason); - return noOp(getEngine(), noOp); + return noOp(engine, noOp); } private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { @@ -787,15 +789,15 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) { public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) throws IOException { assert versionType.validateVersionForWrites(version); - return applyDeleteOperation(UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, + return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, Engine.Operation.Origin.PRIMARY); } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { - return applyDeleteOperation(seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); + return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); } - private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, + private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id, @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; @@ -826,7 +828,7 @@ private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version, versionType, origin); - return delete(getEngine(), delete); + return delete(engine, delete); } private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, @@ -1265,6 +1267,11 @@ public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimar } public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { + return applyTranslogOperation(getEngine(), operation, origin); + } + + private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation, + Engine.Operation.Origin origin) throws IOException { // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; final Engine.Result result; @@ -1273,19 +1280,19 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine final Translog.Index index = (Translog.Index) operation; // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. - result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(), + result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(), versionType, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; - result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), + result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), versionType, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; - result = markSeqNoAsNoop(noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin); + result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin); break; default: throw new IllegalStateException("No operation defined for [" + operation + "]"); @@ -1304,7 +1311,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); - Engine.Result result = applyTranslogOperation(operation, origin); + Engine.Result result = applyTranslogOperation(engine, operation, origin); switch (result.getResultType()) { case FAILURE: throw result.getFailure(); @@ -1384,18 +1391,26 @@ private void innerOpenEngineAndTranslog() throws IOException { final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); trimUnsafeCommits(); - - createNewEngine(config); - verifyNotClosed(); - // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, - // we still give sync'd flush a chance to run: - active.set(true); + synchronized (mutex) { + verifyNotClosed(); + assert currentEngineReference.get() == null : "engine is running"; + // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + final Engine newEngine = engineFactory.newReadWriteEngine(config); + onNewEngine(newEngine); + currentEngineReference.set(newEngine); + // We set active because we are now writing operations to the engine; this way, + // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + active.set(true); + } + // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during + // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. + onSettingsChanged(); assertSequenceNumbersInCommit(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } private void trimUnsafeCommits() throws IOException { - assert currentEngineReference.get() == null : "engine is running"; + assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running"; final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); @@ -2224,31 +2239,6 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { } } - private Engine createNewEngine(EngineConfig config) { - synchronized (mutex) { - verifyNotClosed(); - assert this.currentEngineReference.get() == null; - Engine engine = newEngine(config); - onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen - // inside the callback are not visible. This one enforces happens-before - this.currentEngineReference.set(engine); - } - - // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which - // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine: - Engine engine = getEngineOrNull(); - - // engine could perhaps be null if we were e.g. concurrently closed: - if (engine != null) { - engine.onSettingsChanged(); - } - return engine; - } - - protected Engine newEngine(EngineConfig config) { - return engineFactory.newReadWriteEngine(config); - } - private static void persistMetadata( final ShardPath shardPath, final IndexSettings indexSettings, @@ -2852,21 +2842,47 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { void resetEngineToGlobalCheckpoint() throws IOException { assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; sync(); // persist the global checkpoint to disk - final long globalCheckpoint = getGlobalCheckpoint(); - final Engine newEngine; + final SeqNoStats seqNoStats = seqNoStats(); + final TranslogStats translogStats = translogStats(); + // flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations. + flush(new FlushRequest().waitIfOngoing(true)); synchronized (mutex) { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(null)); + // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()); + IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + } + + Engine newEngine = null; + try { + final long globalCheckpoint = getGlobalCheckpoint(); trimUnsafeCommits(); - newEngine = createNewEngine(newEngineConfig()); - active.set(true); + synchronized (mutex) { + verifyNotClosed(); + // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + newEngine = engineFactory.newReadWriteEngine(newEngineConfig()); + onNewEngine(newEngine); + } + newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint); + final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( + engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { + // TODO: add a dedicate recovery stats for the reset translog + }); + newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); + synchronized (mutex) { + verifyNotClosed(); + IOUtils.close(currentEngineReference.getAndSet(newEngine)); + // We set active because we are now writing operations to the engine; this way, + // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + active.set(true); + newEngine = null; + } + // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during + // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. + onSettingsChanged(); + } finally { + IOUtils.close(newEngine); } - newEngine.advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint); - final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( - engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { - // TODO: add a dedicate recovery stats for the reset translog - }); - newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } /** diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 05dac1ee32439..f4c29800cdbd3 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -37,9 +38,11 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; @@ -64,13 +67,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -727,6 +733,64 @@ public void testAddNewReplicas() throws Exception { } } + public void testRollbackOnPromotion() throws Exception { + try (ReplicationGroup shards = createGroup(between(2, 3))) { + shards.startAll(); + IndexShard newPrimary = randomFrom(shards.getReplicas()); + int initDocs = shards.indexDocs(randomInt(100)); + int inFlightOpsOnNewPrimary = 0; + int inFlightOps = scaledRandomIntBetween(10, 200); + for (int i = 0; i < inFlightOps; i++) { + String id = "extra-" + i; + IndexRequest primaryRequest = new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(primaryRequest, shards.getPrimary()); + for (IndexShard replica : shards.getReplicas()) { + if (randomBoolean()) { + indexOnReplica(replicationRequest, shards, replica); + if (replica == newPrimary) { + inFlightOpsOnNewPrimary++; + } + } + } + if (randomBoolean()) { + shards.syncGlobalCheckpoint(); + } + if (rarely()) { + shards.flush(); + } + } + shards.refresh("test"); + List docsBelowGlobalCheckpoint = EngineTestCase.getDocIds(getEngine(newPrimary), randomBoolean()) + .stream().filter(doc -> doc.getSeqNo() <= newPrimary.getGlobalCheckpoint()).collect(Collectors.toList()); + CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean done = new AtomicBoolean(); + Thread thread = new Thread(() -> { + List replicas = new ArrayList<>(shards.getReplicas()); + replicas.remove(newPrimary); + latch.countDown(); + while (done.get() == false) { + try { + List exposedDocs = EngineTestCase.getDocIds(getEngine(randomFrom(replicas)), randomBoolean()); + assertThat(docsBelowGlobalCheckpoint, everyItem(isIn(exposedDocs))); + assertThat(randomFrom(replicas).getLocalCheckpoint(), greaterThanOrEqualTo(initDocs - 1L)); + } catch (AlreadyClosedException ignored) { + // replica swaps engine during rollback + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + thread.start(); + latch.await(); + shards.promoteReplicaToPrimary(newPrimary).get(); + shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary); + int moreDocsAfterRollback = shards.indexDocs(scaledRandomIntBetween(1, 20)); + shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); + done.set(true); + thread.join(); + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d6803249c91f9..fb77f1def5a94 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -83,6 +83,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.DeleteResult; import org.elasticsearch.index.engine.EngineException; @@ -175,13 +176,16 @@ import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -906,7 +910,10 @@ private void finish() { } else { assertTrue(onResponse.get()); assertNull(onFailure.get()); - assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, + // if rollback happens we roll translog twice: one when we flush a commit before opening a read-only engine + // and one after replaying translog (upto the global checkpoint); otherwise we roll translog once. + either(equalTo(translogGen + 1)).or(equalTo(translogGen + 2))); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } @@ -3592,11 +3599,35 @@ public void testResetEngine() throws Exception { Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() .filter(id -> Long.parseLong(id) <= globalCheckpoint).collect(Collectors.toSet()); TranslogStats translogStats = shard.translogStats(); + AtomicBoolean done = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); + Thread thread = new Thread(() -> { + latch.countDown(); + int hitClosedExceptions = 0; + while (done.get() == false) { + try { + List exposedDocIds = EngineTestCase.getDocIds(getEngine(shard), rarely()) + .stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toList()); + assertThat("every operations before the global checkpoint must be reserved", + docBelowGlobalCheckpoint, everyItem(isIn(exposedDocIds))); + } catch (AlreadyClosedException ignored) { + hitClosedExceptions++; + } catch (IOException e) { + throw new AssertionError(e); + } + } + // engine reference was switched twice: current read/write engine -> ready-only engine -> new read/write engine + assertThat(hitClosedExceptions, lessThanOrEqualTo(2)); + }); + thread.start(); + latch.await(); shard.resetEngineToGlobalCheckpoint(); assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint)); + done.set(true); + thread.join(); closeShard(shard, false); }