From d4f6d3ed4201f5bdb4dd49a191d0665830ae856f Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Apr 2019 08:32:57 +0100 Subject: [PATCH] Remove some abstractions from `TransportReplicationAction` (#40706) `TransportReplicationAction` is a rather complex beast, and some of its concrete implementations do not need all of its features. More specifically, it (a) chases a primary around the cluster until it manages to pin it down and then (b) executes an action on that primary and all its replicas. There are some actions that are coordinated by the primary itself, meaning that there is no need for the chase-the-primary phases, and in the case of peer recovery retention leases and primary/replica resync it is important to bypass these first phases. This commit is a step towards separating the `TransportReplicationAction` into these two parts. It is a mostly mechanical sequence of steps to remove some abstractions that are no longer in use. --- .../refresh/TransportShardRefreshAction.java | 5 +- .../TransportResyncReplicationAction.java | 6 +- .../action/support/ChannelActionListener.java | 1 + .../TransportReplicationAction.java | 234 +++++------------- .../TransportReplicationActionTests.java | 38 +-- ...ReplicationAllPermitsAcquisitionTests.java | 39 +-- 6 files changed, 97 insertions(+), 226 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 0b5975cf025af..df3ff16ff8800 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -53,10 +53,11 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) { + protected PrimaryResult shardOperationOnPrimary( + BasicReplicationRequest shardRequest, IndexShard primary) { primary.refresh("api"); logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index c3aa39009ce8f..80a55e8ab2d41 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -66,16 +66,16 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran @Override protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); // we should never reject resync because of thread pool capacity on primary transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, true, true, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, - new ReplicaOperationTransportHandler()); + this::handleReplicaRequest); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index b23758758e24d..b93298eefe871 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -55,6 +55,7 @@ public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception e1) { + e1.addSuppressed(e); logger.warn(() -> new ParameterizedMessage( "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2ff1ff3e93a9f..ef294b52803de 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.transport.NoNodeAvailableException; @@ -69,10 +70,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; @@ -154,14 +153,12 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - new PrimaryOperationTransportHandler()); + this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - new ReplicaOperationTransportHandler()); + transportService.registerRequestHandler( + transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); } @Override @@ -271,71 +268,30 @@ boolean isRetryableClusterBlockException(final Throwable e) { return false; } - protected class OperationTransportHandler implements TransportRequestHandler { - - public OperationTransportHandler() { - - } - - @Override - public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - execute(task, request, new ActionListener() { - @Override - public void onResponse(Response result) { - try { - channel.sendResponse(result); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage("Failed to send response for {}", actionName), inner); - } - } - }); - } + protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { + execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } - protected class PrimaryOperationTransportHandler implements TransportRequestHandler> { - - public PrimaryOperationTransportHandler() { - - } - - @Override - public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run(); - } + protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { + new AsyncPrimaryAction( + request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); } class AsyncPrimaryAction extends AbstractRunnable { - - private final Request request; - // targetAllocationID of the shard this request is meant for - private final String targetAllocationID; - // primary term of the shard this request is meant for - private final long primaryTerm; - private final TransportChannel channel; + private final ActionListener onCompletionListener; private final ReplicationTask replicationTask; + private final ConcreteShardRequest primaryRequest; - AsyncPrimaryAction(Request request, String targetAllocationID, long primaryTerm, TransportChannel channel, + AsyncPrimaryAction(ConcreteShardRequest primaryRequest, ActionListener onCompletionListener, ReplicationTask replicationTask) { - this.request = request; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.channel = channel; + this.primaryRequest = primaryRequest; + this.onCompletionListener = onCompletionListener; this.replicationTask = replicationTask; } @Override protected void doRun() throws Exception { - final ShardId shardId = request.shardId(); + final ShardId shardId = primaryRequest.getRequest().shardId(); final IndexShard indexShard = getIndexShard(shardId); final ShardRouting shardRouting = indexShard.routingEntry(); // we may end up here if the cluster state used to route the primary is so stale that the underlying @@ -345,17 +301,17 @@ protected void doRun() throws Exception { throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting); } final String actualAllocationId = shardRouting.allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), actualAllocationId); } final long actualTerm = indexShard.getPendingPrimaryTerm(); - if (actualTerm != primaryTerm) { - throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID, - primaryTerm, actualTerm); + if (actualTerm != primaryRequest.getPrimaryTerm()) { + throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", + primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm); } - acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap( + acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap( releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), this::onFailure )); @@ -387,11 +343,10 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere }; DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, - new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), + new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), + primaryRequest.getPrimaryTerm()), transportOptions, - new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - reader) { - + new ActionListenerResponseHandler(onCompletionListener, reader) { @Override public void handleResponse(Response response) { setPhase(replicationTask, "finished"); @@ -407,7 +362,7 @@ public void handleException(TransportException exp) { } else { setPhase(replicationTask, "primary"); final ActionListener listener = createResponseListener(primaryShardReference); - createReplicatedOperation(request, + createReplicatedOperation(primaryRequest.getRequest(), ActionListener.wrap(result -> result.respond(listener), listener::onFailure), primaryShardReference) .execute(); @@ -421,12 +376,7 @@ public void handleException(TransportException exp) { @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("failed to send response", inner); - } + onCompletionListener.onFailure(e); } private ActionListener createResponseListener(final PrimaryShardReference primaryShardReference) { @@ -451,22 +401,14 @@ public void onResponse(Response response) { } primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(response); - } catch (IOException e) { - onFailure(e); - } + onCompletionListener.onResponse(response); } @Override public void onFailure(Exception e) { primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); - try { - channel.sendResponse(e); - } catch (IOException e1) { - logger.warn("failed to send response", e); - } + onCompletionListener.onFailure(e); } }; } @@ -475,7 +417,7 @@ protected ReplicationOperation> listener, PrimaryShardReference primaryShardReference) { return new ReplicationOperation<>(request, primaryShardReference, listener, - newReplicasProxy(primaryTerm), logger, actionName); + newReplicasProxy(primaryRequest.getPrimaryTerm()), logger, actionName); } } @@ -544,24 +486,10 @@ public void respond(ActionListener listener) { } } - public class ReplicaOperationTransportHandler implements TransportRequestHandler> { - - @Override - public void messageReceived( - final ConcreteReplicaRequest replicaRequest, - final TransportChannel channel, - final Task task) - throws Exception { - new AsyncReplicaAction( - replicaRequest.getRequest(), - replicaRequest.getTargetAllocationID(), - replicaRequest.getPrimaryTerm(), - replicaRequest.getGlobalCheckpoint(), - replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(), - channel, - (ReplicationTask) task).run(); - } - + protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, + final TransportChannel channel, final Task task) { + new AsyncReplicaAction( + replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); } public static class RetryOnReplicaException extends ElasticsearchException { @@ -577,13 +505,7 @@ public RetryOnReplicaException(StreamInput in) throws IOException { } private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { - private final ReplicaRequest request; - // allocation id of the replica this request is meant for - private final String targetAllocationID; - private final long primaryTerm; - private final long globalCheckpoint; - private final long maxSeqNoOfUpdatesOrDeletes; - private final TransportChannel channel; + private final ActionListener onCompletionListener; private final IndexShard replica; /** * The task on the node with the replica shard. @@ -592,23 +514,14 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio // important: we pass null as a timeout as failing a replica is // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + private final ConcreteReplicaRequest replicaRequest; - AsyncReplicaAction( - ReplicaRequest request, - String targetAllocationID, - long primaryTerm, - long globalCheckpoint, - long maxSeqNoOfUpdatesOrDeletes, - TransportChannel channel, - ReplicationTask task) { - this.request = request; - this.channel = channel; + AsyncReplicaAction(ConcreteReplicaRequest replicaRequest, ActionListener onCompletionListener, + ReplicationTask task) { + this.replicaRequest = replicaRequest; + this.onCompletionListener = onCompletionListener; this.task = task; - this.targetAllocationID = targetAllocationID; - this.primaryTerm = primaryTerm; - this.globalCheckpoint = globalCheckpoint; - this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; - final ShardId shardId = request.shardId(); + final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); } @@ -616,7 +529,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio @Override public void onResponse(Releasable releasable) { try { - final ReplicaResult replicaResult = shardOperationOnReplica(request, replica); + final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); releasable.close(); // release shard operation lock before responding to caller final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()); @@ -634,22 +547,17 @@ public void onFailure(Exception e) { () -> new ParameterizedMessage( "Retrying operation on replica, action [{}], request [{}]", transportReplicaAction, - request), + replicaRequest.getRequest()), e); - request.onRetry(); + replicaRequest.getRequest().onRetry(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins - String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; - TransportChannelResponseHandler handler = - new TransportChannelResponseHandler<>(logger, channel, extraMessage, - (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, - globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), - handler); + replicaRequest, + new ActionListenerResponseHandler<>(onCompletionListener, in -> new ReplicaResponse())); } @Override @@ -668,25 +576,20 @@ public void onTimeout(TimeValue timeout) { } protected void responseWithFailure(Exception e) { - try { - setPhase(task, "finished"); - channel.sendResponse(e); - } catch (IOException responseException) { - responseException.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "failed to send error message back to client for action [{}]", transportReplicaAction), responseException); - } + setPhase(task, "finished"); + onCompletionListener.onFailure(e); } @Override protected void doRun() throws Exception { setPhase(task, "replica"); final String actualAllocationId = this.replica.routingEntry().allocationId().getId(); - if (actualAllocationId.equals(targetAllocationID) == false) { - throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID, - actualAllocationId); + if (actualAllocationId.equals(replicaRequest.getTargetAllocationID()) == false) { + throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", + replicaRequest.getTargetAllocationID(), actualAllocationId); } - acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(), + replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()); } /** @@ -702,15 +605,12 @@ private class ResponseListener implements ActionListener> { protected final IndexShard indexShard; private final Releasable operationLock; - ShardReference(IndexShard indexShard, Releasable operationLock) { + PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; } @@ -1005,15 +906,6 @@ public ShardRouting routingEntry() { return indexShard.routingEntry(); } - } - - class PrimaryShardReference extends ShardReference - implements ReplicationOperation.Primary> { - - PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { - super(indexShard, operationLock); - } - public boolean isRelocated() { return indexShard.isRelocatedPrimary(); } @@ -1028,8 +920,8 @@ public void failShard(String reason, Exception e) { } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); + public PrimaryResult perform(Request request) throws Exception { + PrimaryResult result = shardOperationOnPrimary(request, indexShard); assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + "] with a primary failure [" + result.finalFailure + "]"; return result; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ffc9c2bf70a8a..9164d9e4184eb 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -331,8 +331,10 @@ public ClusterBlockLevel indexBlockLevel() { final ReplicationTask task = maybeTask(); final PlainActionFuture listener = new PlainActionFuture<>(); + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, targetAllocationID, primaryTerm); final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = - actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task); + actionWithBlocks.new AsyncPrimaryAction(primaryRequest, listener, task); asyncPrimaryActionWithBlocks.run(); final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); @@ -589,7 +591,9 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw isRelocated.set(true); executeOnPrimary = false; } - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -645,8 +649,9 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), primaryTerm, - createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getRelocationId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -792,9 +797,7 @@ protected IndexShard getIndexShard(ShardId shardId) { } }; - TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = - action.new PrimaryOperationTransportHandler(); - primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); + action.handlePrimaryRequest(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); assertThat(requestsToReplicas, arrayWithSize(1)); assertThat(((TransportReplicationAction.ConcreteShardRequest) requestsToReplicas[0].request).getPrimaryTerm(), @@ -817,7 +820,9 @@ public void testCounterOnPrimary() throws Exception { final boolean throwExceptionOnCreation = i == 1; final boolean throwExceptionOnRun = i == 2; final boolean respondWithError = i == 3; - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), primaryTerm, createTransportChannel(listener), task) { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + action.new AsyncPrimaryAction(primaryRequest, listener, task) { @Override protected ReplicationOperation> createReplicatedOperation( @@ -880,9 +885,8 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>( new Request().setShardId(shardId), replicaRouting.allocationId().getId(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), @@ -938,7 +942,7 @@ public void testPrimaryActionRejectsWrongAidOrWrongTerm() throws Exception { final boolean wrongAllocationId = randomBoolean(); final long requestTerm = wrongAllocationId && randomBoolean() ? primaryTerm : primaryTerm + randomIntBetween(1, 10); Request request = new Request(shardId).timeout("1ms"); - action.new PrimaryOperationTransportHandler().messageReceived( + action.handlePrimaryRequest( new TransportReplicationAction.ConcreteShardRequest<>(request, wrongAllocationId ? "_not_a_valid_aid_" : primary.allocationId().getId(), requestTerm), @@ -973,7 +977,7 @@ public void testReplicaActionRejectsWrongAid() throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); - action.new ReplicaOperationTransportHandler().messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, "_not_a_valid_aid_", randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), createTransportChannel(listener), maybeTask() @@ -1015,12 +1019,11 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdatesOrDeletes), createTransportChannel(listener), task); @@ -1084,12 +1087,11 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); final long checkpoint = randomNonNegativeLong(); final long maxSeqNoOfUpdates = randomNonNegativeLong(); - replicaOperationTransportHandler.messageReceived( + action.handleReplicaRequest( new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, checkpoint, maxSeqNoOfUpdates), createTransportChannel(listener), task); @@ -1221,10 +1223,10 @@ protected TestResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new TestResponse()); + return new PrimaryResult<>(shardRequest, new TestResponse()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 1cb1bfde34ea8..8fe204cee2c34 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -203,8 +203,10 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { actions[threadId] = singlePermitAction; Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) { + singlePermitAction.new AsyncPrimaryAction(primaryRequest, listener, null) { @Override protected void doRun() throws Exception { if (delayed) { @@ -254,8 +256,10 @@ private void assertBlockIsPresentForDelayedOp() { final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request(), allocationId(), primaryTerm()); TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction = - allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) { + allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) { @Override void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); @@ -407,9 +411,8 @@ protected void sendReplicaRequest(final ConcreteReplicaRequest replicaR final DiscoveryNode node, final ActionListener listener) { assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); - ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { + handleReplicaRequest(replicaRequest, new TransportChannel() { @Override public String getProfileName() { return null; @@ -530,32 +533,4 @@ public String toString() { static class Response extends ReplicationResponse { } - - /** - * Transport channel that is needed for replica operation testing. - */ - public TransportChannel transportChannel(final PlainActionFuture listener) { - return new TransportChannel() { - - @Override - public String getProfileName() { - return ""; - } - - @Override - public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse(((Response) response)); - } - - @Override - public void sendResponse(Exception exception) throws IOException { - listener.onFailure(exception); - } - - @Override - public String getChannelType() { - return "replica_test"; - } - }; - } }