Skip to content

Commit

Permalink
Remove TransportSingleItemBulkWriteAction as replication action (#40424)
Browse files Browse the repository at this point in the history
The implementation of TransportIndexAction and TransportDeleteAction as
TransportReplicationAction existed for interoperability with older 5.x nodes, as these older nodes
coordinated single index / deletes as replication requests. This BWC layer is no longer needed in 7.x,
where these single actions are now mapped to bulk requests. Completely removing the deprecated
transport actions is not possible yet if we want to keep BWC with a 6.x transport client. The best
way here is to wait for the transport client to go away and then just remove the actions.
  • Loading branch information
ywelsch authored Mar 27, 2019
1 parent 79a3b93 commit 78636e3
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,12 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.function.Supplier;
Expand All @@ -45,68 +38,21 @@
public abstract class TransportSingleItemBulkWriteAction<
Request extends ReplicatedWriteRequest<Request>,
Response extends ReplicationResponse & WriteResponse
> extends TransportWriteAction<Request, Request, Response> {
> extends HandledTransportAction<Request, Response> {

private final TransportBulkAction bulkAction;
private final TransportShardBulkAction shardBulkAction;


protected TransportSingleItemBulkWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<Request> replicaRequest, String executor,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor);
protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
Supplier<Request> request, TransportBulkAction bulkAction) {
super(actionName, transportService, actionFilters, request);
this.bulkAction = bulkAction;
this.shardBulkAction = shardBulkAction;
}


@Override
protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) {
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
}

@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
Request request, final IndexShard primary) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest<?>) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WritePrimaryResult<BulkShardRequest, BulkShardResponse> bulkResult =
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response";
BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0];
final Response response;
final Exception failure;
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
response = null;
} else {
response = (Response) itemResponse.getResponse();
failure = null;
}
return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger);
}

@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(
Request replicaRequest, IndexShard replica) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy();
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest<?>) replicaRequest));
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
// a replica operation can never throw a document-level failure,
// as the same document has been already indexed successfully in the primary
return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger);
}


public static <Response extends ReplicationResponse & WriteResponse>
ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
return ActionListener.wrap(bulkItemResponses -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,9 @@
package org.elasticsearch.action.delete;

import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

/**
Expand All @@ -41,17 +34,7 @@
public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<DeleteRequest, DeleteResponse> {

@Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.WRITE,
bulkAction, shardBulkAction);
}

@Override
protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
public TransportDeleteAction(TransportService transportService, ActionFilters actionFilters, TransportBulkAction bulkAction) {
super(DeleteAction.NAME, transportService, actionFilters, DeleteRequest::new, bulkAction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,9 @@
package org.elasticsearch.action.index;

import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

/**
Expand All @@ -48,18 +41,7 @@
public class TransportIndexAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {

@Inject
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE,
bulkAction, shardBulkAction);
}

@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
public TransportIndexAction(ActionFilters actionFilters, TransportService transportService, TransportBulkAction bulkAction) {
super(IndexAction.NAME, transportService, actionFilters, IndexRequest::new, bulkAction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected void registerRequestHandlers(String actionName, TransportService trans

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}

Expand Down Expand Up @@ -779,7 +780,6 @@ protected void doRun() {

// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
Expand Down Expand Up @@ -156,15 +155,8 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {

TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null);
}

@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
super(IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
new ActionFilters(Collections.emptySet()), IndexRequest::new, bulkAction);
}
}

Expand Down

0 comments on commit 78636e3

Please sign in to comment.