diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c396cdfe84570..37fc1c748c189 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -267,9 +267,7 @@ public synchronized int startReplicas(int numOfReplicasToStart) throws IOExcepti } public void startPrimary() throws IOException { - final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); - primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); - primary.recoverFromStore(); + recoverPrimary(primary); HashSet activeIds = new HashSet<>(); activeIds.addAll(activeIds()); activeIds.add(primary.routingEntry().allocationId().getId()); @@ -302,6 +300,11 @@ assert shardRoutings().stream() updateAllocationIDsOnPrimary(); } + protected synchronized void recoverPrimary(IndexShard primary) { + final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); + primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); + primary.recoverFromStore(); + } public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { final ShardRouting shardRouting = TestShardRouting.newShardRouting( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index dac1e70b86d8f..0f82bfab10287 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -7,6 +7,7 @@ import com.carrotsearch.hppc.LongHashSet; import com.carrotsearch.hppc.LongSet; +import org.apache.lucene.store.IOContext; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -17,9 +18,14 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -31,9 +37,16 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.RestoreOnlyRepository; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -57,6 +70,8 @@ import java.util.function.LongConsumer; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -64,233 +79,241 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { public void testSimpleCcrReplication() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(randomInt(2)); - ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + try (ReplicationGroup leaderGroup = createLeaderGroup(randomInt(2))) { leaderGroup.startAll(); - int docCount = leaderGroup.appendDocs(randomInt(64)); - leaderGroup.assertAllEqual(docCount); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( + try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, randomInt(2))) { + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - docCount += leaderGroup.appendDocs(randomInt(128)); - leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size()); - }); - for (IndexShard shard : followerGroup) { - assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); - } - // Deletes should be replicated to the follower - List deleteDocIds = randomSubsetOf(indexedDocIds); - for (String deleteId : deleteDocIds) { - BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId)); - assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED)); + docCount += leaderGroup.appendDocs(randomInt(128)); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + for (IndexShard shard : followerGroup) { + assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); + } + // Deletes should be replicated to the follower + List deleteDocIds = randomSubsetOf(indexedDocIds); + for (String deleteId : deleteDocIds) { + BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId)); + assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED)); + } + leaderGroup.syncGlobalCheckpoint(); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size()); + }); + shardFollowTask.markAsCompleted(); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); } - leaderGroup.syncGlobalCheckpoint(); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size()); - }); - shardFollowTask.markAsCompleted(); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); } } public void testAddRemoveShardOnLeader() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1)); - ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + try (ReplicationGroup leaderGroup = createLeaderGroup(1 + randomInt(1))) { leaderGroup.startAll(); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( + try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, randomInt(2))) { + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - int batches = between(0, 10); - int docCount = 0; - boolean hasPromotion = false; - for (int i = 0; i < batches; i++) { - docCount += leaderGroup.indexDocs(between(1, 5)); - if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) { - IndexShard closingReplica = randomFrom(leaderGroup.getReplicas()); - leaderGroup.removeReplica(closingReplica); - closingReplica.close("test", false); - closingReplica.store().close(); - } else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) { - IndexShard newPrimary = randomFrom(leaderGroup.getReplicas()); - leaderGroup.promoteReplicaToPrimary(newPrimary).get(); - hasPromotion = true; - } else if (randomInt(100) < 5) { - leaderGroup.addReplica(); - leaderGroup.startReplicas(1); + int batches = between(0, 10); + int docCount = 0; + boolean hasPromotion = false; + for (int i = 0; i < batches; i++) { + docCount += leaderGroup.indexDocs(between(1, 5)); + if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) { + IndexShard closingReplica = randomFrom(leaderGroup.getReplicas()); + leaderGroup.removeReplica(closingReplica); + closingReplica.close("test", false); + closingReplica.store().close(); + } else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) { + IndexShard newPrimary = randomFrom(leaderGroup.getReplicas()); + leaderGroup.promoteReplicaToPrimary(newPrimary).get(); + hasPromotion = true; + } else if (randomInt(100) < 5) { + leaderGroup.addReplica(); + leaderGroup.startReplicas(1); + } + leaderGroup.syncGlobalCheckpoint(); } - leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + assertThat(shardFollowTask.getFailure(), nullValue()); + int expectedDoc = docCount; + assertBusy(() -> followerGroup.assertAllEqual(expectedDoc)); + shardFollowTask.markAsCompleted(); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, hasPromotion == false); } - leaderGroup.assertAllEqual(docCount); - assertThat(shardFollowTask.getFailure(), nullValue()); - int expectedDoc = docCount; - assertBusy(() -> followerGroup.assertAllEqual(expectedDoc)); - shardFollowTask.markAsCompleted(); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, hasPromotion == false); } } public void testChangeLeaderHistoryUUID() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(0); - ReplicationGroup followerGroup = createFollowGroup(0)) { - leaderGroup.startAll(); - int docCount = leaderGroup.appendDocs(randomInt(64)); - leaderGroup.assertAllEqual(docCount); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( - followerGroup.getPrimary().getHistoryUUID(), - leaderSeqNoStats.getGlobalCheckpoint(), - leaderSeqNoStats.getMaxSeqNo(), - followerSeqNoStats.getGlobalCheckpoint(), - followerSeqNoStats.getMaxSeqNo()); - leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size()); - }); - - String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); - leaderGroup.reinitPrimaryShard(); - leaderGroup.getPrimary().store().bootstrapNewHistory(); - recoverShardFromStore(leaderGroup.getPrimary()); - String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); - - // force the global checkpoint on the leader to advance - leaderGroup.appendDocs(64); - - assertBusy(() -> { - assertThat(shardFollowTask.isStopped(), is(true)); - ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); - assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + - "], actual [" + newHistoryUUID + "]")); - }); + try (ReplicationGroup leaderGroup = createLeaderGroup(0)) { + try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, 0)) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + leaderGroup.reinitPrimaryShard(); + leaderGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(leaderGroup.getPrimary()); + String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); + assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "]")); + }); + } } } public void testChangeFollowerHistoryUUID() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(0); - ReplicationGroup followerGroup = createFollowGroup(0)) { + try (ReplicationGroup leaderGroup = createLeaderGroup(0)) { leaderGroup.startAll(); - int docCount = leaderGroup.appendDocs(randomInt(64)); - leaderGroup.assertAllEqual(docCount); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( - followerGroup.getPrimary().getHistoryUUID(), - leaderSeqNoStats.getGlobalCheckpoint(), - leaderSeqNoStats.getMaxSeqNo(), - followerSeqNoStats.getGlobalCheckpoint(), - followerSeqNoStats.getMaxSeqNo()); - leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size()); - }); - - String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); - followerGroup.reinitPrimaryShard(); - followerGroup.getPrimary().store().bootstrapNewHistory(); - recoverShardFromStore(followerGroup.getPrimary()); - String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); - - // force the global checkpoint on the leader to advance - leaderGroup.appendDocs(64); - - assertBusy(() -> { - assertThat(shardFollowTask.isStopped(), is(true)); - ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); - assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + - "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated")); - }); + try(ReplicationGroup followerGroup = createFollowGroup(leaderGroup, 0)) { + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); + followerGroup.reinitPrimaryShard(); + followerGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(followerGroup.getPrimary()); + String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); + + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); + assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated")); + }); + } } } public void testRetryBulkShardOperations() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(between(0, 1)); - ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) { + try (ReplicationGroup leaderGroup = createLeaderGroup(between(0, 1))) { leaderGroup.startAll(); - followerGroup.startAll(); - leaderGroup.appendDocs(between(10, 100)); - leaderGroup.refresh("test"); - for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) { - long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1; - Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(), - Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i); - for (IndexShard shard : leaderGroup) { - getEngine(shard).noOp(noOp); + try(ReplicationGroup followerGroup = createFollowGroup(leaderGroup, between(1, 3))) { + followerGroup.startAll(); + leaderGroup.appendDocs(between(10, 100)); + leaderGroup.refresh("test"); + for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) { + long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1; + Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(), + Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i); + for (IndexShard shard : leaderGroup) { + getEngine(shard).noOp(noOp); + } } - } - for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { - BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); - assertThat(resp.getFailure(), nullValue()); - } - leaderGroup.syncGlobalCheckpoint(); - IndexShard leadingPrimary = leaderGroup.getPrimary(); - // Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower - // but the primary of the follower crashed before these requests completed. - for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) { - long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint()); - long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint()); - int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo); - Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(), - fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); - - IndexShard followingPrimary = followerGroup.getPrimary(); - TransportWriteAction.WritePrimaryResult primaryResult = - TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(), - followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), - followingPrimary, logger); - for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) { - final PlainActionFuture permitFuture = new PlainActionFuture<>(); - replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(), - followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), - permitFuture, ThreadPool.Names.SAME, primaryResult); - try (Releasable ignored = permitFuture.get()) { - TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger); + for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { + BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); + assertThat(resp.getFailure(), nullValue()); + } + leaderGroup.syncGlobalCheckpoint(); + IndexShard leadingPrimary = leaderGroup.getPrimary(); + // Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower + // but the primary of the follower crashed before these requests completed. + for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) { + long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint()); + long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint()); + int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo); + Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(), + fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); + + IndexShard followingPrimary = followerGroup.getPrimary(); + TransportWriteAction.WritePrimaryResult primaryResult = + TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(), + followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + followingPrimary, logger); + for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) { + final PlainActionFuture permitFuture = new PlainActionFuture<>(); + replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(), + followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + permitFuture, ThreadPool.Names.SAME, primaryResult); + try (Releasable ignored = permitFuture.get()) { + TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger); + } } } - } - // A follow-task retries these requests while the primary-replica resync is happening on the follower. - followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas())); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(), - leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - try { - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); - }); - } finally { - shardFollowTask.markAsCompleted(); + // A follow-task retries these requests while the primary-replica resync is happening on the follower. + followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas())); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), + leadingPrimary.getGlobalCheckpoint(), + leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + try { + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); + }); + } finally { + shardFollowTask.markAsCompleted(); + } } } } @@ -303,7 +326,17 @@ public void testAddNewFollowingReplica() throws Exception { operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1)); } Future recoveryFuture = null; - try (ReplicationGroup group = createFollowGroup(between(0, 1))) { + Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) + .build(); + IndexMetaData indexMetaData = buildIndexMetaData(between(0, 1), settings, indexMapping); + try (ReplicationGroup group = new ReplicationGroup(indexMetaData) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return new FollowingEngineFactory(); + } + }) { group.startAll(); while (operations.isEmpty() == false) { List bulkOps = randomSubsetOf(between(1, operations.size()), operations); @@ -330,35 +363,79 @@ public void testAddNewFollowingReplica() throws Exception { } } - @Override - protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { - Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + public void testSimpleRemoteRecovery() throws Exception { + try (ReplicationGroup leader = createLeaderGroup(between(0, 1))) { + leader.startAll(); + leader.appendDocs(between(0, 100)); + leader.flush(); + leader.syncGlobalCheckpoint(); + try (ReplicationGroup follower = createFollowGroup(leader, 0)) { + follower.startAll(); + ShardFollowNodeTask followTask = createShardFollowTask(leader, follower); + followTask.start( + follower.getPrimary().getHistoryUUID(), + leader.getPrimary().getGlobalCheckpoint(), + leader.getPrimary().seqNoStats().getMaxSeqNo(), + follower.getPrimary().getGlobalCheckpoint(), + follower.getPrimary().seqNoStats().getMaxSeqNo() + ); + leader.appendDocs(between(0, 100)); + if (randomBoolean()) { + follower.recoverReplica(follower.addReplica()); + } + assertBusy(() -> assertConsistentHistoryBetweenLeaderAndFollower(leader, follower, false)); + followTask.markAsCompleted(); + } + } + } + + private ReplicationGroup createLeaderGroup(int replicas) throws IOException { + Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) - .put(settings) .build(); - if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) { - IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping); - return new ReplicationGroup(metaData) { - - @Override - protected EngineFactory getEngineFactory(ShardRouting routing) { - return new FollowingEngineFactory(); - } - }; - } else { - return super.createGroup(replicas, newSettings); - } + return createGroup(replicas, settings); } - private ReplicationGroup createFollowGroup(int replicas) throws IOException { - Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException { + Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)); - return createGroup(replicas, settingsBuilder.build()); + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) + .build(); + IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping); + return new ReplicationGroup(indexMetaData) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return new FollowingEngineFactory(); + } + @Override + protected synchronized void recoverPrimary(IndexShard primary) { + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); + ShardRouting routing = ShardRoutingHelper.newWithRestoreSource(primary.routingEntry(), + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); + primary.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null)); + primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) { + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, + IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + try { + IndexShard leader = leaderGroup.getPrimary(); + Lucene.cleanLuceneIndex(primary.store().directory()); + try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { + Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit()); + for (StoreFileMetaData md : sourceSnapshot) { + primary.store().directory().copyFrom( + leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); + } + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + } + }; } private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { @@ -483,7 +560,7 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le final List> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream() .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); final Set> operationsOnLeader = new HashSet<>(); - try (Translog.Snapshot snapshot = leader.getPrimary().getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType())); @@ -497,13 +574,13 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader)); final Set> operationsOnFollower = new HashSet<>(); - try (Translog.Snapshot snapshot = followingShard.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType())); } } - assertThat(operationsOnFollower, equalTo(operationsOnLeader)); + assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader)); } }