From 5708796e5131bac7724ea57d6290b6f688a9b47a Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 16 Apr 2019 13:03:55 +0100 Subject: [PATCH] Inline TransportReplAct#createReplicatedOperation (#41197) `TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation` exists so it can be overridden in tests. This commit re-works these tests to use a real `ReplicationOperation` and inlines the now-unnecessary method. Relates #40706. --- .../replication/ReplicationOperation.java | 2 +- .../TransportReplicationAction.java | 62 +++---- .../TransportReplicationActionTests.java | 161 +++++++----------- 3 files changed, 93 insertions(+), 132 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index e5c2136aae56d..7917d9c05078b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -74,7 +74,7 @@ public class ReplicationOperation< private final long primaryTerm; // exposed for tests - final ActionListener resultListener; + private final ActionListener resultListener; private volatile PrimaryResultT primaryResult = null; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e9c071e5a0ed1..0c464d27e1957 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -357,37 +357,35 @@ public void handleException(TransportException exp) { }); } else { setPhase(replicationTask, "primary"); - createReplicatedOperation(primaryRequest.getRequest(), - ActionListener.wrap(result -> result.respond( - new ActionListener<>() { - @Override - public void onResponse(Response response) { - if (syncGlobalCheckpointAfterOperation) { - final IndexShard shard = primaryShardReference.indexShard; - try { - shard.maybeSyncGlobalCheckpoint("post-operation"); - } catch (final Exception e) { - // only log non-closed exceptions - if (ExceptionsHelper.unwrap( - e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - // intentionally swallow, a missed global checkpoint sync should not fail this operation - logger.info( - new ParameterizedMessage( - "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e); - } - } - } - primaryShardReference.close(); // release shard operation lock before responding to caller - setPhase(replicationTask, "finished"); - onCompletionListener.onResponse(response); - } - @Override - public void onFailure(Exception e) { - handleException(primaryShardReference, e); + final ActionListener referenceClosingListener = ActionListener.wrap(response -> { + primaryShardReference.close(); // release shard operation lock before responding to caller + setPhase(replicationTask, "finished"); + onCompletionListener.onResponse(response); + }, e -> handleException(primaryShardReference, e)); + + final ActionListener globalCheckpointSyncingListener = ActionListener.wrap(response -> { + if (syncGlobalCheckpointAfterOperation) { + final IndexShard shard = primaryShardReference.indexShard; + try { + shard.maybeSyncGlobalCheckpoint("post-operation"); + } catch (final Exception e) { + // only log non-closed exceptions + if (ExceptionsHelper.unwrap( + e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + // intentionally swallow, a missed global checkpoint sync should not fail this operation + logger.info( + new ParameterizedMessage( + "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e); } - }), e -> handleException(primaryShardReference, e) - ), primaryShardReference).execute(); + } + } + referenceClosingListener.onResponse(response); + }, referenceClosingListener::onFailure); + + new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference, + ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure), + newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute(); } } catch (Exception e) { handleException(primaryShardReference, e); @@ -405,12 +403,6 @@ public void onFailure(Exception e) { onCompletionListener.onFailure(e); } - protected ReplicationOperation> createReplicatedOperation( - Request request, ActionListener> listener, - PrimaryShardReference primaryShardReference) { - return new ReplicationOperation<>(request, primaryShardReference, listener, - newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()); - } } public static class PrimaryResult, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2cdd3ad2fe480..ccb23a9111a4e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -143,6 +143,7 @@ public static R resolveRequest(TransportRequest r if (requestOrWrappedRequest instanceof TransportReplicationAction.ConcreteShardRequest) { requestOrWrappedRequest = ((TransportReplicationAction.ConcreteShardRequest)requestOrWrappedRequest).getRequest(); } + //noinspection unchecked return (R) requestOrWrappedRequest; } @@ -209,7 +210,7 @@ private void setStateWithBlock(final ClusterService clusterService, final Cluste setState(clusterService, ClusterState.builder(clusterService.state()).blocks(blocks).build()); } - public void testBlocksInReroutePhase() throws Exception { + public void testBlocksInReroutePhase() { final ClusterBlock nonRetryableBlock = new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); final ClusterBlock retryableBlock = @@ -290,7 +291,6 @@ public ClusterBlockLevel indexBlockLevel() { TestAction testActionWithNoBlocks = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool); - listener = new PlainActionFuture<>(); TestAction.ReroutePhase reroutePhase = testActionWithNoBlocks.new ReroutePhase(task, requestWithTimeout, listener); reroutePhase.run(); assertListenerThrows("should fail with an IndexNotFoundException when no blocks", listener, IndexNotFoundException.class); @@ -350,7 +350,7 @@ public void assertIndexShardUninitialized() { assertEquals(0, count.get()); } - public void testNotStartedPrimary() throws InterruptedException, ExecutionException { + public void testNotStartedPrimary() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // no replicas in oder to skip the replication part @@ -399,7 +399,7 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept * This test checks that replication request is not routed back from relocation target to relocation source in case of * stale index routing table on relocation target. */ - public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException { + public void testNoRerouteOnStaleClusterState() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = state(index, true, ShardRoutingState.RELOCATING); @@ -441,7 +441,7 @@ public void testNoRerouteOnStaleClusterState() throws InterruptedException, Exec assertIndexShardCounter(0); } - public void testUnknownIndexOrShardOnReroute() throws InterruptedException { + public void testUnknownIndexOrShardOnReroute() { final String index = "test"; // no replicas in oder to skip the replication part setState(clusterService, state(index, true, @@ -462,10 +462,9 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException { reroutePhase.run(); assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class); assertFalse(request.isRetrySet.get()); //TODO I'd have expected this to be true but we fail too early? - } - public void testClosedIndexOnReroute() throws InterruptedException { + public void testClosedIndexOnReroute() { final String index = "test"; // no replicas in oder to skip the replication part ClusterStateChanges clusterStateChanges = new ClusterStateChanges(xContentRegistry(), threadPool); @@ -488,7 +487,7 @@ public void testClosedIndexOnReroute() throws InterruptedException { assertFalse(request.isRetrySet.get()); } - public void testStalePrimaryShardOnReroute() throws InterruptedException { + public void testStalePrimaryShardOnReroute() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // no replicas in order to skip the replication part @@ -596,23 +595,17 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw } final TransportReplicationAction.ConcreteShardRequest primaryRequest = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); - action.new AsyncPrimaryAction(primaryRequest, listener, task) { + + new TestAction(Settings.EMPTY, "internal:testAction2", transportService, clusterService, shardStateAction, threadPool) { @Override - protected ReplicationOperation> - createReplicatedOperation( - Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference) { - return new NoopReplicationOperation(request, actionListener, primaryTerm) { - @Override - public void execute() throws Exception { - assertPhase(task, "primary"); - assertFalse(executed.getAndSet(true)); - super.execute(); - } - }; + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { + assertPhase(task, "primary"); + assertFalse(executed.getAndSet(true)); + super.shardOperationOnPrimary(shardRequest, primary, listener); } - }.run(); + }.new AsyncPrimaryAction(primaryRequest, listener, task).run(); + if (executeOnPrimary) { assertTrue(executed.get()); assertTrue(listener.isDone()); @@ -626,9 +619,12 @@ public void execute() throws Exception { transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId()); assertThat(requests, notNullValue()); assertThat(requests.size(), equalTo(1)); - assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("internal:testAction[p]")); - assertThat("primary term not properly set on primary delegation", - ((TransportReplicationAction.ConcreteShardRequest)requests.get(0).request).getPrimaryTerm(), equalTo(primaryTerm)); + assertThat("primary request was not delegated to relocation target", + requests.get(0).action, equalTo("internal:testAction2[p]")); + //noinspection unchecked + final TransportReplicationAction.ConcreteShardRequest concreteShardRequest + = (TransportReplicationAction.ConcreteShardRequest) requests.get(0).request; + assertThat("primary term not properly set on primary delegation", concreteShardRequest.getPrimaryTerm(), equalTo(primaryTerm)); assertPhase(task, "primary_delegation"); transport.handleResponse(requests.get(0).requestId, new TestResponse()); assertTrue(listener.isDone()); @@ -638,7 +634,7 @@ public void execute() throws Exception { } } - public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws Exception { + public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = state(index, true, ShardRoutingState.RELOCATING); @@ -654,34 +650,24 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws AtomicBoolean executed = new AtomicBoolean(); final TransportReplicationAction.ConcreteShardRequest primaryRequest = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm); - action.new AsyncPrimaryAction(primaryRequest, listener, task) { - @Override - protected ReplicationOperation> - createReplicatedOperation( - Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference) { - return new NoopReplicationOperation(request, actionListener, primaryTerm) { - @Override - public void execute() throws Exception { - assertPhase(task, "primary"); - assertFalse(executed.getAndSet(true)); - super.execute(); - } - }; - } + new TestAction(Settings.EMPTY, "internal:testAction2", transportService, clusterService, shardStateAction, threadPool) { @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { + assertPhase(task, "primary"); + assertFalse(executed.getAndSet(true)); + super.shardOperationOnPrimary(shardRequest, primary, listener); } - }.run(); + }.new AsyncPrimaryAction(primaryRequest, listener, task).run(); assertThat(executed.get(), equalTo(true)); assertPhase(task, "finished"); assertFalse(request.isRetrySet.get()); + assertTrue(listener.isDone()); + listener.actionGet(); // throws no exception } - public void testPrimaryReference() throws Exception { + public void testPrimaryReference() { final IndexShard shard = mock(IndexShard.class); AtomicBoolean closed = new AtomicBoolean(); @@ -789,6 +775,7 @@ public void testSeqNoIsSetOnPrimary() { inSyncIds, shardRoutingTable.getAllAllocationIds())); doAnswer(invocation -> { + //noinspection unchecked ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); @@ -805,6 +792,7 @@ public void testSeqNoIsSetOnPrimary() { action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); assertThat(requestsToReplicas, arrayWithSize(1)); + //noinspection unchecked assertThat(((TransportReplicationAction.ConcreteShardRequest) requestsToReplicas[0].request).getPrimaryTerm(), equalTo(primaryTerm)); } @@ -821,47 +809,38 @@ public void testCounterOnPrimary() throws Exception { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - int i = randomInt(3); - final boolean throwExceptionOnCreation = i == 1; - final boolean throwExceptionOnRun = i == 2; - final boolean respondWithError = i == 3; + int i = randomInt(2); + final boolean throwExceptionOnRun = i == 1; + final boolean respondWithError = i == 2; final TransportReplicationAction.ConcreteShardRequest primaryRequest = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); - action.new AsyncPrimaryAction(primaryRequest, listener, task) { + + new TestAction(Settings.EMPTY, "internal:testAction2", transportService, clusterService, shardStateAction, threadPool) { @Override - protected ReplicationOperation> - createReplicatedOperation( - Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference) { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { assertIndexShardCounter(1); - if (throwExceptionOnCreation) { - throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); + if (throwExceptionOnRun) { + throw new ElasticsearchException("simulated exception, during shardOperationOnPrimary"); + } else if (respondWithError) { + listener.onFailure(new ElasticsearchException("simulated exception, as a response")); + } else { + super.shardOperationOnPrimary(request, primary, listener); } - return new NoopReplicationOperation(request, actionListener, primaryTerm) { - @Override - public void execute() throws Exception { - assertIndexShardCounter(1); - assertPhase(task, "primary"); - if (throwExceptionOnRun) { - throw new ElasticsearchException("simulated exception, during performOnPrimary"); - } else if (respondWithError) { - this.resultListener.onFailure(new ElasticsearchException("simulated exception, as a response")); - } else { - super.execute(); - } - } - }; } - }.run(); + }.new AsyncPrimaryAction(primaryRequest, listener, task).run(); + assertIndexShardCounter(0); assertTrue(listener.isDone()); assertPhase(task, "finished"); try { listener.get(); + if (throwExceptionOnRun || respondWithError) { + fail("expected exception, but none was thrown"); + } } catch (ExecutionException e) { - if (throwExceptionOnCreation || throwExceptionOnRun || respondWithError) { + if (throwExceptionOnRun || respondWithError) { Throwable cause = e.getCause(); assertThat(cause, instanceOf(ElasticsearchException.class)); assertThat(cause.getMessage(), containsString("simulated")); @@ -871,7 +850,7 @@ public void execute() throws Exception { } } - public void testReplicasCounter() throws Exception { + public void testReplicasCounter() { final ShardId shardId = new ShardId("test", "_na_", 0); final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); setState(clusterService, state); @@ -909,7 +888,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl * This test ensures that replication operations adhere to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS} setting * when the request is using the default value for waitForActiveShards. */ - public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception { + public void testDefaultWaitForActiveShardsUsesIndexSetting() { final String indexName = "test"; final ShardId shardId = new ShardId(indexName, "_na_", 0); @@ -1167,9 +1146,9 @@ private void assertPhase(@Nullable ReplicationTask task, Matcher phaseMa } public static class Request extends ReplicationRequest { - public AtomicBoolean processedOnPrimary = new AtomicBoolean(); - public AtomicInteger processedOnReplicas = new AtomicInteger(); - public AtomicBoolean isRetrySet = new AtomicBoolean(false); + AtomicBoolean processedOnPrimary = new AtomicBoolean(); + AtomicInteger processedOnReplicas = new AtomicInteger(); + AtomicBoolean isRetrySet = new AtomicBoolean(false); Request(StreamInput in) throws IOException { super(in); @@ -1284,6 +1263,7 @@ private IndexService mockIndexService(final IndexMetaData indexMetaData, Cluster return indexService; } + @SuppressWarnings("unchecked") private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { final IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(shardId); @@ -1319,21 +1299,10 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); when(indexShard.getPendingPrimaryTerm()).thenAnswer(i -> clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); - return indexShard; - } - - class NoopReplicationOperation extends ReplicationOperation> { - - NoopReplicationOperation(Request request, ActionListener> listener, - long primaryTerm) { - super(request, null, listener, null, TransportReplicationActionTests.this.logger, "noop", primaryTerm); - } - @Override - public void execute() throws Exception { - // Using the diamond operator (<>) prevents Eclipse from being able to compile this code - this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult(null, new TestResponse())); - } + ReplicationGroup replicationGroup = mock(ReplicationGroup.class); + when(indexShard.getReplicationGroup()).thenReturn(replicationGroup); + return indexShard; } /** @@ -1348,12 +1317,12 @@ public String getProfileName() { } @Override - public void sendResponse(TransportResponse response) throws IOException { + public void sendResponse(TransportResponse response) { listener.onResponse(((TestResponse) response)); } @Override - public void sendResponse(Exception exception) throws IOException { + public void sendResponse(Exception exception) { listener.onFailure(exception); }