From 5935e4ffb9fa5264e974600a4cb1806dec492a72 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 5 Sep 2018 20:44:14 -0400 Subject: [PATCH 01/10] Reset replica engine to global checkpoint on promotion When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. Thus we need to throw those operations to align a replica with the new primary. This can be done by resetting an engine from the safe commit, then replaying the local translog up to the global checkpoint. --- .../elasticsearch/index/engine/Engine.java | 7 +- .../index/engine/InternalEngine.java | 7 +- .../elasticsearch/index/shard/IndexShard.java | 119 ++++++++++-------- .../discovery/AbstractDisruptionTestCase.java | 1 + .../IndexLevelReplicationTests.java | 14 +-- .../RecoveryDuringReplicationTests.java | 31 +---- .../index/shard/IndexShardTests.java | 49 +++++--- .../index/translog/SnapshotMatchers.java | 11 +- .../index/translog/TestTranslog.java | 11 ++ .../elasticsearch/recovery/RelocationIT.java | 1 + .../index/engine/EngineTestCase.java | 32 +++++ .../index/shard/IndexShardTestCase.java | 28 +++-- .../elasticsearch/test/ESIntegTestCase.java | 44 +++++++ .../test/InternalTestCluster.java | 18 +++ 14 files changed, 243 insertions(+), 130 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index a64c3f88eb3d2..c3e795e9982f1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1138,11 +1138,16 @@ public enum Origin { PRIMARY, REPLICA, PEER_RECOVERY, - LOCAL_TRANSLOG_RECOVERY; + LOCAL_TRANSLOG_RECOVERY, + LOCAL_RESETTING; public boolean isRecovery() { return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY; } + + boolean isLocal() { + return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESETTING; + } } public Origin origin() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ea2b53bea8d0a..2ac3af44aeacb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -737,6 +737,7 @@ private boolean canOptimizeAddDocument(Index index) { : "version: " + index.version() + " type: " + index.versionType(); return true; case LOCAL_TRANSLOG_RECOVERY: + case LOCAL_RESETTING: assert index.isRetry(); return true; // allow to optimize in order to update the max safe time stamp default: @@ -835,7 +836,7 @@ public IndexResult index(Index index) throws IOException { indexResult = new IndexResult( plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + if (index.origin().isLocal() == false) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Index(index, indexResult)); @@ -1175,7 +1176,7 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult = new DeleteResult( plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + if (delete.origin().isLocal() == false) { final Translog.Location location; if (deleteResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Delete(delete, deleteResult)); @@ -1413,7 +1414,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } } final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo()); - if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + if (noOp.origin().isLocal() == false) { final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bceb106aeef91..b71e49c1f1b16 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1275,14 +1275,26 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine // package-private for testing int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { - recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); - recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); + final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); + translogRecoveryStats.totalOperations(snapshot.totalOperations()); + translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); + return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + translogRecoveryStats::incrementRecoveredOperations); + } + + private int runTranslogRecoveryAfterResetting(Engine engine, Translog.Snapshot snapshot) throws IOException { + // TODO: record resetting stats + return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESETTING, () -> {}); + } + + private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, + Runnable onPerOperationRecovered) throws IOException { int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); - Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY); + Engine.Result result = applyTranslogOperation(operation, origin); switch (result.getResultType()) { case FAILURE: throw result.getFailure(); @@ -1293,9 +1305,8 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce default: throw new AssertionError("Unknown result type [" + result.getResultType() + "]"); } - opsRecovered++; - recoveryState.getTranslog().incrementRecoveredOperations(); + onPerOperationRecovered.run(); } catch (Exception e) { if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent @@ -1340,12 +1351,6 @@ private void innerOpenEngineAndTranslog() throws IOException { } } recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - - final EngineConfig config = newEngineConfig(); - - // we disable deletes since we allow for operations to be executed against the shard while recovering - // but we need to make sure we don't loose deletes until we are done recovering - config.setEnableGcDeletes(false); // we have to set it before we open an engine and recover from the translog because // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. @@ -1353,17 +1358,15 @@ private void innerOpenEngineAndTranslog() throws IOException { final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); - assertMaxUnsafeAutoIdInCommit(); - - final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); - store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); - - createNewEngine(config); - verifyNotClosed(); - // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, - // we still give sync'd flush a chance to run: - active.set(true); - assertSequenceNumbersInCommit(); + final EngineConfig config = newEngineConfig(); + // we disable deletes since we allow for operations to be executed against the shard while recovering + // but we need to make sure we don't loose deletes until we are done recovering + config.setEnableGcDeletes(false); + synchronized (mutex) { + assert currentEngineReference.get() == null : "engine is initialized already"; + currentEngineReference.set(createNewEngine(config)); + } + assert assertSequenceNumbersInCommit(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } @@ -1463,7 +1466,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn if (origin == Engine.Operation.Origin.PRIMARY) { assert assertPrimaryMode(); } else { - assert origin == Engine.Operation.Origin.REPLICA; + assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESETTING; assert assertReplicationTarget(); } if (writeAllowedStates.contains(state) == false) { @@ -2164,33 +2167,21 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { } } - private Engine createNewEngine(EngineConfig config) { - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); - } - assert this.currentEngineReference.get() == null; - Engine engine = newEngine(config); - onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen - // inside the callback are not visible. This one enforces happens-before - this.currentEngineReference.set(engine); - } - - // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which - // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine: - Engine engine = getEngineOrNull(); - - // engine could perhaps be null if we were e.g. concurrently closed: - if (engine != null) { - engine.onSettingsChanged(); - } + private Engine createNewEngine(EngineConfig config) throws IOException { + assert Thread.holdsLock(mutex); + verifyNotClosed(); + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); + assertMaxUnsafeAutoIdInCommit(); + final Engine engine = engineFactory.newReadWriteEngine(config); + onNewEngine(engine); + engine.onSettingsChanged(); + active.set(true); return engine; } - protected Engine newEngine(EngineConfig config) { - return engineFactory.newReadWriteEngine(config); - } - private static void persistMetadata( final ShardPath shardPath, final IndexSettings indexSettings, @@ -2320,13 +2311,15 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g } else { localCheckpoint = currentGlobalCheckpoint; } - logger.trace( - "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", - opPrimaryTerm, - getLocalCheckpoint(), - localCheckpoint); - getEngine().resetLocalCheckpoint(localCheckpoint); - getEngine().rollTranslogGeneration(); + logger.info("detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", + opPrimaryTerm, getLocalCheckpoint(), localCheckpoint); + final Engine engine = getEngine(); + engine.resetLocalCheckpoint(localCheckpoint); + if (localCheckpoint < engine.getSeqNoStats(localCheckpoint).getMaxSeqNo()) { + resetEngineToGlobalCheckpoint(); + } else { + engine.rollTranslogGeneration(); + } }); } } @@ -2687,4 +2680,22 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { } }; } + + /** + * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. + */ + void resetEngineToGlobalCheckpoint() throws IOException { + assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; + sync(); // persist the global checkpoint to disk + final long globalCheckpoint = getGlobalCheckpoint(); + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("resetting replica engine from max_seq_no [{}] to global checkpoint [{}]", maxSeqNo, globalCheckpoint); + final Engine resettingEngine; + synchronized (mutex) { + IOUtils.close(currentEngineReference.getAndSet(null)); + resettingEngine = createNewEngine(newEngineConfig()); + currentEngineReference.set(resettingEngine); + } + resettingEngine.recoverFromTranslog(this::runTranslogRecoveryAfterResetting, globalCheckpoint); + } } diff --git a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java index ac2f2b0d4f32e..c0b01eb5ec518 100644 --- a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java +++ b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java @@ -111,6 +111,7 @@ protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); assertSeqNos(); + assertSameDocIdsOnShards(); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e471874f6d664..5a786120b405a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.shard.IndexShardTests; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.TestThreadPool; @@ -69,6 +70,7 @@ import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; @@ -520,17 +522,11 @@ public void testSeqNoCollision() throws Exception { logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); - final List expectedOps = new ArrayList<>(initOperations); - expectedOps.add(op2); - assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); + List operations = TestTranslog.drainAll(snapshot); + assertThat(operations, contains(op2)); assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } - // TODO: We should assert the content of shards in the ReplicationGroup. - // Without rollback replicas(current implementation), we don't have the same content across shards: - // - replica1 has {doc1} - // - replica2 has {doc1, doc2} - // - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery + shards.assertAllEqual(initDocs + 1); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 28122665e9bb6..a73d7385d9d4d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -55,10 +55,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -306,14 +304,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); } - - // roll back the extra ops in the replica - shards.removeReplica(replica); - replica.close("resync", false); - replica.store().close(); - newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); - shards.recoverReplica(newReplica); - shards.assertAllEqual(totalDocs); // Make sure that flushing on a recovering shard is ok. shards.flush(); shards.assertAllEqual(totalDocs); @@ -406,31 +396,14 @@ public void testResyncAfterPrimaryPromotion() throws Exception { indexOnReplica(bulkShardRequest, shards, justReplica); } - logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); - - logger.info("--> resyncing replicas"); + logger.info("--> resyncing replicas seqno_stats primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get(); if (syncedGlobalCheckPoint) { assertEquals(extraDocs, task.getResyncedOperations()); } else { assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); } - List replicas = shards.getReplicas(); - - // check all docs on primary are available on replica - Set primaryIds = getShardDocUIDs(newPrimary); - assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs)); - for (IndexShard replica : replicas) { - Set replicaIds = getShardDocUIDs(replica); - Set temp = new HashSet<>(primaryIds); - temp.removeAll(replicaIds); - assertThat(replica.routingEntry() + " is missing docs", temp, empty()); - temp = new HashSet<>(replicaIds); - temp.removeAll(primaryIds); - // yeah, replica has more docs as there is no Lucene roll back on it - assertThat(replica.routingEntry() + " has to have extra docs", temp, - extraDocsToBeTrimmed > 0 ? not(empty()) : empty()); - } + shards.assertAllEqual(initialDocs + extraDocs); // check translog on replica is trimmed int translogOperations = 0; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 713bc04634b0a..cc984efdf8c33 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -106,6 +106,7 @@ import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -944,27 +945,21 @@ public void onFailure(Exception e) { resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); - - closeShards(indexShard); + closeShard(indexShard, false); } - public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { + public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); - final long globalCheckpointOnReplica = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - - final int globalCheckpoint = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + Set docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream() + .filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(1); indexShard.acquireReplicaOperationPermit( indexShard.pendingPrimaryTerm + 1, @@ -990,12 +985,11 @@ public void onFailure(final Exception e) { } else { assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); } - + assertThat(getShardDocUIDs(indexShard), equalTo(docsBelowGlobalCheckpoint)); // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); - - closeShards(indexShard); + closeShard(indexShard, false); } public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { @@ -1854,13 +1848,16 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - // Simulate resync (without rollback): Noop #1, index #2 - acquireReplicaOperationPermitBlockingly(shard, shard.pendingPrimaryTerm + 1); + // Here we try to simulate the primary fail-over without rollback which is no longer the case. + shard.pendingPrimaryTerm++; + shard.operationPrimaryTerm++; + shard.getEngine().rollTranslogGeneration(); shard.markSeqNoAsNoop(1, "test"); shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2")); + closeShard(shard, false); // Recovering from store should discard doc #1 final ShardRouting replicaRouting = shard.routingEntry(); IndexShard newShard = reinitShard(shard, @@ -2839,6 +2836,9 @@ private Result indexOnReplicaWithGaps( } else { gap = true; } + if (rarely()) { + indexShard.flush(new FlushRequest()); + } } assert localCheckpoint == indexShard.getLocalCheckpoint(); assert !gap || (localCheckpoint != max); @@ -3376,4 +3376,19 @@ public void testSupplyTombstoneDoc() throws Exception { closeShards(shard); } + + public void testResetEngine() throws Exception { + IndexShard shard = newStartedShard(false); + indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); + final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()); + shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() + .filter(id -> Long.parseLong(id) <= globalCheckpoint).collect(Collectors.toSet()); + TranslogStats translogStats = shard.translogStats(); + shard.resetEngineToGlobalCheckpoint(); + assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); + assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); + assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); + closeShard(shard, false); + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index 21f7dd9481c5e..4d925096595fa 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -148,15 +148,6 @@ public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher notFoundOps; private List notExpectedOps; - static List drainAll(Translog.Snapshot snapshot) throws IOException { - final List actualOps = new ArrayList<>(); - Translog.Operation op; - while ((op = snapshot.next()) != null) { - actualOps.add(op); - } - return actualOps; - } - public ContainingInAnyOrderMatcher(Collection expectedOps) { this.expectedOps = expectedOps; } @@ -164,7 +155,7 @@ public ContainingInAnyOrderMatcher(Collection expectedOps) { @Override protected boolean matchesSafely(Translog.Snapshot snapshot) { try { - List actualOps = drainAll(snapshot); + List actualOps = TestTranslog.drainAll(snapshot); notFoundOps = expectedOps.stream() .filter(o -> actualOps.contains(o) == false) .collect(Collectors.toList()); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index f37ec5a8e55d5..fd61546a41fad 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; @@ -121,4 +122,14 @@ public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExce public static long getCurrentTerm(Translog translog) { return translog.getCurrent().getPrimaryTerm(); } + + /** Drains all operations from the given translog snapshot */ + public static List drainAll(Translog.Snapshot snapshot) throws IOException { + final List actualOps = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + actualOps.add(op); + } + return actualOps; + } } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index cb93d803bb7c6..8d0f1845be60d 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -103,6 +103,7 @@ protected Collection> nodePlugins() { protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); assertSeqNos(); + assertSameDocIdsOnShards(); } public void testSimpleRelocationNoIndexing() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 0e22d0a7eda2a..907c52e18101d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -33,7 +33,10 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -246,10 +249,12 @@ public void tearDown() throws Exception { if (engine != null && engine.isClosed.get() == false) { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); + assertUniqueSeqNoInLucene(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); + assertUniqueSeqNoInLucene(replicaEngine); } IOUtils.close( replicaEngine, storeReplica, @@ -867,6 +872,33 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e } } + /** + * Asserts that there is at most one (root) document in Lucene for every sequence number. + */ + public static void assertUniqueSeqNoInLucene(Engine engine) throws IOException { + final Set foundSeqNos = new HashSet<>(); + try (Engine.Searcher searcher = engine.acquireSearcher("assert-seqno", Engine.SearcherScope.INTERNAL)) { + List leaves = searcher.reader().leaves(); + for (LeafReaderContext leaf : leaves) { + DocIdSetIterator rootDocIterator = DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( + SeqNoFieldMapper.PRIMARY_TERM_NAME, leaf.reader()); + Bits liveDocs = leaf.reader().getLiveDocs(); + NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + while (rootDocIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (liveDocs == null || liveDocs.get(rootDocIterator.docID())) { + if (seqNoDocValues.advanceExact(rootDocIterator.docID()) == false) { + throw new AssertionError("seq_no docValues not found"); + } + long seqNo = seqNoDocValues.longValue(); + if (foundSeqNos.add(seqNo) == false) { + throw new AssertionError("seq_no [" + seqNo + "] appear twice"); + } + } + } + } + } + } + protected MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 32db9bf0a2a04..419b65469cac6 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -458,15 +458,22 @@ protected void closeShards(IndexShard... shards) throws IOException { closeShards(Arrays.asList(shards)); } + protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTranslogAndLucene) throws IOException { + try { + if (assertConsistencyBetweenTranslogAndLucene) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + assertUniqueSeqNoInLucene(shard); + shard.close("test", false); + } finally { + IOUtils.close(shard.store()); + } + } + protected void closeShards(Iterable shards) throws IOException { for (IndexShard shard : shards) { if (shard != null) { - try { - assertConsistentHistoryBetweenTranslogAndLucene(shard); - shard.close("test", false); - } finally { - IOUtils.close(shard.store()); - } + closeShard(shard, true); } } } @@ -642,7 +649,7 @@ private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) th return result; } - protected Set getShardDocUIDs(final IndexShard shard) throws IOException { + public static Set getShardDocUIDs(final IndexShard shard) throws IOException { return EngineTestCase.getDocIds(shard.getEngine(), true); } @@ -663,6 +670,13 @@ public static void assertConsistentHistoryBetweenTranslogAndLucene(IndexShard sh } } + public static void assertUniqueSeqNoInLucene(IndexShard shard) throws IOException { + final Engine engine = shard.getEngineOrNull(); + if (engine != null) { + EngineTestCase.assertUniqueSeqNoInLucene(engine); + } + } + protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 68a862c109d98..6371d7c25ab70 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -132,6 +132,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -2380,6 +2381,49 @@ protected void assertSeqNos() throws Exception { }); } + /** + * Asserts that all shards with the same shardId should have document Ids. + */ + public void assertSameDocIdsOnShards() throws Exception { + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + for (ObjectObjectCursor indexRoutingTable : state.routingTable().indicesRouting()) { + for (IntObjectCursor indexShardRoutingTable : indexRoutingTable.value.shards()) { + ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard(); + if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId()); + IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName()) + .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id()); + final Set docsOnPrimary; + try { + docsOnPrimary = IndexShardTestCase.getShardDocUIDs(primaryShard); + } catch (AlreadyClosedException ex) { + continue; + } + for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) { + if (replicaShardRouting.assignedToNode() == false) { + continue; + } + DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId()); + IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName()) + .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); + final Set docsOnReplica; + try { + docsOnReplica = IndexShardTestCase.getShardDocUIDs(replicaShard); + } catch (AlreadyClosedException ex) { + continue; + } + assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size() + + "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]", + docsOnReplica, equalTo(docsOnPrimary)); + } + } + } + }); + } + public static boolean inFipsJvm() { return Security.getProviders()[0].getName().toLowerCase(Locale.ROOT).contains("fips"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 3c46acd0fbe73..c13ff858d4416 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1141,6 +1141,7 @@ public void beforeIndexDeletion() throws Exception { //check that shards that have same sync id also contain same number of documents assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); + assertUniqueSeqNoInLucene(); } private void assertSameSyncIdSameDocs() { @@ -1229,6 +1230,23 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce } } + /** Asserts that there is at most one (root) document in Lucene for every sequence number */ + private void assertUniqueSeqNoInLucene() throws IOException { + final Collection nodesAndClients = nodes.values(); + for (NodeAndClient nodeAndClient : nodesAndClients) { + IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + try { + IndexShardTestCase.assertUniqueSeqNoInLucene(indexShard); + } catch (AlreadyClosedException ignored) { + // shard is closed + } + } + } + } + } + private void randomlyResetClients() throws IOException { // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { From 18851c141d74ad9384a868d88aefab5e4175e233 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Sep 2018 14:49:44 -0400 Subject: [PATCH 02/10] fix test --- .../index/replication/IndexLevelReplicationTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 5a786120b405a..667ca50137c2a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -70,12 +70,12 @@ import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.Is.is; @@ -523,7 +523,7 @@ public void testSeqNoCollision() throws Exception { recoverReplica(replica3, replica2, true); try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { List operations = TestTranslog.drainAll(snapshot); - assertThat(operations, contains(op2)); + assertThat(op2, isIn(operations)); assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } shards.assertAllEqual(initDocs + 1); From 5ab9d12390ff25ecabba78b46a57f2be89d211cc Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Sep 2018 16:30:54 -0400 Subject: [PATCH 03/10] assertUniqueSeqNoInLucene does not work with shrink --- .../index/engine/EngineTestCase.java | 32 ------------------- .../index/shard/IndexShardTestCase.java | 8 ----- .../test/InternalTestCluster.java | 18 ----------- 3 files changed, 58 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 907c52e18101d..0e22d0a7eda2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -33,10 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -249,12 +246,10 @@ public void tearDown() throws Exception { if (engine != null && engine.isClosed.get() == false) { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); - assertUniqueSeqNoInLucene(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); - assertUniqueSeqNoInLucene(replicaEngine); } IOUtils.close( replicaEngine, storeReplica, @@ -872,33 +867,6 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e } } - /** - * Asserts that there is at most one (root) document in Lucene for every sequence number. - */ - public static void assertUniqueSeqNoInLucene(Engine engine) throws IOException { - final Set foundSeqNos = new HashSet<>(); - try (Engine.Searcher searcher = engine.acquireSearcher("assert-seqno", Engine.SearcherScope.INTERNAL)) { - List leaves = searcher.reader().leaves(); - for (LeafReaderContext leaf : leaves) { - DocIdSetIterator rootDocIterator = DocValuesFieldExistsQuery.getDocValuesDocIdSetIterator( - SeqNoFieldMapper.PRIMARY_TERM_NAME, leaf.reader()); - Bits liveDocs = leaf.reader().getLiveDocs(); - NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - while (rootDocIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (liveDocs == null || liveDocs.get(rootDocIterator.docID())) { - if (seqNoDocValues.advanceExact(rootDocIterator.docID()) == false) { - throw new AssertionError("seq_no docValues not found"); - } - long seqNo = seqNoDocValues.longValue(); - if (foundSeqNos.add(seqNo) == false) { - throw new AssertionError("seq_no [" + seqNo + "] appear twice"); - } - } - } - } - } - } - protected MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 419b65469cac6..25643b800ee9f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -463,7 +463,6 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran if (assertConsistencyBetweenTranslogAndLucene) { assertConsistentHistoryBetweenTranslogAndLucene(shard); } - assertUniqueSeqNoInLucene(shard); shard.close("test", false); } finally { IOUtils.close(shard.store()); @@ -670,13 +669,6 @@ public static void assertConsistentHistoryBetweenTranslogAndLucene(IndexShard sh } } - public static void assertUniqueSeqNoInLucene(IndexShard shard) throws IOException { - final Engine engine = shard.getEngineOrNull(); - if (engine != null) { - EngineTestCase.assertUniqueSeqNoInLucene(engine); - } - } - protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c13ff858d4416..3c46acd0fbe73 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1141,7 +1141,6 @@ public void beforeIndexDeletion() throws Exception { //check that shards that have same sync id also contain same number of documents assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); - assertUniqueSeqNoInLucene(); } private void assertSameSyncIdSameDocs() { @@ -1230,23 +1229,6 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce } } - /** Asserts that there is at most one (root) document in Lucene for every sequence number */ - private void assertUniqueSeqNoInLucene() throws IOException { - final Collection nodesAndClients = nodes.values(); - for (NodeAndClient nodeAndClient : nodesAndClients) { - IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); - for (IndexService indexService : indexServices) { - for (IndexShard indexShard : indexService) { - try { - IndexShardTestCase.assertUniqueSeqNoInLucene(indexShard); - } catch (AlreadyClosedException ignored) { - // shard is closed - } - } - } - } - } - private void randomlyResetClients() throws IOException { // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { From 571fc99d29268d6560b45c52a8db0b7c4fd42212 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Sep 2018 23:22:34 -0400 Subject: [PATCH 04/10] remove reset local checkpoint --- .../main/java/org/elasticsearch/index/engine/Engine.java | 6 ------ .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++-- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index c3e795e9982f1..43235e6231581 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -653,12 +653,6 @@ public CommitStats commitStats() { */ public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException; - /** - * Reset the local checkpoint in the tracker to the given local checkpoint - * @param localCheckpoint the new checkpoint to be set - */ - public abstract void resetLocalCheckpoint(long localCheckpoint); - /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2ac3af44aeacb..07dd9350d1259 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2374,8 +2374,8 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { localCheckpointTracker.waitForOpsToComplete(seqNo); } - @Override - public void resetLocalCheckpoint(long localCheckpoint) { + // TODO: remove this method after we restore local history on primary promotion + void resetLocalCheckpoint(long localCheckpoint) { localCheckpointTracker.resetCheckpoint(localCheckpoint); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b71e49c1f1b16..b79fd29d67dcf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2314,7 +2314,6 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g logger.info("detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", opPrimaryTerm, getLocalCheckpoint(), localCheckpoint); final Engine engine = getEngine(); - engine.resetLocalCheckpoint(localCheckpoint); if (localCheckpoint < engine.getSeqNoStats(localCheckpoint).getMaxSeqNo()) { resetEngineToGlobalCheckpoint(); } else { From a7902f34284b950426ca097d5c2903c4a8ea73a4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 6 Sep 2018 23:45:08 -0400 Subject: [PATCH 05/10] add comment to remove `resetCheckpoint` --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 5 ----- .../elasticsearch/index/seqno/LocalCheckpointTracker.java | 1 + .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 07dd9350d1259..82ff478907437 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2374,11 +2374,6 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { localCheckpointTracker.waitForOpsToComplete(seqNo); } - // TODO: remove this method after we restore local history on primary promotion - void resetLocalCheckpoint(long localCheckpoint) { - localCheckpointTracker.resetCheckpoint(localCheckpoint); - } - @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return localCheckpointTracker.getStats(globalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index cd33c1bf046ed..90a8ca93cf3fb 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -109,6 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { * @param checkpoint the local checkpoint to reset this tracker to */ public synchronized void resetCheckpoint(final long checkpoint) { + // TODO: remove this method as we no longer need it. assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6d9cdd0f225d7..487073457dce4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4077,7 +4077,7 @@ public void markSeqNoAsCompleted(long seqNo) { final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint(); final long resetLocalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint)); - actualEngine.resetLocalCheckpoint(resetLocalCheckpoint); + actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint); completedSeqNos.clear(); actualEngine.restoreLocalCheckpointFromTranslog(); final Set intersection = new HashSet<>(expectedCompletedSeqNos); From b5ec9577583245a6ed8768a9b037c80c6df7e288 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 10 Sep 2018 09:46:56 -0400 Subject: [PATCH 06/10] simon feedback --- .../elasticsearch/index/engine/Engine.java | 6 ++--- .../index/engine/InternalEngine.java | 8 +++--- .../elasticsearch/index/shard/IndexShard.java | 27 +++++++------------ 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 4d52a325bf6fb..15f7d6d465122 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1158,14 +1158,14 @@ public enum Origin { REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY, - LOCAL_RESETTING; + LOCAL_RESET; public boolean isRecovery() { return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY; } - boolean isLocal() { - return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESETTING; + boolean isRemote() { + return this == PRIMARY || this == REPLICA || this == PEER_RECOVERY; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 19fecc13f566b..265651b28cdf4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -735,7 +735,7 @@ private boolean canOptimizeAddDocument(Index index) { : "version: " + index.version() + " type: " + index.versionType(); return true; case LOCAL_TRANSLOG_RECOVERY: - case LOCAL_RESETTING: + case LOCAL_RESET: assert index.isRetry(); return true; // allow to optimize in order to update the max safe time stamp default: @@ -834,7 +834,7 @@ public IndexResult index(Index index) throws IOException { indexResult = new IndexResult( plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (index.origin().isLocal() == false) { + if (index.origin().isRemote()) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Index(index, indexResult)); @@ -1174,7 +1174,7 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult = new DeleteResult( plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (delete.origin().isLocal() == false) { + if (delete.origin().isRemote()) { final Translog.Location location; if (deleteResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Delete(delete, deleteResult)); @@ -1412,7 +1412,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } } final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo()); - if (noOp.origin().isLocal() == false) { + if (noOp.origin().isRemote()) { final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b79fd29d67dcf..5acff2936345b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1282,11 +1282,6 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce translogRecoveryStats::incrementRecoveredOperations); } - private int runTranslogRecoveryAfterResetting(Engine engine, Translog.Snapshot snapshot) throws IOException { - // TODO: record resetting stats - return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESETTING, () -> {}); - } - private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, Runnable onPerOperationRecovered) throws IOException { int opsRecovered = 0; @@ -1466,7 +1461,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn if (origin == Engine.Operation.Origin.PRIMARY) { assert assertPrimaryMode(); } else { - assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESETTING; + assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET; assert assertReplicationTarget(); } if (writeAllowedStates.contains(state) == false) { @@ -2305,19 +2300,13 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g bumpPrimaryTerm(opPrimaryTerm, () -> { updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); final long currentGlobalCheckpoint = getGlobalCheckpoint(); - final long localCheckpoint; - if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) { - localCheckpoint = NO_OPS_PERFORMED; - } else { - localCheckpoint = currentGlobalCheckpoint; - } - logger.info("detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", - opPrimaryTerm, getLocalCheckpoint(), localCheckpoint); - final Engine engine = getEngine(); - if (localCheckpoint < engine.getSeqNoStats(localCheckpoint).getMaxSeqNo()) { + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", + opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); + if (currentGlobalCheckpoint < maxSeqNo) { resetEngineToGlobalCheckpoint(); } else { - engine.rollTranslogGeneration(); + getEngine().rollTranslogGeneration(); } }); } @@ -2695,6 +2684,8 @@ void resetEngineToGlobalCheckpoint() throws IOException { resettingEngine = createNewEngine(newEngineConfig()); currentEngineReference.set(resettingEngine); } - resettingEngine.recoverFromTranslog(this::runTranslogRecoveryAfterResetting, globalCheckpoint); + final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> + runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}); + resettingEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } } From 7800e65d41daef27f42dd873035c834144186736 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 10 Sep 2018 11:07:15 -0400 Subject: [PATCH 07/10] single translog runner method --- .../elasticsearch/index/shard/IndexShard.java | 25 ++++++++----------- .../index/shard/IndexShardTests.java | 7 +++--- .../index/shard/IndexShardTestCase.java | 3 +-- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5acff2936345b..c38138ab15fac 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -163,7 +163,6 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.index.mapper.SourceToParse.source; -import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -1273,17 +1272,8 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine return result; } - // package-private for testing - int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { - final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); - translogRecoveryStats.totalOperations(snapshot.totalOperations()); - translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); - return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, - translogRecoveryStats::incrementRecoveredOperations); - } - - private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, - Runnable onPerOperationRecovered) throws IOException { + int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, + Runnable onOperationRecovered) throws IOException { int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) { @@ -1301,7 +1291,7 @@ private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engin throw new AssertionError("Unknown result type [" + result.getResultType() + "]"); } opsRecovered++; - onPerOperationRecovered.run(); + onOperationRecovered.run(); } catch (Exception e) { if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent @@ -1319,8 +1309,15 @@ private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engin * Operations from the translog will be replayed to bring lucene up to date. **/ public void openEngineAndRecoverFromTranslog() throws IOException { + final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); + final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + translogRecoveryStats.totalOperations(snapshot.totalOperations()); + translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); + return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + translogRecoveryStats::incrementRecoveredOperations); + }; innerOpenEngineAndTranslog(); - getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE); + getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } /** diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bc342fe1fe038..f22660f46d941 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2245,10 +2245,11 @@ public Translog.Operation next() throws IOException { null)); primary.recoverFromStore(); + primary.recoveryState().getTranslog().totalOperations(snapshot.totalOperations()); + primary.recoveryState().getTranslog().totalOperationsOnStart(snapshot.totalOperations()); primary.state = IndexShardState.RECOVERING; // translog recovery on the next line would otherwise fail as we are in POST_RECOVERY - primary.runTranslogRecovery(primary.getEngine(), snapshot); - assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries)); - assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries)); + primary.runTranslogRecovery(primary.getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + primary.recoveryState().getTranslog()::incrementRecoveredOperations); assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries)); closeShards(primary); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index be1ff03cf89e8..cb16d50262730 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -456,9 +456,8 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran if (assertConsistencyBetweenTranslogAndLucene) { assertConsistentHistoryBetweenTranslogAndLucene(shard); } - shard.close("test", false); } finally { - IOUtils.close(shard.store()); + IOUtils.close(() -> shard.close("test", false), shard.store()); } } From 06f3c02fc99583047ebe38b653dfe0f3695d6cf2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 10 Sep 2018 14:52:36 -0400 Subject: [PATCH 08/10] boaz feedback --- .../elasticsearch/index/engine/Engine.java | 4 +- .../index/engine/InternalEngine.java | 6 +- .../index/seqno/LocalCheckpointTracker.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 87 ++++++++++++------- .../index/shard/IndexShardTests.java | 14 ++- .../index/engine/DocIdSeqNoAndTerm.java | 66 ++++++++++++++ .../index/engine/EngineTestCase.java | 29 +++++-- .../index/shard/IndexShardTestCase.java | 7 ++ .../elasticsearch/test/ESIntegTestCase.java | 9 +- 9 files changed, 175 insertions(+), 49 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 15f7d6d465122..647ac697371b8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1164,8 +1164,8 @@ public boolean isRecovery() { return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY; } - boolean isRemote() { - return this == PRIMARY || this == REPLICA || this == PEER_RECOVERY; + boolean isFromTranslog() { + return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESET; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 265651b28cdf4..7e13d5477cf1e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -834,7 +834,7 @@ public IndexResult index(Index index) throws IOException { indexResult = new IndexResult( plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (index.origin().isRemote()) { + if (index.origin().isFromTranslog() == false) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Index(index, indexResult)); @@ -1174,7 +1174,7 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult = new DeleteResult( plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (delete.origin().isRemote()) { + if (delete.origin().isFromTranslog() == false) { final Translog.Location location; if (deleteResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Delete(delete, deleteResult)); @@ -1412,7 +1412,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } } final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo()); - if (noOp.origin().isRemote()) { + if (noOp.origin().isFromTranslog() == false) { final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 90a8ca93cf3fb..9fad96940b87b 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -109,7 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { * @param checkpoint the local checkpoint to reset this tracker to */ public synchronized void resetCheckpoint(final long checkpoint) { - // TODO: remove this method as we no longer need it. + // TODO: remove this method as after we restore the local history on promotion. assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c38138ab15fac..cdc36e670aca1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1272,6 +1272,10 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine return result; } + /** + * Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}. + * The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully. + */ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin, Runnable onOperationRecovered) throws IOException { int opsRecovered = 0; @@ -1290,6 +1294,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat default: throw new AssertionError("Unknown result type [" + result.getResultType() + "]"); } + opsRecovered++; onOperationRecovered.run(); } catch (Exception e) { @@ -1343,25 +1348,37 @@ private void innerOpenEngineAndTranslog() throws IOException { } } recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + + final EngineConfig config = newEngineConfig(); + + // we disable deletes since we allow for operations to be executed against the shard while recovering + // but we need to make sure we don't loose deletes until we are done recovering + config.setEnableGcDeletes(false); // we have to set it before we open an engine and recover from the translog because // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); + trimUnsafeCommits(); - final EngineConfig config = newEngineConfig(); - // we disable deletes since we allow for operations to be executed against the shard while recovering - // but we need to make sure we don't loose deletes until we are done recovering - config.setEnableGcDeletes(false); - synchronized (mutex) { - assert currentEngineReference.get() == null : "engine is initialized already"; - currentEngineReference.set(createNewEngine(config)); - } - assert assertSequenceNumbersInCommit(); + createNewEngine(config); + verifyNotClosed(); + // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, + // we still give sync'd flush a chance to run: + active.set(true); + assertSequenceNumbersInCommit(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } + private void trimUnsafeCommits() throws IOException { + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); + assertMaxUnsafeAutoIdInCommit(); + store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated()); + } + private boolean assertSequenceNumbersInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; @@ -2159,21 +2176,31 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { } } - private Engine createNewEngine(EngineConfig config) throws IOException { - assert Thread.holdsLock(mutex); - verifyNotClosed(); - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); - final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID); - store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated()); - assertMaxUnsafeAutoIdInCommit(); - final Engine engine = engineFactory.newReadWriteEngine(config); - onNewEngine(engine); - engine.onSettingsChanged(); - active.set(true); + private Engine createNewEngine(EngineConfig config) { + synchronized (mutex) { + verifyNotClosed(); + assert this.currentEngineReference.get() == null; + Engine engine = newEngine(config); + onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen + // inside the callback are not visible. This one enforces happens-before + this.currentEngineReference.set(engine); + } + + // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which + // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine: + Engine engine = getEngineOrNull(); + + // engine could perhaps be null if we were e.g. concurrently closed: + if (engine != null) { + engine.onSettingsChanged(); + } return engine; } + protected Engine newEngine(EngineConfig config) { + return engineFactory.newReadWriteEngine(config); + } + private static void persistMetadata( final ShardPath shardPath, final IndexSettings indexSettings, @@ -2673,16 +2700,18 @@ void resetEngineToGlobalCheckpoint() throws IOException { assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; sync(); // persist the global checkpoint to disk final long globalCheckpoint = getGlobalCheckpoint(); - final long maxSeqNo = seqNoStats().getMaxSeqNo(); - logger.info("resetting replica engine from max_seq_no [{}] to global checkpoint [{}]", maxSeqNo, globalCheckpoint); - final Engine resettingEngine; + final Engine newEngine; synchronized (mutex) { + verifyNotClosed(); IOUtils.close(currentEngineReference.getAndSet(null)); - resettingEngine = createNewEngine(newEngineConfig()); - currentEngineReference.set(resettingEngine); + trimUnsafeCommits(); + newEngine = createNewEngine(newEngineConfig()); + active.set(true); } - final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> - runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}); - resettingEngine.recoverFromTranslog(translogRunner, globalCheckpoint); + final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( + engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { + // TODO: add a dedicate recovery stats for the reset translog + }); + newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f22660f46d941..4f5d16380febf 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -182,6 +182,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; /** * Simple unit-test IndexShard related operations. @@ -962,6 +963,8 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt Set docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream() .filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(1); + final boolean shouldRollback = Math.max(globalCheckpoint, globalCheckpointOnReplica) < indexShard.seqNoStats().getMaxSeqNo(); + final Engine beforeRollbackEngine = indexShard.getEngine(); indexShard.acquireReplicaOperationPermit( indexShard.pendingPrimaryTerm + 1, globalCheckpoint, @@ -980,13 +983,17 @@ public void onFailure(final Exception e) { ThreadPool.Names.SAME, ""); latch.await(); - if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO - && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); } else { assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); } assertThat(getShardDocUIDs(indexShard), equalTo(docsBelowGlobalCheckpoint)); + if (shouldRollback) { + assertThat(indexShard.getEngine(), not(sameInstance(beforeRollbackEngine))); + } else { + assertThat(indexShard.getEngine(), sameInstance(beforeRollbackEngine)); + } // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); @@ -1873,7 +1880,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - // Here we try to simulate the primary fail-over without rollback which is no longer the case. + // Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations + // in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations. shard.pendingPrimaryTerm++; shard.operationPrimaryTerm++; shard.getEngine().rollTranslogGeneration(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java b/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java new file mode 100644 index 0000000000000..b24a010c1a0d6 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/DocIdSeqNoAndTerm.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + + +import java.util.Objects; + +/** A tuple of document id, sequence number and primary term of a document */ +public final class DocIdSeqNoAndTerm { + private final String id; + private final long seqNo; + private final long primaryTerm; + + public DocIdSeqNoAndTerm(String id, long seqNo, long primaryTerm) { + this.id = id; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public String getId() { + return id; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DocIdSeqNoAndTerm that = (DocIdSeqNoAndTerm) o; + return Objects.equals(id, that.id) && seqNo == that.seqNo && primaryTerm == that.primaryTerm; + } + + @Override + public int hashCode() { + return Objects.hash(id, seqNo, primaryTerm); + } + + @Override + public String toString() { + return "DocIdSeqNoAndTerm{" + "id='" + id + " seqNo=" + seqNo + " primaryTerm=" + primaryTerm + "}"; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 283a7b137533d..f9377afe6ed3e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; @@ -95,11 +96,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -775,26 +775,41 @@ protected void concurrentlyApplyOps(List ops, InternalEngine e } /** - * Gets all docId from the given engine. + * Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine. */ - public static Set getDocIds(Engine engine, boolean refresh) throws IOException { + public static List getDocIds(Engine engine, boolean refresh) throws IOException { if (refresh) { engine.refresh("test_get_doc_ids"); } try (Engine.Searcher searcher = engine.acquireSearcher("test_get_doc_ids")) { - Set ids = new HashSet<>(); + List docs = new ArrayList<>(); for (LeafReaderContext leafContext : searcher.reader().leaves()) { LeafReader reader = leafContext.reader(); + NumericDocValues seqNoDocValues = reader.getNumericDocValues(SeqNoFieldMapper.NAME); + NumericDocValues primaryTermDocValues = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); Bits liveDocs = reader.getLiveDocs(); for (int i = 0; i < reader.maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); - ids.add(Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length))); + String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)); + final long primaryTerm; + if (primaryTermDocValues.advanceExact(i)) { + primaryTerm = primaryTermDocValues.longValue(); + } else { + primaryTerm = 0; // non-root documents of a nested document. + } + if (seqNoDocValues.advanceExact(i) == false) { + throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]"); + } + final long seqNo = seqNoDocValues.longValue(); + docs.add(new DocIdSeqNoAndTerm(id, seqNo, primaryTerm)); } } } - return ids; + docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId) + .thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)); + return docs; } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index cb16d50262730..c012d5b52fc15 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.query.DisabledQueryCache; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; @@ -82,12 +83,14 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; @@ -641,6 +644,10 @@ private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) th } public static Set getShardDocUIDs(final IndexShard shard) throws IOException { + return getDocIdAndSeqNos(shard).stream().map(DocIdSeqNoAndTerm::getId).collect(Collectors.toSet()); + } + + public static List getDocIdAndSeqNos(final IndexShard shard) throws IOException { return EngineTestCase.getDocIds(shard.getEngine(), true); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 6371d7c25ab70..52ed2205ab5f5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -125,6 +125,7 @@ import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; @@ -2396,9 +2397,9 @@ public void assertSameDocIdsOnShards() throws Exception { DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId()); IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName()) .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id()); - final Set docsOnPrimary; + final List docsOnPrimary; try { - docsOnPrimary = IndexShardTestCase.getShardDocUIDs(primaryShard); + docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard); } catch (AlreadyClosedException ex) { continue; } @@ -2409,9 +2410,9 @@ public void assertSameDocIdsOnShards() throws Exception { DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId()); IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName()) .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); - final Set docsOnReplica; + final List docsOnReplica; try { - docsOnReplica = IndexShardTestCase.getShardDocUIDs(replicaShard); + docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard); } catch (AlreadyClosedException ex) { continue; } From 3ca021ed8ef1b54765a5bb32e347b68c8b42a545 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 10 Sep 2018 19:56:25 -0400 Subject: [PATCH 09/10] fix tests --- .../index/replication/IndexLevelReplicationTests.java | 10 +++++----- .../index/translog/SnapshotMatchers.java | 11 ++++++++++- .../elasticsearch/index/translog/TestTranslog.java | 11 ----------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 667ca50137c2a..f2cdfbf8fc566 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -49,7 +49,6 @@ import org.elasticsearch.index.shard.IndexShardTests; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; -import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.TestThreadPool; @@ -75,7 +74,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.Is.is; @@ -521,9 +519,11 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2).get(); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); - try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { - List operations = TestTranslog.drainAll(snapshot); - assertThat(op2, isIn(operations)); + try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + final List expectedOps = new ArrayList<>(initOperations); + expectedOps.add(op2); + assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } shards.assertAllEqual(initDocs + 1); diff --git a/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index 4d925096595fa..21f7dd9481c5e 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/server/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -148,6 +148,15 @@ public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher notFoundOps; private List notExpectedOps; + static List drainAll(Translog.Snapshot snapshot) throws IOException { + final List actualOps = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + actualOps.add(op); + } + return actualOps; + } + public ContainingInAnyOrderMatcher(Collection expectedOps) { this.expectedOps = expectedOps; } @@ -155,7 +164,7 @@ public ContainingInAnyOrderMatcher(Collection expectedOps) { @Override protected boolean matchesSafely(Translog.Snapshot snapshot) { try { - List actualOps = TestTranslog.drainAll(snapshot); + List actualOps = drainAll(snapshot); notFoundOps = expectedOps.stream() .filter(o -> actualOps.contains(o) == false) .collect(Collectors.toList()); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index fd61546a41fad..f37ec5a8e55d5 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -34,7 +34,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; @@ -122,14 +121,4 @@ public static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExce public static long getCurrentTerm(Translog translog) { return translog.getCurrent().getPrimaryTerm(); } - - /** Drains all operations from the given translog snapshot */ - public static List drainAll(Translog.Snapshot snapshot) throws IOException { - final List actualOps = new ArrayList<>(); - Translog.Operation op; - while ((op = snapshot.next()) != null) { - actualOps.add(op); - } - return actualOps; - } } From e4ad8bf66033cd9f15ae53b72801511949fced15 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 11 Sep 2018 17:22:48 -0400 Subject: [PATCH 10/10] assert engine is not running --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cdc36e670aca1..4bb56c8b0d3b5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1372,6 +1372,7 @@ private void innerOpenEngineAndTranslog() throws IOException { } private void trimUnsafeCommits() throws IOException { + assert currentEngineReference.get() == null : "engine is running"; final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);