From 41d27c87d57b249f0a86ce81295ae130e1ce3e42 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 12:48:12 +0100 Subject: [PATCH 01/29] add testRecoveryWithConcurrentIndexing --- .../elasticsearch/backwards/IndexingIT.java | 10 --- .../elasticsearch/upgrades/RecoveryIT.java | 73 +++++++++++++++++++ .../test/rest/ESRestTestCase.java | 8 ++ 3 files changed, 81 insertions(+), 10 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 0133546f3b301..c8d882870f84b 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.test.rest.ESRestTestCase; @@ -44,19 +43,10 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; public class IndexingIT extends ESRestTestCase { - private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { - updateIndexSetting(name, settings.build()); - } - private void updateIndexSetting(String name, Settings settings) throws IOException { - assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), - new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); - } - private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 6c21d3e37ef30..f2cab9c1d9451 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -18,16 +18,26 @@ */ package org.elasticsearch.upgrades; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; +import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.Future; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; +import static java.util.Collections.emptyMap; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -109,4 +119,67 @@ public void testHistoryUUIDIsGenerated() throws Exception { } } + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON))); + } + return numDocs; + } + + private Future asyncIndexDocs(String index, final int idStart, final int numDocs) throws IOException { + PlainActionFuture future = new PlainActionFuture<>(); + Thread background = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + future.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + indexDocs(index, idStart, numDocs); + future.onResponse(null); + } + }); + background.start(); + return future; + } + + public void testRecoveryWithConcurrentIndexing() throws Exception { + final String index = "recovery_with_concurrent_indexing"; + switch (clusterType) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, 10); + ensureGreen(); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case MIXED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case UPGRADED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(); + break; + default: + throw new IllegalStateException("unknown type " + clusterType); + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index cef820ac0096a..7527677f94ec8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -411,4 +411,12 @@ protected void createIndex(String name, Settings settings, String mapping) throw + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); } + protected void updateIndexSetting(String index, Settings.Builder settings) throws IOException { + updateIndexSetting(index, settings.build()); + } + + private void updateIndexSetting(String index, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", index + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings), ContentType.APPLICATION_JSON))); + } } From e99de2081e0bf93b3fbbc83d86b900aa916986e0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 13:58:14 +0100 Subject: [PATCH 02/29] fix for promotion --- .../index/seqno/SequenceNumbers.java | 20 +++++--- .../elasticsearch/index/shard/IndexShard.java | 19 +++++-- .../recovery/RecoverySourceHandler.java | 3 +- .../elasticsearch/upgrades/RecoveryIT.java | 51 +++++++++++++++++++ 4 files changed, 80 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index 21b4134f9837e..a5a225109b2ff 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -72,9 +72,8 @@ public static SeqNoStats loadSeqNoStatsFromLuceneCommit( /** * Compute the minimum of the given current minimum sequence number and the specified sequence number, accounting for the fact that the * current minimum sequence number could be {@link SequenceNumbers#NO_OPS_PERFORMED} or - * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. When the current minimum sequence number is not - * {@link SequenceNumbers#NO_OPS_PERFORMED} nor {@link SequenceNumbers#UNASSIGNED_SEQ_NO}, the specified sequence number - * must not be {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. + * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. The method guarantees that once we see an operation we make sure all future operations + * either have an assigned sequence number (if the first operation had one) or do not (if the first operation didn't). * * @param minSeqNo the current minimum sequence number * @param seqNo the specified sequence number @@ -84,7 +83,10 @@ public static long min(final long minSeqNo, final long seqNo) { if (minSeqNo == NO_OPS_PERFORMED) { return seqNo; } else if (minSeqNo == UNASSIGNED_SEQ_NO) { - return seqNo; + if (seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence number may not be assigned. got " + seqNo); + } + return UNASSIGNED_SEQ_NO; } else { if (seqNo == UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException("sequence number must be assigned"); @@ -96,9 +98,8 @@ public static long min(final long minSeqNo, final long seqNo) { /** * Compute the maximum of the given current maximum sequence number and the specified sequence number, accounting for the fact that the * current maximum sequence number could be {@link SequenceNumbers#NO_OPS_PERFORMED} or - * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. When the current maximum sequence number is not - * {@link SequenceNumbers#NO_OPS_PERFORMED} nor {@link SequenceNumbers#UNASSIGNED_SEQ_NO}, the specified sequence number - * must not be {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. + * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. The method guarantees that once we see an operation we make sure all future operations + * either have an assigned sequence number (if the first operation had one) or do not (if the first operation didn't). * * @param maxSeqNo the current maximum sequence number * @param seqNo the specified sequence number @@ -108,7 +109,10 @@ public static long max(final long maxSeqNo, final long seqNo) { if (maxSeqNo == NO_OPS_PERFORMED) { return seqNo; } else if (maxSeqNo == UNASSIGNED_SEQ_NO) { - return seqNo; + if (seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence number may not be assigned. got " + seqNo); + } + return UNASSIGNED_SEQ_NO; } else { if (seqNo == UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException("sequence number must be assigned"); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b3db0c258ac30..da3b3b7d5f92d 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -492,10 +492,21 @@ public void updateShardState(final ShardRouting newRouting, * generation in a new primary as it takes the last known sequence number as a starting point), but it * simplifies reasoning about the relationship between primary terms and translog generations. */ - getEngine().rollTranslogGeneration(); - getEngine().restoreLocalCheckpointFromTranslog(); - getEngine().fillSeqNoGaps(newPrimaryTerm); - getEngine().seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), + final Engine engine = getEngine(); + if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1) && + engine.getTranslog().uncommittedOperations() > 0) { + // an index that was created before sequence numbers were introduce may contain operations in its + // translog that do not have a sequence numbers. We want to make sure those operations will never + // be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# (due + // to active indexing) and operations without a seq# coming from the translog. We therefore flush + // to create a lucene commit point to an empty translog file. + engine.flush(false, true); + } else { + engine.rollTranslogGeneration(); + } + engine.restoreLocalCheckpointFromTranslog(); + engine.fillSeqNoGaps(newPrimaryTerm); + engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5f692d8e8f5fa..8103e3de84010 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -543,7 +543,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl * any ops before the starting sequence number. */ final long seqNo = operation.seqNo(); - if (startingSeqNo >= 0 && (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + assert seqNo >= 0 : "translog operations must have a sequence number. got: " + operation; + if (seqNo < startingSeqNo) { skippedOps++; continue; } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index f2cab9c1d9451..ac0d557547690 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -20,6 +20,7 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; +import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -31,6 +32,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Future; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; @@ -182,4 +184,53 @@ public void testRecoveryWithConcurrentIndexing() throws Exception { } } + private String getOldNodeName() throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + for (String id : nodesAsMap.keySet()) { + Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version")); + if (version.before(Version.CURRENT)) { + return objectPath.evaluate("nodes." + id + ".name"); + } + } + throw new IllegalStateException("couldn't find an old node"); + } + + + public void testRelocationWithConcurrentIndexing() throws Exception { + final String index = "relocation_with_concurrent_indexing"; + switch (clusterType) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + indexDocs(index, 0, 10); + ensureGreen(); + // make sure that the replicas are not started + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case MIXED: + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(); + // make sure that we can index while the replicas are recovering + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + break; + case UPGRADED: + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + asyncIndexDocs(index, 10, 50).get(); + ensureGreen(); + break; + default: + throw new IllegalStateException("unknown type " + clusterType); + } + } + } From 277c87aedeb368458c7c4746e9e1496617792c6a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 14:15:08 +0100 Subject: [PATCH 03/29] relax. You can't guarantee what you want --- .../index/seqno/SequenceNumbers.java | 20 ++++++++----------- .../recovery/RecoverySourceHandler.java | 7 +++---- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index a5a225109b2ff..21b4134f9837e 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -72,8 +72,9 @@ public static SeqNoStats loadSeqNoStatsFromLuceneCommit( /** * Compute the minimum of the given current minimum sequence number and the specified sequence number, accounting for the fact that the * current minimum sequence number could be {@link SequenceNumbers#NO_OPS_PERFORMED} or - * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. The method guarantees that once we see an operation we make sure all future operations - * either have an assigned sequence number (if the first operation had one) or do not (if the first operation didn't). + * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. When the current minimum sequence number is not + * {@link SequenceNumbers#NO_OPS_PERFORMED} nor {@link SequenceNumbers#UNASSIGNED_SEQ_NO}, the specified sequence number + * must not be {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * * @param minSeqNo the current minimum sequence number * @param seqNo the specified sequence number @@ -83,10 +84,7 @@ public static long min(final long minSeqNo, final long seqNo) { if (minSeqNo == NO_OPS_PERFORMED) { return seqNo; } else if (minSeqNo == UNASSIGNED_SEQ_NO) { - if (seqNo != UNASSIGNED_SEQ_NO) { - throw new IllegalArgumentException("sequence number may not be assigned. got " + seqNo); - } - return UNASSIGNED_SEQ_NO; + return seqNo; } else { if (seqNo == UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException("sequence number must be assigned"); @@ -98,8 +96,9 @@ public static long min(final long minSeqNo, final long seqNo) { /** * Compute the maximum of the given current maximum sequence number and the specified sequence number, accounting for the fact that the * current maximum sequence number could be {@link SequenceNumbers#NO_OPS_PERFORMED} or - * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. The method guarantees that once we see an operation we make sure all future operations - * either have an assigned sequence number (if the first operation had one) or do not (if the first operation didn't). + * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. When the current maximum sequence number is not + * {@link SequenceNumbers#NO_OPS_PERFORMED} nor {@link SequenceNumbers#UNASSIGNED_SEQ_NO}, the specified sequence number + * must not be {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * * @param maxSeqNo the current maximum sequence number * @param seqNo the specified sequence number @@ -109,10 +108,7 @@ public static long max(final long maxSeqNo, final long seqNo) { if (maxSeqNo == NO_OPS_PERFORMED) { return seqNo; } else if (maxSeqNo == UNASSIGNED_SEQ_NO) { - if (seqNo != UNASSIGNED_SEQ_NO) { - throw new IllegalArgumentException("sequence number may not be assigned. got " + seqNo); - } - return UNASSIGNED_SEQ_NO; + return seqNo; } else { if (seqNo == UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException("sequence number must be assigned"); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 8103e3de84010..a49d42513180c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -161,9 +161,9 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // we set this to unassigned to create a translog roughly according to the retention policy - // on the target - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + // we set this to 0 to create a translog roughly according to the retention policy + // on the target. Note that it will still filter out legacy operations with no sequence numbers + startingSeqNo = 0; try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); @@ -543,7 +543,6 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl * any ops before the starting sequence number. */ final long seqNo = operation.seqNo(); - assert seqNo >= 0 : "translog operations must have a sequence number. got: " + operation; if (seqNo < startingSeqNo) { skippedOps++; continue; From d77e00abf2667c13c3fc4e70eae8629fd78339bf Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 16:28:13 +0100 Subject: [PATCH 04/29] assert we ship what we want to ship --- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../recovery/RecoverySourceHandler.java | 69 +++++++++++++------ .../recovery/RecoverySourceHandlerTests.java | 4 +- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index da3b3b7d5f92d..f13b50ed5cf72 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -493,6 +493,7 @@ public void updateShardState(final ShardRouting newRouting, * simplifies reasoning about the relationship between primary terms and translog generations. */ final Engine engine = getEngine(); + engine.restoreLocalCheckpointFromTranslog(); if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1) && engine.getTranslog().uncommittedOperations() > 0) { // an index that was created before sequence numbers were introduce may contain operations in its @@ -504,7 +505,6 @@ public void updateShardState(final ShardRouting newRouting, } else { engine.rollTranslogGeneration(); } - engine.restoreLocalCheckpointFromTranslog(); engine.fillSeqNoGaps(newPrimaryTerm); engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a49d42513180c..acb25787cbd85 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -148,12 +148,14 @@ public RecoveryResponse recoverToTarget() throws IOException { final Translog translog = shard.getTranslog(); final long startingSeqNo; + final long requiredSeqNoRangeStart; + final long endingSeqNo = determineEndingSeqNo(); final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); - + isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(endingSeqNo); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); + requiredSeqNoRangeStart = startingSeqNo; } else { final Engine.IndexCommitRef phase1Snapshot; try { @@ -164,7 +166,11 @@ public RecoveryResponse recoverToTarget() throws IOException { // we set this to 0 to create a translog roughly according to the retention policy // on the target. Note that it will still filter out legacy operations with no sequence numbers startingSeqNo = 0; - + // but we must have everything above the local checkpoint in the commit + requiredSeqNoRangeStart = + Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + assert requiredSeqNoRangeStart >= 0 : + "base commit contains an illegal local checkpoint " + (requiredSeqNoRangeStart - 1); try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); } catch (final Exception e) { @@ -189,7 +195,7 @@ public RecoveryResponse recoverToTarget() throws IOException { logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { - targetLocalCheckpoint = phase2(startingSeqNo, snapshot); + targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -221,28 +227,31 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { }); } + private long determineEndingSeqNo() { + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + return endingSeqNo; + } + /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source - * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. + * translog contains all operations between the local checkpoint on the target and desired endingSeqNo. * * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot */ - boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { + boolean isTranslogReadyForSequenceNumberBasedRecovery(final long endingSeqNo) throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo); // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one if (startingSeqNo - 1 <= endingSeqNo) { - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; @@ -434,11 +443,12 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { * * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all * ops should be sent + * @param requiredSeqNoRangeStart + *@param endingSeqNo * @param snapshot a snapshot of the translog - * - * @return the local checkpoint on the target + * @return the local checkpoint on the target */ - long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -449,7 +459,7 @@ long phase2(final long startingSeqNo, final Translog.Snapshot snapshot) throws I logger.trace("recovery [phase2]: sending transaction log operations"); // send all the snapshot's translog operations to the target - final SendSnapshotResult result = sendSnapshot(startingSeqNo, snapshot); + final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); @@ -511,17 +521,25 @@ static class SendSnapshotResult { * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param snapshot the translog snapshot to replay operations from - * @return the local checkpoint on the target and the total number of operations sent + * @param requiredSeqNoRangeStart + *@param endingSeqNo + * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Translog.Snapshot snapshot) throws IOException { + protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { + assert requiredSeqNoRangeStart <= endingSeqNo : + "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; + assert startingSeqNo <= endingSeqNo : + "startingSeqNo " + startingSeqNo + " is larger than endingSeqNo " + endingSeqNo; + assert startingSeqNo <= requiredSeqNoRangeStart : + "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; int ops = 0; long size = 0; int skippedOps = 0; int totalSentOps = 0; final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final List operations = new ArrayList<>(); + final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); final int expectedTotalOps = snapshot.totalOperations(); if (expectedTotalOps == 0) { @@ -551,6 +569,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl ops++; size += operation.estimateSize(); totalSentOps++; + requiredOpsTracker.markSeqNoAsCompleted(seqNo); // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { @@ -567,6 +586,12 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl cancellableThreads.executeIO(sendBatch); } + if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { + throw new IllegalStateException("translog replay failed to covered required sequence numbers" + + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]. first missing op is [" + + (requiredOpsTracker.getCheckpoint() + 1) + "]"); + } + assert expectedTotalOps == skippedOps + totalSentOps : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 993cc84506498..f9f6b1357d6f0 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -182,7 +182,7 @@ public void testSendSnapshotSendsOps() throws IOException { } operations.add(null); final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, new Translog.Snapshot() { @Override public void close() { @@ -383,7 +383,7 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { } @Override - long phase2(long startingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot) throws IOException { phase2Called.set(true); return SequenceNumbers.UNASSIGNED_SEQ_NO; } From af3f8154b82e2a63ba67693a7ecfde79e4efc877 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 19:59:08 +0100 Subject: [PATCH 05/29] verify we ship the right ops --- .../recovery/RecoverySourceHandler.java | 36 +++++++++---------- .../recovery/RecoverySourceHandlerTests.java | 11 +++--- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index acb25787cbd85..dc94fe6a46667 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -149,9 +149,8 @@ public RecoveryResponse recoverToTarget() throws IOException { final long startingSeqNo; final long requiredSeqNoRangeStart; - final long endingSeqNo = determineEndingSeqNo(); final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(endingSeqNo); + isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); @@ -191,6 +190,7 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } + final long endingSeqNo = determineEndingSeqNo(); logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; @@ -241,17 +241,19 @@ private long determineEndingSeqNo() { /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source - * translog contains all operations between the local checkpoint on the target and desired endingSeqNo. + * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain + * all ops above the source local checkpoint, so we can stop check there. * * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot */ - boolean isTranslogReadyForSequenceNumberBasedRecovery(final long endingSeqNo) throws IOException { + boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; - logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, endingSeqNo); + final long localCheckpoint = shard.getLocalCheckpoint(); + logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one - if (startingSeqNo - 1 <= endingSeqNo) { + if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; @@ -261,7 +263,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final long endingSeqNo) th } } } - return tracker.getCheckpoint() >= endingSeqNo; + return tracker.getCheckpoint() >= localCheckpoint; } else { return false; } @@ -443,10 +445,10 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { * * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all * ops should be sent - * @param requiredSeqNoRangeStart - *@param endingSeqNo + * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) + * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog - * @return the local checkpoint on the target + * @return the local checkpoint on the target */ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { if (shard.state() == IndexShardState.CLOSED) { @@ -521,9 +523,10 @@ static class SendSnapshotResult { * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param requiredSeqNoRangeStart - *@param endingSeqNo - * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total number of operations sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) + * @param endingSeqNo the highest sequence number that should be sent + * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total + * number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { @@ -556,12 +559,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); - /* - * If we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and - * any ops before the starting sequence number. - */ + final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo) { + if (seqNo < startingSeqNo && seqNo > endingSeqNo) { skippedOps++; continue; } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index f9f6b1357d6f0..24ffac6285091 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -181,8 +181,9 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); } operations.add(null); - final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16); - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, new Translog.Snapshot() { + final long startingSeqNo = randomIntBetween(0, 16); + // todo add proper tests + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, startingSeqNo, 16, new Translog.Snapshot() { @Override public void close() { @@ -200,11 +201,7 @@ public Translog.Operation next() throws IOException { return operations.get(counter++); } }); - if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { - assertThat(result.totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers)); - } else { - assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); - } + assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); } private Engine.Index getIndex(final String id) { From 04cbccf26c86e371a544a33ff2ec4636ecb21573 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 20:26:27 +0100 Subject: [PATCH 06/29] logging --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index dc94fe6a46667..854f97994d76d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -458,7 +458,8 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS final StopWatch stopWatch = new StopWatch().start(); - logger.trace("recovery [phase2]: sending transaction log operations"); + logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); From eb3bd721a1957e9468e411d1b2d145d7811b3818 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 21:59:24 +0100 Subject: [PATCH 07/29] doh --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 854f97994d76d..890c974c0decd 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -562,7 +562,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require cancellableThreads.checkForCancel(); final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo && seqNo > endingSeqNo) { + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { skippedOps++; continue; } From 0a75735c2cafba713f59ad57094c44554e5c67cd Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 22:06:31 +0100 Subject: [PATCH 08/29] lint --- .../indices/recovery/RecoverySourceHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 890c974c0decd..66683629a7a96 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -450,7 +450,8 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { * @param snapshot a snapshot of the translog * @return the local checkpoint on the target */ - long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { + long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) + throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -530,7 +531,8 @@ static class SendSnapshotResult { * number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ - protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { + protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, + final Translog.Snapshot snapshot) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo : "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= endingSeqNo : From 4c4a1bce95f0c303f92b0e0561f5e196de231f34 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 22:37:38 +0100 Subject: [PATCH 09/29] more intuitive range indication --- .../indices/recovery/RecoverySourceHandler.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 66683629a7a96..ba97745a5a6be 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -236,7 +236,7 @@ private long determineEndingSeqNo() { cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - return endingSeqNo; + return endingSeqNo + 1; } /** @@ -525,8 +525,8 @@ static class SendSnapshotResult { * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) - * @param endingSeqNo the highest sequence number that should be sent + * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo - 1) + * @param endingSeqNo the upper bound of the sequence number range to be sent (exclusive) * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total * number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot @@ -534,9 +534,7 @@ static class SendSnapshotResult { protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo : - "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; - assert startingSeqNo <= endingSeqNo : - "startingSeqNo " + startingSeqNo + " is larger than endingSeqNo " + endingSeqNo; + "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo" + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; int ops = 0; @@ -564,7 +562,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require cancellableThreads.checkForCancel(); final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo || seqNo > endingSeqNo) { + if (seqNo < startingSeqNo || seqNo >= endingSeqNo) { skippedOps++; continue; } @@ -589,9 +587,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require cancellableThreads.executeIO(sendBatch); } - if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { + if (requiredOpsTracker.getCheckpoint() < endingSeqNo - 1) { throw new IllegalStateException("translog replay failed to covered required sequence numbers" + - " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]. first missing op is [" + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + (requiredOpsTracker.getCheckpoint() + 1) + "]"); } From a29acf225bbc84403f44d3effdb3f6bf6f934aff Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 28 Nov 2017 22:54:33 +0100 Subject: [PATCH 10/29] fix testSendSnapshotSendsOps --- .../indices/recovery/RecoverySourceHandlerTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 24ffac6285091..bf7cdcc2acad1 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -183,7 +183,8 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(null); final long startingSeqNo = randomIntBetween(0, 16); // todo add proper tests - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, startingSeqNo, 16, new Translog.Snapshot() { + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, startingSeqNo, + numberOfDocsWithValidSequenceNumbers, new Translog.Snapshot() { @Override public void close() { From 895b78b8350314b93df8ce0516071d5d6ac38cb1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 09:35:24 +0100 Subject: [PATCH 11/29] add primary relocation test --- .../elasticsearch/upgrades/RecoveryIT.java | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index ac0d557547690..d2bbee63008cc 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -171,6 +171,9 @@ public void testRecoveryWithConcurrentIndexing() throws Exception { updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); asyncIndexDocs(index, 10, 50).get(); ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 60); + assertCount(index, "_replica", 60); // make sure that we can index while the replicas are recovering updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); break; @@ -178,19 +181,30 @@ public void testRecoveryWithConcurrentIndexing() throws Exception { updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); asyncIndexDocs(index, 10, 50).get(); ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 110); + assertCount(index, "_replica", 110); break; default: throw new IllegalStateException("unknown type " + clusterType); } } - private String getOldNodeName() throws IOException { + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { + final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); + assertOK(response); + final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString()); + assertThat(actualCount, equalTo(expectedCount)); + } + + + private String getNewNodeName() throws IOException { Response response = client().performRequest("GET", "_nodes"); ObjectPath objectPath = ObjectPath.createFromResponse(response); Map nodesAsMap = objectPath.evaluate("nodes"); for (String id : nodesAsMap.keySet()) { Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version")); - if (version.before(Version.CURRENT)) { + if (version.equals(Version.CURRENT)) { return objectPath.evaluate("nodes." + id + ".name"); } } @@ -214,19 +228,29 @@ public void testRelocationWithConcurrentIndexing() throws Exception { createIndex(index, settings.build()); indexDocs(index, 0, 10); ensureGreen(); - // make sure that the replicas are not started + // make sure that the replicas are not started (so we have the primary on an old node) updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); break; case MIXED: + final String newNodeName = getNewNodeName(); + // remove the replica now that we know that the primary is an old node + updateIndexSetting(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null) + ); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._name", newNodeName)); asyncIndexDocs(index, 10, 50).get(); ensureGreen(); - // make sure that we can index while the replicas are recovering - updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 60); break; case UPGRADED: - updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); + updateIndexSetting(index, Settings.builder().put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)); asyncIndexDocs(index, 10, 50).get(); ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + assertCount(index, "_primary", 110); + assertCount(index, "_replica", 110); break; default: throw new IllegalStateException("unknown type " + clusterType); From 8782377167d207ed5beb71617c6ed7466a3b4d07 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 09:57:17 +0100 Subject: [PATCH 12/29] index specific ensure green --- .../org/elasticsearch/backwards/IndexingIT.java | 14 +++++++------- .../org/elasticsearch/upgrades/RecoveryIT.java | 14 +++++++------- .../elasticsearch/test/rest/ESRestTestCase.java | 9 +++++++-- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index c8d882870f84b..e995796651c0a 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -103,7 +103,7 @@ public void testIndexVersionPropagation() throws Exception { final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); List shards = buildShards(index, nodes, newNodeClient); Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); @@ -128,7 +128,7 @@ public void testIndexVersionPropagation() throws Exception { primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); @@ -141,7 +141,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("setting number of replicas to 0"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0)); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); @@ -154,7 +154,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("setting number of replicas to 1"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); - ensureGreen(); + ensureGreen(index); nUpdates = randomIntBetween(minUpdates, maxUpdates); logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); @@ -192,7 +192,7 @@ public void testSeqNoCheckpoints() throws Exception { assertSeqNoOnShards(index, nodes, nodes.getBWCVersion().major >= 6 ? numDocs : 0, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); for (final String bwcName : bwcNamesList) { assertCount(index, "_only_nodes:" + bwcName, numDocs); @@ -204,7 +204,7 @@ public void testSeqNoCheckpoints() throws Exception { Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); - ensureGreen(); + ensureGreen(index); int numDocsOnNewPrimary = 0; final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5); logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); @@ -223,7 +223,7 @@ public void testSeqNoCheckpoints() throws Exception { numDocs += numberOfDocsAfterDroppingReplicas; logger.info("setting number of replicas to 1"); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); // the number of documents on the primary and on the recovered replica should match the number of indexed documents assertCount(index, "_primary", numDocs); diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index d2bbee63008cc..d2a6f8b4dc86d 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -101,7 +101,7 @@ public void testHistoryUUIDIsGenerated() throws Exception { .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); createIndex(index, settings.build()); } else if (clusterType == CLUSTER_TYPE.UPGRADED) { - ensureGreen(); + ensureGreen(index); Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); assertOK(response); ObjectPath objectPath = ObjectPath.createFromResponse(response); @@ -163,14 +163,14 @@ public void testRecoveryWithConcurrentIndexing() throws Exception { .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster createIndex(index, settings.build()); indexDocs(index, 0, 10); - ensureGreen(); + ensureGreen(index); // make sure that we can index while the replicas are recovering updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); break; case MIXED: updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); asyncIndexDocs(index, 10, 50).get(); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 60); assertCount(index, "_replica", 60); @@ -180,7 +180,7 @@ public void testRecoveryWithConcurrentIndexing() throws Exception { case UPGRADED: updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); asyncIndexDocs(index, 10, 50).get(); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 110); assertCount(index, "_replica", 110); @@ -227,7 +227,7 @@ public void testRelocationWithConcurrentIndexing() throws Exception { .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster createIndex(index, settings.build()); indexDocs(index, 0, 10); - ensureGreen(); + ensureGreen(index); // make sure that the replicas are not started (so we have the primary on an old node) updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); break; @@ -240,14 +240,14 @@ public void testRelocationWithConcurrentIndexing() throws Exception { ); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._name", newNodeName)); asyncIndexDocs(index, 10, 50).get(); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 60); break; case UPGRADED: updateIndexSetting(index, Settings.builder().put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)); asyncIndexDocs(index, 10, 50).get(); - ensureGreen(); + ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 110); assertCount(index, "_replica", 110); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 7527677f94ec8..0da1aff2ef773 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -392,13 +392,18 @@ protected void assertOK(Response response) { assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); } - protected void ensureGreen() throws IOException { + /** + * checks that the specific index is green. we force a selection of an index as the tests share a cluster and often leave indices + * in an non green state + * @param index index to test for + **/ + protected void ensureGreen(String index) throws IOException { Map params = new HashMap<>(); params.put("wait_for_status", "green"); params.put("wait_for_no_relocating_shards", "true"); params.put("timeout", "70s"); params.put("level", "shards"); - assertOK(client().performRequest("GET", "_cluster/health", params)); + assertOK(client().performRequest("GET", "_cluster/health/" + index, params)); } protected void createIndex(String name, Settings settings) throws IOException { From b77d4e81a17732cd5feab71d1741376a31d55cda Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 10:01:26 +0100 Subject: [PATCH 13/29] fix counts --- .../src/test/java/org/elasticsearch/upgrades/RecoveryIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index d2a6f8b4dc86d..caf9ee8cbde29 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -179,7 +179,7 @@ public void testRecoveryWithConcurrentIndexing() throws Exception { break; case UPGRADED: updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null)); - asyncIndexDocs(index, 10, 50).get(); + asyncIndexDocs(index, 60, 50).get(); ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 110); @@ -246,7 +246,7 @@ public void testRelocationWithConcurrentIndexing() throws Exception { break; case UPGRADED: updateIndexSetting(index, Settings.builder().put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)); - asyncIndexDocs(index, 10, 50).get(); + asyncIndexDocs(index, 60, 50).get(); ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 110); From 22d5bfa189f2cc24a1e7ee78f0340b4b12ec3ad8 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 10:35:54 +0100 Subject: [PATCH 14/29] tighten testRelocationWithConcurrentIndexing --- .../elasticsearch/upgrades/RecoveryIT.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index caf9ee8cbde29..36bc1b1a9273f 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.function.Predicate; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; @@ -198,17 +199,17 @@ private void assertCount(final String index, final String preference, final int } - private String getNewNodeName() throws IOException { + private String getNodeId(Predicate versionPredicate) throws IOException { Response response = client().performRequest("GET", "_nodes"); ObjectPath objectPath = ObjectPath.createFromResponse(response); Map nodesAsMap = objectPath.evaluate("nodes"); for (String id : nodesAsMap.keySet()) { Version version = Version.fromString(objectPath.evaluate("nodes." + id + ".version")); - if (version.equals(Version.CURRENT)) { - return objectPath.evaluate("nodes." + id + ".name"); + if (versionPredicate.test(version)) { + return id; } } - throw new IllegalStateException("couldn't find an old node"); + return null; } @@ -228,24 +229,30 @@ public void testRelocationWithConcurrentIndexing() throws Exception { createIndex(index, settings.build()); indexDocs(index, 0, 10); ensureGreen(index); - // make sure that the replicas are not started (so we have the primary on an old node) - updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")); + // make sure that no shards are allocated, so we can make sure the primary stays on the old node (when one + // node stops, we lose the master too, so a replica will not be promoted) + updateIndexSetting(index, Settings.builder().put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")); break; case MIXED: - final String newNodeName = getNewNodeName(); + final String newNode = getNodeId(v -> v.equals(Version.CURRENT)); + final String oldNode = getNodeId(v -> v.before(Version.CURRENT)); // remove the replica now that we know that the primary is an old node updateIndexSetting(index, Settings.builder() .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String)null) + .put("index.routing.allocation.include._id", oldNode) ); - updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._name", newNodeName)); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.include._id", newNode)); asyncIndexDocs(index, 10, 50).get(); ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); assertCount(index, "_primary", 60); break; case UPGRADED: - updateIndexSetting(index, Settings.builder().put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)); + updateIndexSetting(index, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._id", (String)null) + ); asyncIndexDocs(index, 60, 50).get(); ensureGreen(index); assertOK(client().performRequest("POST", index + "/_refresh")); From b2082b070b47bd419d3d40aba71f4780a4b03972 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 10:53:03 +0100 Subject: [PATCH 15/29] flush on relocation --- .../java/org/elasticsearch/index/shard/IndexShard.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f13b50ed5cf72..6c5990f2df2d7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -422,7 +422,13 @@ public void updateShardState(final ShardRouting newRouting, final DiscoveryNode recoverySourceNode = recoveryState.getSourceNode(); if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) { // there was no primary context hand-off in < 6.0.0, need to manually activate the shard - getEngine().seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint()); + final Engine engine = getEngine(); + engine.seqNoService().activatePrimaryMode(getEngine().seqNoService().getLocalCheckpoint()); + // Flush the translog as it may contain operations with no sequence numbers. We want to make sure those + // operations will never be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# + // (due to active indexing) and operations without a seq# coming from the translog. We therefore flush + // to create a lucene commit point to an empty translog file. + engine.flush(false, true); } } From 99352258ed4ea5d567ecec7decc69ee35beebd5f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 11:49:52 +0100 Subject: [PATCH 16/29] simplify relation ship between flush and roll --- .../elasticsearch/index/shard/IndexShard.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6c5990f2df2d7..4f9d4f80983c6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -493,24 +493,24 @@ public void updateShardState(final ShardRouting newRouting, * subsequently fails before the primary/replica re-sync completes successfully and we are now being * promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by - * replaying the translog and marking any operations there are completed. Rolling the translog generation is - * not strictly needed here (as we will never have collisions between sequence numbers in a translog - * generation in a new primary as it takes the last known sequence number as a starting point), but it - * simplifies reasoning about the relationship between primary terms and translog generations. + * replaying the translog and marking any operations there are completed. */ final Engine engine = getEngine(); engine.restoreLocalCheckpointFromTranslog(); - if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1) && - engine.getTranslog().uncommittedOperations() > 0) { + if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1)) { // an index that was created before sequence numbers were introduce may contain operations in its // translog that do not have a sequence numbers. We want to make sure those operations will never // be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# (due // to active indexing) and operations without a seq# coming from the translog. We therefore flush // to create a lucene commit point to an empty translog file. engine.flush(false, true); - } else { - engine.rollTranslogGeneration(); } + /* Rolling the translog generation is not strictly needed here (as we will never have collisions between + * sequence numbers in a translog generation in a new primary as it takes the last known sequence number + * as a starting point), but it simplifies reasoning about the relationship between primary terms and + * translog generations. + */ + engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); engine.seqNoService().updateLocalCheckpointForShard(currentRouting.allocationId().getId(), getEngine().seqNoService().getLocalCheckpoint()); From 10fbb3e3a39f7055c1cef0a1f51083a5947b17a9 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 11:57:54 +0100 Subject: [PATCH 17/29] add explicit index names to health check --- .../resources/rest-api-spec/test/upgraded_cluster/10_basic.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index e050e5dd451d1..5de13d77afc1c 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -7,6 +7,7 @@ # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s level: shards + index: test_index, index_with_replicas, multi_type_index - do: search: From fb8a105c45ec123b997258c1600ac8001c269f42 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 14:24:25 +0100 Subject: [PATCH 18/29] beef up testSendSnapshotSendsOps --- .../recovery/RecoverySourceHandlerTests.java | 84 +++++++++++++++---- 1 file changed, 68 insertions(+), 16 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index bf7cdcc2acad1..3315c5c95fa20 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -70,15 +70,18 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -88,6 +91,8 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RecoverySourceHandlerTests extends ESTestCase { @@ -181,28 +186,75 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true))); } operations.add(null); - final long startingSeqNo = randomIntBetween(0, 16); + final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); + final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); + final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo, numberOfDocsWithValidSequenceNumbers); // todo add proper tests - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, startingSeqNo, - numberOfDocsWithValidSequenceNumbers, new Translog.Snapshot() { - @Override - public void close() { + RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, new Translog.Snapshot() { + @Override + public void close() { - } + } - private int counter = 0; + private int counter = 0; - @Override - public int totalOperations() { - return operations.size() - 1; - } + @Override + public int totalOperations() { + return operations.size() - 1; + } - @Override - public Translog.Operation next() throws IOException { - return operations.get(counter++); + @Override + public Translog.Operation next() throws IOException { + return operations.get(counter++); + } + }); + final int expectedOps = (int) (endingSeqNo - startingSeqNo); + assertThat(result.totalOperations, equalTo(expectedOps)); + final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); + if (expectedOps > 0) { + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); + List shippedOps = shippedOpsCaptor.getAllValues().stream() + .flatMap(List::stream).map(o -> (Translog.Operation) o).collect(Collectors.toList()); + shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); + assertThat(shippedOps.size(), equalTo(expectedOps)); + for (int i = 0; i < shippedOps.size(); i++) { + assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } - }); - assertThat(result.totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo))); + } else { + verify(recoveryTarget, never()).indexTranslogOperations(null, 0); + } + if (endingSeqNo >= requiredStartingSeqNo + 1) { + // check that missing ops blows up + List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker + .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() < endingSeqNo).collect(Collectors.toList()); + List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); + expectThrows(IllegalStateException.class, () -> + handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + endingSeqNo, new Translog.Snapshot() { + @Override + public void close() { + + } + + private int counter = 0; + + @Override + public int totalOperations() { + return operations.size() - 1 - opsToSkip.size(); + } + + @Override + public Translog.Operation next() throws IOException { + Translog.Operation op; + do { + op = operations.get(counter++); + } + while (op != null && opsToSkip.contains(op)); + return op; + } + })); + } } private Engine.Index getIndex(final String id) { From c1a0cc74017a68cea7641faeb81265477e8b8660 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 15:08:24 +0100 Subject: [PATCH 19/29] fix testWaitForPendingSeqNo --- .../replication/RecoveryDuringReplicationTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 96f6aa6d47acb..b0988d9cf9a5c 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -374,15 +374,15 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); CountDownLatch recoveryStart = new CountDownLatch(1); - AtomicBoolean preparedForTranslog = new AtomicBoolean(false); + AtomicBoolean opsSent = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { - preparedForTranslog.set(true); - super.prepareForTranslogOperations(totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + opsSent.set(true); + return super.indexTranslogOperations(operations, totalTranslogOps); } }; }); @@ -392,7 +392,7 @@ public void prepareForTranslogOperations(int totalTranslogOps) throws IOExceptio // index some more docs += shards.indexDocs(randomInt(5)); - assertFalse("recovery should wait on pending docs", preparedForTranslog.get()); + assertFalse("recovery should wait on pending docs", opsSent.get()); primaryEngineFactory.releaseLatchedIndexers(); pendingDocsDone.await(); From e8f65f320df031ac65b3d9e7f62b206dc612dba3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 16:27:21 +0100 Subject: [PATCH 20/29] feedback --- .../recovery/RecoverySourceHandler.java | 39 +++++++++---------- .../recovery/RecoverySourceHandlerTests.java | 6 +-- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index ba97745a5a6be..096598e25c453 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -168,8 +168,6 @@ public RecoveryResponse recoverToTarget() throws IOException { // but we must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; - assert requiredSeqNoRangeStart >= 0 : - "base commit contains an illegal local checkpoint " + (requiredSeqNoRangeStart - 1); try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); } catch (final Exception e) { @@ -182,6 +180,9 @@ public RecoveryResponse recoverToTarget() throws IOException { } } } + assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + + startingSeqNo + "]"; runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); @@ -190,7 +191,15 @@ public RecoveryResponse recoverToTarget() throws IOException { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } - final long endingSeqNo = determineEndingSeqNo(); + + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; @@ -227,18 +236,6 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { }); } - private long determineEndingSeqNo() { - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - return endingSeqNo + 1; - } - /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain @@ -525,16 +522,16 @@ static class SendSnapshotResult { * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo - 1) - * @param endingSeqNo the upper bound of the sequence number range to be sent (exclusive) + * @param requiredSeqNoRangeStart the lower sequence number of the required range + * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total * number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { - assert requiredSeqNoRangeStart <= endingSeqNo : - "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo" + endingSeqNo; + assert requiredSeqNoRangeStart <= endingSeqNo + 1: + "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; int ops = 0; @@ -562,7 +559,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require cancellableThreads.checkForCancel(); final long seqNo = operation.seqNo(); - if (seqNo < startingSeqNo || seqNo >= endingSeqNo) { + if (seqNo < startingSeqNo || seqNo > endingSeqNo) { skippedOps++; continue; } @@ -587,7 +584,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require cancellableThreads.executeIO(sendBatch); } - if (requiredOpsTracker.getCheckpoint() < endingSeqNo - 1) { + if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { throw new IllegalStateException("translog replay failed to covered required sequence numbers" + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + (requiredOpsTracker.getCheckpoint() + 1) + "]"); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 3315c5c95fa20..c7358baf98a41 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -188,7 +188,7 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(null); final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); - final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo, numberOfDocsWithValidSequenceNumbers); + final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); // todo add proper tests RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, endingSeqNo, new Translog.Snapshot() { @@ -209,7 +209,7 @@ public Translog.Operation next() throws IOException { return operations.get(counter++); } }); - final int expectedOps = (int) (endingSeqNo - startingSeqNo); + final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); if (expectedOps > 0) { @@ -227,7 +227,7 @@ public Translog.Operation next() throws IOException { if (endingSeqNo >= requiredStartingSeqNo + 1) { // check that missing ops blows up List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker - .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() < endingSeqNo).collect(Collectors.toList()); + .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); expectThrows(IllegalStateException.class, () -> handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, From 6c5140ccfe49adf49b0beb03b307fd6c40b2520c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 17:03:51 +0100 Subject: [PATCH 21/29] more feedback --- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../indices/recovery/RecoverySourceHandler.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4f9d4f80983c6..99ac7740becc3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -498,7 +498,7 @@ public void updateShardState(final ShardRouting newRouting, final Engine engine = getEngine(); engine.restoreLocalCheckpointFromTranslog(); if (indexSettings.getIndexVersionCreated().onOrBefore(Version.V_6_0_0_alpha1)) { - // an index that was created before sequence numbers were introduce may contain operations in its + // an index that was created before sequence numbers were introduced may contain operations in its // translog that do not have a sequence numbers. We want to make sure those operations will never // be replayed as part of peer recovery to avoid an arbitrary mixture of operations with seq# (due // to active indexing) and operations without a seq# coming from the translog. We therefore flush diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 096598e25c453..fc28136705148 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -440,11 +440,11 @@ void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new * shard. * - * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all - * ops should be sent + * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all + * ops should be sent * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) - * @param endingSeqNo the highest sequence number that should be sent - * @param snapshot a snapshot of the translog + * @param endingSeqNo the highest sequence number that should be sent + * @param snapshot a snapshot of the translog * @return the local checkpoint on the target */ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) @@ -521,11 +521,11 @@ static class SendSnapshotResult { *

* Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. * - * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent + * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent * @param requiredSeqNoRangeStart the lower sequence number of the required range - * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) - * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the total - * number of operations sent + * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) + * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the + * total number of operations sent * @throws IOException if an I/O exception occurred reading the translog snapshot */ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, From dce71ab2e06b9f58d79f0476b03954c4a6700bc0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 17:14:34 +0100 Subject: [PATCH 22/29] last feedback round --- .../indices/recovery/RecoverySourceHandler.java | 2 +- .../indices/recovery/RecoverySourceHandlerTests.java | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index fc28136705148..c0965cd1b0651 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -585,7 +585,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require } if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { - throw new IllegalStateException("translog replay failed to covered required sequence numbers" + + throw new IllegalStateException("translog replay failed to cover required sequence numbers" + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" + (requiredOpsTracker.getCheckpoint() + 1) + "]"); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index c7358baf98a41..ddb1e2ff4aeab 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -188,8 +188,7 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(null); final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); - final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); - // todo add proper tests + final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, endingSeqNo, new Translog.Snapshot() { @Override @@ -222,7 +221,7 @@ public Translog.Operation next() throws IOException { assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } } else { - verify(recoveryTarget, never()).indexTranslogOperations(null, 0); + verify(recoveryTarget, never()).indexTranslogOperations(any(), any()); } if (endingSeqNo >= requiredStartingSeqNo + 1) { // check that missing ops blows up @@ -249,8 +248,7 @@ public Translog.Operation next() throws IOException { Translog.Operation op; do { op = operations.get(counter++); - } - while (op != null && opsToSkip.contains(op)); + } while (op != null && opsToSkip.contains(op)); return op; } })); From 6989d52759021f5c15df93d696ed23e1ff0ca349 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 17:26:16 +0100 Subject: [PATCH 23/29] reduce the ensure green --- .../resources/rest-api-spec/test/upgraded_cluster/10_basic.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index 5de13d77afc1c..9edee96fc0d98 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -7,7 +7,7 @@ # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s level: shards - index: test_index, index_with_replicas, multi_type_index + index: test_index, index_with_replicas - do: search: From c09e4191eed79b4a9b0b26d1059e13c613c14f9c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 29 Nov 2017 23:32:19 +0100 Subject: [PATCH 24/29] fix testSendSnapshotSendsOps as we always send at least one (potentially empty) batch --- .../recovery/RecoverySourceHandlerTests.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index ddb1e2ff4aeab..c04d69bbed20e 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -91,7 +91,6 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -211,17 +210,13 @@ public Translog.Operation next() throws IOException { final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); - if (expectedOps > 0) { - verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); - List shippedOps = shippedOpsCaptor.getAllValues().stream() - .flatMap(List::stream).map(o -> (Translog.Operation) o).collect(Collectors.toList()); - shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); - assertThat(shippedOps.size(), equalTo(expectedOps)); - for (int i = 0; i < shippedOps.size(); i++) { - assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); - } - } else { - verify(recoveryTarget, never()).indexTranslogOperations(any(), any()); + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); + List shippedOps = shippedOpsCaptor.getAllValues().stream() + .flatMap(List::stream).map(o -> (Translog.Operation) o).collect(Collectors.toList()); + shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo)); + assertThat(shippedOps.size(), equalTo(expectedOps)); + for (int i = 0; i < shippedOps.size(); i++) { + assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } if (endingSeqNo >= requiredStartingSeqNo + 1) { // check that missing ops blows up From 53869b3a89c76c27ed30482e9de425a0ec36741e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 30 Nov 2017 00:05:38 +0100 Subject: [PATCH 25/29] extra space? --- .../resources/rest-api-spec/test/upgraded_cluster/10_basic.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index 9edee96fc0d98..556d871e664eb 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -7,7 +7,7 @@ # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s level: shards - index: test_index, index_with_replicas + index: test_index,index_with_replicas - do: search: From 94677587a43e53e239d8b87bd95de0fcb9629469 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 30 Nov 2017 09:09:27 +0100 Subject: [PATCH 26/29] add empty shard test --- .../upgrades/FullClusterRestartIT.java | 26 +++++++++++++++++++ .../test/old_cluster/10_basic.yml | 10 +++++++ .../test/upgraded_cluster/10_basic.yml | 2 +- 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index c7e708418c92c..4939533883eae 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -25,8 +25,10 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -49,6 +51,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -583,6 +587,28 @@ public void testSingleDoc() throws IOException { assertThat(toStr(client().performRequest("GET", docLocation)), containsString(doc)); } + /** + * Tests that a single empty shard index is correctly recovered. Empty shards are often an edge case. + */ + public void testEmptyShard() throws IOException { + final String index = "test_empty_hard"; + + if (runningAgainstOldCluster) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + // if the node with the replica is the first to be restarted, while a replica is still recovering + // then delayed allocation will kick in. When the node comes back, the master will search for a copy + // but the recovering copy will be seen as invalid and the cluster health won't return to GREEN + // before timing out + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster + createIndex(index, settings.build()); + } + ensureGreen(index); + } + + /** * Tests recovery of an index with or without a translog and the * statistics we gather about that. diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml index 499a39c4e4edb..30f891bf0e0d1 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/10_basic.yml @@ -16,6 +16,16 @@ # allocation will kick in, and the cluster health won't return to GREEN # before timing out index.unassigned.node_left.delayed_timeout: "100ms" + + - do: + indices.create: + index: empty_index # index to ensure we can recover empty indices + body: + # if the node with the replica is the first to be restarted, then delayed + # allocation will kick in, and the cluster health won't return to GREEN + # before timing out + index.unassigned.node_left.delayed_timeout: "100ms" + - do: bulk: refresh: true diff --git a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml index 556d871e664eb..3dddc2538af93 100644 --- a/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml +++ b/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/10_basic.yml @@ -7,7 +7,7 @@ # wait for long enough that we give delayed unassigned shards to stop being delayed timeout: 70s level: shards - index: test_index,index_with_replicas + index: test_index,index_with_replicas,empty_index - do: search: From 09f71334a89cf1fbbb52d7ac30a76b5c850b7920 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 30 Nov 2017 09:24:54 +0100 Subject: [PATCH 27/29] make sure seq no info is in commit if recovering an old index --- .../org/elasticsearch/index/engine/InternalEngine.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2359a4195c569..c04cf8da8c7f9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -351,9 +351,11 @@ private void recoverFromTranslogInternal() throws IOException { } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); - } else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) { - assert historyUUID != null; - // put the history uuid into the index + } else if (lastCommittedSegmentInfos.getUserData().containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) == false) { + assert engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : + "index was created on version " + engineConfig.getIndexSettings().getIndexVersionCreated() + "but has " + + "no sequence numbers info in commit"; + commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); } From 1501d4abb395cf4eef8ffe65c235e3fa99f466a3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 30 Nov 2017 10:27:24 +0100 Subject: [PATCH 28/29] add assertions that commit point in store always has sequence numbers info once recovery is done. --- .../org/elasticsearch/index/shard/IndexShard.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 99ac7740becc3..3f9314b29c29f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1333,6 +1333,17 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole active.set(true); newEngine.recoverFromTranslog(); } + assertSequenceNumbersInCommit(); + } + + private boolean assertSequenceNumbersInCommit() throws IOException { + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); + assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; + assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; + assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + + userData.get(Engine.HISTORY_UUID_KEY) + "] is different than engine [" + getHistoryUUID() + "]"; + return true; } private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { From e5a734ca378b149aa83f9343b91dbacc245c28ab Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 30 Nov 2017 10:28:12 +0100 Subject: [PATCH 29/29] hard -> shard --- .../java/org/elasticsearch/upgrades/FullClusterRestartIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 4939533883eae..5011c9e7006ac 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -591,7 +591,7 @@ public void testSingleDoc() throws IOException { * Tests that a single empty shard index is correctly recovered. Empty shards are often an edge case. */ public void testEmptyShard() throws IOException { - final String index = "test_empty_hard"; + final String index = "test_empty_shard"; if (runningAgainstOldCluster) { Settings.Builder settings = Settings.builder()