Skip to content

Commit

Permalink
Remove some abstractions from TransportReplicationAction (elastic#4…
Browse files Browse the repository at this point in the history
…0706)

`TransportReplicationAction` is a rather complex beast, and some of its
concrete implementations do not need all of its features. More specifically, it
(a) chases a primary around the cluster until it manages to pin it down and
then (b) executes an action on that primary and all its replicas. There are
some actions that are coordinated by the primary itself, meaning that there is
no need for the chase-the-primary phases, and in the case of peer recovery
retention leases and primary/replica resync it is important to bypass these
first phases.

This commit is a step towards separating the `TransportReplicationAction` into
these two parts. It is a mostly mechanical sequence of steps to remove some
abstractions that are no longer in use.
  • Loading branch information
DaveCTurner authored and Gurkan Kaymak committed May 27, 2019
1 parent f3bf5c5 commit d4f6d3e
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ protected ReplicationResponse newResponseInstance() {
}

@Override
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) {
protected PrimaryResult<BasicReplicationRequest, ReplicationResponse> shardOperationOnPrimary(
BasicReplicationRequest shardRequest, IndexShard primary) {
primary.refresh("api");
logger.trace("{} refresh request executed on primary", primary.shardId());
return new PrimaryResult(shardRequest, new ReplicationResponse());
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
new PrimaryOperationTransportHandler());
this::handlePrimaryRequest);
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
this::handleReplicaRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
e1.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
}
Expand Down
Loading

0 comments on commit d4f6d3e

Please sign in to comment.