Skip to content

Commit

Permalink
Keep pipelines BulkRequestPreprocessor into the IngestService for sim…
Browse files Browse the repository at this point in the history
…plicity, wire Node and security
  • Loading branch information
carlosdelest committed Oct 24, 2023
1 parent d350f15 commit 46e2634
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 507 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.BulkRequestPreprocessor;
import org.elasticsearch.ingest.FieldInferenceBulkRequestPreprocessor;
import org.elasticsearch.ingest.PipelinesBulkRequestPreprocessor;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -111,7 +111,7 @@ public TransportBulkAction(
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
PipelinesBulkRequestPreprocessor pipelinesBulkRequestPreprocessor,
IngestService ingestService,
FieldInferenceBulkRequestPreprocessor fieldInferenceBulkRequestPreprocessor,
NodeClient client,
ActionFilters actionFilters,
Expand All @@ -123,7 +123,7 @@ public TransportBulkAction(
threadPool,
transportService,
clusterService,
pipelinesBulkRequestPreprocessor,
ingestService,
fieldInferenceBulkRequestPreprocessor,
client,
actionFilters,
Expand All @@ -138,7 +138,7 @@ public TransportBulkAction(
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
PipelinesBulkRequestPreprocessor pipelinesBulkRequestPreprocessor,
IngestService ingestService,
FieldInferenceBulkRequestPreprocessor fieldInferenceBulkRequestPreprocessor,
NodeClient client,
ActionFilters actionFilters,
Expand All @@ -151,7 +151,7 @@ public TransportBulkAction(
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.bulkRequestPreprocessors = List.of(pipelinesBulkRequestPreprocessor, fieldInferenceBulkRequestPreprocessor);
this.bulkRequestPreprocessors = List.of(ingestService, fieldInferenceBulkRequestPreprocessor);
this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
Expand Down Expand Up @@ -355,12 +355,7 @@ protected void doRun() {
}
}

private boolean preprocessBulkRequest(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener
) {
private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final Metadata metadata = clusterService.state().getMetadata();
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
boolean needsProcessing = false;
Expand All @@ -385,12 +380,12 @@ private boolean preprocessBulkRequest(
// this path is never taken.
ActionListener.run(listener, l -> {
if (Assertions.ENABLED) {
final boolean areRequestsProcessed = bulkRequest.requests()
final boolean allRequestsUnprocessed = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.allMatch(preprocessor::hasBeenProcessed);
assert areRequestsProcessed : bulkRequest;
.noneMatch(preprocessor::hasBeenProcessed);
assert allRequestsUnprocessed : bulkRequest;
}
if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) {
preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ protected abstract void processIndexRequest(
int slot,
RefCountingRunnable refs,
IntConsumer onDropped,
BiConsumer<Integer, Exception> onFailure);
BiConsumer<Integer, Exception> onFailure
);

/**
* Updates an index request based on the source of an ingest document, guarding against self-references if necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void processBulkRequest(
String executorName
);

boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata);
boolean needsProcessing(DocWriteRequest<?> docWriteRequest, IndexRequest indexRequest, Metadata metadata);

boolean hasBeenProcessed(IndexRequest indexRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected void processIndexRequest(
}

@Override
public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata) {
public boolean needsProcessing(DocWriteRequest<?> docWriteRequest, IndexRequest indexRequest, Metadata metadata) {
return (indexRequest.isFieldInferenceResolved() == false)
&& indexRequest.sourceAsMap().keySet().stream().anyMatch(fieldName -> fieldNeedsInference(indexRequest.index(), fieldName));
}
Expand Down
Loading

0 comments on commit 46e2634

Please sign in to comment.