Skip to content

Commit

Permalink
Removed the operation_threaded option.
Browse files Browse the repository at this point in the history
This low level option isn't worth the complexity and an operation should never happen on the network thread.
  • Loading branch information
martijnvg committed Aug 26, 2015
1 parent 7de70b4 commit cab7c79
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
public abstract class TransportBroadcastAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends BroadcastShardRequest, ShardResponse extends BroadcastShardResponse>
extends HandledTransportAction<Request, Response> {

protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final TransportService transportService;

Expand All @@ -59,7 +58,6 @@ protected TransportBroadcastAction(Settings settings, String actionName, ThreadP
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportShardAction = actionName + "[s]";

transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

private boolean threadedOperation = true;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private volatile boolean canHaveDuplicates = false;

Expand Down Expand Up @@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) {
super(originalRequest);
this.timeout = request.timeout();
this.index = request.index();
this.threadedOperation = request.operationThreaded();
this.consistencyLevel = request.consistencyLevel();
}

Expand All @@ -91,23 +89,6 @@ public boolean canHaveDuplicates() {
return canHaveDuplicates;
}

/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
public final boolean operationThreaded() {
return threadedOperation;
}

/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
@SuppressWarnings("unchecked")
public final T operationThreaded(boolean threadedOperation) {
this.threadedOperation = threadedOperation;
return (T) this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Action<Request,
super(client, action, request);
}

/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setOperationThreaded(boolean threadedOperation) {
request.operationThreaded(threadedOperation);
return (RequestBuilder) this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ public <T extends ActionWriteResponse> T response() {
class OperationTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
Expand Down Expand Up @@ -440,21 +438,17 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) {
protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (internalRequest.request().operationThreaded()) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
finishAsFailed(t);
}
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
finishAsFailed(t);
}

@Override
protected void doRun() throws Exception {
performOnPrimary(primary, shardsIt);
}
});
} else {
performOnPrimary(primary, shardsIt);
}
@Override
protected void doRun() throws Exception {
performOnPrimary(primary, shardsIt);
}
});
} catch (Throwable t) {
finishAsFailed(t);
}
Expand Down Expand Up @@ -506,9 +500,6 @@ void retry(Throwable failure) {
finishAsFailed(failure);
return;
}
// make it threaded operation so we fork on the discovery listener thread
internalRequest.request().operationThreaded(true);

observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
Expand Down Expand Up @@ -904,43 +895,33 @@ public void handleException(TransportException exp) {

});
} else {
if (replicaRequest.operationThreaded()) {
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
}

// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}

@Override
public void onFailure(Throwable t) {
onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
} else {
try {
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
@Override
public void onFailure(Throwable t) {
onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) {
.routing(request.routing())
.parent(request.parent())
.consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
if (request.versionType() != VersionType.INTERNAL) {
// in all but the internal versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(request.versionType());
Expand Down Expand Up @@ -227,13 +226,11 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) {
.consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.refresh(request.refresh());
indexRequest.operationThreaded(false);
return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(updateVersion).versionType(request.versionType())
.consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ public RestDeleteAction(Settings settings, RestController controller, Client cli
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));

deleteRequest.operationThreaded(true);

deleteRequest.routing(request.param("routing"));
deleteRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void handleRequest(RestRequest request, RestChannel channel, final Client
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.operationThreaded(true);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ static class Request extends ReplicationRequest<Request> {
public AtomicInteger processedOnReplicas = new AtomicInteger();

Request() {
this.operationThreaded(randomBoolean());
}

Request(ShardId shardId) {
Expand Down

0 comments on commit cab7c79

Please sign in to comment.