Skip to content

Commit

Permalink
Remove dangerous default executor from HandledTransportAction (elasti…
Browse files Browse the repository at this point in the history
…c#100162)

Today subclasses of `HandledTransportAction` can specify the executor on
which they run, but the executor is optional and if omitted will use
`DIRECT_EXECUTOR_SERVICE`, which means the action runs on a transport
thread. This is a dangerous default behaviour because it makes it easy
to add new transport actions which implicitly run on a network thread,
which is very hard to pick up in reviews.

This commit makes the executor explicit in all callers, and marks the
dangerous methods for removal.
  • Loading branch information
DaveCTurner authored Oct 3, 2023
1 parent 9f08c33 commit 0a31ce6
Show file tree
Hide file tree
Showing 182 changed files with 869 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand All @@ -30,7 +31,7 @@ public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest,

@Inject
public TransportNoopBulkAction(TransportService transportService, ActionFilters actionFilters) {
super(NoopBulkAction.NAME, transportService, actionFilters, BulkRequest::new);
super(NoopBulkAction.NAME, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand All @@ -30,7 +31,13 @@
public class TransportNoopSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@Inject
public TransportNoopSearchAction(TransportService transportService, ActionFilters actionFilters) {
super(NoopSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
super(
NoopSearchAction.NAME,
transportService,
actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.grok.GrokBuiltinPatterns;
import org.elasticsearch.grok.PatternBank;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -139,7 +140,7 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
PatternBank legacyGrokPatterns,
PatternBank ecsV1GrokPatterns
) {
super(NAME, transportService, actionFilters, Request::new);
super(NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.legacyGrokPatterns = legacyGrokPatterns.bank();
this.sortedLegacyGrokPatterns = new TreeMap<>(this.legacyGrokPatterns);
this.ecsV1GrokPatterns = ecsV1GrokPatterns.bank();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -44,7 +45,13 @@ public TransportMultiSearchTemplateAction(
NodeClient client,
UsageService usageService
) {
super(MultiSearchTemplateAction.NAME, transportService, actionFilters, MultiSearchTemplateRequest::new);
super(
MultiSearchTemplateAction.NAME,
transportService,
actionFilters,
MultiSearchTemplateRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -54,7 +55,7 @@ public TransportSearchTemplateAction(
NodeClient client,
UsageService usageService
) {
super(SearchTemplateAction.NAME, transportService, actionFilters, SearchTemplateRequest::new);
super(SearchTemplateAction.NAME, transportService, actionFilters, SearchTemplateRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.painless.PainlessScriptEngine;
import org.elasticsearch.painless.lookup.PainlessLookup;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -142,7 +143,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters, PainlessScriptEngine painlessScriptEngine) {
super(NAME, transportService, actionFilters, (Writeable.Reader<Request>) Request::new);
super(NAME, transportService, actionFilters, (Writeable.Reader<Request>) Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.painlessScriptEngine = painlessScriptEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -69,7 +70,7 @@ public TransportRankEvalAction(
ScriptService scriptService,
NamedXContentRegistry namedXContentRegistry
) {
super(RankEvalAction.NAME, transportService, actionFilters, RankEvalRequest::new);
super(RankEvalAction.NAME, transportService, actionFilters, RankEvalRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.scriptService = scriptService;
this.namedXContentRegistry = namedXContentRegistry;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
Expand All @@ -40,7 +41,7 @@ public TransportDeleteByQueryAction(
ScriptService scriptService,
ClusterService clusterService
) {
super(DeleteByQueryAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new);
super(DeleteByQueryAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ReindexAction;
Expand Down Expand Up @@ -82,7 +83,7 @@ protected TransportReindexAction(
TransportService transportService,
ReindexSslConfig sslConfig
) {
super(name, transportService, actionFilters, ReindexRequest::new);
super(name, transportService, actionFilters, ReindexRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.client = client;
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ScrollableHitSource;
Expand Down Expand Up @@ -53,7 +54,7 @@ public TransportUpdateByQueryAction(
ScriptService scriptService,
ClusterService clusterService
) {
super(UpdateByQueryAction.NAME, transportService, actionFilters, UpdateByQueryRequest::new);
super(UpdateByQueryAction.NAME, transportService, actionFilters, UpdateByQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
Expand All @@ -33,7 +34,7 @@ public TransportMainAction(
ActionFilters actionFilters,
ClusterService clusterService
) {
super(MainAction.NAME, transportService, actionFilters, MainRequest::new);
super(MainAction.NAME, transportService, actionFilters, MainRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterService = clusterService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -185,7 +186,7 @@ public TestTransportAction(
PluginsService pluginsService,
ThreadPool threadPool
) {
super(NAME, transportService, actionFilters, in -> new TestRequest());
super(NAME, transportService, actionFilters, in -> new TestRequest(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
testPlugin = pluginsService.filterPlugins(TestPlugin.class).get(0);
this.threadPool = threadPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -129,7 +130,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters, MasterHistoryService masterHistoryService) {
super(MasterHistoryAction.NAME, transportService, actionFilters, MasterHistoryAction.Request::new);
super(MasterHistoryAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.masterHistoryService = masterHistoryService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public TransportGetTaskAction(
Client client,
NamedXContentRegistry xContentRegistry
) {
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new);
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.RemoteClusterServerInfo;
Expand Down Expand Up @@ -98,7 +99,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters) {
super(RemoteClusterNodesAction.NAME, transportService, actionFilters, Request::new);
super(RemoteClusterNodesAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.transportService = transportService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
Expand All @@ -30,7 +31,7 @@ public TransportRemoteInfoAction(
ActionFilters actionFilters,
SearchTransportService searchTransportService
) {
super(RemoteInfoAction.NAME, transportService, actionFilters, RemoteInfoRequest::new);
super(RemoteInfoAction.NAME, transportService, actionFilters, RemoteInfoRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.remoteClusterService = searchTransportService.getRemoteClusterService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptContextInfo;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
Expand All @@ -24,7 +25,13 @@ public class TransportGetScriptContextAction extends HandledTransportAction<GetS

@Inject
public TransportGetScriptContextAction(TransportService transportService, ActionFilters actionFilters, ScriptService scriptService) {
super(GetScriptContextAction.NAME, transportService, actionFilters, GetScriptContextRequest::new);
super(
GetScriptContextAction.NAME,
transportService,
actionFilters,
GetScriptContextRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.scriptService = scriptService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand All @@ -21,7 +22,13 @@ public class TransportGetScriptLanguageAction extends HandledTransportAction<Get

@Inject
public TransportGetScriptLanguageAction(TransportService transportService, ActionFilters actionFilters, ScriptService scriptService) {
super(GetScriptLanguageAction.NAME, transportService, actionFilters, GetScriptLanguageRequest::new);
super(
GetScriptLanguageAction.NAME,
transportService,
actionFilters,
GetScriptLanguageRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.scriptService = scriptService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.LocalAllocateDangledIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -49,7 +50,13 @@ public TransportImportDanglingIndexAction(
LocalAllocateDangledIndices danglingIndexAllocator,
NodeClient nodeClient
) {
super(ImportDanglingIndexAction.NAME, transportService, actionFilters, ImportDanglingIndexRequest::new);
super(
ImportDanglingIndexAction.NAME,
transportService,
actionFilters,
ImportDanglingIndexRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.danglingIndexAllocator = danglingIndexAllocator;
this.nodeClient = nodeClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

Expand All @@ -41,7 +42,13 @@ public TransportGetFieldMappingsAction(
IndexNameExpressionResolver indexNameExpressionResolver,
NodeClient client
) {
super(GetFieldMappingsAction.NAME, transportService, actionFilters, GetFieldMappingsRequest::new);
super(
GetFieldMappingsAction.NAME,
transportService,
actionFilters,
GetFieldMappingsRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ public TransportAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(NAME, transportService, actionFilters, Request::new);
super(NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

Expand All @@ -38,7 +39,7 @@ protected TransportSingleItemBulkWriteAction(
Writeable.Reader<Request> requestReader,
TransportBulkAction bulkAction
) {
super(actionName, transportService, actionFilters, requestReader);
super(actionName, transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.bulkAction = bulkAction;
}

Expand Down
Loading

0 comments on commit 0a31ce6

Please sign in to comment.