diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 5fca8abcc925c..8637de272757a 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -105,10 +105,11 @@ public RestResponse buildResponse(final Table table) throws Exception { } }); - sendGetSettingsRequest(indices, indicesOptions, local, masterNodeTimeout, client, new ActionListener() { + sendGetSettingsRequest(indices, indicesOptions, local, masterNodeTimeout, client, + new ActionListener.Delegating(listener) { @Override public void onResponse(final GetSettingsResponse getSettingsResponse) { - final GroupedActionListener groupedListener = createGroupedListener(request, 4, listener); + final GroupedActionListener groupedListener = createGroupedListener(request, 4, delegate); groupedListener.onResponse(getSettingsResponse); // The list of indices that will be returned is determined by the indices returned from the Get Settings call. @@ -132,11 +133,6 @@ public void onResponse(final GetSettingsResponse getSettingsResponse) { sendClusterHealthRequest(indices, subRequestIndicesOptions, local, masterNodeTimeout, client, ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)); } - - @Override - public void onFailure(final Exception e) { - listener.onFailure(e); - } }); }; } @@ -214,7 +210,7 @@ private void sendIndicesStatsRequest(final String[] indices, private GroupedActionListener createGroupedListener(final RestRequest request, final int size, final ActionListener listener) { - return new GroupedActionListener<>(new ActionListener>() { + return new GroupedActionListener<>(new ActionListener.Delegating, Table>(listener) { @Override public void onResponse(final Collection responses) { try { @@ -234,16 +230,11 @@ public void onResponse(final Collection responses) { Map indicesStats = statsResponse.getIndices(); Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates); - listener.onResponse(responseTable); + delegate.onResponse(responseTable); } catch (Exception e) { onFailure(e); } } - - @Override - public void onFailure(final Exception e) { - listener.onFailure(e); - } }, size); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f6bda9457d3b9..c6157695ba945 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -338,18 +338,10 @@ protected void doClose() { public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInContext, SearchShardTask task, ActionListener listener) { final IndexShard shard = getShard(request); - rewriteAndFetchShardRequest(shard, request, new ActionListener() { - @Override - public void onResponse(ShardSearchRequest rewritten) { - // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener); - } - - @Override - public void onFailure(Exception exc) { - listener.onFailure(exc); - } - }); + rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> { + // fork the execution in the search thread pool + runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), l); + })); } private DfsSearchResult executeDfsPhase(ShardSearchRequest request, @@ -385,34 +377,26 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; final IndexShard shard = getShard(request); - rewriteAndFetchShardRequest(shard, request, new ActionListener() { - @Override - public void onResponse(ShardSearchRequest orig) { - // check if we can shortcut the query phase entirely. - if (orig.canReturnNullResponseIfMatchNoDocs()) { - assert orig.scroll() == null; - final CanMatchResponse canMatchResp; - try { - ShardSearchRequest clone = new ShardSearchRequest(orig); - canMatchResp = canMatch(clone, false); - } catch (Exception exc) { - listener.onFailure(exc); - return; - } - if (canMatchResp.canMatch == false) { - listener.onResponse(QuerySearchResult.nullInstance()); - return; - } + rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) -> { + // check if we can shortcut the query phase entirely. + if (orig.canReturnNullResponseIfMatchNoDocs()) { + assert orig.scroll() == null; + final CanMatchResponse canMatchResp; + try { + ShardSearchRequest clone = new ShardSearchRequest(orig); + canMatchResp = canMatch(clone, false); + } catch (Exception exc) { + l.onFailure(exc); + return; + } + if (canMatchResp.canMatch == false) { + l.onResponse(QuerySearchResult.nullInstance()); + return; } - // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); - } - - @Override - public void onFailure(Exception exc) { - listener.onFailure(exc); } - }); + // fork the execution in the search thread pool + runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), l); + })); } private IndexShard getShard(ShardSearchRequest request) { diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index ff206e035d686..76f1381904416 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -209,19 +209,11 @@ public void onFailure(Exception e) { DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); - connectionManager.connectToNode(node, null, clusterNameValidator, new ActionListener() { - @Override - public void onResponse(Void v) { - compositeListener.onResponse(v); - } - - @Override - public void onFailure(Exception e) { - logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]", + connectionManager.connectToNode(node, null, clusterNameValidator, compositeListener.delegateResponse((l, e) -> { + logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]", clusterAlias, resolved), e); - compositeListener.onFailure(e); - } - }); + l.onFailure(e); + })); } } else { int openConnections = connectionManager.size(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java index db02b57b24aa5..a4905fe728ae4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -102,7 +102,7 @@ protected void shardOperation(final ForgetFollowerAction.Request request, final final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id()); - indexShard.acquirePrimaryOperationPermit(new ActionListener() { + indexShard.acquirePrimaryOperationPermit(new ActionListener.Delegating(listener) { @Override public void onResponse(Releasable releasable) { try { @@ -110,25 +110,20 @@ public void onResponse(Releasable releasable) { @Override public void onResponse(ReplicationResponse replicationResponse) { releasable.close(); - listener.onResponse(EmptyResult.INSTANCE); + delegate.onResponse(EmptyResult.INSTANCE); } @Override public void onFailure(Exception e) { releasable.close(); - listener.onFailure(e); + delegate.onFailure(e); } }); } catch (Exception e) { releasable.close(); - listener.onFailure(e); + onFailure(e); } } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } }, ThreadPool.Names.SAME, request); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java index 6987d7409fe53..5657d9362117a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java @@ -46,35 +46,16 @@ public TransportXPackUsageAction(ThreadPool threadPool, TransportService transpo @Override protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener listener) { - final ActionListener> usageActionListener = new ActionListener>() { - @Override - public void onResponse(List usages) { - listener.onResponse(new XPackUsageResponse(usages)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }; + final ActionListener> usageActionListener = + listener.delegateFailure((l, usages) -> l.onResponse(new XPackUsageResponse(usages))); final AtomicReferenceArray featureSetUsages = new AtomicReferenceArray<>(featureSets.size()); final AtomicInteger position = new AtomicInteger(0); final BiConsumer>> consumer = (featureSet, iteratingListener) -> { assert Transports.assertNotTransportThread("calculating usage can be more expensive than we allow on transport threads"); - featureSet.usage(new ActionListener() { - @Override - public void onResponse(Usage usage) { - featureSetUsages.set(position.getAndIncrement(), usage); - // the value sent back doesn't matter since our predicate keeps iterating - ActionRunnable invokeListener = ActionRunnable.supply(iteratingListener, Collections::emptyList); - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(invokeListener); - } - - @Override - public void onFailure(Exception e) { - iteratingListener.onFailure(e); - } - }); + featureSet.usage(iteratingListener.delegateFailure((l, usage) -> { + featureSetUsages.set(position.getAndIncrement(), usage); + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(ActionRunnable.supply(iteratingListener, Collections::emptyList)); + })); }; IteratingActionListener, XPackFeatureSet> iteratingActionListener = new IteratingActionListener<>(usageActionListener, consumer, featureSets, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index b3f8a1703015f..ee612cf74d9be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -126,17 +126,8 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId, if (expirationTimeMillis != -1) { task.setExpirationTime(expirationTimeMillis); } - addCompletionListener.apply(task, new ActionListener() { - @Override - public void onResponse(Response response) { - sendFinalResponse(request, response, nowInMillis, listener); - } - - @Override - public void onFailure(Exception exc) { - listener.onFailure(exc); - } - }, request.getWaitForCompletionTimeout()); + addCompletionListener.apply(task, listener.delegateFailure((l, response) -> + sendFinalResponse(request, response, nowInMillis, l)), request.getWaitForCompletionTimeout()); } catch (Exception exc) { listener.onFailure(exc); } @@ -146,18 +137,7 @@ private void getSearchResponseFromIndex(AsyncExecutionId searchId, GetAsyncResultRequest request, long nowInMillis, ActionListener listener) { - store.getResponse(searchId, true, - new ActionListener() { - @Override - public void onResponse(Response response) { - sendFinalResponse(request, response, nowInMillis, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + store.getResponse(searchId, true, listener.delegateFailure((l, response) -> sendFinalResponse(request, response, nowInMillis, l))); } private void sendFinalResponse(GetAsyncResultRequest request, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 2040a590c536d..10076bbad2f30 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -371,18 +371,8 @@ public void retrieveStatu SR response = statusProducerFromTask.apply(asyncTask); sendFinalStatusResponse(request, response, listener); } else { // get status response from index - getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex, - new ActionListener() { - @Override - public void onResponse(SR searchStatusResponse) { - sendFinalStatusResponse(request, searchStatusResponse, listener); - } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - } - ); + getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex, listener.delegateFailure( + (l, searchStatusResponse) -> sendFinalStatusResponse(request, searchStatusResponse, l))); } } catch (Exception exc) { listener.onFailure(exc); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 770e11ce4d5ea..b43e0b3b66e59 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -22,17 +22,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; @@ -40,7 +36,6 @@ import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; @@ -121,23 +116,15 @@ public void run() { logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices); GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices); // This call does not set the origin to ensure that the user executing the policy has permission to access the source index - client.admin().indices().getIndex(getIndexRequest, new ActionListener() { - @Override - public void onResponse(GetIndexResponse getIndexResponse) { - try { - validateMappings(getIndexResponse); - } catch (Exception e) { - listener.onFailure(e); - return; - } - prepareAndCreateEnrichIndex(); + client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> { + try { + validateMappings(getIndexResponse); + } catch (Exception e) { + l.onFailure(e); + return; } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + prepareAndCreateEnrichIndex(); + })); } private Map getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) { @@ -312,33 +299,21 @@ private void prepareAndCreateEnrichIndex() { CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings); createEnrichIndexRequest.mapping(MapperService.SINGLE_MAPPING_NAME, resolveEnrichMapping(policy)); logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName); - enrichOriginClient().admin().indices().create(createEnrichIndexRequest, new ActionListener() { - @Override - public void onResponse(CreateIndexResponse createIndexResponse) { - prepareReindexOperation(enrichIndexName); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + enrichOriginClient().admin() + .indices() + .create( + createEnrichIndexRequest, + listener.delegateFailure((l, createIndexResponse) -> prepareReindexOperation(enrichIndexName)) + ); } private void prepareReindexOperation(final String destinationIndexName) { // Check to make sure that the enrich pipeline exists, and create it if it is missing. if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) { - EnrichPolicyReindexPipeline.create(enrichOriginClient(), new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - transferDataToEnrichIndex(destinationIndexName); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + EnrichPolicyReindexPipeline.create( + enrichOriginClient(), + listener.delegateFailure((l, r) -> transferDataToEnrichIndex(destinationIndexName)) + ); } else { transferDataToEnrichIndex(destinationIndexName); } @@ -363,67 +338,66 @@ private void transferDataToEnrichIndex(final String destinationIndexName) { reindexRequest.getDestination().routing("discard"); reindexRequest.getDestination().setPipeline(EnrichPolicyReindexPipeline.pipelineName()); - client.execute(EnrichReindexAction.INSTANCE, reindexRequest, new ActionListener() { - @Override - public void onResponse(BulkByScrollResponse bulkByScrollResponse) { - // Do we want to fail the request if there were failures during the reindex process? - if (bulkByScrollResponse.getBulkFailures().size() > 0) { - logger.warn( - "Policy [{}]: encountered [{}] bulk failures. Turn on DEBUG logging for details.", - policyName, - bulkByScrollResponse.getBulkFailures().size() - ); - if (logger.isDebugEnabled()) { - for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { - logger.debug( - new ParameterizedMessage( - "Policy [{}]: bulk index failed for index [{}], id [{}]", - policyName, - failure.getIndex(), - failure.getId() - ), - failure.getCause() - ); + client.execute( + EnrichReindexAction.INSTANCE, + reindexRequest, + new ActionListener.Delegating(listener) { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + // Do we want to fail the request if there were failures during the reindex process? + if (bulkByScrollResponse.getBulkFailures().size() > 0) { + logger.warn( + "Policy [{}]: encountered [{}] bulk failures. Turn on DEBUG logging for details.", + policyName, + bulkByScrollResponse.getBulkFailures().size() + ); + if (logger.isDebugEnabled()) { + for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { + logger.debug( + new ParameterizedMessage( + "Policy [{}]: bulk index failed for index [{}], id [{}]", + policyName, + failure.getIndex(), + failure.getId() + ), + failure.getCause() + ); + } } - } - listener.onFailure(new ElasticsearchException("Encountered bulk failures during reindex process")); - } else if (bulkByScrollResponse.getSearchFailures().size() > 0) { - logger.warn( - "Policy [{}]: encountered [{}] search failures. Turn on DEBUG logging for details.", - policyName, - bulkByScrollResponse.getSearchFailures().size() - ); - if (logger.isDebugEnabled()) { - for (ScrollableHitSource.SearchFailure failure : bulkByScrollResponse.getSearchFailures()) { - logger.debug( - new ParameterizedMessage( - "Policy [{}]: search failed for index [{}], shard [{}] on node [{}]", - policyName, - failure.getIndex(), - failure.getShardId(), - failure.getNodeId() - ), - failure.getReason() - ); + delegate.onFailure(new ElasticsearchException("Encountered bulk failures during reindex process")); + } else if (bulkByScrollResponse.getSearchFailures().size() > 0) { + logger.warn( + "Policy [{}]: encountered [{}] search failures. Turn on DEBUG logging for details.", + policyName, + bulkByScrollResponse.getSearchFailures().size() + ); + if (logger.isDebugEnabled()) { + for (ScrollableHitSource.SearchFailure failure : bulkByScrollResponse.getSearchFailures()) { + logger.debug( + new ParameterizedMessage( + "Policy [{}]: search failed for index [{}], shard [{}] on node [{}]", + policyName, + failure.getIndex(), + failure.getShardId(), + failure.getNodeId() + ), + failure.getReason() + ); + } } + delegate.onFailure(new ElasticsearchException("Encountered search failures during reindex process")); + } else { + logger.info( + "Policy [{}]: Transferred [{}] documents to enrich index [{}]", + policyName, + bulkByScrollResponse.getCreated(), + destinationIndexName + ); + forceMergeEnrichIndex(destinationIndexName, 1); } - listener.onFailure(new ElasticsearchException("Encountered search failures during reindex process")); - } else { - logger.info( - "Policy [{}]: Transferred [{}] documents to enrich index [{}]", - policyName, - bulkByScrollResponse.getCreated(), - destinationIndexName - ); - forceMergeEnrichIndex(destinationIndexName, 1); } } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ); } private void forceMergeEnrichIndex(final String destinationIndexName, final int attempt) { @@ -436,116 +410,86 @@ private void forceMergeEnrichIndex(final String destinationIndexName, final int ); enrichOriginClient().admin() .indices() - .forceMerge(new ForceMergeRequest(destinationIndexName).maxNumSegments(1), new ActionListener() { - @Override - public void onResponse(ForceMergeResponse forceMergeResponse) { - refreshEnrichIndex(destinationIndexName, attempt); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + .forceMerge( + new ForceMergeRequest(destinationIndexName).maxNumSegments(1), + listener.delegateFailure((l, r) -> refreshEnrichIndex(destinationIndexName, attempt)) + ); } private void refreshEnrichIndex(final String destinationIndexName, final int attempt) { logger.debug("Policy [{}]: Refreshing enrich index [{}]", policyName, destinationIndexName); - enrichOriginClient().admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener() { - @Override - public void onResponse(RefreshResponse refreshResponse) { - ensureSingleSegment(destinationIndexName, attempt); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + enrichOriginClient().admin() + .indices() + .refresh( + new RefreshRequest(destinationIndexName), + listener.delegateFailure((l, r) -> ensureSingleSegment(destinationIndexName, attempt)) + ); } protected void ensureSingleSegment(final String destinationIndexName, final int attempt) { enrichOriginClient().admin() .indices() - .segments(new IndicesSegmentsRequest(destinationIndexName), new ActionListener() { - @Override - public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { - IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName); - if (indexSegments == null) { - throw new ElasticsearchException( - "Could not locate segment information for newly created index [{}]", - destinationIndexName - ); - } - Map indexShards = indexSegments.getShards(); - assert indexShards.size() == 1 : "Expected enrich index to contain only one shard"; - ShardSegments[] shardSegments = indexShards.get(0).getShards(); - assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point"; - ShardSegments primarySegments = shardSegments[0]; - if (primarySegments.getSegments().size() > 1) { - int nextAttempt = attempt + 1; - if (nextAttempt > maxForceMergeAttempts) { - listener.onFailure( - new ElasticsearchException( - "Force merging index [{}] attempted [{}] times but did not result in one segment.", - destinationIndexName, - attempt, - maxForceMergeAttempts - ) + .segments( + new IndicesSegmentsRequest(destinationIndexName), + new ActionListener.Delegating(listener) { + @Override + public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { + IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName); + if (indexSegments == null) { + throw new ElasticsearchException( + "Could not locate segment information for newly created index [{}]", + destinationIndexName ); + } + Map indexShards = indexSegments.getShards(); + assert indexShards.size() == 1 : "Expected enrich index to contain only one shard"; + ShardSegments[] shardSegments = indexShards.get(0).getShards(); + assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point"; + ShardSegments primarySegments = shardSegments[0]; + if (primarySegments.getSegments().size() > 1) { + int nextAttempt = attempt + 1; + if (nextAttempt > maxForceMergeAttempts) { + delegate.onFailure( + new ElasticsearchException( + "Force merging index [{}] attempted [{}] times but did not result in one segment.", + destinationIndexName, + attempt, + maxForceMergeAttempts + ) + ); + } else { + logger.debug( + "Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})", + policyName, + primarySegments.getSegments().size(), + nextAttempt, + maxForceMergeAttempts + ); + forceMergeEnrichIndex(destinationIndexName, nextAttempt); + } } else { - logger.debug( - "Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})", - policyName, - primarySegments.getSegments().size(), - nextAttempt, - maxForceMergeAttempts - ); - forceMergeEnrichIndex(destinationIndexName, nextAttempt); + // Force merge down to one segment successful + setIndexReadOnly(destinationIndexName); } - } else { - // Force merge down to one segment successful - setIndexReadOnly(destinationIndexName); } } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ); } private void setIndexReadOnly(final String destinationIndexName) { logger.debug("Policy [{}]: Setting new enrich index [{}] to be read only", policyName, destinationIndexName); UpdateSettingsRequest request = new UpdateSettingsRequest(destinationIndexName).setPreserveExisting(true) .settings(Settings.builder().put("index.auto_expand_replicas", "0-all").put("index.blocks.write", "true")); - enrichOriginClient().admin().indices().updateSettings(request, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - waitForIndexGreen(destinationIndexName); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + enrichOriginClient().admin() + .indices() + .updateSettings(request, listener.delegateFailure((l, r) -> waitForIndexGreen(destinationIndexName))); } private void waitForIndexGreen(final String destinationIndexName) { ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus(); - enrichOriginClient().admin().cluster().health(request, new ActionListener() { - @Override - public void onResponse(ClusterHealthResponse clusterHealthResponse) { - updateEnrichPolicyAlias(destinationIndexName); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + enrichOriginClient().admin() + .cluster() + .health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName))); } private void updateEnrichPolicyAlias(final String destinationIndexName) { @@ -561,20 +505,12 @@ private void updateEnrichPolicyAlias(final String destinationIndexName) { aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().indices(indices).alias(enrichIndexBase)); } aliasToggleRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(destinationIndexName).alias(enrichIndexBase)); - enrichOriginClient().admin().indices().aliases(aliasToggleRequest, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - logger.info("Policy [{}]: Policy execution complete", policyName); - ExecuteEnrichPolicyStatus completeStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); - task.setStatus(completeStatus); - listener.onResponse(completeStatus); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + enrichOriginClient().admin().indices().aliases(aliasToggleRequest, listener.delegateFailure((l, r) -> { + logger.info("Policy [{}]: Policy execution complete", policyName); + ExecuteEnrichPolicyStatus completeStatus = new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.COMPLETE); + task.setStatus(completeStatus); + l.onResponse(completeStatus); + })); } /** diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java index 9a0ec03bdc816..31cd2bbb8368e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportExecuteEnrichPolicyAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor; import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; @@ -80,17 +79,10 @@ protected void masterOperation( } if (request.isWaitForCompletion()) { - executor.runPolicy(request, new ActionListener() { - @Override - public void onResponse(ExecuteEnrichPolicyStatus executionStatus) { - listener.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + executor.runPolicy( + request, + listener.delegateFailure((l, executionStatus) -> l.onResponse(new ExecuteEnrichPolicyAction.Response(executionStatus))) + ); } else { Task executeTask = executor.runPolicy(request, LoggingTaskListener.instance()); TaskId taskId = new TaskId(clusterService.localNode().getId(), executeTask.getId()); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/ReverseListener.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/ReverseListener.java index 8774cc5aaaba2..22592565f5592 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/ReverseListener.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/ReverseListener.java @@ -11,21 +11,14 @@ import org.elasticsearch.xpack.eql.execution.payload.ReversePayload; import org.elasticsearch.xpack.eql.session.Payload; -public class ReverseListener implements ActionListener { - - private final ActionListener delegate; +public class ReverseListener extends ActionListener.Delegating { public ReverseListener(ActionListener delegate) { - this.delegate = delegate; + super(delegate); } @Override public void onResponse(Payload response) { delegate.onResponse(new ReversePayload(response)); } - - @Override - public void onFailure(Exception e) { - delegate.onFailure(e); - } } diff --git a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java index 510f0d4fa4034..3c8384c2484ba 100644 --- a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java +++ b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java @@ -313,7 +313,7 @@ synchronized void expand() { // System.out.println(source); logger.trace("executing expansion graph search request"); - client.search(searchRequest, new ActionListener() { + client.search(searchRequest, new ActionListener.Delegating(listener) { @Override public void onResponse(SearchResponse searchResponse) { // System.out.println(searchResponse); @@ -515,11 +515,6 @@ private double getExpandTotalSignalStrength(Hop lastHop, Hop currentHop, Sampler } return totalSignalOutput; } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } }); } @@ -660,7 +655,7 @@ public synchronized void start() { searchRequest.source(source); // System.out.println(source); logger.trace("executing initial graph search request"); - client.search(searchRequest, new ActionListener() { + client.search(searchRequest, new ActionListener.Delegating(listener) { @Override public void onResponse(SearchResponse searchResponse) { addShardFailures(searchResponse.getShardFailures()); @@ -718,11 +713,6 @@ private double getInitialTotalSignalStrength(Hop rootHop, Sampler sample) { } return totalSignalStrength; } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } }); } catch (Exception e) { logger.error("unable to execute the graph query", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 376b916ca640f..48379ccbb2af6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -475,16 +475,6 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo } } return true; - }, request.getCloseTimeout(), new ActionListener() { - @Override - public void onResponse(Boolean result) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + }, request.getCloseTimeout(), listener.delegateFailure((l, r) -> l.onResponse(response))); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index dc23961f3939c..d5ca01e80bb89 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -609,18 +609,7 @@ private void removePersistentTask(String jobId, ClusterState currentState, if (jobTask == null) { listener.onResponse(null); } else { - persistentTasksService.sendRemoveRequest(jobTask.getId(), - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetadata.PersistentTask task) { - listener.onResponse(Boolean.TRUE); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + persistentTasksService.sendRemoveRequest(jobTask.getId(), listener.delegateFailure((l, task) -> l.onResponse(Boolean.TRUE))); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java index 99fc893dc7c38..bb759b520feeb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; @@ -85,24 +84,15 @@ protected void doExecute(Task task, DeleteModelSnapshotAction.Request request, // Delete the snapshot and any associated state files JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId()); deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate), - new ActionListener() { - @Override - public void onResponse(BulkByScrollResponse bulkResponse) { - String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED, - deleteCandidate.getSnapshotId(), deleteCandidate.getDescription()); - - auditor.info(request.getJobId(), msg); - logger.debug(() -> new ParameterizedMessage("[{}] {}", request.getJobId(), msg)); - // We don't care about the bulk response, just that it succeeded - listener.onResponse(AcknowledgedResponse.TRUE); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + listener.delegateFailure((l, bulkResponse) -> { + String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED, + deleteCandidate.getSnapshotId(), deleteCandidate.getDescription()); + auditor.info(request.getJobId(), msg); + logger.debug(() -> new ParameterizedMessage("[{}] {}", request.getJobId(), msg)); + // We don't care about the bulk response, just that it succeeded + l.onResponse(AcknowledgedResponse.TRUE); + })); }, listener::onFailure )); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 67a59cf49addb..8c0b95e733fe2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -85,18 +85,10 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio // Fake DatafeedTimingStatsReporter that does not have access to results index new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy) -> { }), - new ActionListener() { - @Override - public void onResponse(DataExtractorFactory dataExtractorFactory) { - DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE); - threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + listener.delegateFailure((l, dataExtractorFactory) -> { + DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE); + threadPool.generic().execute(() -> previewDatafeed(dataExtractor, l)); + })); }); }, listener::onFailure)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index cf904a3c88604..28aa1ab8a4ccf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -171,17 +171,8 @@ private ActionListener wrapDeleteOldAnnotati eventsToDelete.add(Annotation.Event.DELAYED_DATA.toString()); // Because the model that changed is no longer in use as it has been rolled back to a time before those changes occurred eventsToDelete.add(Annotation.Event.MODEL_CHANGE.toString()); - dataDeleter.deleteAnnotationsFromTime(deleteAfter.getTime() + 1, eventsToDelete, new ActionListener() { - @Override - public void onResponse(Boolean success) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + dataDeleter.deleteAnnotationsFromTime(deleteAfter.getTime() + 1, eventsToDelete, + listener.delegateFailure((l, r) -> l.onResponse(response))); }, listener::onFailure); } @@ -198,17 +189,7 @@ private ActionListener wrapDeleteOldDataList logger.info("[{}] Removing intervening records after reverting model: deleting results after [{}]", jobId, deleteAfter); JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId); - dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener() { - @Override - public void onResponse(Boolean success) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, listener.delegateFailure((l, r) -> l.onResponse(response))); }, listener::onFailure); } @@ -217,22 +198,10 @@ private ActionListener wrapRevertDataCountsL ModelSnapshot modelSnapshot, String jobId) { - return ActionListener.wrap(response -> { - jobResultsProvider.dataCounts(jobId, counts -> { - counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp()); - jobDataCountsPersister.persistDataCountsAsync(jobId, counts, new ActionListener() { - @Override - public void onResponse(Boolean aBoolean) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - }, listener::onFailure); - }, listener::onFailure); + return ActionListener.wrap(response -> jobResultsProvider.dataCounts(jobId, counts -> { + counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp()); + jobDataCountsPersister.persistDataCountsAsync(jobId, counts, listener.delegateFailure((l, r) -> l.onResponse(response))); + }, listener::onFailure), listener::onFailure); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index e7011e49c2576..00e309f6ef740 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -340,17 +340,7 @@ void waitForDatafeedStopped(List datafeedPersistentTaskIds, StopDatafeed } } return true; - }, request.getTimeout(), new ActionListener() { - @Override - public void onResponse(Boolean result) { - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + }, request.getTimeout(), listener.delegateFailure((l, result) -> l.onResponse(response))); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java index 4073381decf66..46817cf38d5b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java @@ -139,31 +139,23 @@ public void onFailure(Exception e) { private void getFilterWithVersion(String filterId, ActionListener listener) { GetRequest getRequest = new GetRequest(MlMetaIndex.indexName(), MlFilter.documentId(filterId)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { - @Override - public void onResponse(GetResponse getDocResponse) { - try { - if (getDocResponse.isExists()) { - BytesReference docSource = getDocResponse.getSourceAsBytesRef(); - try (InputStream stream = docSource.streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build(); - listener.onResponse(new FilterWithSeqNo(filter, getDocResponse)); - } - } else { - this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId))); + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, listener.delegateFailure((l, getDocResponse) -> { + try { + if (getDocResponse.isExists()) { + BytesReference docSource = getDocResponse.getSourceAsBytesRef(); + try (InputStream stream = docSource.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build(); + l.onResponse(new FilterWithSeqNo(filter, getDocResponse)); } - } catch (Exception e) { - this.onFailure(e); + } else { + l.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId))); } + } catch (Exception e) { + l.onFailure(e); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + })); } private static class FilterWithSeqNo { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index f399153e7f3c2..76e5ae5db2cad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -222,21 +222,14 @@ public void findDatafeedsForJobIds(Collection jobIds, ActionListener actionListener) { DeleteRequest request = new DeleteRequest(MlConfigIndex.indexName(), DatafeedConfig.documentId(datafeedId)); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - actionListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); - return; - } - assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; - actionListener.onResponse(deleteResponse); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, actionListener.delegateFailure((l, deleteResponse) -> { + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + return; } - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }); + assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; + l.onResponse(deleteResponse); + })); } /** @@ -259,14 +252,14 @@ public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map updatedConfigListener) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DatafeedConfig.documentId(datafeedId)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, + new ActionListener.Delegating(updatedConfigListener) { @Override public void onResponse(GetResponse getResponse) { if (getResponse.isExists() == false) { - updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + delegate.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); return; } - final long version = getResponse.getVersion(); final long seqNo = getResponse.getSeqNo(); final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); @@ -274,7 +267,7 @@ public void onResponse(GetResponse getResponse) { try { configBuilder = parseLenientlyFromSource(source); } catch (IOException e) { - updatedConfigListener.onFailure( + delegate.onFailure( new ElasticsearchParseException("Failed to parse datafeed config [" + datafeedId + "]", e)); return; } @@ -283,29 +276,17 @@ public void onResponse(GetResponse getResponse) { try { updatedConfig = update.apply(configBuilder.build(), headers); } catch (Exception e) { - updatedConfigListener.onFailure(e); + delegate.onFailure(e); return; } - ActionListener validatedListener = ActionListener.wrap( - ok -> { - indexUpdatedConfig(updatedConfig, seqNo, primaryTerm, ActionListener.wrap( - indexResponse -> { - assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; - updatedConfigListener.onResponse(updatedConfig); - }, - updatedConfigListener::onFailure)); - }, - updatedConfigListener::onFailure - ); - + ActionListener validatedListener = ActionListener.wrap(ok -> indexUpdatedConfig(updatedConfig, seqNo, primaryTerm, + ActionListener.wrap(indexResponse -> { + assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; + delegate.onResponse(updatedConfig); + }, delegate::onFailure)), delegate::onFailure); validator.accept(updatedConfig, validatedListener); } - - @Override - public void onFailure(Exception e) { - updatedConfigListener.onFailure(e); - } }); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java index 8848c56ed551c..808bb00b33172 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java @@ -254,7 +254,7 @@ public void getConfigsForJobsWithTasksLeniently(Set jobsWithTask, Action executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, - new ActionListener() { + new ActionListener.Delegating>(listener) { @Override public void onResponse(SearchResponse searchResponse) { SearchHit[] hits = searchResponse.getHits().getHits(); @@ -266,7 +266,7 @@ public void onResponse(SearchResponse searchResponse) { xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { configs.add(DataFrameAnalyticsConfig.LENIENT_PARSER.apply(parser, null).build()); } catch (IOException e) { - listener.onFailure(e); + delegate.onFailure(e); } } @@ -276,14 +276,8 @@ public void onResponse(SearchResponse searchResponse) { if (tasksWithoutConfigs.isEmpty() == false) { logger.warn("Data frame analytics tasks {} have no configs", tasksWithoutConfigs); } - listener.onResponse(configs); + delegate.onResponse(configs); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, - client::search); + }, client::search); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 23d85194b0f98..9b5f2c157e75b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -201,23 +201,16 @@ public void deleteJob(String jobId, boolean errorIfMissing, ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - if (errorIfMissing) { - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - actionListener.onFailure(ExceptionsHelper.missingJobException(jobId)); - return; - } - assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, actionListener.delegateFailure((l, deleteResponse) -> { + if (errorIfMissing) { + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(ExceptionsHelper.missingJobException(jobId)); + return; } - actionListener.onResponse(deleteResponse); + assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; } - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }); + l.onResponse(deleteResponse); + })); } /** @@ -232,15 +225,15 @@ public void onFailure(Exception e) { * are not changed. * @param updatedJobListener Updated job listener */ - public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - ActionListener updatedJobListener) { + public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, + new ActionListener.Delegating(updatedJobListener) { @Override public void onResponse(GetResponse getResponse) { if (getResponse.isExists() == false) { - updatedJobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + delegate.onFailure(ExceptionsHelper.missingJobException(jobId)); return; } @@ -251,8 +244,7 @@ public void onResponse(GetResponse getResponse) { try { jobBuilder = parseJobLenientlyFromSource(source); } catch (IOException e) { - updatedJobListener.onFailure( - new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); + delegate.onFailure(new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); return; } @@ -261,16 +253,11 @@ public void onResponse(GetResponse getResponse) { // Applying the update may result in a validation error updatedJob = update.mergeWithJob(jobBuilder.build(), maxModelMemoryLimit); } catch (Exception e) { - updatedJobListener.onFailure(e); + delegate.onFailure(e); return; } - indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener); - } - - @Override - public void onFailure(Exception e) { - updatedJobListener.onFailure(e); + indexUpdatedJob(updatedJob, seqNo, primaryTerm, delegate); } }); } @@ -299,11 +286,12 @@ public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValu UpdateValidator validator, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, + new ActionListener.Delegating(updatedJobListener) { @Override public void onResponse(GetResponse getResponse) { if (getResponse.isExists() == false) { - updatedJobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + delegate.onFailure(ExceptionsHelper.missingJobException(jobId)); return; } @@ -314,8 +302,7 @@ public void onResponse(GetResponse getResponse) { try { originalJob = parseJobLenientlyFromSource(source).build(); } catch (Exception e) { - updatedJobListener.onFailure( - new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); + delegate.onFailure(new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); return; } @@ -326,20 +313,15 @@ public void onResponse(GetResponse getResponse) { // Applying the update may result in a validation error updatedJob = update.mergeWithJob(originalJob, maxModelMemoryLimit); } catch (Exception e) { - updatedJobListener.onFailure(e); + delegate.onFailure(e); return; } - indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener); + indexUpdatedJob(updatedJob, seqNo, primaryTerm, delegate); }, - updatedJobListener::onFailure + delegate::onFailure )); } - - @Override - public void onFailure(Exception e) { - updatedJobListener.onFailure(e); - } }); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 3211171917b98..9f91702e4006a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.ToXContent; @@ -97,17 +96,8 @@ public void persistDataCountsAsync(String jobId, DataCounts counts, ActionListen .setRequireAlias(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(content); - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - listener.onResponse(true); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, + listener.delegateFailure((l, r) -> l.onResponse(true))); } catch (IOException ioe) { String msg = new ParameterizedMessage("[{}] Failed writing data_counts stats", jobId).getFormattedMessage(); logger.error(msg, ioe); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index f32929328459d..625b5e4da979c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -210,7 +210,8 @@ public void checkForLeftOverDocuments(Job job, ActionListener listener) .add(resultDocSearch) .add(quantilesDocSearch); - ActionListener searchResponseActionListener = new ActionListener() { + ActionListener searchResponseActionListener = + new ActionListener.Delegating(listener) { @Override public void onResponse(MultiSearchResponse response) { List searchHits = new ArrayList<>(); @@ -232,14 +233,14 @@ public void onResponse(MultiSearchResponse response) { } } } - listener.onFailure(e); + delegate.onFailure(e); return; } searchHits.addAll(Arrays.asList(itemResponse.getResponse().getHits().getHits())); } if (searchHits.isEmpty()) { - listener.onResponse(true); + delegate.onResponse(true); } else { int quantileDocCount = 0; int categorizerStateDocCount = 0; @@ -259,17 +260,12 @@ public void onResponse(MultiSearchResponse response) { LOGGER.warn("{} result, {} quantile state and {} categorizer state documents exist for a prior job with Id [{}]", resultDocCount, quantileDocCount, categorizerStateDocCount, job.getId()); - listener.onFailure(ExceptionsHelper.conflictStatusException( + delegate.onFailure(ExceptionsHelper.conflictStatusException( "[" + resultDocCount + "] result and [" + (quantileDocCount + categorizerStateDocCount) + "] state documents exist for a prior job with Id [" + job.getId() + "]. " + "Please create the job with a different Id")); } } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } }; executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, msearch.request(), searchResponseActionListener, @@ -424,17 +420,9 @@ private void updateIndexMappingWithTermFields(String indexName, String mappingTy final PutMappingRequest request = client.admin().indices().preparePutMapping(indexName) .setType(mappingType) .setSource(termFieldsMapping).request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse putMappingResponse) { - listener.onResponse(putMappingResponse.isAcknowledged()); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, client.admin().indices()::putMapping); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, + listener.delegateFailure((l, putMappingResponse) -> + l.onResponse(putMappingResponse.isAcknowledged())), client.admin().indices()::putMapping); } catch (IOException e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/cat/RestCatTrainedModelsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/cat/RestCatTrainedModelsAction.java index e454bd93cb4ba..5529ce4b28bd2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/cat/RestCatTrainedModelsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/cat/RestCatTrainedModelsAction.java @@ -206,22 +206,14 @@ private GroupedActionListener createGroupedListener(final RestRe final int size, final List configs, final ActionListener
listener) { - return new GroupedActionListener<>(new ActionListener>() { - @Override - public void onResponse(final Collection responses) { - GetTrainedModelsStatsAction.Response statsResponse = extractResponse(responses, GetTrainedModelsStatsAction.Response.class); - GetDataFrameAnalyticsAction.Response analytics = extractResponse(responses, GetDataFrameAnalyticsAction.Response.class); - listener.onResponse(buildTable(request, + return new GroupedActionListener<>(listener.delegateFailure((l, responses) -> { + GetTrainedModelsStatsAction.Response statsResponse = extractResponse(responses, GetTrainedModelsStatsAction.Response.class); + GetDataFrameAnalyticsAction.Response analytics = extractResponse(responses, GetDataFrameAnalyticsAction.Response.class); + l.onResponse(buildTable(request, statsResponse.getResources().results(), configs, analytics == null ? Collections.emptyList() : analytics.getResources().results())); - } - - @Override - public void onFailure(final Exception e) { - listener.onFailure(e); - } - }, size); + }), size); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 2d6e4201f1a2b..a640143cb2a93 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; @@ -89,24 +88,16 @@ protected void masterOperation(PutRollupJobAction.Request request, ClusterState .indices(request.getConfig().getIndexPattern()) .fields(request.getConfig().getAllFields().toArray(new String[0])); - client.fieldCaps(fieldCapsRequest, new ActionListener() { - @Override - public void onResponse(FieldCapabilitiesResponse fieldCapabilitiesResponse) { - ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get()); - if (validationException != null) { - listener.onFailure(validationException); - return; - } - - RollupJob job = createRollupJob(request.getConfig(), threadPool); - createIndex(job, listener, persistentTasksService, client, logger); + client.fieldCaps(fieldCapsRequest, listener.delegateFailure((l, fieldCapabilitiesResponse) -> { + ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get()); + if (validationException != null) { + l.onFailure(validationException); + return; } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + RollupJob job = createRollupJob(request.getConfig(), threadPool); + createIndex(job, l, persistentTasksService, client, logger); + })); } static void checkForDeprecatedTZ(PutRollupJobAction.Request request) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleAction.java index 6563c236534ce..e60d0d89655f9 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleAction.java @@ -38,17 +38,7 @@ protected void doExecute(Task task, DeleteRoleRequest request, ActionListener() { - @Override - public void onResponse(Boolean found) { - listener.onResponse(new DeleteRoleResponse(found)); - } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + rolesStore.deleteRole(request, listener.delegateFailure((l, found) -> l.onResponse(new DeleteRoleResponse(found)))); } catch (Exception e) { logger.error((Supplier) () -> new ParameterizedMessage("failed to delete role [{}]", request.name()), e); listener.onFailure(e); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleAction.java index 54d65f4b9e399..fcb0ddc8f278d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleAction.java @@ -49,21 +49,13 @@ protected void doExecute(Task task, final PutRoleRequest request, final ActionLi return; } - rolesStore.putRole(request, request.roleDescriptor(), new ActionListener() { - @Override - public void onResponse(Boolean created) { - if (created) { - logger.info("added role [{}]", request.name()); - } else { - logger.info("updated role [{}]", request.name()); - } - listener.onResponse(new PutRoleResponse(created)); + rolesStore.putRole(request, request.roleDescriptor(), listener.delegateFailure((l, created) -> { + if (created) { + logger.info("added role [{}]", request.name()); + } else { + logger.info("updated role [{}]", request.name()); } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + l.onResponse(new PutRoleResponse(created)); + })); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingAction.java index 310a48267e142..83b7cee80c857 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingAction.java @@ -31,16 +31,7 @@ public TransportDeleteRoleMappingAction(ActionFilters actionFilters, TransportSe @Override protected void doExecute(Task task, DeleteRoleMappingRequest request, ActionListener listener) { - roleMappingStore.deleteRoleMapping(request, new ActionListener() { - @Override - public void onResponse(Boolean found) { - listener.onResponse(new DeleteRoleMappingResponse(found)); - } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + roleMappingStore.deleteRoleMapping(request, + listener.delegateFailure((l, found) -> l.onResponse(new DeleteRoleMappingResponse(found)))); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordAction.java index 110d0be776354..fda6e987dded1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordAction.java @@ -52,16 +52,6 @@ protected void doExecute(Task task, ChangePasswordRequest request, ActionListene " [" + configPwdHashAlgo + "] is configured.")); return; } - nativeUsersStore.changePassword(request, new ActionListener() { - @Override - public void onResponse(Void v) { - listener.onResponse(ActionResponse.Empty.INSTANCE); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + nativeUsersStore.changePassword(request, listener.delegateFailure((l, v) -> l.onResponse(ActionResponse.Empty.INSTANCE))); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserAction.java index ea6f675d76488..3bfda0f091cf4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserAction.java @@ -50,16 +50,6 @@ protected void doExecute(Task task, DeleteUserRequest request, final ActionListe return; } - usersStore.deleteUser(request, new ActionListener() { - @Override - public void onResponse(Boolean found) { - listener.onResponse(new DeleteUserResponse(found)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + usersStore.deleteUser(request, listener.delegateFailure((l, found) -> l.onResponse(new DeleteUserResponse(found)))); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledAction.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledAction.java index 2cf339ea6ed98..815cc548aa38d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledAction.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledAction.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.SetEnabledAction; @@ -28,16 +27,14 @@ public class TransportSetEnabledAction extends HandledTransportAction { private final Settings settings; - private final ThreadPool threadPool; private final SecurityContext securityContext; private final NativeUsersStore usersStore; @Inject - public TransportSetEnabledAction(Settings settings, ThreadPool threadPool, TransportService transportService, + public TransportSetEnabledAction(Settings settings, TransportService transportService, ActionFilters actionFilters, SecurityContext securityContext, NativeUsersStore usersStore) { super(SetEnabledAction.NAME, transportService, actionFilters, SetEnabledRequest::new); this.settings = settings; - this.threadPool = threadPool; this.securityContext = securityContext; this.usersStore = usersStore; } @@ -57,16 +54,7 @@ protected void doExecute(Task task, SetEnabledRequest request, ActionListener() { - @Override - public void onResponse(Void v) { - listener.onResponse(ActionResponse.Empty.INSTANCE); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + usersStore.setEnabled(username, request.enabled(), request.getRefreshPolicy(), + listener.delegateFailure((l, v) -> l.onResponse(ActionResponse.Empty.INSTANCE))); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java index 7aadbaf2d38bc..b3094a6784d73 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java @@ -168,17 +168,8 @@ void getUserCount(final ActionListener listener) { .setSize(0) .setTrackTotalHits(true) .request(), - new ActionListener() { - @Override - public void onResponse(SearchResponse response) { - listener.onResponse(response.getHits().getTotalHits().value); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, client::search)); + listener.delegateFailure( + (l, response) -> l.onResponse(response.getHits().getTotalHits().value)), client::search)); } } @@ -279,17 +270,7 @@ private void createReservedUser(String username, char[] passwordHash, RefreshPol .setSource(Fields.PASSWORD.getPreferredName(), String.valueOf(passwordHash), Fields.ENABLED.getPreferredName(), true, Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE) .setRefreshPolicy(refresh).request(), - new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - clearRealmCache(username, listener, null); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, client::index); + listener.delegateFailure((l, indexResponse) -> clearRealmCache(username, l, null)), client::index); }); } @@ -370,18 +351,8 @@ private void indexUser(final PutUserRequest putUserRequest, final ActionListener Fields.TYPE.getPreferredName(), USER_DOC_TYPE) .setRefreshPolicy(putUserRequest.getRefreshPolicy()) .request(), - new ActionListener() { - @Override - public void onResponse(IndexResponse updateResponse) { - clearRealmCache(putUserRequest.username(), listener, - updateResponse.getResult() == DocWriteResponse.Result.CREATED); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, client::index); + listener.delegateFailure((l, updateResponse) -> clearRealmCache(putUserRequest.username(), l, + updateResponse.getResult() == DocWriteResponse.Result.CREATED)), client::index); }); } @@ -444,21 +415,13 @@ private void setReservedUserEnabled(final String username, final boolean enabled Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE) .setRefreshPolicy(refreshPolicy) .request(), - new ActionListener() { - @Override - public void onResponse(UpdateResponse updateResponse) { - if (clearCache) { - clearRealmCache(username, listener, null); - } else { - listener.onResponse(null); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + listener.delegateFailure((l, updateResponse) -> { + if (clearCache) { + clearRealmCache(username, l, null); + } else { + l.onResponse(null); } - }, client::update); + }), client::update); }); } @@ -475,18 +438,8 @@ public void deleteUser(final DeleteUserRequest deleteUserRequest, final ActionLi .request(); request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy()); executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, - new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - clearRealmCache(deleteUserRequest.username(), listener, - deleteResponse.getResult() == DocWriteResponse.Result.DELETED); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, client::delete); + listener.delegateFailure((l, deleteResponse) -> clearRealmCache(deleteUserRequest.username(), l, + deleteResponse.getResult() == DocWriteResponse.Result.DELETED)), client::delete); }); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java index 948fe0932717b..051a2ab811d08 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java @@ -270,20 +270,12 @@ public void getRoleMappings(Set names, ActionListener>() { - @Override - public void onResponse(List mappings) { - final List filtered = mappings.stream() - .filter(m -> names.contains(m.getName())) - .collect(Collectors.toList()); - listener.onResponse(filtered); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + getMappings(listener.delegateFailure((l, mappings) -> { + final List filtered = mappings.stream() + .filter(m -> names.contains(m.getName())) + .collect(Collectors.toList()); + l.onResponse(filtered); + })); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index 7503eacf5ca2e..0908caa02ad76 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -276,7 +276,7 @@ public void usageStats(ActionListener> listener) { .setSize(0) .setTerminateAfter(1)) .request(), - new ActionListener() { + new ActionListener.Delegating>(listener) { @Override public void onResponse(MultiSearchResponse items) { Item[] responses = items.getResponses(); @@ -296,12 +296,7 @@ public void onResponse(MultiSearchResponse items) { } else { usageStats.put("dls", responses[2].getResponse().getHits().getTotalHits().value > 0L); } - listener.onResponse(usageStats); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + delegate.onResponse(usageStats); } }, client::multiSearch)); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java index cdbd35bdd13cc..a4d763f2ef803 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java @@ -72,7 +72,7 @@ public void testAnonymousUser() throws Exception { TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - TransportSetEnabledAction action = new TransportSetEnabledAction(settings, threadPool, transportService, mock(ActionFilters.class), + TransportSetEnabledAction action = new TransportSetEnabledAction(settings, transportService, mock(ActionFilters.class), securityContext, usersStore); SetEnabledRequest request = new SetEnabledRequest(); @@ -114,7 +114,7 @@ public void testInternalUser() throws Exception { TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, threadPool, transportService, + TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, transportService, mock(ActionFilters.class), securityContext, usersStore); SetEnabledRequest request = new SetEnabledRequest(); @@ -172,7 +172,7 @@ public Void answer(InvocationOnMock invocation) { TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, threadPool, transportService, + TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, transportService, mock(ActionFilters.class), securityContext, usersStore); final AtomicReference throwableRef = new AtomicReference<>(); @@ -227,7 +227,7 @@ public Void answer(InvocationOnMock invocation) { TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, threadPool, transportService, + TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, transportService, mock(ActionFilters.class), securityContext, usersStore); final AtomicReference throwableRef = new AtomicReference<>(); @@ -270,7 +270,7 @@ public void testUserModifyingThemselves() throws Exception { TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, threadPool, transportService, + TransportSetEnabledAction action = new TransportSetEnabledAction(Settings.EMPTY, transportService, mock(ActionFilters.class), securityContext, usersStore); final AtomicReference throwableRef = new AtomicReference<>(); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggCursor.java index ffdfdab477d85..847a474058e19 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggCursor.java @@ -135,21 +135,16 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry SearchRequest request = Querier.prepareRequest(client, query, cfg.pageTimeout(), includeFrozen, indices); - client.search(request, new ActionListener() { + client.search(request, new ActionListener.Delegating(listener) { @Override public void onResponse(SearchResponse response) { handle(response, request.source(), makeRowSet(response), makeCursor(), () -> client.search(request, this), - listener, + delegate, Schema.EMPTY); } - - @Override - public void onFailure(Exception ex) { - listener.onFailure(ex); - } }); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index 802b8a47b4fdb..cc93ba47c566f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -188,10 +188,7 @@ protected static void logSearchResponse(SearchResponse response, Logger logger) * results back to the client. */ @SuppressWarnings("rawtypes") - class LocalAggregationSorterListener implements ActionListener { - - private final ActionListener listener; - + class LocalAggregationSorterListener extends ActionListener.Delegating { // keep the top N entries. private final AggSortingQueue data; private final AtomicInteger counter = new AtomicInteger(); @@ -202,7 +199,7 @@ class LocalAggregationSorterListener implements ActionListener { private final boolean noLimit; LocalAggregationSorterListener(ActionListener listener, List> sortingColumns, int limit) { - this.listener = listener; + super(listener); int size = MAXIMUM_SIZE; if (limit < 0) { @@ -266,12 +263,7 @@ private boolean consumeRowSet(RowSet rowSet) { } private void sendResponse() { - listener.onResponse(ListCursor.of(schema, data.asList(), cfg.pageSize())); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + delegate.onResponse(ListCursor.of(schema, data.asList(), cfg.pageSize())); } } @@ -280,7 +272,7 @@ public void onFailure(Exception e) { */ static class ImplicitGroupActionListener extends BaseAggActionListener { - private static List EMPTY_BUCKET = singletonList(new Bucket() { + private static final List EMPTY_BUCKET = singletonList(new Bucket() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -345,11 +337,10 @@ private void handleBuckets(List buckets, SearchResponse respon for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) { values[index++] = extractors.get(i).extract(implicitGroup); } - listener.onResponse(Page.last(Rows.singleton(schema, values))); + delegate.onResponse(Page.last(Rows.singleton(schema, values))); } else if (buckets.isEmpty()) { - listener.onResponse(Page.last(Rows.empty(schema))); - + delegate.onResponse(Page.last(Rows.empty(schema))); } else { throw new SqlIllegalArgumentException("Too many groups returned by the implicit group; expected 1, received {}", buckets.size()); @@ -529,9 +520,7 @@ private HitExtractor createExtractor(FieldExtraction ref) { * Base listener class providing clean-up and exception handling. * Handles both scroll queries (scan/scroll) and regular/composite-aggs queries. */ - abstract static class BaseActionListener implements ActionListener { - - final ActionListener listener; + abstract static class BaseActionListener extends ActionListener.Delegating { final Client client; final SqlConfiguration cfg; @@ -539,7 +528,7 @@ abstract static class BaseActionListener implements ActionListener listener, Client client, SqlConfiguration cfg, List output) { - this.listener = listener; + super(listener); this.client = client; this.cfg = cfg; @@ -555,7 +544,7 @@ public void onResponse(final SearchResponse response) { if (CollectionUtils.isEmpty(failure) == false) { cleanup(response, new SqlIllegalArgumentException(failure[0].reason(), failure[0].getCause())); } else { - handleResponse(response, ActionListener.wrap(listener::onResponse, e -> cleanup(response, e))); + handleResponse(response, ActionListener.wrap(delegate::onResponse, e -> cleanup(response, e))); } } catch (Exception ex) { cleanup(response, ex); @@ -569,12 +558,12 @@ protected final void cleanup(SearchResponse response, Exception ex) { if (response != null && response.getScrollId() != null) { client.prepareClearScroll().addScrollId(response.getScrollId()) // in case of failure, report the initial exception instead of the one resulting from cleaning the scroll - .execute(ActionListener.wrap(r -> listener.onFailure(ex), e -> { + .execute(ActionListener.wrap(r -> delegate.onFailure(ex), e -> { ex.addSuppressed(e); - listener.onFailure(ex); + delegate.onFailure(ex); })); } else { - listener.onFailure(ex); + delegate.onFailure(ex); } } @@ -586,11 +575,6 @@ protected final void clear(String scrollId, ActionListener listener) { listener.onResponse(false); } } - - @Override - public final void onFailure(Exception ex) { - listener.onFailure(ex); - } } @SuppressWarnings("rawtypes")