From c51d707d89c52b9fa406055edae0ae148e1abd70 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 19 Dec 2023 07:29:18 -0800 Subject: [PATCH] Remove workaround channel listener in ESQL (#103537) Relates #100528 --- .../compute/OwningChannelActionListener.java | 44 ------------------- .../compute/operator/DriverTaskRunner.java | 4 +- .../operator/exchange/ExchangeService.java | 4 +- .../exchange/ExchangeSinkHandler.java | 2 +- .../esql/enrich/EnrichLookupService.java | 6 +-- .../esql/enrich/EnrichPolicyResolver.java | 4 +- .../xpack/esql/plugin/ComputeService.java | 4 +- 7 files changed, 12 insertions(+), 56 deletions(-) delete mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/OwningChannelActionListener.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/OwningChannelActionListener.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/OwningChannelActionListener.java deleted file mode 100644 index 50a20ee6ee73d..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/OwningChannelActionListener.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponse; - -/** - * Wraps a {@link ChannelActionListener} and takes ownership of responses passed to - * {@link org.elasticsearch.action.ActionListener#onResponse(Object)}; the reference count will be decreased once sending is done. - * - * Deprecated: use {@link ChannelActionListener} instead and ensure responses sent to it are properly closed after. - */ -@Deprecated(forRemoval = true) -public final class OwningChannelActionListener implements ActionListener { - private final ChannelActionListener listener; - - public OwningChannelActionListener(TransportChannel channel) { - this.listener = new ChannelActionListener<>(channel); - } - - @Override - public void onResponse(Response response) { - ActionListener.respondAndRelease(listener, response); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - public String toString() { - return "OwningChannelActionListener{" + listener + "}"; - } - -} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java index 1293118680824..38d879f8f7ad4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java @@ -11,9 +11,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.compute.OwningChannelActionListener; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -117,7 +117,7 @@ public Status getStatus() { private record DriverRequestHandler(TransportService transportService) implements TransportRequestHandler { @Override public void messageReceived(DriverRequest request, TransportChannel channel, Task task) { - var listener = new OwningChannelActionListener(channel); + var listener = new ChannelActionListener(channel); Driver.start( transportService.getThreadPool().getThreadContext(), request.executor, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 8fb38ccf907d6..ab9582b20d4aa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -13,6 +13,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; @@ -21,7 +22,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.compute.OwningChannelActionListener; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.core.TimeValue; @@ -193,7 +193,7 @@ private class ExchangeTransportAction implements TransportRequestHandler listener = new OwningChannelActionListener<>(channel); + ActionListener listener = new ChannelActionListener<>(channel); final ExchangeSinkHandler sinkHandler = sinks.get(exchangeId); if (sinkHandler == null) { listener.onResponse(new ExchangeResponse(null, true)); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java index 9f921c40851d4..c8a6dd9128d16 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java @@ -139,7 +139,7 @@ private void notifyListeners() { promised.release(); } onChanged(); - listener.onResponse(response); + ActionListener.respondAndRelease(listener, response); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 945f543329c15..6d57b239e94a0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; @@ -24,7 +25,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.compute.OwningChannelActionListener; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockStreamInput; @@ -369,7 +369,7 @@ private class TransportHandler implements TransportRequestHandler @Override public void messageReceived(LookupRequest request, TransportChannel channel, Task task) { request.incRef(); - ActionListener listener = ActionListener.runBefore(new OwningChannelActionListener<>(channel), request::decRef); + ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel), request::decRef); doLookup( request.sessionId, (CancellableTask) task, @@ -378,7 +378,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas request.matchField, request.inputPage, request.extractFields, - listener.map(LookupResponse::new) + listener.delegateFailureAndWrap((l, outPage) -> ActionListener.respondAndRelease(l, new LookupResponse(outPage))) ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 246849896bcdf..1e21886a7ac4b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -9,12 +9,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.compute.OwningChannelActionListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -110,7 +110,7 @@ public void messageReceived(ResolveRequest request, TransportChannel channel, Ta String policyName = request.policyName; EnrichPolicy policy = policies().get(policyName); ThreadContext threadContext = threadPool.getThreadContext(); - ActionListener listener = new OwningChannelActionListener<>(channel); + ActionListener listener = new ChannelActionListener<>(channel); listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext); try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) { indexResolver.resolveAsMergedMapping( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 20fcc05e80440..53c5cf3c8698a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.search.SearchShardsRequest; import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.search.TransportSearchShardsAction; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingRunnable; @@ -26,7 +27,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.compute.OwningChannelActionListener; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; @@ -502,7 +502,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T final var sessionId = request.sessionId(); final var exchangeSink = exchangeService.getSinkHandler(sessionId); parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled"))); - final ActionListener listener = new OwningChannelActionListener<>(channel); + final ActionListener listener = new ChannelActionListener<>(channel); final EsqlConfiguration configuration = request.configuration(); acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> { var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink);