Skip to content

Commit

Permalink
Cleanup more ActionListener Delegation Spots (#69662) (#70438)
Browse files Browse the repository at this point in the history
Cleaning up the remaining spots where the short-wrapper methods
could be used that I could find.
  • Loading branch information
original-brownbear authored Mar 16, 2021
1 parent 57eaa54 commit 43c437c
Show file tree
Hide file tree
Showing 37 changed files with 346 additions and 832 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ public RestResponse buildResponse(final Table table) throws Exception {
}
});

sendGetSettingsRequest(indices, indicesOptions, local, masterNodeTimeout, client, new ActionListener<GetSettingsResponse>() {
sendGetSettingsRequest(indices, indicesOptions, local, masterNodeTimeout, client,
new ActionListener.Delegating<GetSettingsResponse, Table>(listener) {
@Override
public void onResponse(final GetSettingsResponse getSettingsResponse) {
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, listener);
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, delegate);
groupedListener.onResponse(getSettingsResponse);

// The list of indices that will be returned is determined by the indices returned from the Get Settings call.
Expand All @@ -132,11 +133,6 @@ public void onResponse(final GetSettingsResponse getSettingsResponse) {
sendClusterHealthRequest(indices, subRequestIndicesOptions, local, masterNodeTimeout, client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
});
};
}
Expand Down Expand Up @@ -214,7 +210,7 @@ private void sendIndicesStatsRequest(final String[] indices,

private GroupedActionListener<ActionResponse> createGroupedListener(final RestRequest request, final int size,
final ActionListener<Table> listener) {
return new GroupedActionListener<>(new ActionListener<Collection<ActionResponse>>() {
return new GroupedActionListener<>(new ActionListener.Delegating<Collection<ActionResponse>, Table>(listener) {
@Override
public void onResponse(final Collection<ActionResponse> responses) {
try {
Expand All @@ -234,16 +230,11 @@ public void onResponse(final Collection<ActionResponse> responses) {
Map<String, IndexStats> indicesStats = statsResponse.getIndices();

Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates);
listener.onResponse(responseTable);
delegate.onResponse(responseTable);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}, size);
}

Expand Down
60 changes: 22 additions & 38 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,10 @@ protected void doClose() {
public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInContext,
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest rewritten) {
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
});
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), l);
}));
}

private DfsSearchResult executeDfsPhase(ShardSearchRequest request,
Expand Down Expand Up @@ -385,34 +377,26 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
final CanMatchResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
listener.onFailure(exc);
return;
}
if (canMatchResp.canMatch == false) {
listener.onResponse(QuerySearchResult.nullInstance());
return;
}
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) -> {
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
final CanMatchResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
l.onFailure(exc);
return;
}
if (canMatchResp.canMatch == false) {
l.onResponse(QuerySearchResult.nullInstance());
return;
}
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
});
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), l);
}));
}

private IndexShard getShard(ShardSearchRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,11 @@ public void onFailure(Exception e) {
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());

connectionManager.connectToNode(node, null, clusterNameValidator, new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
compositeListener.onResponse(v);
}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
connectionManager.connectToNode(node, null, clusterNameValidator, compositeListener.delegateResponse((l, e) -> {
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
clusterAlias, resolved), e);
compositeListener.onFailure(e);
}
});
l.onFailure(e);
}));
}
} else {
int openConnections = connectionManager.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,33 +102,28 @@ protected void shardOperation(final ForgetFollowerAction.Request request, final

final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id());

indexShard.acquirePrimaryOperationPermit(new ActionListener<Releasable>() {
indexShard.acquirePrimaryOperationPermit(new ActionListener.Delegating<Releasable, EmptyResult>(listener) {
@Override
public void onResponse(Releasable releasable) {
try {
indexShard.removeRetentionLease(id, new ActionListener<ReplicationResponse>() {
@Override
public void onResponse(ReplicationResponse replicationResponse) {
releasable.close();
listener.onResponse(EmptyResult.INSTANCE);
delegate.onResponse(EmptyResult.INSTANCE);
}

@Override
public void onFailure(Exception e) {
releasable.close();
listener.onFailure(e);
delegate.onFailure(e);
}
});
} catch (Exception e) {
releasable.close();
listener.onFailure(e);
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, ThreadPool.Names.SAME, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,16 @@ public TransportXPackUsageAction(ThreadPool threadPool, TransportService transpo

@Override
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = new ActionListener<List<Usage>>() {
@Override
public void onResponse(List<Usage> usages) {
listener.onResponse(new XPackUsageResponse(usages));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener =
listener.delegateFailure((l, usages) -> l.onResponse(new XPackUsageResponse(usages)));
final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(featureSets.size());
final AtomicInteger position = new AtomicInteger(0);
final BiConsumer<XPackFeatureSet, ActionListener<List<Usage>>> consumer = (featureSet, iteratingListener) -> {
assert Transports.assertNotTransportThread("calculating usage can be more expensive than we allow on transport threads");
featureSet.usage(new ActionListener<Usage>() {
@Override
public void onResponse(Usage usage) {
featureSetUsages.set(position.getAndIncrement(), usage);
// the value sent back doesn't matter since our predicate keeps iterating
ActionRunnable<?> invokeListener = ActionRunnable.supply(iteratingListener, Collections::emptyList);
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(invokeListener);
}

@Override
public void onFailure(Exception e) {
iteratingListener.onFailure(e);
}
});
featureSet.usage(iteratingListener.delegateFailure((l, usage) -> {
featureSetUsages.set(position.getAndIncrement(), usage);
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(ActionRunnable.supply(iteratingListener, Collections::emptyList));
}));
};
IteratingActionListener<List<XPackFeatureSet.Usage>, XPackFeatureSet> iteratingActionListener =
new IteratingActionListener<>(usageActionListener, consumer, featureSets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,8 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId,
if (expirationTimeMillis != -1) {
task.setExpirationTime(expirationTimeMillis);
}
addCompletionListener.apply(task, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
sendFinalResponse(request, response, nowInMillis, listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
}, request.getWaitForCompletionTimeout());
addCompletionListener.apply(task, listener.delegateFailure((l, response) ->
sendFinalResponse(request, response, nowInMillis, l)), request.getWaitForCompletionTimeout());
} catch (Exception exc) {
listener.onFailure(exc);
}
Expand All @@ -146,18 +137,7 @@ private void getSearchResponseFromIndex(AsyncExecutionId searchId,
GetAsyncResultRequest request,
long nowInMillis,
ActionListener<Response> listener) {
store.getResponse(searchId, true,
new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
sendFinalResponse(request, response, nowInMillis, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> sendFinalResponse(request, response, nowInMillis, l)));
}

private void sendFinalResponse(GetAsyncResultRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,8 @@ public <T extends AsyncTask, SR extends SearchStatusResponse> void retrieveStatu
SR response = statusProducerFromTask.apply(asyncTask);
sendFinalStatusResponse(request, response, listener);
} else { // get status response from index
getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex,
new ActionListener<SR>() {
@Override
public void onResponse(SR searchStatusResponse) {
sendFinalStatusResponse(request, searchStatusResponse, listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex, listener.delegateFailure(
(l, searchStatusResponse) -> sendFinalStatusResponse(request, searchStatusResponse, l)));
}
} catch (Exception exc) {
listener.onFailure(exc);
Expand Down
Loading

0 comments on commit 43c437c

Please sign in to comment.