diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2359a4195c569..c04cf8da8c7f9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -351,9 +351,11 @@ private void recoverFromTranslogInternal() throws IOException { } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); - } else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) { - assert historyUUID != null; - // put the history uuid into the index + } else if (lastCommittedSegmentInfos.getUserData().containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) == false) { + assert engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : + "index was created on version " + engineConfig.getIndexSettings().getIndexVersionCreated() + "but has " + + "no sequence numbers info in commit"; + commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b3db0c258ac30..3f9314b29c29f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -422,7 +422,13 @@ public void updateShardState(final ShardRouting newRouting, final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode(); if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) { // there was no primary context hand-off in < 6.0.0, need to manually activate the shard - getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint()); + final Engine engine = getEngine(); + engine.seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint()); + // Flush the translog as it may contain operations with no sequence numbers. We want to make sure those + // operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# + // (due to active indexing) and operations without a seq# coming from the translog. We therefore flush + // to create a lucene commit point to an empty translog file. + engine.flush(false, true); } } @@ -487,15 +493,26 @@ public void updateShardState(final ShardRouting newRouting, * subsequently fails before the primary/replica re-sync completes successfully and we are now being * promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by - * replaying the translog and marking any operations there are completed. Rolling the translog generation is - * not strictly needed here (as we will never have collisions between sequence numbers in a translog - * generation in a new primary as it takes the last known sequence number as a starting point), but it - * simplifies reasoning about the relationship between primary terms and translog generations. + * replaying the translog and marking any operations there are completed. */ - getEngine().rollTranslogGeneration(); - getEngine().restoreLocalCheckpointFromTranslog(); - getEngine().fillSeqNoGaps(newPrimaryTerm); - getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), + final Engine engine = getEngine(); + engine.restoreLocalCheckpointFromTranslog(); + if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1)) { + // an index that was created before sequence numbers were introduced may contain operations in its + // translog that do not have a sequence numbers. We want to make sure those operations will never + // be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# (due + // to active indexing) and operations without a seq# coming from the translog. We therefore flush + // to create a lucene commit point to an empty translog file. + engine.flush(false, true); + } + /* Rolling the translog generation is not strictly needed here (as we will never have collisions between + * sequence numbers in a translog generation in a new primary as it takes the last known sequence number + * as a starting point), but it simplifies reasoning about the relationship between primary terms and + * translog generations. + */ + engine.rollTranslogGeneration(); + engine.fillSeqNoGaps(newPrimaryTerm); + engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -1316,6 +1333,17 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole active.set(true); newEngine.recoverFromTranslog(); } + assertSequenceNumbersInCommit(); + } + + private boolean assertSequenceNumbersInCommit() throws IOException { + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); + assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; + assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; + assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + + userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]"; + return true; } private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5f692d8e8f5fa..c0965cd1b0651 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -148,12 +148,13 @@ public RecoveryResponse recoverToTarget() throws IOException { final Translog translog = shard.getTranslog(); final long startingSeqNo; + final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); - if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); + requiredSeqNoRangeStart = startingSeqNo; } else { final Engine.IndexCommitRef phase1Snapshot; try { @@ -161,10 +162,12 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // we set this to unassigned to create a translog roughly according to the retention policy - // on the target - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - + // we set this to 0 to create a translog roughly according to the retention policy + // on the target. Note that it will still filter out legacy operations with no sequence numbers + startingSeqNo = 0; + // but we must have everything above the local checkpoint in the commit + requiredSeqNoRangeStart = + Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); } catch (final Exception e) { @@ -177,6 +180,9 @@ public RecoveryResponse recoverToTarget() throws IOException { } } } + assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + + startingSeqNo + "]"; runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); @@ -186,10 +192,19 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { - targetLocalCheckpoint = phase2(startingSeqNo, snapshot); + targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -223,7 +238,8 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source - * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. + * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain + * all ops above the source local checkpoint, so we can stop check there. * * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot @@ -231,18 +247,10 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo); + final long localCheckpoint = shard.getLocalCheckpoint(); + logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one - if (startingSeqNo - 1 <= endingSeqNo) { - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); - + if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; @@ -252,7 +260,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { } } } - return tracker.getCheckpoint() >= endingSeqNo; + return tracker.getCheckpoint() >= localCheckpoint; } else { return false; } @@ -432,13 +440,15 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new * shard. * - * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all - * ops should be sent - * @param snapshot a snapshot of the translog - * + * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all + * ops should be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) + * @param endingSeqNo the highest sequence number that should be sent + * @param snapshot a snapshot of the translog * @return the local checkpoint on the target */ - long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) + throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -446,10 +456,11 @@ long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws I final StopWatch stopWatch = new StopWatch().start(); - logger.trace("recovery [phase2]: sending transaction log operations"); + logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target - final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot); + final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); @@ -510,18 +521,26 @@ static class SendSnapshotResult { *

* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * - * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param snapshot the translog snapshot to replay operations from - * @return the local checkpoint on the target and the total number of operations sent + * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range + * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) + * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the + * total number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, + final Translog.Snapshot snapshot) throws IOException { + assert requiredSeqNoRangeStart <= endingSeqNo + 1: + "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; + assert startingSeqNo <= requiredSeqNoRangeStart : + "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; int ops = 0; long size = 0; int skippedOps = 0; int totalSentOps = 0; final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final List operations = new ArrayList<>(); + final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); final int expectedTotalOps = snapshot.totalOperations(); if (expectedTotalOps == 0) { @@ -538,12 +557,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - /* - * If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and - * any ops before the starting sequence number. - */ + final long seqNo = operation.seqNo(); - if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { skippedOps++; continue; } @@ -551,6 +567,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl ops++; size += operation.estimateSize(); totalSentOps++; + requiredOpsTracker.markSeqNoAsCompleted(seqNo); // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { @@ -567,6 +584,12 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl cancellableThreads.executeIO(sendBatch); } + if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { + throw new IllegalStateException("translog replay failed to cover required sequence numbers" + + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + + (requiredOpsTracker.getCheckpoint() + 1) + "]"); + } + assert expectedTotalOps == skippedOps + totalSentOps : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 96f6aa6d47acb..b0988d9cf9a5c 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -374,15 +374,15 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); CountDownLatch recoveryStart = new CountDownLatch(1); - AtomicBoolean preparedForTranslog = new AtomicBoolean(false); + AtomicBoolean opsSent = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { - preparedForTranslog.set(true); - super.prepareForTranslogOperations(totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + opsSent.set(true); + return super.indexTranslogOperations(operations, totalTranslogOps); } }; }); @@ -392,7 +392,7 @@ public void prepareForTranslogOperations(int totalTranslogOps) throws IOExceptio // index some more docs += shards.indexDocs(randomInt(5)); - assertFalse("recovery should wait on pending docs", preparedForTranslog.get()); + assertFalse("recovery should wait on pending docs", opsSent.get()); primaryEngineFactory.releaseLatchedIndexers(); pendingDocsDone.await(); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 993cc84506498..c04d69bbed20e 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -70,15 +70,18 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -88,6 +91,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RecoverySourceHandlerTests extends ESTestCase { @@ -181,29 +185,68 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); } operations.add(null); - final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { - @Override - public void close() { + final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); + final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); + final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, new Translog.Snapshot() { + @Override + public void close() { - } + } - private int counter = 0; + private int counter = 0; - @Override - public int totalOperations() { - return operations.size() - 1; - } + @Override + public int totalOperations() { + return operations.size() - 1; + } - @Override - public Translog.Operation next() throws IOException { - return operations.get(counter++); - } - }); - if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { - assertThat(result.totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); - } else { - assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); + @Override + public Translog.Operation next() throws IOException { + return operations.get(counter++); + } + }); + final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); + assertThat(result.totalOperations, equalTo(expectedOps)); + final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); + List shippedOps = shippedOpsCaptor.getAllValues().stream() + .flatMap(List::stream).map(o -> (Translog.Operation) o).collect(Collectors.toList()); + shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); + assertThat(shippedOps.size(), equalTo(expectedOps)); + for (int i = 0; i < shippedOps.size(); i++) { + assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); + } + if (endingSeqNo >= requiredStartingSeqNo + 1) { + // check that missing ops blows up + List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker + .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); + List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); + expectThrows(IllegalStateException.class, () -> + handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, new Translog.Snapshot() { + @Override + public void close() { + + } + + private int counter = 0; + + @Override + public int totalOperations() { + return operations.size() - 1 - opsToSkip.size(); + } + + @Override + public Translog.Operation next() throws IOException { + Translog.Operation op; + do { + op = operations.get(counter++); + } while (op != null && opsToSkip.contains(op)); + return op; + } + })); } } @@ -383,7 +426,7 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { } @Override - long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot) throws IOException { phase2Called.set(true); return SequenceNumbers.UNASSIGNED_SEQ_NO; } diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index c7e708418c92c..5011c9e7006ac 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -25,8 +25,10 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -49,6 +51,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -583,6 +587,28 @@ public void testSingleDoc() throws IOException { assertThat(toStr(client().performRequest("GET", docLocation)), containsString(doc)); } + /** + * Tests that a single empty shard index is correctly recovered. Empty shards are often an edge case. + */ + public void testEmptyShard() throws IOException { + final String index = "test_empty_shard"; + + if (runningAgainstOldCluster) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + } + ensureGreen(index); + } + + /** * Tests recovery of an index with or without a translog and the * statistics we gather about that. diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 0133546f3b301..e995796651c0a 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.test.rest.ESRestTestCase; @@ -44,19 +43,10 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; public class IndexingIT extends ESRestTestCase { - private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { - updateIndexSetting(name, settings.build()); - } - private void updateIndexSetting(String name, Settings settings) throws IOException { - assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), - new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); - } - private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; @@ -113,7 +103,7 @@ public void testIndexVersionPropagation() throws Exception { final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); List shards = buildShards(index, nodes, newNodeClient); Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); @@ -138,7 +128,7 @@ public void testIndexVersionPropagation() throws Exception { primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); @@ -151,7 +141,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("setting number of replicas to 0"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0)); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); @@ -164,7 +154,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("setting number of replicas to 1"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); @@ -202,7 +192,7 @@ public void testSeqNoCheckpoints() throws Exception { assertSeqNoOnShards(index, nodes, nodes.getBWCVersion().major >= 6 ? numDocs : 0, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); for (final String bwcName : bwcNamesList) { assertCount(index, "_only_nodes:" + bwcName, numDocs); @@ -214,7 +204,7 @@ public void testSeqNoCheckpoints() throws Exception { Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); - ensureGreen(); + ensureGreen(index); int numDocsOnNewPrimary = 0; final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5); logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); @@ -233,7 +223,7 @@ public void testSeqNoCheckpoints() throws Exception { numDocs += numberOfDocsAfterDroppingReplicas; logger.info("setting number of replicas to 1"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); // the number of documents on the primary and on the recovered replica should match the number of indexed documents assertCount(index, "_primary", numDocs); diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 6c21d3e37ef30..36bc1b1a9273f 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -18,16 +18,29 @@ */ package org.elasticsearch.upgrades; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; +import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.function.Predicate; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; +import static java.util.Collections.emptyMap; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -89,7 +102,7 @@ public void testHistoryUUIDIsGenerated() throws Exception { .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); createIndex(index, settings.build()); } else if (clusterType == CLUSTER_TYPE.UPGRADED) { - ensureGreen(); + ensureGreen(index); Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); assertOK(response); ObjectPath objectPath = ObjectPath.createFromResponse(response); @@ -109,4 +122,146 @@ public void testHistoryUUIDIsGenerated() throws Exception { } } + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON))); + } + return numDocs; + } + + private Future asyncIndexDocs(String index, final int idStart, final int numDocs) throws IOException { + PlainActionFuture future = new PlainActionFuture<>(); + Thread background = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + future.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + indexDocs(index, idStart, numDocs); + future.onResponse(null); + } + }); + background.start(); + return future; + } + + public void testRecoveryWithConcurrentIndexing() throws Exception { + final String index = "recovery_with_concurrent_indexing"; + switch (clusterType) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, 10); + ensureGreen(index); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case MIXED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 60); + assertCount(index, "_replica", 60); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case UPGRADED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 60, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 110); + assertCount(index, "_replica", 110); + break; + default: + throw new IllegalStateException("unknown type " + clusterType); + } + } + + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { + final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); + assertOK(response); + final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString()); + assertThat(actualCount, equalTo(expectedCount)); + } + + + private String getNodeId(Predicate versionPredicate) throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + for (String id : nodesAsMap.keySet()) { + Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version")); + if (versionPredicate.test(version)) { + return id; + } + } + return null; + } + + + public void testRelocationWithConcurrentIndexing() throws Exception { + final String index = "relocation_with_concurrent_indexing"; + switch (clusterType) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, 10); + ensureGreen(index); + // make sure that no shards are allocated, so we can make sure the primary stays on the old node (when one + // node stops, we lose the master too, so a replica will not be promoted) + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")); + break; + case MIXED: + final String newNode = getNodeId(v -> v.equals(Version.CURRENT)); + final String oldNode = getNodeId(v -> v.before(Version.CURRENT)); + // remove the replica now that we know that the primary is an old node + updateIndexSetting(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null) + .put("index.routing.allocation.include._id", oldNode) + ); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._id", newNode)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 60); + break; + case UPGRADED: + updateIndexSetting(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._id", (String)null) + ); + asyncIndexDocs(index, 60, 50).get(); + ensureGreen(index); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 110); + assertCount(index, "_replica", 110); + break; + default: + throw new IllegalStateException("unknown type " + clusterType); + } + } + } diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml index 499a39c4e4edb..30f891bf0e0d1 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml @@ -16,6 +16,16 @@ # allocation will kick in, and the cluster health won't return to GREEN # before timing out index.unassigned.node_left.delayed_timeout: "100ms" + + - do: + indices.create: + index: empty_index # index to ensure we can recover empty indices + body: + # if the node with the replica is the first to be restarted, then delayed + # allocation will kick in, and the cluster health won't return to GREEN + # before timing out + index.unassigned.node_left.delayed_timeout: "100ms" + - do: bulk: refresh: true diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index e050e5dd451d1..3dddc2538af93 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -7,6 +7,7 @@ # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s level: shards + index: test_index,index_with_replicas,empty_index - do: search: diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index cef820ac0096a..0da1aff2ef773 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -392,13 +392,18 @@ protected void assertOK(Response response) { assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); } - protected void ensureGreen() throws IOException { + /** + * checks that the specific index is green. we force a selection of an index as the tests share a cluster and often leave indices + * in an non green state + * @param index index to test for + **/ + protected void ensureGreen(String index) throws IOException { Map params = new HashMap<>(); params.put("wait_for_status", "green"); params.put("wait_for_no_relocating_shards", "true"); params.put("timeout", "70s"); params.put("level", "shards"); - assertOK(client().performRequest("GET", "_cluster/health", params)); + assertOK(client().performRequest("GET", "_cluster/health/" + index, params)); } protected void createIndex(String name, Settings settings) throws IOException { @@ -411,4 +416,12 @@ protected void createIndex(String name, Settings settings, String mapping) throw + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); } + protected void updateIndexSetting(String index, Settings.Builder settings) throws IOException { + updateIndexSetting(index, settings.build()); + } + + private void updateIndexSetting(String index, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", index + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); + } }