From 91b9e4d961a2f46bb54a315cd05c09a0a1302d40 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 30 May 2018 16:03:53 -0400 Subject: [PATCH 1/3] TEST: Only synced-flush after global checkpoint synced When the last indexing operation is completed, we will fire a global checkpoint sync. Since a global checkpoint sync request is a replication request, it will acquire an index shard permit on the primary when executing. If this happens at the same time while we are issuing the synced-flush, the synced-flush request will fail as it thinks there are in-flight operations. We can avoid such situation by not issue the synced-flush until the global checkpoint on the primary is propagated to replicas. --- .../indices/flush/SyncedFlushService.java | 12 ------- .../elasticsearch/indices/flush/FlushIT.java | 22 +----------- .../indices/flush/SyncedFlushUtil.java | 36 +++++++++++++++++-- 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 52e0ac8ab860f..6ef6c1546d152 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.flush; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -502,18 +501,7 @@ private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) { if (indexShard.routingEntry().primary() == false) { throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard"); } - if (Assertions.ENABLED) { - if (logger.isTraceEnabled()) { - logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations()); - } - } int opCount = indexShard.getActiveOperationsCount(); - // Need to snapshot the debug info twice as it's updated concurrently with the permit count. - if (Assertions.ENABLED) { - if (logger.isTraceEnabled()) { - logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations()); - } - } return new InFlightOpsResponse(opCount); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 94bd8e80898db..a543e87adcb46 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -46,16 +46,13 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -103,7 +100,7 @@ public void onFailure(Exception e) { } } - public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException { + public void testSyncedFlush() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get(); ensureGreen(); @@ -246,16 +243,6 @@ private void indexDoc(Engine engine, String id) throws IOException { assertThat(indexResult.getFailure(), nullValue()); } - private String syncedFlushDescription(ShardsSyncedFlushResult result) { - String detail = result.shardResponses().entrySet().stream() - .map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]") - .collect(Collectors.joining(",")); - return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]", - result.totalShards(), result.failed(), result.failureReason(), detail); - } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392") - @TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE") public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception { internalCluster().ensureAtLeastNumDataNodes(between(2, 3)); final int numberOfReplicas = internalCluster().numDataNodes() - 1; @@ -281,7 +268,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception { indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i); } final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); - logger.info("Partial seal: {}", syncedFlushDescription(partialResult)); assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1)); assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas)); assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo( @@ -297,8 +283,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception { assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392") - @TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE") public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { internalCluster().ensureAtLeastNumDataNodes(between(2, 3)); final int numberOfReplicas = internalCluster().numDataNodes() - 1; @@ -315,11 +299,9 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { index("test", "doc", Integer.toString(i)); } final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); - logger.info("First seal: {}", syncedFlushDescription(firstSeal)); assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1)); // Do not renew synced-flush final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); - logger.info("Second seal: {}", syncedFlushDescription(secondSeal)); assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1)); assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId())); // Shards were updated, renew synced flush. @@ -328,7 +310,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { index("test", "doc", Integer.toString(i)); } final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); - logger.info("Third seal: {}", syncedFlushDescription(thirdSeal)); assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1)); assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId()))); // Manually remove or change sync-id, renew synced flush. @@ -344,7 +325,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception { assertThat(shard.commitStats().syncId(), nullValue()); } final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId); - logger.info("Forth seal: {}", syncedFlushDescription(forthSeal)); assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1)); assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId()))); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java index 987f69b65878a..eb1a86555d6d5 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java @@ -23,13 +23,21 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.InternalTestCluster; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import static org.elasticsearch.test.ESTestCase.assertBusy; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + /** Utils for SyncedFlush */ public class SyncedFlushUtil { @@ -40,10 +48,32 @@ private SyncedFlushUtil() { /** * Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)} */ - public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) { + public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) throws Exception { + /* + * When the last indexing operation is completed, we will fire a global checkpoint sync. + * Since a global checkpoint sync request is a replication request, it will acquire an index + * shard permit on the primary when executing. If this happens at the same time while we are + * issuing the synced-flush, the synced-flush request will fail as it thinks there are + * in-flight operations. We can avoid such situation by not issue the synced-flush until the + * global checkpoint on the primary is propagated to replicas. + */ + assertBusy(() -> { + long globalCheckpointOnPrimary = SequenceNumbers.NO_OPS_PERFORMED; + Set assignedNodes = cluster.nodesInclude(shardId.getIndexName()); + for (String node : assignedNodes) { + IndicesService indicesService = cluster.getInstance(IndicesService.class, node); + IndexShard shard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); + if (shard.routingEntry().primary()) { + globalCheckpointOnPrimary = shard.getGlobalCheckpoint(); + } + } + for (String node : assignedNodes) { + IndicesService indicesService = cluster.getInstance(IndicesService.class, node); + IndexShard shard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); + assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpointOnPrimary)); + } + }); SyncedFlushService service = cluster.getInstance(SyncedFlushService.class); - logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]", - service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state()); LatchedListener listener = new LatchedListener<>(); service.attemptSyncedFlush(shardId, listener); try { From a0d69c82a2f7346c301763edb0056e27168313fe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 31 May 2018 11:53:41 -0400 Subject: [PATCH 2/3] retry until no ongoing operations on primary --- .../indices/flush/SyncedFlushUtil.java | 51 ++++++++----------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java index eb1a86555d6d5..2ba2a0340d992 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java @@ -23,20 +23,18 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.InternalTestCluster; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.ESTestCase.assertBusy; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; /** Utils for SyncedFlush */ public class SyncedFlushUtil { @@ -54,37 +52,28 @@ public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, Internal * Since a global checkpoint sync request is a replication request, it will acquire an index * shard permit on the primary when executing. If this happens at the same time while we are * issuing the synced-flush, the synced-flush request will fail as it thinks there are - * in-flight operations. We can avoid such situation by not issue the synced-flush until the - * global checkpoint on the primary is propagated to replicas. + * in-flight operations. We can avoid such situation by continuing issuing another synced-flush + * if the synced-flush failed due to the ongoing operations on the primary. */ + SyncedFlushService service = cluster.getInstance(SyncedFlushService.class); + AtomicReference> listenerHolder = new AtomicReference<>(); assertBusy(() -> { - long globalCheckpointOnPrimary = SequenceNumbers.NO_OPS_PERFORMED; - Set assignedNodes = cluster.nodesInclude(shardId.getIndexName()); - for (String node : assignedNodes) { - IndicesService indicesService = cluster.getInstance(IndicesService.class, node); - IndexShard shard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); - if (shard.routingEntry().primary()) { - globalCheckpointOnPrimary = shard.getGlobalCheckpoint(); - } + LatchedListener listener = new LatchedListener<>(); + listenerHolder.set(listener); + service.attemptSyncedFlush(shardId, listener); + listener.latch.await(); + if (listener.error != null) { + return; // stop here so that we can preserve the error } - for (String node : assignedNodes) { - IndicesService indicesService = cluster.getInstance(IndicesService.class, node); - IndexShard shard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); - assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpointOnPrimary)); + if (listener.result.failed()) { + // only retry if request failed due to ongoing operations on primary + assertThat(listener.result.failureReason(), not(containsString("ongoing operations on primary"))); } }); - SyncedFlushService service = cluster.getInstance(SyncedFlushService.class); - LatchedListener listener = new LatchedListener<>(); - service.attemptSyncedFlush(shardId, listener); - try { - listener.latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (listenerHolder.get().error != null) { + throw ExceptionsHelper.convertToElastic(listenerHolder.get().error); } - if (listener.error != null) { - throw ExceptionsHelper.convertToElastic(listener.error); - } - return listener.result; + return listenerHolder.get().result; } public static final class LatchedListener implements ActionListener { From 93d959c7e9375ccd071242d09a22f5108c6543fa Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 31 May 2018 15:58:51 -0400 Subject: [PATCH 3/3] boaz suggestion --- .../elasticsearch/indices/flush/SyncedFlushUtil.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java index 2ba2a0340d992..8a8d57295a502 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java @@ -32,9 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.ESTestCase.assertBusy; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; /** Utils for SyncedFlush */ public class SyncedFlushUtil { @@ -62,12 +59,9 @@ public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, Internal listenerHolder.set(listener); service.attemptSyncedFlush(shardId, listener); listener.latch.await(); - if (listener.error != null) { - return; // stop here so that we can preserve the error - } - if (listener.result.failed()) { - // only retry if request failed due to ongoing operations on primary - assertThat(listener.result.failureReason(), not(containsString("ongoing operations on primary"))); + if (listener.result != null && listener.result.failureReason() != null + && listener.result.failureReason().contains("ongoing operations on primary")) { + throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry } }); if (listenerHolder.get().error != null) {