Skip to content

Commit

Permalink
Remove workaround channel listener in ESQL (elastic#103537)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn authored and navarone-feekery committed Dec 21, 2023
1 parent 284ccd2 commit 0aa8689
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 56 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -117,7 +117,7 @@ public Status getStatus() {
private record DriverRequestHandler(TransportService transportService) implements TransportRequestHandler<DriverRequest> {
@Override
public void messageReceived(DriverRequest request, TransportChannel channel, Task task) {
var listener = new OwningChannelActionListener<TransportResponse.Empty>(channel);
var listener = new ChannelActionListener<TransportResponse.Empty>(channel);
Driver.start(
transportService.getThreadPool().getThreadContext(),
request.executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
Expand All @@ -21,7 +22,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -193,7 +193,7 @@ private class ExchangeTransportAction implements TransportRequestHandler<Exchang
@Override
public void messageReceived(ExchangeRequest request, TransportChannel channel, Task task) {
final String exchangeId = request.exchangeId();
ActionListener<ExchangeResponse> listener = new OwningChannelActionListener<>(channel);
ActionListener<ExchangeResponse> listener = new ChannelActionListener<>(channel);
final ExchangeSinkHandler sinkHandler = sinks.get(exchangeId);
if (sinkHandler == null) {
listener.onResponse(new ExchangeResponse(null, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void notifyListeners() {
promised.release();
}
onChanged();
listener.onResponse(response);
ActionListener.respondAndRelease(listener, response);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -24,7 +25,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
Expand Down Expand Up @@ -369,7 +369,7 @@ private class TransportHandler implements TransportRequestHandler<LookupRequest>
@Override
public void messageReceived(LookupRequest request, TransportChannel channel, Task task) {
request.incRef();
ActionListener<LookupResponse> listener = ActionListener.runBefore(new OwningChannelActionListener<>(channel), request::decRef);
ActionListener<LookupResponse> listener = ActionListener.runBefore(new ChannelActionListener<>(channel), request::decRef);
doLookup(
request.sessionId,
(CancellableTask) task,
Expand All @@ -378,7 +378,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
request.matchField,
request.inputPage,
request.extractFields,
listener.map(LookupResponse::new)
listener.delegateFailureAndWrap((l, outPage) -> ActionListener.respondAndRelease(l, new LookupResponse(outPage)))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -110,7 +110,7 @@ public void messageReceived(ResolveRequest request, TransportChannel channel, Ta
String policyName = request.policyName;
EnrichPolicy policy = policies().get(policyName);
ThreadContext threadContext = threadPool.getThreadContext();
ActionListener<ResolveResponse> listener = new OwningChannelActionListener<>(channel);
ActionListener<ResolveResponse> listener = new ChannelActionListener<>(channel);
listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
indexResolver.resolveAsMergedMapping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.search.TransportSearchShardsAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
Expand All @@ -26,7 +27,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
Expand Down Expand Up @@ -502,7 +502,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
final var sessionId = request.sessionId();
final var exchangeSink = exchangeService.getSinkHandler(sessionId);
parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled")));
final ActionListener<DataNodeResponse> listener = new OwningChannelActionListener<>(channel);
final ActionListener<DataNodeResponse> listener = new ChannelActionListener<>(channel);
final EsqlConfiguration configuration = request.configuration();
acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink);
Expand Down

0 comments on commit 0aa8689

Please sign in to comment.