diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6449c979de40a..151675f54b84f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -426,11 +426,6 @@ private void recoverFromTranslogInternal() throws IOException { } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); - } else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) { - assert historyUUID != null; - // put the history uuid into the index - commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); - refreshLastCommittedSegmentInfos(); } // clean up what's not needed translog.trimUnreferencedReaders(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7b29b2b632ee3..6974cbde97337 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -48,7 +48,6 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -66,7 +65,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; @@ -411,14 +409,6 @@ public void updateShardState(final ShardRouting newRouting, logger.debug("failed to refresh due to move to cluster wide started", e); } - if (newRouting.primary()) { - final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode(); - if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) { - // there was no primary context hand-off in < 6.0.0, need to manually activate the shard - getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint()); - } - } - changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); } else if (state == IndexShardState.RELOCATED && (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { @@ -480,15 +470,18 @@ public void updateShardState(final ShardRouting newRouting, * subsequently fails before the primary/replica re-sync completes successfully and we are now being * promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by - * replaying the translog and marking any operations there are completed. Rolling the translog generation is - * not strictly needed here (as we will never have collisions between sequence numbers in a translog - * generation in a new primary as it takes the last known sequence number as a starting point), but it - * simplifies reasoning about the relationship between primary terms and translog generations. + * replaying the translog and marking any operations there are completed. + */ + final Engine engine = getEngine(); + engine.restoreLocalCheckpointFromTranslog(); + /* Rolling the translog generation is not strictly needed here (as we will never have collisions between + * sequence numbers in a translog generation in a new primary as it takes the last known sequence number + * as a starting point), but it simplifies reasoning about the relationship between primary terms and + * translog generations. */ - getEngine().rollTranslogGeneration(); - getEngine().restoreLocalCheckpointFromTranslog(); - getEngine().fillSeqNoGaps(newPrimaryTerm); - getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), + engine.rollTranslogGeneration(); + engine.fillSeqNoGaps(newPrimaryTerm); + engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -1332,6 +1325,17 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole active.set(true); newEngine.recoverFromTranslog(); } + assertSequenceNumbersInCommit(); + } + + private boolean assertSequenceNumbersInCommit() throws IOException { + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); + assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; + assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; + assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + + userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]"; + return true; } private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 71ad21c14d7ec..a847088869b2e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -149,12 +149,13 @@ public RecoveryResponse recoverToTarget() throws IOException { final Translog translog = shard.getTranslog(); final long startingSeqNo; + final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); - if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); + requiredSeqNoRangeStart = startingSeqNo; } else { final Engine.IndexCommitRef phase1Snapshot; try { @@ -162,10 +163,12 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // we set this to unassigned to create a translog roughly according to the retention policy - // on the target - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - + // we set this to 0 to create a translog roughly according to the retention policy + // on the target. Note that it will still filter out legacy operations with no sequence numbers + startingSeqNo = 0; + // but we must have everything above the local checkpoint in the commit + requiredSeqNoRangeStart = + Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); } catch (final Exception e) { @@ -178,6 +181,9 @@ public RecoveryResponse recoverToTarget() throws IOException { } } } + assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + + startingSeqNo + "]"; runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); @@ -187,10 +193,19 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { - targetLocalCheckpoint = phase2(startingSeqNo, snapshot); + targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -224,7 +239,8 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source - * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. + * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain + * all ops above the source local checkpoint, so we can stop check there. * * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot @@ -232,18 +248,10 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo); + final long localCheckpoint = shard.getLocalCheckpoint(); + logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one - if (startingSeqNo - 1 <= endingSeqNo) { - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); - + if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; @@ -253,7 +261,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { } } } - return tracker.getCheckpoint() >= endingSeqNo; + return tracker.getCheckpoint() >= localCheckpoint; } else { return false; } @@ -433,13 +441,15 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new * shard. * - * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all - * ops should be sent - * @param snapshot a snapshot of the translog - * + * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all + * ops should be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) + * @param endingSeqNo the highest sequence number that should be sent + * @param snapshot a snapshot of the translog * @return the local checkpoint on the target */ - long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) + throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -447,10 +457,11 @@ long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws I final StopWatch stopWatch = new StopWatch().start(); - logger.trace("recovery [phase2]: sending transaction log operations"); + logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target - final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot); + final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); @@ -511,18 +522,26 @@ static class SendSnapshotResult { *

* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * - * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param snapshot the translog snapshot to replay operations from - * @return the local checkpoint on the target and the total number of operations sent + * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range + * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) + * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the + * total number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, + final Translog.Snapshot snapshot) throws IOException { + assert requiredSeqNoRangeStart <= endingSeqNo + 1: + "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; + assert startingSeqNo <= requiredSeqNoRangeStart : + "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; int ops = 0; long size = 0; int skippedOps = 0; int totalSentOps = 0; final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final List operations = new ArrayList<>(); + final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); final int expectedTotalOps = snapshot.totalOperations(); if (expectedTotalOps == 0) { @@ -539,12 +558,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - /* - * If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and - * any ops before the starting sequence number. - */ + final long seqNo = operation.seqNo(); - if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { skippedOps++; continue; } @@ -552,6 +568,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl ops++; size += operation.estimateSize(); totalSentOps++; + requiredOpsTracker.markSeqNoAsCompleted(seqNo); // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { @@ -569,8 +586,14 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl } assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps - : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); + : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", + expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); + + if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { + throw new IllegalStateException("translog replay failed to cover required sequence numbers" + + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + + (requiredOpsTracker.getCheckpoint() + 1) + "]"); + } logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 844d6b0aaf957..13e8d90eb47a7 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -374,15 +374,15 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); CountDownLatch recoveryStart = new CountDownLatch(1); - AtomicBoolean preparedForTranslog = new AtomicBoolean(false); + AtomicBoolean opsSent = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { - preparedForTranslog.set(true); - super.prepareForTranslogOperations(totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + opsSent.set(true); + return super.indexTranslogOperations(operations, totalTranslogOps); } }; }); @@ -392,7 +392,7 @@ public void prepareForTranslogOperations(int totalTranslogOps) throws IOExceptio // index some more docs += shards.indexDocs(randomInt(5)); - assertFalse("recovery should wait on pending docs", preparedForTranslog.get()); + assertFalse("recovery should wait on pending docs", opsSent.get()); primaryEngineFactory.releaseLatchedIndexers(); pendingDocsDone.await(); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 993cc84506498..c04d69bbed20e 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -70,15 +70,18 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -88,6 +91,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RecoverySourceHandlerTests extends ESTestCase { @@ -181,29 +185,68 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); } operations.add(null); - final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { - @Override - public void close() { + final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); + final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); + final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, new Translog.Snapshot() { + @Override + public void close() { - } + } - private int counter = 0; + private int counter = 0; - @Override - public int totalOperations() { - return operations.size() - 1; - } + @Override + public int totalOperations() { + return operations.size() - 1; + } - @Override - public Translog.Operation next() throws IOException { - return operations.get(counter++); - } - }); - if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { - assertThat(result.totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); - } else { - assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); + @Override + public Translog.Operation next() throws IOException { + return operations.get(counter++); + } + }); + final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); + assertThat(result.totalOperations, equalTo(expectedOps)); + final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); + List shippedOps = shippedOpsCaptor.getAllValues().stream() + .flatMap(List::stream).map(o -> (Translog.Operation) o).collect(Collectors.toList()); + shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); + assertThat(shippedOps.size(), equalTo(expectedOps)); + for (int i = 0; i < shippedOps.size(); i++) { + assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); + } + if (endingSeqNo >= requiredStartingSeqNo + 1) { + // check that missing ops blows up + List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker + .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); + List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); + expectThrows(IllegalStateException.class, () -> + handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, new Translog.Snapshot() { + @Override + public void close() { + + } + + private int counter = 0; + + @Override + public int totalOperations() { + return operations.size() - 1 - opsToSkip.size(); + } + + @Override + public Translog.Operation next() throws IOException { + Translog.Operation op; + do { + op = operations.get(counter++); + } while (op != null && opsToSkip.contains(op)); + return op; + } + })); } } @@ -383,7 +426,7 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { } @Override - long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot) throws IOException { phase2Called.set(true); return SequenceNumbers.UNASSIGNED_SEQ_NO; } diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 22859859f2521..00d700d2d5289 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -26,8 +26,10 @@ import org.elasticsearch.Version; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -53,6 +55,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -585,6 +589,28 @@ public void testSingleDoc() throws IOException { assertThat(toStr(client().performRequest("GET", docLocation)), containsString(doc)); } + /** + * Tests that a single empty shard index is correctly recovered. Empty shards are often an edge case. + */ + public void testEmptyShard() throws IOException { + final String index = "test_empty_shard"; + + if (runningAgainstOldCluster) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + } + ensureGreen(index); + } + + /** * Tests recovery of an index with or without a translog and the * statistics we gather about that. diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 9de8954c531ff..958cd9fe3ef91 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; @@ -49,14 +48,6 @@ public class IndexingIT extends ESRestTestCase { - private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { - updateIndexSetting(name, settings.build()); - } - private void updateIndexSetting(String name, Settings settings) throws IOException { - assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), - new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); - } - private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; @@ -113,7 +104,7 @@ public void testIndexVersionPropagation() throws Exception { final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); List shards = buildShards(index, nodes, newNodeClient); Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); @@ -138,7 +129,7 @@ public void testIndexVersionPropagation() throws Exception { primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); @@ -151,7 +142,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("setting number of replicas to 0"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0)); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); @@ -164,7 +155,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("setting number of replicas to 1"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); @@ -199,7 +190,7 @@ public void testSeqNoCheckpoints() throws Exception { assertSeqNoOnShards(index, nodes, nodes.getBWCVersion().major >= 6 ? numDocs : 0, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); for (final String bwcName : bwcNamesList) { assertCount(index, "_only_nodes:" + bwcName, numDocs); @@ -211,7 +202,7 @@ public void testSeqNoCheckpoints() throws Exception { Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); - ensureGreen(); + ensureGreen(index); int numDocsOnNewPrimary = 0; final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5); logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); @@ -230,7 +221,7 @@ public void testSeqNoCheckpoints() throws Exception { numDocs += numberOfDocsAfterDroppingReplicas; logger.info("setting number of replicas to 1"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); for (Shard shard : buildShards(index, nodes, newNodeClient)) { @@ -272,7 +263,7 @@ public void testUpdateSnapshotStatus() throws Exception { final String index = "test-snapshot-index"; createIndex(index, settings.build()); indexDocs(index, 0, between(50, 100)); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertOK( @@ -282,7 +273,7 @@ public void testUpdateSnapshotStatus() throws Exception { // Allocating shards on all nodes, taking snapshots should happen on all nodes. updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertOK( diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 6c21d3e37ef30..36bc1b1a9273f 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -18,16 +18,29 @@ */ package org.elasticsearch.upgrades; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; +import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.Predicate; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; +import static java.util.Collections.emptyMap; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -89,7 +102,7 @@ public void testHistoryUUIDIsGenerated() throws Exception { .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); createIndex(index, settings.build()); } else if (clusterType == CLUSTER_TYPE.UPGRADED) { - ensureGreen(); + ensureGreen(index); Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); assertOK(response); ObjectPath objectPath = ObjectPath.createFromResponse(response); @@ -109,4 +122,146 @@ public void testHistoryUUIDIsGenerated() throws Exception { } } + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON))); + } + return numDocs; + } + + private Future asyncIndexDocs(String index, final int idStart, final int numDocs) throws IOException { + PlainActionFuture future = new PlainActionFuture<>(); + Thread background = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + future.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + indexDocs(index, idStart, numDocs); + future.onResponse(null); + } + }); + background.start(); + return future; + } + + public void testRecoveryWithConcurrentIndexing() throws Exception { + final String index = "recovery_with_concurrent_indexing"; + switch (clusterType) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, 10); + ensureGreen(index); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case MIXED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 60); + assertCount(index, "_replica", 60); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case UPGRADED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 60, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 110); + assertCount(index, "_replica", 110); + break; + default: + throw new IllegalStateException("unknown type " + clusterType); + } + } + + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { + final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); + assertOK(response); + final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString()); + assertThat(actualCount, equalTo(expectedCount)); + } + + + private String getNodeId(Predicate versionPredicate) throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + for (String id : nodesAsMap.keySet()) { + Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version")); + if (versionPredicate.test(version)) { + return id; + } + } + return null; + } + + + public void testRelocationWithConcurrentIndexing() throws Exception { + final String index = "relocation_with_concurrent_indexing"; + switch (clusterType) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, 10); + ensureGreen(index); + // make sure that no shards are allocated, so we can make sure the primary stays on the old node (when one + // node stops, we lose the master too, so a replica will not be promoted) + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")); + break; + case MIXED: + final String newNode = getNodeId(v -> v.equals(Version.CURRENT)); + final String oldNode = getNodeId(v -> v.before(Version.CURRENT)); + // remove the replica now that we know that the primary is an old node + updateIndexSetting(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null) + .put("index.routing.allocation.include._id", oldNode) + ); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._id", newNode)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 60); + break; + case UPGRADED: + updateIndexSetting(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._id", (String)null) + ); + asyncIndexDocs(index, 60, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 110); + assertCount(index, "_replica", 110); + break; + default: + throw new IllegalStateException("unknown type " + clusterType); + } + } + } diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml index b24025f356c3f..29790e2146190 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml @@ -15,6 +15,16 @@ # allocation will kick in, and the cluster health won't return to GREEN # before timing out index.unassigned.node_left.delayed_timeout: "100ms" + + - do: + indices.create: + index: empty_index # index to ensure we can recover empty indices + body: + # if the node with the replica is the first to be restarted, then delayed + # allocation will kick in, and the cluster health won't return to GREEN + # before timing out + index.unassigned.node_left.delayed_timeout: "100ms" + - do: bulk: refresh: true diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index cdc94f638b5f5..af58a2f362c78 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -7,6 +7,7 @@ # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s level: shards + index: test_index,index_with_replicas,empty_index - do: search: diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index cef820ac0096a..0da1aff2ef773 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -392,13 +392,18 @@ protected void assertOK(Response response) { assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); } - protected void ensureGreen() throws IOException { + /** + * checks that the specific index is green. we force a selection of an index as the tests share a cluster and often leave indices + * in an non green state + * @param index index to test for + **/ + protected void ensureGreen(String index) throws IOException { Map params = new HashMap<>(); params.put("wait_for_status", "green"); params.put("wait_for_no_relocating_shards", "true"); params.put("timeout", "70s"); params.put("level", "shards"); - assertOK(client().performRequest("GET", "_cluster/health", params)); + assertOK(client().performRequest("GET", "_cluster/health/" + index, params)); } protected void createIndex(String name, Settings settings) throws IOException { @@ -411,4 +416,12 @@ protected void createIndex(String name, Settings settings, String mapping) throw + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); } + protected void updateIndexSetting(String index, Settings.Builder settings) throws IOException { + updateIndexSetting(index, settings.build()); + } + + private void updateIndexSetting(String index, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", index + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); + } }