From cab7c791d26b5d0acbb0a3a1494ca4d4f88c2e22 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 26 Aug 2015 13:24:29 +0200 Subject: [PATCH] Removed the `operation_threaded` option. This low level option isn't worth the complexity and an operation should never happen on the network thread. --- .../broadcast/TransportBroadcastAction.java | 2 - .../replication/ReplicationRequest.java | 19 ---- .../ReplicationRequestBuilder.java | 10 --- .../TransportReplicationAction.java | 87 ++++++++----------- .../action/update/UpdateHelper.java | 3 - .../rest/action/delete/RestDeleteAction.java | 3 - .../rest/action/index/RestIndexAction.java | 1 - .../replication/ShardReplicationTests.java | 1 - 8 files changed, 34 insertions(+), 92 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index dcb6952dc3585..2386a82650a95 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -47,7 +47,6 @@ public abstract class TransportBroadcastAction extends HandledTransportAction { - protected final ThreadPool threadPool; protected final ClusterService clusterService; protected final TransportService transportService; @@ -59,7 +58,6 @@ protected TransportBroadcastAction(Settings settings, String actionName, ThreadP super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); this.clusterService = clusterService; this.transportService = transportService; - this.threadPool = threadPool; this.transportShardAction = actionName + "[s]"; transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler()); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 39a3dae75698b..93907cf0aa631 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -46,7 +46,6 @@ public abstract class ReplicationRequest extends A protected TimeValue timeout = DEFAULT_TIMEOUT; protected String index; - private boolean threadedOperation = true; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private volatile boolean canHaveDuplicates = false; @@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) { super(originalRequest); this.timeout = request.timeout(); this.index = request.index(); - this.threadedOperation = request.operationThreaded(); this.consistencyLevel = request.consistencyLevel(); } @@ -91,23 +89,6 @@ public boolean canHaveDuplicates() { return canHaveDuplicates; } - /** - * Controls if the operation will be executed on a separate thread when executed locally. - */ - public final boolean operationThreaded() { - return threadedOperation; - } - - /** - * Controls if the operation will be executed on a separate thread when executed locally. Defaults - * to true when running in embedded mode. - */ - @SuppressWarnings("unchecked") - public final T operationThreaded(boolean threadedOperation) { - this.threadedOperation = threadedOperation; - return (T) this; - } - /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java index 7762eaee7dc0e..bafb33be6a73e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java @@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Actiontrue when running in embedded mode. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setOperationThreaded(boolean threadedOperation) { - request.operationThreaded(threadedOperation); - return (RequestBuilder) this; - } - /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index bc4094acc54f4..155c30756ca95 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -211,8 +211,6 @@ public T response() { class OperationTransportHandler implements TransportRequestHandler { @Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - // if we have a local operation, execute it on a thread since we don't spawn - request.operationThreaded(true); execute(request, new ActionListener() { @Override public void onResponse(Response result) { @@ -440,21 +438,17 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) { protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) { if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { try { - if (internalRequest.request().operationThreaded()) { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - finishAsFailed(t); - } + threadPool.executor(executor).execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + finishAsFailed(t); + } - @Override - protected void doRun() throws Exception { - performOnPrimary(primary, shardsIt); - } - }); - } else { - performOnPrimary(primary, shardsIt); - } + @Override + protected void doRun() throws Exception { + performOnPrimary(primary, shardsIt); + } + }); } catch (Throwable t) { finishAsFailed(t); } @@ -506,9 +500,6 @@ void retry(Throwable failure) { finishAsFailed(failure); return; } - // make it threaded operation so we fork on the discovery listener thread - internalRequest.request().operationThreaded(true); - observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { @@ -904,43 +895,33 @@ public void handleException(TransportException exp) { }); } else { - if (replicaRequest.operationThreaded()) { - try { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - protected void doRun() { - try { - shardOperationOnReplica(shard.shardId(), replicaRequest); - onReplicaSuccess(); - } catch (Throwable e) { - onReplicaFailure(nodeId, e); - failReplicaIfNeeded(shard.index(), shard.id(), e); - } + try { + threadPool.executor(executor).execute(new AbstractRunnable() { + @Override + protected void doRun() { + try { + shardOperationOnReplica(shard.shardId(), replicaRequest); + onReplicaSuccess(); + } catch (Throwable e) { + onReplicaFailure(nodeId, e); + failReplicaIfNeeded(shard.index(), shard.id(), e); } + } - // we must never reject on because of thread pool capacity on replicas - @Override - public boolean isForceExecution() { - return true; - } + // we must never reject on because of thread pool capacity on replicas + @Override + public boolean isForceExecution() { + return true; + } - @Override - public void onFailure(Throwable t) { - onReplicaFailure(nodeId, t); - } - }); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - onReplicaFailure(nodeId, e); - } - } else { - try { - shardOperationOnReplica(shard.shardId(), replicaRequest); - onReplicaSuccess(); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - onReplicaFailure(nodeId, e); - } + @Override + public void onFailure(Throwable t) { + onReplicaFailure(nodeId, t); + } + }); + } catch (Throwable e) { + failReplicaIfNeeded(shard.index(), shard.id(), e); + onReplicaFailure(nodeId, e); } } } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index c9401ca5392ec..fba2f23852e15 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -131,7 +131,6 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) { .routing(request.routing()) .parent(request.parent()) .consistencyLevel(request.consistencyLevel()); - indexRequest.operationThreaded(false); if (request.versionType() != VersionType.INTERNAL) { // in all but the internal versioning mode, we want to create the new document using the given version. indexRequest.version(request.version()).versionType(request.versionType()); @@ -227,13 +226,11 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) { .consistencyLevel(request.consistencyLevel()) .timestamp(timestamp).ttl(ttl) .refresh(request.refresh()); - indexRequest.operationThreaded(false); return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType); } else if ("delete".equals(operation)) { DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .version(updateVersion).versionType(request.versionType()) .consistencyLevel(request.consistencyLevel()); - deleteRequest.operationThreaded(false); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 69f06cef1f129..209ab686ce5b3 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -50,9 +50,6 @@ public RestDeleteAction(Settings settings, RestController controller, Client cli @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id")); - - deleteRequest.operationThreaded(true); - deleteRequest.routing(request.param("routing")); deleteRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index 3d3ecdfa880ed..d0d0fe68a13c1 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -70,7 +70,6 @@ public void handleRequest(RestRequest request, RestChannel channel, final Client @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); - indexRequest.operationThreaded(true); indexRequest.routing(request.param("routing")); indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.timestamp(request.param("timestamp")); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index d08840b077184..d7fb2ddd54d5d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -661,7 +661,6 @@ static class Request extends ReplicationRequest { public AtomicInteger processedOnReplicas = new AtomicInteger(); Request() { - this.operationThreaded(randomBoolean()); } Request(ShardId shardId) {