From 002f763c482d43867a3ffb381a223864c0acc98e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 13:21:11 -0400 Subject: [PATCH] Restore local history from translog on promotion (#33616) If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867 --- .../elasticsearch/upgrades/RecoveryIT.java | 2 - .../elasticsearch/index/engine/Engine.java | 8 +- .../index/engine/InternalEngine.java | 11 +-- .../index/engine/ReadOnlyEngine.java | 3 +- .../elasticsearch/index/shard/IndexShard.java | 23 ++--- .../index/engine/InternalEngineTests.java | 87 +++++++++---------- .../index/shard/IndexShardTests.java | 15 ++-- 7 files changed, 68 insertions(+), 81 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 7bd5cc3a8d2b3..062016909b651 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -111,7 +111,6 @@ protected void doRun() throws Exception { return future; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616") public void testRecoveryWithConcurrentIndexing() throws Exception { final String index = "recovery_with_concurrent_indexing"; Response response = client().performRequest(new Request("GET", "_nodes")); @@ -184,7 +183,6 @@ private String getNodeId(Predicate versionPredicate) throws IOException } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616") public void testRelocationWithConcurrentIndexing() throws Exception { final String index = "relocation_with_concurrent_indexing"; switch (CLUSTER_TYPE) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 09171d4df84b3..f50c9c0a51a9f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1720,12 +1720,12 @@ public interface Warmer { public abstract void deactivateThrottling(); /** - * Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary - * promotion. + * This method replays translog to restore the Lucene index which might be reverted previously. + * This ensures that all acknowledged writes are restored correctly when this engine is promoted. * - * @throws IOException if an I/O exception occurred reading the translog + * @return the number of translog operations have been recovered */ - public abstract void restoreLocalCheckpointFromTranslog() throws IOException; + public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException; /** * Fills up the local checkpoints history with no-ops until the local checkpoint diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 52dd4d3fcd09e..64dae5dc67de8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -332,17 +332,12 @@ protected int getRefCount(IndexSearcher reference) { } @Override - public void restoreLocalCheckpointFromTranslog() throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { + public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); final long localCheckpoint = localCheckpointTracker.getCheckpoint(); try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() > localCheckpoint) { - localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo()); - } - } + return translogRecoveryRunner.run(this, snapshot); } } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 80b653939299f..9f7ceb614742c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -344,7 +344,8 @@ public void rollTranslogGeneration() { } @Override - public void restoreLocalCheckpointFromTranslog() { + public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) { + return 0; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5d947b0cf4069..8f97bc07c1393 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -494,17 +494,16 @@ public void updateShardState(final ShardRouting newRouting, try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); /* - * If this shard was serving as a replica shard when another shard was promoted to primary then the state of - * its local checkpoint tracker was reset during the primary term transition. In particular, the local - * checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint - * tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary - * subsequently fails before the primary/replica re-sync completes successfully and we are now being - * promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence - * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by - * replaying the translog and marking any operations there are completed. + * If this shard was serving as a replica shard when another shard was promoted to primary then + * its Lucene index was reset during the primary term transition. In particular, the Lucene index + * on this shard was reset to the global checkpoint and the operations above the local checkpoint + * were reverted. If the other shard that was promoted to primary subsequently fails before the + * primary/replica re-sync completes successfully and we are now being promoted, we have to restore + * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); - engine.restoreLocalCheckpointFromTranslog(); + engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) -> + runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between * sequence numbers in a translog generation in a new primary as it takes the last known sequence number * as a starting point), but it simplifies reasoning about the relationship between primary terms and @@ -1452,9 +1451,11 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn } else { if (origin == Engine.Operation.Origin.PRIMARY) { assert assertPrimaryMode(); - } else { - assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET; + } else if (origin == Engine.Operation.Origin.REPLICA) { assert assertReplicationTarget(); + } else { + assert origin == Engine.Operation.Origin.LOCAL_RESET; + assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; } if (writeAllowedStates.contains(state) == false) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]"); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8f9d90154f8f4..2faced930a36d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -148,6 +148,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; @@ -4047,56 +4048,52 @@ private Tuple getSequenceID(Engine engine, Engine.Get get) throws En } } - public void testRestoreLocalCheckpointFromTranslog() throws IOException { - engine.close(); - InternalEngine actualEngine = null; - try { - final Set completedSeqNos = new HashSet<>(); - final BiFunction supplier = (maxSeqNo, localCheckpoint) -> new LocalCheckpointTracker( - maxSeqNo, - localCheckpoint) { - @Override - public void markSeqNoAsCompleted(long seqNo) { - super.markSeqNoAsCompleted(seqNo); - completedSeqNos.add(seqNo); - } - }; - trimUnsafeCommits(engine.config()); - actualEngine = new InternalEngine(engine.config(), supplier); - final int operations = randomIntBetween(0, 1024); - final Set expectedCompletedSeqNos = new HashSet<>(); - for (int i = 0; i < operations; i++) { - if (rarely() && i < operations - 1) { + public void testRestoreLocalHistoryFromTranslog() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + final ArrayList seqNos = new ArrayList<>(); + final int numOps = randomIntBetween(0, 1024); + for (int i = 0; i < numOps; i++) { + if (rarely()) { continue; } - expectedCompletedSeqNos.add((long) i); + seqNos.add((long) i); } - - final ArrayList seqNos = new ArrayList<>(expectedCompletedSeqNos); Randomness.shuffle(seqNos); - for (final long seqNo : seqNos) { - final String id = Long.toString(seqNo); - final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); - final Term uid = newUid(doc); - final long time = System.nanoTime(); - actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false)); - if (rarely()) { - actualEngine.rollTranslogGeneration(); + final EngineConfig engineConfig; + final SeqNoStats prevSeqNoStats; + final List prevDocs; + final int totalTranslogOps; + try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { + engineConfig = engine.config(); + for (final long seqNo : seqNos) { + final String id = Long.toString(seqNo); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1, seqNo, false)); + if (rarely()) { + engine.rollTranslogGeneration(); + } + if (rarely()) { + engine.flush(); + } } - } - final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint(); - final long resetLocalCheckpoint = - randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint)); - actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint); - completedSeqNos.clear(); - actualEngine.restoreLocalCheckpointFromTranslog(); - final Set intersection = new HashSet<>(expectedCompletedSeqNos); - intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet())); - assertThat(completedSeqNos, equalTo(intersection)); - assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint)); - assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations)); - } finally { - IOUtils.close(actualEngine); + globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getLocalCheckpoint())); + engine.syncTranslog(); + prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); + prevDocs = getDocIds(engine, true); + totalTranslogOps = engine.getTranslog().totalOperations(); + } + trimUnsafeCommits(engineConfig); + try (InternalEngine engine = new InternalEngine(engineConfig)) { + engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); + engine.restoreLocalHistoryFromTranslog(translogHandler); + assertThat(getDocIds(engine, true), equalTo(prevDocs)); + SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint())); + assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo())); + assertThat(engine.getTranslog().totalOperations(), equalTo(totalTranslogOps)); + } + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 64e3f481ff49b..7fd47c9994cc4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -896,23 +896,17 @@ public void testGlobalCheckpointSync() throws IOException { closeShards(replicaShard, primaryShard); } - public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException { + public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - final long globalCheckpointOnReplica = SequenceNumbers.UNASSIGNED_SEQ_NO; - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); + final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - final int globalCheckpoint = - randomIntBetween( - Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO), - Math.toIntExact(indexShard.getLocalCheckpoint())); - + final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + final Set docsBeforeRollback = getShardDocUIDs(indexShard); final CountDownLatch latch = new CountDownLatch(1); indexShard.acquireReplicaOperationPermit( indexShard.getPendingPrimaryTerm() + 1, @@ -946,6 +940,7 @@ public void onFailure(Exception e) { resyncLatch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); + assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); closeShard(indexShard, false); }