Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup more ActionListener Delegation Spots #69662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public RestResponse buildResponse(final Table table) throws Exception {
}
});

sendGetSettingsRequest(indices, indicesOptions, masterNodeTimeout, client, new ActionListener<>() {
sendGetSettingsRequest(indices, indicesOptions, masterNodeTimeout, client, new ActionListener.Delegating<>(listener) {
@Override
public void onResponse(final GetSettingsResponse getSettingsResponse) {
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, listener);
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, delegate);
groupedListener.onResponse(getSettingsResponse);

// The list of indices that will be returned is determined by the indices returned from the Get Settings call.
Expand All @@ -123,11 +123,6 @@ public void onResponse(final GetSettingsResponse getSettingsResponse) {
sendClusterHealthRequest(indices, subRequestIndicesOptions, masterNodeTimeout, client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
});
};
}
Expand Down Expand Up @@ -199,7 +194,7 @@ private void sendIndicesStatsRequest(final String[] indices,

private GroupedActionListener<ActionResponse> createGroupedListener(final RestRequest request, final int size,
final ActionListener<Table> listener) {
return new GroupedActionListener<>(new ActionListener<>() {
return new GroupedActionListener<>(new ActionListener.Delegating<>(listener) {
@Override
public void onResponse(final Collection<ActionResponse> responses) {
try {
Expand All @@ -219,16 +214,11 @@ public void onResponse(final Collection<ActionResponse> responses) {
Map<String, IndexStats> indicesStats = statsResponse.getIndices();

Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates);
listener.onResponse(responseTable);
delegate.onResponse(responseTable);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}, size);
}

Expand Down
60 changes: 22 additions & 38 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,10 @@ protected void doClose() {
public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInContext,
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest rewritten) {
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
});
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), l);
}));
}

private DfsSearchResult executeDfsPhase(ShardSearchRequest request,
Expand Down Expand Up @@ -385,34 +377,26 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
final CanMatchResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
listener.onFailure(exc);
return;
}
if (canMatchResp.canMatch == false) {
listener.onResponse(QuerySearchResult.nullInstance());
return;
}
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) -> {
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
final CanMatchResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
l.onFailure(exc);
return;
}
if (canMatchResp.canMatch == false) {
l.onResponse(QuerySearchResult.nullInstance());
return;
}
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
});
// fork the execution in the search thread pool
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), l);
}));
}

private IndexShard getShard(ShardSearchRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,11 @@ public void onFailure(Exception e) {
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());

connectionManager.connectToNode(node, null, clusterNameValidator, new ActionListener<>() {
@Override
public void onResponse(Void v) {
compositeListener.onResponse(v);
}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
connectionManager.connectToNode(node, null, clusterNameValidator, compositeListener.delegateResponse((l, e) -> {
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
clusterAlias, resolved), e);
compositeListener.onFailure(e);
}
});
l.onFailure(e);
}));
}
} else {
int openConnections = connectionManager.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,33 +102,28 @@ protected void shardOperation(final ForgetFollowerAction.Request request, final

final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id());

indexShard.acquirePrimaryOperationPermit(new ActionListener<Releasable>() {
indexShard.acquirePrimaryOperationPermit(new ActionListener.Delegating<>(listener) {
@Override
public void onResponse(Releasable releasable) {
try {
indexShard.removeRetentionLease(id, new ActionListener<ReplicationResponse>() {
@Override
public void onResponse(ReplicationResponse replicationResponse) {
releasable.close();
listener.onResponse(EmptyResult.INSTANCE);
delegate.onResponse(EmptyResult.INSTANCE);
}

@Override
public void onFailure(Exception e) {
releasable.close();
listener.onFailure(e);
delegate.onFailure(e);
}
});
} catch (Exception e) {
releasable.close();
listener.onFailure(e);
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, ThreadPool.Names.SAME, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,16 @@ protected List<XPackUsageFeatureAction> usageActions() {

@Override
protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = new ActionListener<>() {
@Override
public void onResponse(List<Usage> usages) {
listener.onResponse(new XPackUsageResponse(usages));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener =
listener.delegateFailure((l, usages) -> l.onResponse(new XPackUsageResponse(usages)));
final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(usageActions.size());
final AtomicInteger position = new AtomicInteger(0);
final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) -> {
client.executeLocally(featureUsageAction, request, new ActionListener<>() {
@Override
public void onResponse(XPackUsageFeatureResponse usageResponse) {
final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) ->
client.executeLocally(featureUsageAction, request, iteratingListener.delegateFailure((l, usageResponse) -> {
featureSetUsages.set(position.getAndIncrement(), usageResponse.getUsage());
// the value sent back doesn't matter since our predicate keeps iterating
iteratingListener.onResponse(Collections.emptyList());
}

@Override
public void onFailure(Exception e) {
iteratingListener.onFailure(e);
}
});
};
l.onResponse(Collections.emptyList());
}));
IteratingActionListener<List<XPackFeatureSet.Usage>, XPackUsageFeatureAction> iteratingActionListener =
new IteratingActionListener<>(usageActionListener, consumer, usageActions,
threadPool.getThreadContext(), (ignore) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,8 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId,
if (expirationTimeMillis != -1) {
task.setExpirationTime(expirationTimeMillis);
}
addCompletionListener.apply(task, new ActionListener<>() {
@Override
public void onResponse(Response response) {
sendFinalResponse(request, response, nowInMillis, listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
}, request.getWaitForCompletionTimeout());
addCompletionListener.apply(task, listener.delegateFailure((l, response) ->
sendFinalResponse(request, response, nowInMillis, l)), request.getWaitForCompletionTimeout());
} catch (Exception exc) {
listener.onFailure(exc);
}
Expand All @@ -146,18 +137,7 @@ private void getSearchResponseFromIndex(AsyncExecutionId searchId,
GetAsyncResultRequest request,
long nowInMillis,
ActionListener<Response> listener) {
store.getResponse(searchId, true,
new ActionListener<>() {
@Override
public void onResponse(Response response) {
sendFinalResponse(request, response, nowInMillis, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> sendFinalResponse(request, response, nowInMillis, l)));
}

private void sendFinalResponse(GetAsyncResultRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,18 +355,8 @@ public <T extends AsyncTask, SR extends SearchStatusResponse> void retrieveStatu
SR response = statusProducerFromTask.apply(asyncTask);
sendFinalStatusResponse(request, response, listener);
} else { // get status response from index
getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex,
new ActionListener<>() {
@Override
public void onResponse(SR searchStatusResponse) {
sendFinalStatusResponse(request, searchStatusResponse, listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex, listener.delegateFailure(
(l, searchStatusResponse) -> sendFinalStatusResponse(request, searchStatusResponse, l)));
}
} catch (Exception exc) {
listener.onFailure(exc);
Expand Down
Loading