diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index be757a02c3982..13269344a1ac6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.Objects; -import java.util.function.Consumer; public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { @@ -130,10 +129,8 @@ class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy { } @Override - public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess, - final Consumer onPrimaryDemoted, final Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index e9a6e7b48152d..3f09f00b9ac1e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -48,7 +48,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { + shardStateAction.remoteShardFailed( + replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index f001d9a29e29d..71483245ee3fa 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -21,12 +21,14 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -34,7 +36,9 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.util.ArrayList; @@ -43,7 +47,6 @@ import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; public class ReplicationOperation< Request extends ReplicationRequest, @@ -133,10 +136,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica for (String allocationId : replicationGroup.getUnavailableInSyncShards()) { pendingActions.incrementAndGet(); replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, - ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, - throwable -> decPendingAndFinishIfNeeded() - ); + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } } @@ -192,9 +192,8 @@ public void onFailure(Exception replicaException) { shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); } String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); + replicasProxy.failShardIfNeeded(shard, message, replicaException, + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); } @Override @@ -204,13 +203,26 @@ public String toString() { }); } - private void onPrimaryDemoted(Exception demotionFailure) { - String primaryFail = String.format(Locale.ROOT, - "primary shard [%s] was demoted while failing replica shard", - primary.routingEntry()); - // we are no longer the primary, fail ourselves and start over - primary.failShard(primaryFail, demotionFailure); - finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure)); + private void onNoLongerPrimary(Exception failure) { + final boolean nodeIsClosing = failure instanceof NodeClosedException || + (failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage())); + final String message; + if (nodeIsClosing) { + message = String.format(Locale.ROOT, + "node with primary [%s] is shutting down while failing replica shard", primary.routingEntry()); + // We prefer not to fail the primary to avoid unnecessary warning log + // when the node with the primary shard is gracefully shutting down. + } else { + if (Assertions.ENABLED) { + if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) { + throw new AssertionError("unexpected failure", failure); + } + } + // we are no longer the primary, fail ourselves and start over + message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry()); + primary.failShard(message, failure); + } + finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure)); } /** @@ -370,31 +382,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo * of active shards. Whether a failure is needed is left up to the * implementation. * - * @param replica shard to fail - * @param message a (short) description of the reason - * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed - * @param onSuccess a callback to call when the shard has been successfully removed from the active set. - * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted - * by the master. - * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the + * @param replica shard to fail + * @param message a (short) description of the reason + * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener); /** * Marks shard copy as stale if needed, removing its allocation id from * the set of in-sync allocation ids. Whether marking as stale is needed * is left up to the implementation. * - * @param shardId shard id - * @param allocationId allocation id to remove from the set of in-sync allocation ids - * @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set. - * @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted - * by the master. - * @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored. + * @param shardId shard id + * @param allocationId allocation id to remove from the set of in-sync allocation ids + * @param listener a listener that will be notified when the failing shard has been removed from the in-sync set */ - void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 326f7bacdb8f6..cc0c69418d7eb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -84,7 +84,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -1173,47 +1172,21 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { // This does not need to fail the shard. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be // "alive" if it were to fail. - onSuccess.run(); + listener.onResponse(null); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { // This does not need to make the shard stale. The idea is that this // is a non-write operation (something like a refresh or a global // checkpoint sync) and therefore the replica should still be // "alive" if it were to be marked as stale. - onSuccess.run(); - } - - protected final ActionListener createShardActionListener(final Runnable onSuccess, - final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ActionListener() { - @Override - public void onResponse(Void aVoid) { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + listener.onResponse(null); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index f44694f55d960..4781682437545 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -376,20 +375,17 @@ class WriteActionReplicasProxy extends ReplicasProxy { } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { if (TransportActions.isShardNotAvailableException(exception) == false) { logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); } - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + shardStateAction.remoteShardFailed( + replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, - createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8fa10c4ee26d7..adb79b1fe3bba 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -21,14 +21,16 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -39,7 +41,9 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportException; import java.util.ArrayList; import java.util.Collections; @@ -51,7 +55,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -115,10 +118,8 @@ public void testReplication() throws Exception { final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures); final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); - final TestReplicationOperation op = new TestReplicationOperation(request, - primary, listener, replicasProxy); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy); op.execute(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet())); @@ -162,7 +163,7 @@ private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, Shar } } - public void testDemotedPrimary() throws Exception { + public void testNoLongerPrimary() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -198,26 +199,32 @@ public void testDemotedPrimary() throws Exception { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean(); + final Exception shardActionFailure; + if (randomBoolean()) { + shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT)); + } else if (randomBoolean()) { + shardActionFailure = new TransportException("TransportService is closed stopped can't send request"); + } else { + shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); + } final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) { @Override public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - super.failShardIfNeeded(replica, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.failShardIfNeeded(replica, message, exception, shardActionListener); } else { assertThat(replica, equalTo(failedReplica)); - onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + shardActionListener.onFailure(shardActionFailure); } } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener shardActionListener) { if (testPrimaryDemotedOnStaleShardCopies) { - onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + shardActionListener.onFailure(shardActionFailure); } else { - super.markShardCopyAsStaleIfNeeded(shardId, allocationId, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.markShardCopyAsStaleIfNeeded(shardId, allocationId, shardActionListener); } } }; @@ -225,6 +232,7 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) { @Override public void failShard(String message, Exception exception) { + assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); assertTrue(primaryFailed.compareAndSet(false, true)); } }; @@ -233,7 +241,11 @@ public void failShard(String message, Exception exception) { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertTrue("listener is not marked as done", listener.isDone()); - assertTrue(primaryFailed.get()); + if (shardActionFailure instanceof ShardStateAction.NoLongerPrimaryShardException) { + assertTrue(primaryFailed.get()); + } else { + assertFalse(primaryFailed.get()); + } assertListenerThrows("should throw exception to trigger retry", listener, ReplicationOperation.RetryOnPrimaryException.class); } @@ -594,33 +606,23 @@ public void performOn( } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); } if (opFailures.containsKey(replica)) { - if (randomBoolean()) { - onSuccess.run(); - } else { - onIgnoredFailure.accept(new ElasticsearchException("simulated")); - } + listener.onResponse(null); } else { fail("replica [" + replica + "] was failed"); } } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { if (markedAsStaleCopies.add(allocationId) == false) { fail("replica [" + allocationId + "] was marked as stale twice"); } - if (randomBoolean()) { - onSuccess.run(); - } else { - onIgnoredFailure.accept(new ElasticsearchException("simulated")); - } + listener.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 110ab9bcb99a2..ffc9c2bf70a8a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -744,11 +744,9 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set - ); + ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A replication action doesn't not fail the request assertEquals(0, shardFailedRequests.length); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6e1ec3c76797d..f540374a56c20 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -311,11 +311,9 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { } AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set - ); + ActionListener.wrap(r -> success.set(true), failure::set)); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); // A write replication action proxy should fail the shard assertEquals(1, shardFailedRequests.length); @@ -329,8 +327,6 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); assertTrue(success.get()); assertNull(failure.get()); - assertNull(ignoredFailure.get()); - } else if (randomBoolean()) { // simulate the primary has been demoted transport.handleRemoteError(shardFailedRequest.requestId, @@ -338,15 +334,12 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { "shard-failed-test")); assertFalse(success.get()); assertNotNull(failure.get()); - assertNull(ignoredFailure.get()); - } else { - // simulated an "ignored" exception + // simulated a node closing exception transport.handleRemoteError(shardFailedRequest.requestId, new NodeClosedException(state.nodes().getLocalNode())); assertFalse(success.get()); - assertNull(failure.get()); - assertNotNull(ignoredFailure.get()); + assertNotNull(failure.get()); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 9fd08511446d7..ac19fd68cde80 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -37,8 +37,12 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -52,6 +56,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -66,6 +71,7 @@ import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; @@ -436,4 +442,46 @@ public void testIndicesDeleted() throws Exception { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } + public void testRestartNodeWhileIndexing() throws Exception { + startCluster(3); + String index = "restart_while_indexing"; + assertAcked(client().admin().indices().prepareCreate(index).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(1, 2)))); + AtomicBoolean stopped = new AtomicBoolean(); + Thread[] threads = new Thread[between(1, 4)]; + AtomicInteger docID = new AtomicInteger(); + Set ackedDocs = ConcurrentCollections.newConcurrentSet(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (stopped.get() == false && docID.get() < 5000) { + String id = Integer.toString(docID.incrementAndGet()); + try { + IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get(); + assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); + logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo()); + ackedDocs.add(response.getId()); + } catch (ElasticsearchException ignore) { + logger.info("--> fail to index id={}", id); + } + } + }); + threads[i].start(); + } + ensureGreen(index); + assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(100))); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback()); + ensureGreen(index); + assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(200))); + stopped.set(true); + for (Thread thread : threads) { + thread.join(); + } + ClusterState clusterState = internalCluster().clusterService().state(); + for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { + String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + IndexShard indexShard = indicesService.getShardOrNull(shardRouting.shardId()); + assertThat(IndexShardTestCase.getShardDocUIDs(indexShard), equalTo(ackedDocs)); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d20d57aed64a9..8b984c22bfd0e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -96,7 +96,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -721,15 +720,12 @@ public void onFailure(Exception e) { } @Override - public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener listener) { throw new UnsupportedOperationException("failing shard " + replica + " isn't supported. failure: " + message, exception); } @Override - public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener listener) { throw new UnsupportedOperationException("can't mark " + shardId + ", aid [" + allocationId + "] as stale"); } }