From f49436dc25d68ce61a180672d3b4f497f3092eeb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 5 Apr 2019 17:46:05 +0200 Subject: [PATCH] Introduce Delegating ActionListener Wrappers (#40129) * Introduce Delegating ActionListener Wrappers * Dry up use cases of ActionListener that simply pass through the response or exception to another listener --- .../elasticsearch/action/ActionListener.java | 47 +++++++++++ .../tasks/get/TransportGetTaskAction.java | 19 ++--- .../TransportDeleteRepositoryAction.java | 16 +--- .../put/TransportPutRepositoryAction.java | 15 +--- .../TransportVerifyRepositoryAction.java | 16 +--- .../TransportRestoreSnapshotAction.java | 18 ++--- .../close/TransportCloseIndexAction.java | 18 +---- .../indices/shrink/TransportResizeAction.java | 18 ++--- .../upgrade/post/TransportUpgradeAction.java | 16 +--- .../action/bulk/TransportBulkAction.java | 42 +++------- .../master/TransportMasterNodeAction.java | 26 +++--- .../index/query/TermsQueryBuilder.java | 23 ++---- .../index/seqno/RetentionLeaseActions.java | 24 ++---- .../elasticsearch/index/shard/IndexShard.java | 19 ++--- .../CompletionPersistentTaskAction.java | 14 +--- .../RemovePersistentTaskAction.java | 15 +--- .../persistent/StartPersistentTaskAction.java | 15 +--- .../UpdatePersistentTaskStatusAction.java | 14 +--- .../repositories/RepositoriesService.java | 80 +++++-------------- .../TransportClientNodesServiceTests.java | 16 +--- .../RemoteClusterConnectionTests.java | 61 ++++++-------- .../ESIndexLevelReplicationTestCase.java | 53 +++++------- .../test/transport/StubbableTransport.java | 15 +--- .../ccr/action/TransportPutFollowAction.java | 38 +++------ 24 files changed, 206 insertions(+), 432 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 158f8aa61fa3b..c21aa3b9d4b8f 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -72,6 +72,53 @@ public void onFailure(Exception e) { }; } + /** + * Creates a listener that delegates all responses it receives to another listener. + * + * @param delegate ActionListener to wrap and delegate any exception to + * @param bc BiConsumer invoked with delegate listener and exception + * @param Type of the listener + * @return Delegating listener + */ + static ActionListener delegateResponse(ActionListener delegate, BiConsumer, Exception> bc) { + return new ActionListener() { + + @Override + public void onResponse(T r) { + delegate.onResponse(r); + } + + @Override + public void onFailure(Exception e) { + bc.accept(delegate, e); + } + }; + } + + /** + * Creates a listener that delegates all exceptions it receives to another listener. + * + * @param delegate ActionListener to wrap and delegate any exception to + * @param bc BiConsumer invoked with delegate listener and response + * @param Type of the delegating listener's response + * @param Type of the wrapped listeners + * @return Delegating listener + */ + static ActionListener delegateFailure(ActionListener delegate, BiConsumer, T> bc) { + return new ActionListener() { + + @Override + public void onResponse(T r) { + bc.accept(delegate, r); + } + + @Override + public void onFailure(Exception e) { + delegate.onFailure(e); + } + }; + } + /** * Creates a listener that listens for a response (or failure) and executes the * corresponding runnable when the response (or failure) is received. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 79b9b43abb237..fe07a4efe930e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -157,7 +157,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene // Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads. threadPool.generic().execute(new AbstractRunnable() { @Override - protected void doRun() throws Exception { + protected void doRun() { taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout())); waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener); } @@ -180,26 +180,17 @@ public void onFailure(Exception e) { */ void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask, ActionListener listener) { - getFinishedTaskFromIndex(thisTask, request, new ActionListener() { - @Override - public void onResponse(GetTaskResponse response) { - // We were able to load the task from the task index. Let's send that back. - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { + getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> { /* * We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If * the error isn't a 404 then we'll just throw it back to the user. */ if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) { - listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask))); + delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask))); } else { - listener.onFailure(e); + delegatedListener.onFailure(e); } - } - }); + })); } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index 04901cbe256e4..48b9bdaa4511b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -69,17 +68,8 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state, final ActionListener listener) { repositoriesService.unregisterRepository( - request, - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) { - listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged())); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + request, ActionListener.delegateFailure(listener, + (delegatedListener, unregisterRepositoryResponse) -> + delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged())))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index 4a58edf64616b..a10e5c878a7c9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -68,17 +67,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster @Override protected void masterOperation(final PutRepositoryRequest request, ClusterState state, final ActionListener listener) { - repositoriesService.registerRepository(request, new ActionListener() { - - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new AcknowledgedResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener, + (delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged())))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java index aa973d4797a77..d4ec1d3a8bcb4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java @@ -33,8 +33,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.List; - /** * Transport action for verifying repository operation */ @@ -70,16 +68,8 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus @Override protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state, final ActionListener listener) { - repositoriesService.verifyRepository(request.name(), new ActionListener>() { - @Override - public void onResponse(List verifyResponse) { - listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener, + (delegatedListener, verifyResponse) -> + delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index a0f86719cad70..935902432d90f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.snapshots.RestoreService; -import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -75,20 +74,13 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust @Override protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state, final ActionListener listener) { - restoreService.restoreSnapshot(request, new ActionListener() { - @Override - public void onResponse(RestoreCompletionResponse restoreCompletionResponse) { + restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, + (delegatedListener, restoreCompletionResponse) -> { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener); + RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener); } else { - listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); + delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } - } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 05f680af57ddf..a6f4b6f3d0c4a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -118,19 +118,9 @@ protected void masterOperation(final Task task, .masterNodeTimeout(request.masterNodeTimeout()) .waitForActiveShards(request.waitForActiveShards()) .indices(concreteIndices); - - indexStateService.closeIndices(closeRequest, new ActionListener() { - - @Override - public void onResponse(final CloseIndexResponse response) { - listener.onResponse(response); - } - - @Override - public void onFailure(final Exception t) { - logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t); - listener.onFailure(t); - } - }); + indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t); + delegatedListener.onFailure(t); + })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java index 579cf6498222b..ff4e643a08227 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -97,25 +96,18 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex()); final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index()); - client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener() { - @Override - public void onResponse(IndicesStatsResponse indicesStatsResponse) { + client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute( + ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> { CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, - (i) -> { + i -> { IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); }, sourceIndex, targetIndex); createIndexService.createIndex( - updateRequest, ActionListener.map(listener, + updateRequest, ActionListener.map(delegatedListener, response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index())) ); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + })); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index e7db98fa66f03..f2d046f3321b2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -104,7 +103,7 @@ protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, i versions.put(index, new Tuple<>(version, luceneVersion)); } } - Map> updatedVersions = new HashMap<>(); + Map> updatedVersions = new HashMap<>(); MetaData metaData = clusterState.metaData(); for (Map.Entry> versionEntry : versions.entrySet()) { String index = versionEntry.getKey(); @@ -209,16 +208,7 @@ public void onFailure(Exception e) { private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions()); - client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse updateSettingsResponse) { - listener.onResponse(upgradeResponse); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure( + listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse))); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9adc92e02bedb..0249aff74363b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -658,7 +658,15 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, return ActionListener.map(actionListener, response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis)); } else { - return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener); + return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { + BulkItemResponse[] items = response.getItems(); + for (int i = 0; i < items.length; i++) { + itemResponses.add(originalSlots[i], response.getItems()[i]); + } + delegatedListener.onResponse( + new BulkResponse( + itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis)); + }); } } @@ -688,36 +696,4 @@ void markCurrentItemAsFailed(Exception e) { } } - - static final class IngestBulkResponseListener implements ActionListener { - - private final long ingestTookInMillis; - private final int[] originalSlots; - private final List itemResponses; - private final ActionListener actionListener; - - IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List itemResponses, - ActionListener actionListener) { - this.ingestTookInMillis = ingestTookInMillis; - this.itemResponses = itemResponses; - this.actionListener = actionListener; - this.originalSlots = originalSlots; - } - - @Override - public void onResponse(BulkResponse response) { - BulkItemResponse[] items = response.getItems(); - for (int i = 0; i < items.length; i++) { - itemResponses.add(originalSlots[i], response.getItems()[i]); - } - actionListener.onResponse(new BulkResponse( - itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), - response.getTook().getMillis(), ingestTookInMillis)); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - } } diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 679bad1642e53..595b72f8da803 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -39,7 +40,6 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -185,23 +185,15 @@ protected void doStart(ClusterState clusterState) { }); } } else { - ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Response response) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) { - logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " + - "stepped down before publishing action [{}], scheduling a retry", actionName), t); - retry(t, masterChangePredicate); - } else { - listener.onFailure(t); - } + ActionListener delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) { + logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " + + "stepped down before publishing action [{}], scheduling a retry", actionName), t); + retry(t, masterChangePredicate); + } else { + delegatedListener.onFailure(t); } - }; + }); threadPool.executor(executor).execute(new ActionRunnable(delegate) { @Override protected void doRun() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java index 6fdedb8d6a97b..2f4f934727dc7 100644 --- a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java @@ -27,7 +27,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; @@ -464,22 +463,14 @@ private void fetch(TermsLookup termsLookup, Client client, ActionListener() { - @Override - public void onResponse(GetResponse getResponse) { - List terms = new ArrayList<>(); - if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists - List extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap()); - terms.addAll(extractedValues); - } - actionListener.onResponse(terms); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); + client.get(getRequest, ActionListener.delegateFailure(actionListener, (delegatedListener, getResponse) -> { + List terms = new ArrayList<>(); + if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists + List extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap()); + terms.addAll(extractedValues); } - }); + delegatedListener.onResponse(terms); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 2f9043580a6a0..c503f1fa16377 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -102,23 +102,13 @@ protected void asyncShardOperation(T request, ShardId shardId, final ActionListe final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); indexShard.acquirePrimaryOperationPermit( - new ActionListener() { - - @Override - public void onResponse(final Releasable releasable) { - try (Releasable ignore = releasable) { - doRetentionLeaseAction(indexShard, request, listener); - } - } - - @Override - public void onFailure(final Exception e) { - listener.onFailure(e); - } - - }, - ThreadPool.Names.SAME, - request); + ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> { + try (Releasable ignore = releasable) { + doRetentionLeaseAction(indexShard, request, delegatedListener); + } + }), + ThreadPool.Names.SAME, + request); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ff1922a231dc8..90f7c662b91c5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2688,9 +2688,8 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed // in the order submitted, combining both operations ensure that the term is updated before the operation is // executed. It also has the side effect of acquiring all the permits one time instead of two. - final ActionListener operationListener = new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { + final ActionListener operationListener = ActionListener.delegateFailure(onPermitAcquired, + (delegatedListener, releasable) -> { if (opPrimaryTerm < getOperationPrimaryTerm()) { releasable.close(); final String message = String.format( @@ -2699,7 +2698,7 @@ public void onResponse(final Releasable releasable) { shardId, opPrimaryTerm, getOperationPrimaryTerm()); - onPermitAcquired.onFailure(new IllegalStateException(message)); + delegatedListener.onFailure(new IllegalStateException(message)); } else { assert assertReplicationTarget(); try { @@ -2707,18 +2706,12 @@ public void onResponse(final Releasable releasable) { advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); } catch (Exception e) { releasable.close(); - onPermitAcquired.onFailure(e); + delegatedListener.onFailure(e); return; } - onPermitAcquired.onResponse(releasable); + delegatedListener.onResponse(releasable); } - } - - @Override - public void onFailure(final Exception e) { - onPermitAcquired.onFailure(e); - } - }; + }); if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java index 86a657f8336cf..b09c33b59d2f5 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -165,17 +164,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception, - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java index 5ea748de14e7c..877033fe4f33a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -149,17 +148,9 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) @Override protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { - persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + persistentTasksClusterService.removePersistentTask( + request.taskId, ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 72e7a43ad0497..977239ee2b990 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -217,18 +216,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params, - new ActionListener>() { - - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java index bab58eb6001bf..218154d37c9b7 100644 --- a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -181,17 +180,8 @@ protected final void masterOperation(final Request request, final ClusterState state, final ActionListener listener) { persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state, - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index eb130f31e4b9d..e141d0d6014e7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -97,7 +97,15 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL final ActionListener registrationListener; if (request.verify()) { - registrationListener = new VerifyingRegisterRepositoryListener(request.name(), listener); + registrationListener = ActionListener.delegateFailure(listener, (delegatedListener, clusterStateUpdateResponse) -> { + if (clusterStateUpdateResponse.isAcknowledged()) { + // The response was acknowledged - all nodes should know about the new repository, let's verify them + verifyRepository(request.name(), ActionListener.delegateFailure(delegatedListener, + (innerDelegatedListener, discoveryNodes) -> innerDelegatedListener.onResponse(clusterStateUpdateResponse))); + } else { + delegatedListener.onResponse(clusterStateUpdateResponse); + } + }); } else { registrationListener = listener; } @@ -229,27 +237,18 @@ public void verifyRepository(final String repositoryName, final ActionListener>() { - @Override - public void onResponse(List verifyResponse) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - repository.endVerification(verificationToken); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage( - "[{}] failed to finish repository verification", repositoryName), e); - listener.onFailure(e); - return; - } - listener.onResponse(verifyResponse); - }); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener, + (delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + repository.endVerification(verificationToken); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] failed to finish repository verification", repositoryName), e); + delegatedListener.onFailure(e); + return; + } + delegatedListener.onResponse(verifyResponse); + }))); } catch (Exception e) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { @@ -424,41 +423,4 @@ private void ensureRepositoryNotInUse(ClusterState clusterState, String reposito throw new IllegalStateException("trying to modify or unregister repository that is currently used "); } } - - private class VerifyingRegisterRepositoryListener implements ActionListener { - - private final String name; - - private final ActionListener listener; - - VerifyingRegisterRepositoryListener(String name, final ActionListener listener) { - this.name = name; - this.listener = listener; - } - - @Override - public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) { - if (clusterStateUpdateResponse.isAcknowledged()) { - // The response was acknowledged - all nodes should know about the new repository, let's verify them - verifyRepository(name, new ActionListener>() { - @Override - public void onResponse(List verifyResponse) { - listener.onResponse(clusterStateUpdateResponse); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - listener.onResponse(clusterStateUpdateResponse); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - } } diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 6796a23ef0f26..bdcaf80ee19e9 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -363,19 +363,11 @@ public void testSniffNodesSamplerClosesConnections() throws Exception { final List establishedConnections = new CopyOnWriteArrayList<>(); clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) -> - transport.openConnection(discoveryNode, profile, new ActionListener() { - @Override - public void onResponse(Transport.Connection connection) { + transport.openConnection(discoveryNode, profile, + ActionListener.delegateFailure(listener, (delegatedListener, connection) -> { establishedConnections.add(connection); - listener.onResponse(connection); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - })); - + delegatedListener.onResponse(connection); + }))); clientService.start(); clientService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index f57894ed895d9..5f6d31dce3e97 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -1263,45 +1263,36 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map() { - @Override - public void onResponse(Transport.Connection connection) { - Transport.Connection proxyConnection = new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return node; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - connection.sendRequest(requestId, action, request, options); - } + return t.openConnection(proxyNode, profile, ActionListener.delegateFailure(listener, + (delegatedListener, connection) -> delegatedListener.onResponse( + new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } - @Override - public void addCloseListener(ActionListener listener) { - connection.addCloseListener(listener); - } + @Override + public void sendRequest(long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + connection.sendRequest(requestId, action, request, options); + } - @Override - public boolean isClosed() { - return connection.isClosed(); - } + @Override + public void addCloseListener(ActionListener listener) { + connection.addCloseListener(listener); + } - @Override - public void close() { - connection.close(); - } - }; - listener.onResponse(proxyConnection); - } + @Override + public boolean isClosed() { + return connection.isClosed(); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - }); + @Override + public void close() { + connection.close(); + } + }))); + }); return stubbableTransport; } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 4bba0e378fa2d..a884d7de948ef 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -433,7 +433,7 @@ public synchronized DiscoveryNode getPrimaryNode() { } public Future asyncRecoverReplica( - final IndexShard replica, final BiFunction targetSupplier) throws IOException { + final IndexShard replica, final BiFunction targetSupplier) { final FutureTask task = new FutureTask<>(() -> { recoverReplica(replica, targetSupplier); return null; @@ -609,17 +609,8 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - new ActionListener() { - @Override - public void onResponse(PrimaryResult result) { - result.respond(listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, new ReplicasRef(), logger, opType).execute(); + ActionListener.delegateFailure(listener, + (delegatedListener, result) -> result.respond(delegatedListener)), new ReplicasRef(), logger, opType).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -693,28 +684,20 @@ public void performOn( final ActionListener listener) { IndexShard replica = replicationTargets.findReplicaShard(replicaRouting); replica.acquireReplicaOperationPermit( - getPrimaryShard().getPendingPrimaryTerm(), - globalCheckpoint, - maxSeqNoOfUpdatesOrDeletes, - new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - try { - performOnReplica(request, replica); - releasable.close(); - listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint())); - } catch (final Exception e) { - Releasables.closeWhileHandlingException(releasable); - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, - ThreadPool.Names.WRITE, request); + getPrimaryShard().getPendingPrimaryTerm(), + globalCheckpoint, + maxSeqNoOfUpdatesOrDeletes, + ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> { + try { + performOnReplica(request, replica); + releasable.close(); + delegatedListener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint())); + } catch (final Exception e) { + Releasables.closeWhileHandlingException(releasable); + delegatedListener.onFailure(e); + } + }), + ThreadPool.Names.WRITE, request); } @Override @@ -893,7 +876,7 @@ protected void performOnReplica(ResyncReplicationRequest request, IndexShard rep } private TransportWriteAction.WritePrimaryResult executeResyncOnPrimary( - IndexShard primary, ResyncReplicationRequest request) throws Exception { + IndexShard primary, ResyncReplicationRequest request) { final TransportWriteAction.WritePrimaryResult result = new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index e2ff8a7ef5da2..4ccc352158a73 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -129,18 +129,9 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, TransportAddress address = node.getAddress(); OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior); - ActionListener wrappedListener = new ActionListener() { - - @Override - public void onResponse(Connection connection) { - listener.onResponse(new WrappedConnection(connection)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }; + ActionListener wrappedListener = + ActionListener.delegateFailure(listener, + (delegatedListener, connection) -> delegatedListener.onResponse(new WrappedConnection(connection))); if (behavior == null) { return delegate.openConnection(node, profile, wrappedListener); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 3f7f361d4b375..2a95c2a3c7aab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; -import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -147,19 +146,10 @@ public void onFailure(Exception e) { } @Override - protected void doRun() throws Exception { - restoreService.restoreSnapshot(restoreRequest, new ActionListener() { - - @Override - public void onResponse(RestoreService.RestoreCompletionResponse response) { - afterRestoreStarted(clientWithHeaders, request, listener, response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + protected void doRun() { + restoreService.restoreSnapshot(restoreRequest, + ActionListener.delegateFailure(listener, + (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response))); } }); } @@ -186,28 +176,20 @@ public void onFailure(Exception e) { listener = originalListener; } - RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener() { - @Override - public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) { + RestoreClusterStateListener.createAndRegisterListener(clusterService, response, + ActionListener.delegateFailure(listener, (delegatedListener, restoreSnapshotResponse) -> { RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); - if (restoreInfo == null) { // If restoreInfo is null then it is possible there was a master failure during the // restore. - listener.onResponse(new PutFollowAction.Response(true, false, false)); + delegatedListener.onResponse(new PutFollowAction.Response(true, false, false)); } else if (restoreInfo.failedShards() == 0) { - initiateFollowing(clientWithHeaders, request, listener); + initiateFollowing(clientWithHeaders, request, delegatedListener); } else { assert restoreInfo.failedShards() > 0 : "Should have failed shards"; - listener.onResponse(new PutFollowAction.Response(true, false, false)); + delegatedListener.onResponse(new PutFollowAction.Response(true, false, false)); } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + })); } private void initiateFollowing(