diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0be7c392de2bc..0563feebfb1a9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -103,13 +103,21 @@ synchronized void coordinateReads() { params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint); final int maxBatchOperationCount = params.getMaxBatchOperationCount(); while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) { + final long from = lastRequestedSeqno + 1; + final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount - 1); + final int requestBatchCount; + if (numConcurrentReads == 0) { + // This is the only request, we can optimistically fetch more documents if possible but not enforce max_required_seqno. + requestBatchCount = maxBatchOperationCount; + } else { + requestBatchCount = Math.toIntExact(maxRequiredSeqNo - from + 1); + } + assert 0 < requestBatchCount && requestBatchCount <= maxBatchOperationCount : "request_batch_count=" + requestBatchCount; + LOGGER.trace("{}[{} ongoing reads] read from_seqno={} max_required_seqno={} batch_count={}", + params.getFollowShardId(), numConcurrentReads, from, maxRequiredSeqNo, requestBatchCount); numConcurrentReads++; - long from = lastRequestedSeqno + 1; - // -1 is needed, because maxRequiredSeqno is inclusive - long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, (from + maxBatchOperationCount) - 1); - LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount); - sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno); - lastRequestedSeqno = maxRequiredSeqno; + sendShardChangesRequest(from, requestBatchCount, maxRequiredSeqNo); + lastRequestedSeqno = maxRequiredSeqNo; } if (numConcurrentReads == 0 && hasReadBudget()) { @@ -186,7 +194,13 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); } + /** Called when some operations are fetched from the leading */ + protected void onOperationsFetched(Translog.Operation[] operations) { + + } + synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { + onOperationsFetched(response.getOperations()); leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); final long newFromSeqNo; if (response.getOperations().length == 0) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 7b2032f2eac39..e08d87a94cb2b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -160,7 +160,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureYellow("index1"); @@ -218,7 +218,7 @@ public void testFollowIndex() throws Exception { } public void testSyncMappings() throws Exception { - final String leaderIndexSettings = getIndexSettings(2, + final String leaderIndexSettings = getIndexSettings(2, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureYellow("index1"); @@ -255,7 +255,8 @@ public void testSyncMappings() throws Exception { } public void testFollowIndex_backlog() throws Exception { - String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + String leaderIndexSettings = getIndexSettings(between(1, 5), between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override @@ -306,10 +307,10 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) public void testFollowIndexAndCloseNode() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); - String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + String followerIndexSettings = getIndexSettings(3, 1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); ensureGreen("index1", "index2"); @@ -366,13 +367,14 @@ public void testFollowIndexAndCloseNode() throws Exception { public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = - getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + internalCluster().ensureAtLeastNumDataNodes(2); ensureGreen("index1", "index2"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); @@ -455,7 +457,8 @@ public void testValidateFollowingIndexSettings() throws Exception { } public void testFollowIndex_lowMaxTranslogBytes() throws Exception { - final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + final String leaderIndexSettings = getIndexSettings(1, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureYellow("index1"); @@ -554,15 +557,16 @@ private CheckedRunnable assertExpectedDocumentRunnable(final int valu }; } - private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalIndexSettings) throws IOException { + private String getIndexSettings(final int numberOfShards, final int numberOfReplicas, + final Map additionalIndexSettings) throws IOException { final String settings; try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); { builder.startObject("settings"); { - builder.field("index.number_of_shards", numberOfPrimaryShards); - builder.field("index.number_of_replicas", 1); + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { builder.field(additionalSetting.getKey(), additionalSetting.getValue()); } @@ -592,7 +596,7 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalIndexSettings) throws IOException { final String settings; try (XContentBuilder builder = jsonBuilder()) { @@ -600,7 +604,8 @@ private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards { builder.startObject("settings"); { - builder.field("index.number_of_shards", numberOfPrimaryShards); + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { builder.field(additionalSetting.getKey(), additionalSetting.getValue()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 2d8d15903243d..c5b665e7577b7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -42,7 +42,7 @@ public void testSingleReaderWriter() throws Exception { public void testMultipleReaderWriter() throws Exception { int concurrency = randomIntBetween(2, 8); - TestRun testRun = createTestRun(0, 0, 1024); + TestRun testRun = createTestRun(0, 0, between(1, 1024)); ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun); startAndAssertAndStopTask(task, testRun); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index bbe4ac6806e97..9f9a3a0e0c23e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -45,18 +45,18 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue followerGlobalCheckpoints; public void testCoordinateReads() { - ShardFollowNodeTask task = createShardFollowTask(8, 8, 8, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 64, -1); - + ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 3, -1); task.coordinateReads(); - assertThat(shardChangesRequests.size(), equalTo(8)); + assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request + shardChangesRequests.clear(); + task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L)); assertThat(shardChangesRequests, contains(new long[][]{ - {0L, 8L}, {8L, 8L}, {16L, 8L}, {24L, 8L}, {32L, 8L}, {40L, 8L}, {48L, 8L}, {56L, 8L}} + {6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}} )); - ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.getNumberOfConcurrentReads(), equalTo(8)); - assertThat(status.getLastRequestedSeqno(), equalTo(63L)); + assertThat(status.getNumberOfConcurrentReads(), equalTo(7)); + assertThat(status.getLastRequestedSeqno(), equalTo(60L)); } public void testWriteBuffer() { @@ -263,12 +263,12 @@ public void testReceiveLessThanRequested() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L); - task.innerHandleReadResponse(0L, 64L, response); + ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L); + task.innerHandleReadResponse(0L, 63L, response); assertThat(shardChangesRequests.size(), equalTo(1)); - assertThat(shardChangesRequests.get(0)[0], equalTo(32L)); - assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); + assertThat(shardChangesRequests.get(0)[0], equalTo(21L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(43L)); ShardFollowNodeTask.Status status = task.getStatus(); assertThat(status.getNumberOfConcurrentReads(), equalTo(1)); @@ -310,7 +310,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 64L, + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, new Translog.Operation[0])); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -675,9 +675,9 @@ protected void innerSendBulkShardOperationsRequest(List oper } @Override - protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, + protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer handler, Consumer errorHandler) { - shardChangesRequests.add(new long[]{from, maxBatchOperationCount}); + shardChangesRequests.add(new long[]{from, requestBatchSize}); Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll(); if (readFailure != null) { errorHandler.accept(readFailure); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 1c973d11d207a..84b31cda6301c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ccr.action; +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongSet; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; @@ -72,6 +74,7 @@ public void testSimpleCcrReplication() throws Exception { followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size()); }); shardFollowTask.markAsCompleted(); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); } } @@ -107,6 +110,7 @@ public void testFailLeaderReplicaShard() throws Exception { leaderGroup.assertAllEqual(docCount); assertBusy(() -> followerGroup.assertAllEqual(docCount)); shardFollowTask.markAsCompleted(); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); } } @@ -141,12 +145,23 @@ private ReplicationGroup createFollowGroup(int replicas) throws IOException { private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240, + new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); + LongSet fetchOperations = new LongHashSet(); return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) { + @Override + protected synchronized void onOperationsFetched(Translog.Operation[] operations) { + super.onOperationsFetched(operations); + for (Translog.Operation operation : operations) { + if (fetchOperations.add(operation.seqNo()) == false) { + throw new AssertionError("Operation [" + operation + " ] was fetched already"); + } + } + } + @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { // noop, as mapping updates are not tested @@ -210,6 +225,13 @@ public void markAsFailed(Exception e) { }; } + private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws IOException { + int totalOps = leader.getPrimary().estimateNumberOfHistoryOperations("test", 0); + for (IndexShard followingShard : follower) { + assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps)); + } + } + class CCRAction extends ReplicationAction { CCRAction(BulkShardOperationsRequest request, ActionListener listener, ReplicationGroup group) {