diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 22638602fa7d0..dce0a3325bce1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -790,18 +790,7 @@ private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String } if (needsProcessing) { - // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but - // also with requests updated with processing information. This ensures that this on the second time through this method - // this path is never taken. ActionListener.run(listener, l -> { - if (Assertions.ENABLED) { - final boolean allRequestsUnprocessed = bulkRequest.requests() - .stream() - .map(TransportBulkAction::getIndexWriteRequest) - .filter(Objects::nonNull) - .noneMatch(preprocessor::hasBeenProcessed); - assert allRequestsUnprocessed : bulkRequest; - } if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) { preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l); } else { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 2a671835d155e..56699af7f8dcf 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -106,7 +106,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isPipelineResolved; - private boolean isFieldInferenceResolved; + private boolean isFieldInferenceDone; private boolean requireAlias; /** @@ -192,7 +192,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio : new ArrayList<>(possiblyImmutableExecutedPipelines); } } - isFieldInferenceResolved = in.getTransportVersion().before(SEMANTIC_TEXT_FIELD_ADDED) || in.readBoolean(); + isFieldInferenceDone = in.getTransportVersion().before(SEMANTIC_TEXT_FIELD_ADDED) || in.readBoolean(); } public IndexRequest() { @@ -380,13 +380,13 @@ public boolean isPipelineResolved() { } /** - * Sets if field inference for this request has been resolved by the coordinating node. + * Sets if field inference for this request has been done by the coordinating node. * - * @param isFieldInferenceResolved true if the field inference has been resolved + * @param isFieldInferenceDone true if the field inference has been resolved * @return the request */ - public IndexRequest isFieldInferenceResolved(final boolean isFieldInferenceResolved) { - this.isFieldInferenceResolved = isFieldInferenceResolved; + public IndexRequest isFieldInferenceDone(final boolean isFieldInferenceDone) { + this.isFieldInferenceDone = isFieldInferenceDone; return this; } @@ -395,8 +395,8 @@ public IndexRequest isFieldInferenceResolved(final boolean isFieldInferenceResol * * @return true if the pipeline has been resolved */ - public boolean isFieldInferenceResolved() { - return this.isFieldInferenceResolved; + public boolean isFieldInferenceDone() { + return this.isFieldInferenceDone; } /** @@ -780,7 +780,7 @@ private void writeBody(StreamOutput out) throws IOException { } } if (out.getTransportVersion().onOrAfter(SEMANTIC_TEXT_FIELD_ADDED)) { - out.writeBoolean(isFieldInferenceResolved); + out.writeBoolean(isFieldInferenceDone); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java b/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java index 587875be7804c..beea8c77d868c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java @@ -42,6 +42,7 @@ protected void processIndexRequest( IntConsumer onDropped, final BiConsumer onFailure ) { + assert indexRequest.isFieldInferenceDone() == false; String index = indexRequest.index(); Map sourceMap = indexRequest.sourceAsMap(); @@ -52,13 +53,13 @@ protected void processIndexRequest( @Override public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata) { - return (indexRequest.isFieldInferenceResolved() == false) + return (indexRequest.isFieldInferenceDone() == false) && indexRequest.sourceAsMap().keySet().stream().anyMatch(fieldName -> fieldNeedsInference(indexRequest.index(), fieldName)); } @Override public boolean hasBeenProcessed(IndexRequest indexRequest) { - return indexRequest.isFieldInferenceResolved(); + return indexRequest.isFieldInferenceDone(); } @Override @@ -100,17 +101,21 @@ private void runInferenceForField( public void onResponse(InferenceAction.Response response) { ingestDocument.setFieldValue(fieldName + "_inference", response.getResult().asMap(fieldName).get(fieldName)); updateIndexRequestSource(indexRequest, ingestDocument); - indexRequest.isFieldInferenceResolved(true); } @Override public void onFailure(Exception e) { - onFailure.accept(position, e); + // Wrap exception in an illegal argument exception, as there is a problem with the model or model config + onFailure.accept( + position, + new IllegalArgumentException("Error performing inference for field [" + fieldName + "]: " + e.getMessage(), e) + ); ingestMetric.ingestFailed(); } }, () -> { // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate // that we're finished with this document + indexRequest.isFieldInferenceDone(true); final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; ingestMetric.postIngest(ingestTimeInNanos); refs.close(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 83c6e73795d2e..9210e927417b2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; @@ -45,6 +46,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; @@ -545,7 +547,7 @@ public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest @Override public boolean hasBeenProcessed(IndexRequest indexRequest) { - return indexRequest.isPipelineResolved(); + return hasPipeline(indexRequest) && indexRequest.isPipelineResolved(); } @Override @@ -561,6 +563,8 @@ protected void processIndexRequest( IntConsumer onDropped, final BiConsumer onFailure ) { + assert indexRequest.isPipelineResolved(); + IngestService.PipelineIterator pipelines = getAndResetPipelines(indexRequest); if (pipelines.hasNext() == false) { return; @@ -1342,10 +1346,6 @@ private static Optional resolvePipelinesFromIndexTemplates(IndexReque return Optional.of(new Pipelines(defaultPipeline, finalPipeline)); } - public boolean needsProcessing(IndexRequest indexRequest) { - return hasPipeline(indexRequest); - } - /** * Checks whether an IndexRequest has at least one pipeline defined. *