From 859c8e112854fe7c2b0aca846ee5c5211209ff94 Mon Sep 17 00:00:00 2001 From: Armin Date: Sat, 16 Mar 2019 19:50:57 +0100 Subject: [PATCH 1/3] Introduce Delegating ActionListener Wrappers * Dry up use cases of ActionListener that simply pass through the response or exception to another listener --- .../elasticsearch/action/ActionListener.java | 47 +++++++++++ .../tasks/get/TransportGetTaskAction.java | 19 ++--- .../TransportDeleteRepositoryAction.java | 13 +-- .../put/TransportPutRepositoryAction.java | 20 +---- .../TransportVerifyRepositoryAction.java | 20 ++--- .../TransportRestoreSnapshotAction.java | 21 ++--- .../close/TransportCloseIndexAction.java | 18 +---- .../indices/shrink/TransportResizeAction.java | 20 ++--- .../upgrade/post/TransportUpgradeAction.java | 16 +--- .../action/bulk/TransportBulkAction.java | 41 ++-------- .../master/TransportMasterNodeAction.java | 26 +++--- .../index/query/TermsQueryBuilder.java | 23 ++---- .../index/seqno/RetentionLeaseActions.java | 24 ++---- .../elasticsearch/index/shard/IndexShard.java | 50 +++++------- .../CompletionPersistentTaskAction.java | 13 +-- .../RemovePersistentTaskAction.java | 14 +--- .../persistent/StartPersistentTaskAction.java | 14 +--- .../UpdatePersistentTaskStatusAction.java | 13 +-- .../repositories/RepositoriesService.java | 80 +++++-------------- .../TransportClientNodesServiceTests.java | 17 +--- .../RemoteClusterConnectionTests.java | 52 +++++------- .../ESIndexLevelReplicationTestCase.java | 52 ++++-------- .../test/transport/StubbableTransport.java | 14 +--- .../ccr/action/TransportPutFollowAction.java | 39 +++------ 24 files changed, 214 insertions(+), 452 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 158f8aa61fa3b..c21aa3b9d4b8f 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -72,6 +72,53 @@ public void onFailure(Exception e) { }; } + /** + * Creates a listener that delegates all responses it receives to another listener. + * + * @param delegate ActionListener to wrap and delegate any exception to + * @param bc BiConsumer invoked with delegate listener and exception + * @param Type of the listener + * @return Delegating listener + */ + static ActionListener delegateResponse(ActionListener delegate, BiConsumer, Exception> bc) { + return new ActionListener() { + + @Override + public void onResponse(T r) { + delegate.onResponse(r); + } + + @Override + public void onFailure(Exception e) { + bc.accept(delegate, e); + } + }; + } + + /** + * Creates a listener that delegates all exceptions it receives to another listener. + * + * @param delegate ActionListener to wrap and delegate any exception to + * @param bc BiConsumer invoked with delegate listener and response + * @param Type of the delegating listener's response + * @param Type of the wrapped listeners + * @return Delegating listener + */ + static ActionListener delegateFailure(ActionListener delegate, BiConsumer, T> bc) { + return new ActionListener() { + + @Override + public void onResponse(T r) { + bc.accept(delegate, r); + } + + @Override + public void onFailure(Exception e) { + delegate.onFailure(e); + } + }; + } + /** * Creates a listener that listens for a response (or failure) and executes the * corresponding runnable when the response (or failure) is received. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 79b9b43abb237..021a5edcd0482 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -157,7 +157,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene // Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads. threadPool.generic().execute(new AbstractRunnable() { @Override - protected void doRun() throws Exception { + protected void doRun() { taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout())); waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener); } @@ -180,26 +180,17 @@ public void onFailure(Exception e) { */ void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask, ActionListener listener) { - getFinishedTaskFromIndex(thisTask, request, new ActionListener() { - @Override - public void onResponse(GetTaskResponse response) { - // We were able to load the task from the task index. Let's send that back. - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { + getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (l, e) -> { /* * We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If * the error isn't a 404 then we'll just throw it back to the user. */ if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) { - listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask))); + l.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask))); } else { - listener.onFailure(e); + l.onFailure(e); } - } - }); + })); } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index 469c14f49bd40..6a0f7cf39a8c3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -71,16 +70,6 @@ protected void masterOperation(final DeleteRepositoryRequest request, ClusterSta repositoriesService.unregisterRepository( new RepositoriesService.UnregisterRepositoryRequest("delete_repository [" + request.name() + "]", request.name()) .masterNodeTimeout(request.masterNodeTimeout()).ackTimeout(request.timeout()), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) { - listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged())); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged())))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index a495ba72f35b7..bf61e564ab961 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -70,22 +69,11 @@ protected void masterOperation(final PutRepositoryRequest request, ClusterState final ActionListener listener) { repositoriesService.registerRepository( - new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]", - request.name(), request.type(), request.verify()) + new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]", + request.name(), request.type(), request.verify()) .settings(request.settings()) .masterNodeTimeout(request.masterNodeTimeout()) - .ackTimeout(request.timeout()), new ActionListener() { - - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new AcknowledgedResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + .ackTimeout(request.timeout()), + ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged())))); } - } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java index 19fa4cbde15ca..cdf597093d6b5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java @@ -68,20 +68,12 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus @Override protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state, final ActionListener listener) { - repositoriesService.verifyRepository(request.name(), new ActionListener() { - @Override - public void onResponse(RepositoriesService.VerifyResponse verifyResponse) { - if (verifyResponse.failed()) { - listener.onFailure(new RepositoryVerificationException(request.name(), verifyResponse.failureDescription())); - } else { - listener.onResponse(new VerifyRepositoryResponse(verifyResponse.nodes())); - } + repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener, (l, r) -> { + if (r.failed()) { + l.onFailure(new RepositoryVerificationException(request.name(), r.failureDescription())); + } else { + l.onResponse(new VerifyRepositoryResponse(r.nodes())); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index d8dcc5eb8f846..fa5899964fd95 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.snapshots.RestoreService; -import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -73,20 +72,12 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust @Override protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state, final ActionListener listener) { - restoreService.restoreSnapshot(request, new ActionListener() { - @Override - public void onResponse(RestoreCompletionResponse restoreCompletionResponse) { - if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener); - } else { - listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); - } + restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (l, r) -> { + if (r.getRestoreInfo() == null && request.waitForCompletion()) { + RestoreClusterStateListener.createAndRegisterListener(clusterService, r, l); + } else { + l.onResponse(new RestoreSnapshotResponse(r.getRestoreInfo())); } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 05f680af57ddf..c5446a8d3300a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -118,19 +118,9 @@ protected void masterOperation(final Task task, .masterNodeTimeout(request.masterNodeTimeout()) .waitForActiveShards(request.waitForActiveShards()) .indices(concreteIndices); - - indexStateService.closeIndices(closeRequest, new ActionListener() { - - @Override - public void onResponse(final CloseIndexResponse response) { - listener.onResponse(response); - } - - @Override - public void onFailure(final Exception t) { - logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t); - listener.onFailure(t); - } - }); + indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (l, t) -> { + logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t); + listener.onFailure(t); + })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java index 579cf6498222b..a1960535dc1b3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -97,25 +96,18 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex()); final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index()); - client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener() { - @Override - public void onResponse(IndicesStatsResponse indicesStatsResponse) { + client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute( + ActionListener.delegateFailure(listener, (l, r) -> { CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, - (i) -> { - IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); + i -> { + IndexShardStats shard = r.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); }, sourceIndex, targetIndex); createIndexService.createIndex( - updateRequest, ActionListener.map(listener, + updateRequest, ActionListener.map(l, response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index())) ); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + })); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index e7db98fa66f03..a838395a30a4a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -104,7 +103,7 @@ protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, i versions.put(index, new Tuple<>(version, luceneVersion)); } } - Map> updatedVersions = new HashMap<>(); + Map> updatedVersions = new HashMap<>(); MetaData metaData = clusterState.metaData(); for (Map.Entry> versionEntry : versions.entrySet()) { String index = versionEntry.getKey(); @@ -209,16 +208,7 @@ public void onFailure(Exception e) { private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions()); - client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse updateSettingsResponse) { - listener.onResponse(upgradeResponse); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, + ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(upgradeResponse))); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9adc92e02bedb..b8230be031ab2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -658,7 +658,14 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, return ActionListener.map(actionListener, response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis)); } else { - return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener); + return ActionListener.delegateFailure(actionListener, (l, r) -> { + BulkItemResponse[] items = r.getItems(); + for (int i = 0; i < items.length; i++) { + itemResponses.add(originalSlots[i], r.getItems()[i]); + } + l.onResponse( + new BulkResponse(itemResponses.toArray(new BulkItemResponse[0]), r.getTook().getMillis(), ingestTookInMillis)); + }); } } @@ -688,36 +695,4 @@ void markCurrentItemAsFailed(Exception e) { } } - - static final class IngestBulkResponseListener implements ActionListener { - - private final long ingestTookInMillis; - private final int[] originalSlots; - private final List itemResponses; - private final ActionListener actionListener; - - IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List itemResponses, - ActionListener actionListener) { - this.ingestTookInMillis = ingestTookInMillis; - this.itemResponses = itemResponses; - this.actionListener = actionListener; - this.originalSlots = originalSlots; - } - - @Override - public void onResponse(BulkResponse response) { - BulkItemResponse[] items = response.getItems(); - for (int i = 0; i < items.length; i++) { - itemResponses.add(originalSlots[i], response.getItems()[i]); - } - actionListener.onResponse(new BulkResponse( - itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), - response.getTook().getMillis(), ingestTookInMillis)); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - } } diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 679bad1642e53..940a4f5c2c0ea 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -39,7 +40,6 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -185,23 +185,15 @@ protected void doStart(ClusterState clusterState) { }); } } else { - ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Response response) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) { - logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " + - "stepped down before publishing action [{}], scheduling a retry", actionName), t); - retry(t, masterChangePredicate); - } else { - listener.onFailure(t); - } + ActionListener delegate = ActionListener.delegateResponse(listener, (l, t) -> { + if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) { + logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " + + "stepped down before publishing action [{}], scheduling a retry", actionName), t); + retry(t, masterChangePredicate); + } else { + l.onFailure(t); } - }; + }); threadPool.executor(executor).execute(new ActionRunnable(delegate) { @Override protected void doRun() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java index 6fdedb8d6a97b..e0007c0911cd1 100644 --- a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java @@ -27,7 +27,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; @@ -464,22 +463,14 @@ private void fetch(TermsLookup termsLookup, Client client, ActionListener() { - @Override - public void onResponse(GetResponse getResponse) { - List terms = new ArrayList<>(); - if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists - List extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap()); - terms.addAll(extractedValues); - } - actionListener.onResponse(terms); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); + client.get(getRequest, ActionListener.delegateFailure(actionListener, (l, r) -> { + List terms = new ArrayList<>(); + if (r.isSourceEmpty() == false) { // extract terms only if the doc source exists + List extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), r.getSourceAsMap()); + terms.addAll(extractedValues); } - }); + l.onResponse(terms); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 3493271e8d79d..393a4b5d0da73 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -95,23 +95,13 @@ protected void asyncShardOperation(T request, ShardId shardId, final ActionListe final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); indexShard.acquirePrimaryOperationPermit( - new ActionListener() { - - @Override - public void onResponse(final Releasable releasable) { - try (Releasable ignore = releasable) { - doRetentionLeaseAction(indexShard, request, listener); - } - } - - @Override - public void onFailure(final Exception e) { - listener.onFailure(e); - } - - }, - ThreadPool.Names.SAME, - request); + ActionListener.delegateFailure(listener, (l, r) -> { + try (Releasable ignore = r) { + doRetentionLeaseAction(indexShard, request, l); + } + }), + ThreadPool.Names.SAME, + request); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 99dab5a0f3918..a7fdec2b1a9d3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2685,37 +2685,29 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed // in the order submitted, combining both operations ensure that the term is updated before the operation is // executed. It also has the side effect of acquiring all the permits one time instead of two. - final ActionListener operationListener = new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { - if (opPrimaryTerm < getOperationPrimaryTerm()) { - releasable.close(); - final String message = String.format( - Locale.ROOT, - "%s operation primary term [%d] is too old (current [%d])", - shardId, - opPrimaryTerm, - getOperationPrimaryTerm()); - onPermitAcquired.onFailure(new IllegalStateException(message)); - } else { - assert assertReplicationTarget(); - try { - updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); - advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); - } catch (Exception e) { - releasable.close(); - onPermitAcquired.onFailure(e); - return; - } - onPermitAcquired.onResponse(releasable); + final ActionListener operationListener = ActionListener.delegateFailure(onPermitAcquired, (l, r) -> { + if (opPrimaryTerm < getOperationPrimaryTerm()) { + r.close(); + final String message = String.format( + Locale.ROOT, + "%s operation primary term [%d] is too old (current [%d])", + shardId, + opPrimaryTerm, + getOperationPrimaryTerm()); + l.onFailure(new IllegalStateException(message)); + } else { + assert assertReplicationTarget(); + try { + updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); + } catch (Exception e) { + r.close(); + l.onFailure(e); + return; } + l.onResponse(r); } - - @Override - public void onFailure(final Exception e) { - onPermitAcquired.onFailure(e); - } - }; + }); if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java index 86a657f8336cf..fa6b59b75f9c4 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -165,17 +164,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception, - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java index 5ea748de14e7c..18edf652fff34 100644 --- a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -149,17 +148,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) @Override protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { - persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + persistentTasksClusterService.removePersistentTask( + request.taskId, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 72e7a43ad0497..e87a89d07c001 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -217,18 +216,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params, - new ActionListener>() { - - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java index bab58eb6001bf..67b6277c8ae41 100644 --- a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -181,17 +180,7 @@ protected final void masterOperation(final Request request, final ClusterState state, final ActionListener listener) { persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state, - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - listener.onResponse(new PersistentTaskResponse(task)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 033080c2c38e6..fc30c65459b4a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -98,7 +98,21 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac final ActionListener registrationListener; if (request.verify) { - registrationListener = new VerifyingRegisterRepositoryListener(request.name, listener); + final String name = request.name; + registrationListener = ActionListener.delegateFailure(listener, (outerListener, clusterStateUpdateResponse) -> { + if (clusterStateUpdateResponse.isAcknowledged()) { + // The response was acknowledged - all nodes should know about the new repository, let's verify them + verifyRepository(name, ActionListener.delegateFailure(outerListener, (l, r) -> { + if (r.failed()) { + l.onFailure(new RepositoryVerificationException(name, r.failureDescription())); + } else { + l.onResponse(clusterStateUpdateResponse); + } + })); + } else { + listener.onResponse(clusterStateUpdateResponse); + } + }); } else { registrationListener = listener; } @@ -229,27 +243,19 @@ public void verifyRepository(final String repositoryName, final ActionListener() { - @Override - public void onResponse(VerifyResponse verifyResponse) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + verifyAction.verify(repositoryName, verificationToken, + ActionListener.delegateFailure(listener, + (l, r) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { repository.endVerification(verificationToken); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage( "[{}] failed to finish repository verification", repositoryName), e); - listener.onFailure(e); + l.onFailure(e); return; } - listener.onResponse(verifyResponse); - }); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + l.onResponse(r); + }))); } catch (Exception e) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { @@ -425,47 +431,6 @@ private void ensureRepositoryNotInUse(ClusterState clusterState, String reposito } } - private class VerifyingRegisterRepositoryListener implements ActionListener { - - private final String name; - - private final ActionListener listener; - - VerifyingRegisterRepositoryListener(String name, final ActionListener listener) { - this.name = name; - this.listener = listener; - } - - @Override - public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) { - if (clusterStateUpdateResponse.isAcknowledged()) { - // The response was acknowledged - all nodes should know about the new repository, let's verify them - verifyRepository(name, new ActionListener() { - @Override - public void onResponse(VerifyResponse verifyResponse) { - if (verifyResponse.failed()) { - listener.onFailure(new RepositoryVerificationException(name, verifyResponse.failureDescription())); - } else { - listener.onResponse(clusterStateUpdateResponse); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - listener.onResponse(clusterStateUpdateResponse); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - } - /** * Register repository request */ @@ -527,7 +492,6 @@ public UnregisterRepositoryRequest(String cause, String name) { this.cause = cause; this.name = name; } - } /** @@ -562,7 +526,5 @@ public String failureDescription() { .map(failure -> failure.toString()) .collect(Collectors.joining(", ", "[", "]")); } - } - } diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 6796a23ef0f26..d5b31314e13c3 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -363,19 +363,10 @@ public void testSniffNodesSamplerClosesConnections() throws Exception { final List establishedConnections = new CopyOnWriteArrayList<>(); clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) -> - transport.openConnection(discoveryNode, profile, new ActionListener() { - @Override - public void onResponse(Transport.Connection connection) { - establishedConnections.add(connection); - listener.onResponse(connection); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - })); - + transport.openConnection(discoveryNode, profile, ActionListener.delegateFailure(listener, (l, r) -> { + establishedConnections.add(r); + listener.onResponse(r); + }))); clientService.start(); clientService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index f57894ed895d9..c355dd7707a88 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -1263,45 +1263,35 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map() { + return t.openConnection(proxyNode, profile, ActionListener.delegateFailure(listener, (l, connection) -> l.onResponse( + new Transport.Connection() { @Override - public void onResponse(Transport.Connection connection) { - Transport.Connection proxyConnection = new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return node; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - connection.sendRequest(requestId, action, request, options); - } + public DiscoveryNode getNode() { + return node; + } - @Override - public void addCloseListener(ActionListener listener) { - connection.addCloseListener(listener); - } + @Override + public void sendRequest(long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + connection.sendRequest(requestId, action, request, options); + } - @Override - public boolean isClosed() { - return connection.isClosed(); - } + @Override + public void addCloseListener(ActionListener listener) { + connection.addCloseListener(listener); + } - @Override - public void close() { - connection.close(); - } - }; - listener.onResponse(proxyConnection); + @Override + public boolean isClosed() { + return connection.isClosed(); } @Override - public void onFailure(Exception e) { - listener.onFailure(e); + public void close() { + connection.close(); } - }); - }); + }))); + }); return stubbableTransport; } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 37a18ef67ec06..95b66d1eac524 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -435,7 +435,7 @@ public synchronized DiscoveryNode getPrimaryNode() { } public Future asyncRecoverReplica( - final IndexShard replica, final BiFunction targetSupplier) throws IOException { + final IndexShard replica, final BiFunction targetSupplier) { final FutureTask task = new FutureTask<>(() -> { recoverReplica(replica, targetSupplier); return null; @@ -611,17 +611,7 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - new ActionListener() { - @Override - public void onResponse(PrimaryResult result) { - result.respond(listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, new ReplicasRef(), logger, opType).execute(); + ActionListener.delegateFailure(listener, (l, r) -> r.respond(l)), new ReplicasRef(), logger, opType).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -695,28 +685,20 @@ public void performOn( final ActionListener listener) { IndexShard replica = replicationTargets.findReplicaShard(replicaRouting); replica.acquireReplicaOperationPermit( - getPrimaryShard().getPendingPrimaryTerm(), - globalCheckpoint, - maxSeqNoOfUpdatesOrDeletes, - new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - try { - performOnReplica(request, replica); - releasable.close(); - listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint())); - } catch (final Exception e) { - Releasables.closeWhileHandlingException(releasable); - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, - ThreadPool.Names.WRITE, request); + getPrimaryShard().getPendingPrimaryTerm(), + globalCheckpoint, + maxSeqNoOfUpdatesOrDeletes, + ActionListener.delegateFailure(listener, (l, r) -> { + try { + performOnReplica(request, replica); + r.close(); + l.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint())); + } catch (final Exception e) { + Releasables.closeWhileHandlingException(r); + l.onFailure(e); + } + }), + ThreadPool.Names.WRITE, request); } @Override @@ -895,7 +877,7 @@ protected void performOnReplica(ResyncReplicationRequest request, IndexShard rep } private TransportWriteAction.WritePrimaryResult executeResyncOnPrimary( - IndexShard primary, ResyncReplicationRequest request) throws Exception { + IndexShard primary, ResyncReplicationRequest request) { final TransportWriteAction.WritePrimaryResult result = new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 673ed49387570..6508163de1da0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -134,18 +134,8 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, TransportAddress address = node.getAddress(); OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior); - ActionListener wrappedListener = new ActionListener() { - - @Override - public void onResponse(Connection connection) { - listener.onResponse(new WrappedConnection(connection)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }; + ActionListener wrappedListener = + ActionListener.delegateFailure(listener, (l, connection) -> l.onResponse(new WrappedConnection(connection))); if (behavior == null) { return delegate.openConnection(node, profile, wrappedListener); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 3f7f361d4b375..8e79cdf627d02 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; -import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -147,19 +146,9 @@ public void onFailure(Exception e) { } @Override - protected void doRun() throws Exception { - restoreService.restoreSnapshot(restoreRequest, new ActionListener() { - - @Override - public void onResponse(RestoreService.RestoreCompletionResponse response) { - afterRestoreStarted(clientWithHeaders, request, listener, response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + protected void doRun() { + restoreService.restoreSnapshot(restoreRequest, + ActionListener.delegateFailure(listener, (l, r) -> afterRestoreStarted(clientWithHeaders, request, l, r))); } }); } @@ -186,28 +175,20 @@ public void onFailure(Exception e) { listener = originalListener; } - RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener() { - @Override - public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) { - RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); - + RestoreClusterStateListener.createAndRegisterListener(clusterService, response, + ActionListener.delegateFailure(listener, (l, r) -> { + RestoreInfo restoreInfo = r.getRestoreInfo(); if (restoreInfo == null) { // If restoreInfo is null then it is possible there was a master failure during the // restore. - listener.onResponse(new PutFollowAction.Response(true, false, false)); + l.onResponse(new PutFollowAction.Response(true, false, false)); } else if (restoreInfo.failedShards() == 0) { - initiateFollowing(clientWithHeaders, request, listener); + initiateFollowing(clientWithHeaders, request, l); } else { assert restoreInfo.failedShards() > 0 : "Should have failed shards"; - listener.onResponse(new PutFollowAction.Response(true, false, false)); + l.onResponse(new PutFollowAction.Response(true, false, false)); } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + })); } private void initiateFollowing( From cf9763af4b2f1f7159c65647f5f3e19815b5ac7d Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 2 Apr 2019 12:49:11 +0200 Subject: [PATCH 2/3] fix merge conflicts --- .../repositories/RepositoriesService.java | 79 +++++-------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index eb130f31e4b9d..f9c79c30ce7b7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -97,7 +97,14 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL final ActionListener registrationListener; if (request.verify()) { - registrationListener = new VerifyingRegisterRepositoryListener(request.name(), listener); + registrationListener = ActionListener.delegateFailure(listener, (l, r) -> { + if (r.isAcknowledged()) { + // The response was acknowledged - all nodes should know about the new repository, let's verify them + verifyRepository(request.name(), ActionListener.delegateFailure(l, (lis, resp) -> lis.onResponse(r))); + } else { + l.onResponse(r); + } + }); } else { registrationListener = listener; } @@ -229,27 +236,18 @@ public void verifyRepository(final String repositoryName, final ActionListener>() { - @Override - public void onResponse(List verifyResponse) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - repository.endVerification(verificationToken); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage( - "[{}] failed to finish repository verification", repositoryName), e); - listener.onFailure(e); - return; - } - listener.onResponse(verifyResponse); - }); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener, + (l, r) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + repository.endVerification(verificationToken); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] failed to finish repository verification", repositoryName), e); + l.onFailure(e); + return; + } + l.onResponse(r); + }))); } catch (Exception e) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { @@ -424,41 +422,4 @@ private void ensureRepositoryNotInUse(ClusterState clusterState, String reposito throw new IllegalStateException("trying to modify or unregister repository that is currently used "); } } - - private class VerifyingRegisterRepositoryListener implements ActionListener { - - private final String name; - - private final ActionListener listener; - - VerifyingRegisterRepositoryListener(String name, final ActionListener listener) { - this.name = name; - this.listener = listener; - } - - @Override - public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) { - if (clusterStateUpdateResponse.isAcknowledged()) { - // The response was acknowledged - all nodes should know about the new repository, let's verify them - verifyRepository(name, new ActionListener>() { - @Override - public void onResponse(List verifyResponse) { - listener.onResponse(clusterStateUpdateResponse); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - listener.onResponse(clusterStateUpdateResponse); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - } } From 1f0a804438209136bd617365883acc143e57fbae Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 4 Apr 2019 21:41:12 +0200 Subject: [PATCH 3/3] CR: fix naming --- .../tasks/get/TransportGetTaskAction.java | 6 +-- .../TransportDeleteRepositoryAction.java | 4 +- .../put/TransportPutRepositoryAction.java | 4 +- .../TransportVerifyRepositoryAction.java | 3 +- .../TransportRestoreSnapshotAction.java | 15 +++--- .../close/TransportCloseIndexAction.java | 4 +- .../indices/shrink/TransportResizeAction.java | 6 +-- .../upgrade/post/TransportUpgradeAction.java | 4 +- .../action/bulk/TransportBulkAction.java | 11 +++-- .../master/TransportMasterNodeAction.java | 4 +- .../index/query/TermsQueryBuilder.java | 8 +-- .../index/seqno/RetentionLeaseActions.java | 6 +-- .../elasticsearch/index/shard/IndexShard.java | 45 ++++++++--------- .../CompletionPersistentTaskAction.java | 3 +- .../RemovePersistentTaskAction.java | 3 +- .../persistent/StartPersistentTaskAction.java | 3 +- .../UpdatePersistentTaskStatusAction.java | 3 +- .../repositories/RepositoriesService.java | 15 +++--- .../TransportClientNodesServiceTests.java | 9 ++-- .../RemoteClusterConnectionTests.java | 49 ++++++++++--------- .../ESIndexLevelReplicationTestCase.java | 13 ++--- .../test/transport/StubbableTransport.java | 3 +- .../ccr/action/TransportPutFollowAction.java | 13 ++--- 23 files changed, 125 insertions(+), 109 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 021a5edcd0482..fe07a4efe930e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -180,15 +180,15 @@ public void onFailure(Exception e) { */ void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask, ActionListener listener) { - getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (l, e) -> { + getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> { /* * We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If * the error isn't a 404 then we'll just throw it back to the user. */ if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) { - l.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask))); + delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask))); } else { - l.onFailure(e); + delegatedListener.onFailure(e); } })); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index 80dfcd4f66dc3..48b9bdaa4511b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -68,6 +68,8 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state, final ActionListener listener) { repositoriesService.unregisterRepository( - request, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged())))); + request, ActionListener.delegateFailure(listener, + (delegatedListener, unregisterRepositoryResponse) -> + delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged())))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index 8d32447161c75..a10e5c878a7c9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -67,7 +67,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster @Override protected void masterOperation(final PutRepositoryRequest request, ClusterState state, final ActionListener listener) { - repositoriesService.registerRepository( - request, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged())))); + repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener, + (delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged())))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java index 4b8064f98aecc..d4ec1d3a8bcb4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java @@ -69,6 +69,7 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state, final ActionListener listener) { repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener, - (l, r) -> l.onResponse(new VerifyRepositoryResponse(r.toArray(new DiscoveryNode[0]))))); + (delegatedListener, verifyResponse) -> + delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index fa5899964fd95..62b59f272c1b7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -72,12 +72,13 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust @Override protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state, final ActionListener listener) { - restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (l, r) -> { - if (r.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, r, l); - } else { - l.onResponse(new RestoreSnapshotResponse(r.getRestoreInfo())); - } - })); + restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, + (delegatedListener, restoreCompletionResponse) -> { + if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { + RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener); + } else { + delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); + } + })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index c5446a8d3300a..a6f4b6f3d0c4a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -118,9 +118,9 @@ protected void masterOperation(final Task task, .masterNodeTimeout(request.masterNodeTimeout()) .waitForActiveShards(request.waitForActiveShards()) .indices(concreteIndices); - indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (l, t) -> { + indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> { logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t); - listener.onFailure(t); + delegatedListener.onFailure(t); })); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java index a1960535dc1b3..ff4e643a08227 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java @@ -97,14 +97,14 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex()); final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index()); client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute( - ActionListener.delegateFailure(listener, (l, r) -> { + ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> { CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> { - IndexShardStats shard = r.getIndex(sourceIndex).getIndexShards().get(i); + IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); }, sourceIndex, targetIndex); createIndexService.createIndex( - updateRequest, ActionListener.map(l, + updateRequest, ActionListener.map(delegatedListener, response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index())) ); })); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index a838395a30a4a..f2d046f3321b2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -208,7 +208,7 @@ public void onFailure(Exception e) { private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions()); - client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, - ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(upgradeResponse))); + client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure( + listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse))); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b8230be031ab2..0249aff74363b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -658,13 +658,14 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, return ActionListener.map(actionListener, response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis)); } else { - return ActionListener.delegateFailure(actionListener, (l, r) -> { - BulkItemResponse[] items = r.getItems(); + return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { + BulkItemResponse[] items = response.getItems(); for (int i = 0; i < items.length; i++) { - itemResponses.add(originalSlots[i], r.getItems()[i]); + itemResponses.add(originalSlots[i], response.getItems()[i]); } - l.onResponse( - new BulkResponse(itemResponses.toArray(new BulkItemResponse[0]), r.getTook().getMillis(), ingestTookInMillis)); + delegatedListener.onResponse( + new BulkResponse( + itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis)); }); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 940a4f5c2c0ea..595b72f8da803 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -185,13 +185,13 @@ protected void doStart(ClusterState clusterState) { }); } } else { - ActionListener delegate = ActionListener.delegateResponse(listener, (l, t) -> { + ActionListener delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> { if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) { logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " + "stepped down before publishing action [{}], scheduling a retry", actionName), t); retry(t, masterChangePredicate); } else { - l.onFailure(t); + delegatedListener.onFailure(t); } }); threadPool.executor(executor).execute(new ActionRunnable(delegate) { diff --git a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java index e0007c0911cd1..2f4f934727dc7 100644 --- a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java @@ -463,13 +463,13 @@ private void fetch(TermsLookup termsLookup, Client client, ActionListener { + client.get(getRequest, ActionListener.delegateFailure(actionListener, (delegatedListener, getResponse) -> { List terms = new ArrayList<>(); - if (r.isSourceEmpty() == false) { // extract terms only if the doc source exists - List extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), r.getSourceAsMap()); + if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists + List extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap()); terms.addAll(extractedValues); } - l.onResponse(terms); + delegatedListener.onResponse(terms); })); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 6fac01cc0a01f..c503f1fa16377 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -102,9 +102,9 @@ protected void asyncShardOperation(T request, ShardId shardId, final ActionListe final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); indexShard.acquirePrimaryOperationPermit( - ActionListener.delegateFailure(listener, (l, r) -> { - try (Releasable ignore = r) { - doRetentionLeaseAction(indexShard, request, l); + ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> { + try (Releasable ignore = releasable) { + doRetentionLeaseAction(indexShard, request, delegatedListener); } }), ThreadPool.Names.SAME, diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index df729e1b0e2b2..90f7c662b91c5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2688,29 +2688,30 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed // in the order submitted, combining both operations ensure that the term is updated before the operation is // executed. It also has the side effect of acquiring all the permits one time instead of two. - final ActionListener operationListener = ActionListener.delegateFailure(onPermitAcquired, (l, r) -> { - if (opPrimaryTerm < getOperationPrimaryTerm()) { - r.close(); - final String message = String.format( - Locale.ROOT, - "%s operation primary term [%d] is too old (current [%d])", - shardId, - opPrimaryTerm, - getOperationPrimaryTerm()); - l.onFailure(new IllegalStateException(message)); - } else { - assert assertReplicationTarget(); - try { - updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); - advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); - } catch (Exception e) { - r.close(); - l.onFailure(e); - return; + final ActionListener operationListener = ActionListener.delegateFailure(onPermitAcquired, + (delegatedListener, releasable) -> { + if (opPrimaryTerm < getOperationPrimaryTerm()) { + releasable.close(); + final String message = String.format( + Locale.ROOT, + "%s operation primary term [%d] is too old (current [%d])", + shardId, + opPrimaryTerm, + getOperationPrimaryTerm()); + delegatedListener.onFailure(new IllegalStateException(message)); + } else { + assert assertReplicationTarget(); + try { + updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); + } catch (Exception e) { + releasable.close(); + delegatedListener.onFailure(e); + return; + } + delegatedListener.onResponse(releasable); } - l.onResponse(r); - } - }); + }); if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java index fa6b59b75f9c4..b09c33b59d2f5 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java @@ -164,7 +164,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception, - ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); + ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java index 18edf652fff34..877033fe4f33a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java @@ -149,7 +149,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.removePersistentTask( - request.taskId, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); + request.taskId, ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index e87a89d07c001..977239ee2b990 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -216,7 +216,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params, - ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); + ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java index 67b6277c8ae41..218154d37c9b7 100644 --- a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -180,7 +180,8 @@ protected final void masterOperation(final Request request, final ClusterState state, final ActionListener listener) { persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state, - ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r)))); + ActionListener.delegateFailure(listener, + (delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task)))); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index f9c79c30ce7b7..e141d0d6014e7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -97,12 +97,13 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL final ActionListener registrationListener; if (request.verify()) { - registrationListener = ActionListener.delegateFailure(listener, (l, r) -> { - if (r.isAcknowledged()) { + registrationListener = ActionListener.delegateFailure(listener, (delegatedListener, clusterStateUpdateResponse) -> { + if (clusterStateUpdateResponse.isAcknowledged()) { // The response was acknowledged - all nodes should know about the new repository, let's verify them - verifyRepository(request.name(), ActionListener.delegateFailure(l, (lis, resp) -> lis.onResponse(r))); + verifyRepository(request.name(), ActionListener.delegateFailure(delegatedListener, + (innerDelegatedListener, discoveryNodes) -> innerDelegatedListener.onResponse(clusterStateUpdateResponse))); } else { - l.onResponse(r); + delegatedListener.onResponse(clusterStateUpdateResponse); } }); } else { @@ -237,16 +238,16 @@ public void verifyRepository(final String repositoryName, final ActionListener threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + (delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { repository.endVerification(verificationToken); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage( "[{}] failed to finish repository verification", repositoryName), e); - l.onFailure(e); + delegatedListener.onFailure(e); return; } - l.onResponse(r); + delegatedListener.onResponse(verifyResponse); }))); } catch (Exception e) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index d5b31314e13c3..bdcaf80ee19e9 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -363,10 +363,11 @@ public void testSniffNodesSamplerClosesConnections() throws Exception { final List establishedConnections = new CopyOnWriteArrayList<>(); clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) -> - transport.openConnection(discoveryNode, profile, ActionListener.delegateFailure(listener, (l, r) -> { - establishedConnections.add(r); - listener.onResponse(r); - }))); + transport.openConnection(discoveryNode, profile, + ActionListener.delegateFailure(listener, (delegatedListener, connection) -> { + establishedConnections.add(connection); + delegatedListener.onResponse(connection); + }))); clientService.start(); clientService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index c355dd7707a88..5f6d31dce3e97 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -1263,34 +1263,35 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map l.onResponse( - new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return node; - } + return t.openConnection(proxyNode, profile, ActionListener.delegateFailure(listener, + (delegatedListener, connection) -> delegatedListener.onResponse( + new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } - @Override - public void sendRequest(long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException { - connection.sendRequest(requestId, action, request, options); - } + @Override + public void sendRequest(long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + connection.sendRequest(requestId, action, request, options); + } - @Override - public void addCloseListener(ActionListener listener) { - connection.addCloseListener(listener); - } + @Override + public void addCloseListener(ActionListener listener) { + connection.addCloseListener(listener); + } - @Override - public boolean isClosed() { - return connection.isClosed(); - } + @Override + public boolean isClosed() { + return connection.isClosed(); + } - @Override - public void close() { - connection.close(); - } - }))); + @Override + public void close() { + connection.close(); + } + }))); }); return stubbableTransport; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 8b566019926c9..48cc384176e3b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -611,7 +611,8 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - ActionListener.delegateFailure(listener, (l, r) -> r.respond(l)), new ReplicasRef(), logger, opType).execute(); + ActionListener.delegateFailure(listener, + (delegatedListener, result) -> result.respond(delegatedListener)), new ReplicasRef(), logger, opType).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -688,14 +689,14 @@ public void performOn( getPrimaryShard().getPendingPrimaryTerm(), globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, - ActionListener.delegateFailure(listener, (l, r) -> { + ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> { try { performOnReplica(request, replica); - r.close(); - l.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint())); + releasable.close(); + delegatedListener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint())); } catch (final Exception e) { - Releasables.closeWhileHandlingException(r); - l.onFailure(e); + Releasables.closeWhileHandlingException(releasable); + delegatedListener.onFailure(e); } }), ThreadPool.Names.WRITE, request); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 1e5fe5f26de99..4ccc352158a73 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -130,7 +130,8 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior); ActionListener wrappedListener = - ActionListener.delegateFailure(listener, (l, connection) -> l.onResponse(new WrappedConnection(connection))); + ActionListener.delegateFailure(listener, + (delegatedListener, connection) -> delegatedListener.onResponse(new WrappedConnection(connection))); if (behavior == null) { return delegate.openConnection(node, profile, wrappedListener); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 8e79cdf627d02..2a95c2a3c7aab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -148,7 +148,8 @@ public void onFailure(Exception e) { @Override protected void doRun() { restoreService.restoreSnapshot(restoreRequest, - ActionListener.delegateFailure(listener, (l, r) -> afterRestoreStarted(clientWithHeaders, request, l, r))); + ActionListener.delegateFailure(listener, + (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response))); } }); } @@ -176,17 +177,17 @@ public void onFailure(Exception e) { } RestoreClusterStateListener.createAndRegisterListener(clusterService, response, - ActionListener.delegateFailure(listener, (l, r) -> { - RestoreInfo restoreInfo = r.getRestoreInfo(); + ActionListener.delegateFailure(listener, (delegatedListener, restoreSnapshotResponse) -> { + RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); if (restoreInfo == null) { // If restoreInfo is null then it is possible there was a master failure during the // restore. - l.onResponse(new PutFollowAction.Response(true, false, false)); + delegatedListener.onResponse(new PutFollowAction.Response(true, false, false)); } else if (restoreInfo.failedShards() == 0) { - initiateFollowing(clientWithHeaders, request, l); + initiateFollowing(clientWithHeaders, request, delegatedListener); } else { assert restoreInfo.failedShards() > 0 : "Should have failed shards"; - l.onResponse(new PutFollowAction.Response(true, false, false)); + delegatedListener.onResponse(new PutFollowAction.Response(true, false, false)); } })); }