Skip to content

Commit

Permalink
INGEST: Simplify IngestService (elastic#33008)
Browse files Browse the repository at this point in the history
* INGEST: Simplify IngestService

* Follow up to elastic#32617
* Flatten redundant inner classes of `IngestService`
  • Loading branch information
original-brownbear committed Aug 31, 2018
1 parent fdc5817 commit e428306
Show file tree
Hide file tree
Showing 21 changed files with 1,386 additions and 1,497 deletions.
20 changes: 10 additions & 10 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
* the one with the least casts.
*/
final List<DocWriteRequest> requests = new ArrayList<>();
final List<DocWriteRequest<?>> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>();
List<Object> payloads = null;

Expand All @@ -105,14 +105,14 @@ public BulkRequest() {
/**
* Adds a list of requests to be executed. Either index or delete requests.
*/
public BulkRequest add(DocWriteRequest... requests) {
for (DocWriteRequest request : requests) {
public BulkRequest add(DocWriteRequest<?>... requests) {
for (DocWriteRequest<?> request : requests) {
add(request, null);
}
return this;
}

public BulkRequest add(DocWriteRequest request) {
public BulkRequest add(DocWriteRequest<?> request) {
return add(request, null);
}

Expand All @@ -122,7 +122,7 @@ public BulkRequest add(DocWriteRequest request) {
* @param payload Optional payload
* @return the current bulk request
*/
public BulkRequest add(DocWriteRequest request, @Nullable Object payload) {
public BulkRequest add(DocWriteRequest<?> request, @Nullable Object payload) {
if (request instanceof IndexRequest) {
add((IndexRequest) request, payload);
} else if (request instanceof DeleteRequest) {
Expand All @@ -139,8 +139,8 @@ public BulkRequest add(DocWriteRequest request, @Nullable Object payload) {
/**
* Adds a list of requests to be executed. Either index or delete requests.
*/
public BulkRequest add(Iterable<DocWriteRequest> requests) {
for (DocWriteRequest request : requests) {
public BulkRequest add(Iterable<DocWriteRequest<?>> requests) {
for (DocWriteRequest<?> request : requests) {
add(request);
}
return this;
Expand Down Expand Up @@ -229,7 +229,7 @@ private void addPayload(Object payload) {
/**
* The list of requests in this bulk request.
*/
public List<DocWriteRequest> requests() {
public List<DocWriteRequest<?>> requests() {
return this.requests;
}

Expand Down Expand Up @@ -550,7 +550,7 @@ public ActionRequestValidationException validate() {
if (requests.isEmpty()) {
validationException = addValidationError("no requests added", validationException);
}
for (DocWriteRequest request : requests) {
for (DocWriteRequest<?> request : requests) {
// We first check if refresh has been set
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
validationException = addValidationError(
Expand Down Expand Up @@ -585,7 +585,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
waitForActiveShards.writeTo(out);
out.writeVInt(requests.size());
for (DocWriteRequest request : requests) {
for (DocWriteRequest<?> request : requests) {
DocWriteRequest.writeDocumentRequest(out, request);
}
refreshPolicy.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ private long relativeTime() {
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception);
bulkRequestModifier.markCurrentItemAsFailed(exception);
Expand All @@ -549,7 +549,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
});
}

static final class BulkRequestModifier implements Iterator<DocWriteRequest> {
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots;
Expand Down Expand Up @@ -584,7 +584,7 @@ BulkRequest getBulkRequest() {
modifiedBulkRequest.timeout(bulkRequest.timeout());

int slot = 0;
List<DocWriteRequest> requests = bulkRequest.requests();
List<DocWriteRequest<?>> requests = bulkRequest.requests();
originalSlots = new int[requests.size()]; // oversize, but that's ok
for (int i = 0; i < requests.size(); i++) {
DocWriteRequest request = requests.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class DeletePipelineTransportAction extends TransportMasterNodeAction<DeletePipelineRequest, AcknowledgedResponse> {

private final PipelineStore pipelineStore;
private final IngestService ingestService;
private final ClusterService clusterService;

@Inject
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
IndexNameExpressionResolver indexNameExpressionResolver, IngestService ingestService) {
super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool,
actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
this.clusterService = clusterService;
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.ingestService = ingestService;
}

@Override
Expand All @@ -60,8 +60,9 @@ protected AcknowledgedResponse newResponse() {
}

@Override
protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) throws Exception {
pipelineStore.delete(clusterService, request, listener);
protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
ingestService.delete(request, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class GetPipelineTransportAction extends TransportMasterNodeReadAction<GetPipelineRequest, GetPipelineResponse> {

private final PipelineStore pipelineStore;

@Inject
public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new);
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, GetPipelineRequest::new);
}

@Override
Expand All @@ -58,7 +55,7 @@ protected GetPipelineResponse newResponse() {

@Override
protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener) throws Exception {
listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(state, request.getIds())));
listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -46,19 +45,19 @@

public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPipelineRequest, AcknowledgedResponse> {

private final PipelineStore pipelineStore;
private final IngestService ingestService;
private final ClusterService clusterService;
private final TransportNodesInfoAction nodesInfoAction;

@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
IndexNameExpressionResolver indexNameExpressionResolver, IngestService ingestService,
TransportNodesInfoAction nodesInfoAction) {
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.clusterService = clusterService;
this.nodesInfoAction = nodesInfoAction;
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.ingestService = ingestService;
}

@Override
Expand All @@ -84,7 +83,7 @@ public void onResponse(NodesInfoResponse nodeInfos) {
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
}
pipelineStore.put(clusterService, ingestInfos, request, listener);
ingestService.putPipeline(ingestInfos, request, listener);
} catch (Exception e) {
onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineStore;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -160,24 +160,23 @@ public boolean isVerbose() {
}
}

private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";

static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService) {
if (pipelineId == null) {
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = pipelineStore.get(pipelineId);
Pipeline pipeline = ingestService.getPipeline(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
}
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -35,13 +35,13 @@

public class SimulatePipelineTransportAction extends HandledTransportAction<SimulatePipelineRequest, SimulatePipelineResponse> {

private final PipelineStore pipelineStore;
private final IngestService ingestService;
private final SimulateExecutionService executionService;

@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.ingestService = nodeService.getIngestService();
this.executionService = new SimulateExecutionService(threadPool);
}

Expand All @@ -52,9 +52,9 @@ protected void doExecute(SimulatePipelineRequest request, ActionListener<Simulat
final SimulatePipelineRequest.Parsed simulateRequest;
try {
if (request.getId() != null) {
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService);
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService);
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Loading

0 comments on commit e428306

Please sign in to comment.