Skip to content

Commit

Permalink
Translate exceptions, change assertions to indivicual methods
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Oct 25, 2023
1 parent 2adde5d commit 8ae3cc5
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -790,18 +790,7 @@ private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String
}

if (needsProcessing) {
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
// also with requests updated with processing information. This ensures that this on the second time through this method
// this path is never taken.
ActionListener.run(listener, l -> {
if (Assertions.ENABLED) {
final boolean allRequestsUnprocessed = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.noneMatch(preprocessor::hasBeenProcessed);
assert allRequestsUnprocessed : bulkRequest;
}
if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) {
preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement

private boolean isPipelineResolved;

private boolean isFieldInferenceResolved;
private boolean isFieldInferenceDone;

private boolean requireAlias;
/**
Expand Down Expand Up @@ -192,7 +192,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
: new ArrayList<>(possiblyImmutableExecutedPipelines);
}
}
isFieldInferenceResolved = in.getTransportVersion().before(SEMANTIC_TEXT_FIELD_ADDED) || in.readBoolean();
isFieldInferenceDone = in.getTransportVersion().before(SEMANTIC_TEXT_FIELD_ADDED) || in.readBoolean();
}

public IndexRequest() {
Expand Down Expand Up @@ -380,13 +380,13 @@ public boolean isPipelineResolved() {
}

/**
* Sets if field inference for this request has been resolved by the coordinating node.
* Sets if field inference for this request has been done by the coordinating node.
*
* @param isFieldInferenceResolved true if the field inference has been resolved
* @param isFieldInferenceDone true if the field inference has been resolved
* @return the request
*/
public IndexRequest isFieldInferenceResolved(final boolean isFieldInferenceResolved) {
this.isFieldInferenceResolved = isFieldInferenceResolved;
public IndexRequest isFieldInferenceDone(final boolean isFieldInferenceDone) {
this.isFieldInferenceDone = isFieldInferenceDone;
return this;
}

Expand All @@ -395,8 +395,8 @@ public IndexRequest isFieldInferenceResolved(final boolean isFieldInferenceResol
*
* @return true if the pipeline has been resolved
*/
public boolean isFieldInferenceResolved() {
return this.isFieldInferenceResolved;
public boolean isFieldInferenceDone() {
return this.isFieldInferenceDone;
}

/**
Expand Down Expand Up @@ -780,7 +780,7 @@ private void writeBody(StreamOutput out) throws IOException {
}
}
if (out.getTransportVersion().onOrAfter(SEMANTIC_TEXT_FIELD_ADDED)) {
out.writeBoolean(isFieldInferenceResolved);
out.writeBoolean(isFieldInferenceDone);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected void processIndexRequest(
IntConsumer onDropped,
final BiConsumer<Integer, Exception> onFailure
) {
assert indexRequest.isFieldInferenceDone() == false;

String index = indexRequest.index();
Map<String, Object> sourceMap = indexRequest.sourceAsMap();
Expand All @@ -52,13 +53,13 @@ protected void processIndexRequest(

@Override
public boolean needsProcessing(DocWriteRequest<?> docWriteRequest, IndexRequest indexRequest, Metadata metadata) {
return (indexRequest.isFieldInferenceResolved() == false)
return (indexRequest.isFieldInferenceDone() == false)
&& indexRequest.sourceAsMap().keySet().stream().anyMatch(fieldName -> fieldNeedsInference(indexRequest.index(), fieldName));
}

@Override
public boolean hasBeenProcessed(IndexRequest indexRequest) {
return indexRequest.isFieldInferenceResolved();
return indexRequest.isFieldInferenceDone();
}

@Override
Expand Down Expand Up @@ -100,17 +101,21 @@ private void runInferenceForField(
public void onResponse(InferenceAction.Response response) {
ingestDocument.setFieldValue(fieldName + "_inference", response.getResult().asMap(fieldName).get(fieldName));
updateIndexRequestSource(indexRequest, ingestDocument);
indexRequest.isFieldInferenceResolved(true);
}

@Override
public void onFailure(Exception e) {
onFailure.accept(position, e);
// Wrap exception in an illegal argument exception, as there is a problem with the model or model config
onFailure.accept(
position,
new IllegalArgumentException("Error performing inference for field [" + fieldName + "]: " + e.getMessage(), e)
);
ingestMetric.ingestFailed();
}
}, () -> {
// regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate
// that we're finished with this document
indexRequest.isFieldInferenceDone(true);
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
ingestMetric.postIngest(ingestTimeInNanos);
refs.close();
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -545,7 +547,7 @@ public boolean needsProcessing(DocWriteRequest<?> docWriteRequest, IndexRequest

@Override
public boolean hasBeenProcessed(IndexRequest indexRequest) {
return indexRequest.isPipelineResolved();
return hasPipeline(indexRequest) && indexRequest.isPipelineResolved();
}

@Override
Expand All @@ -561,6 +563,8 @@ protected void processIndexRequest(
IntConsumer onDropped,
final BiConsumer<Integer, Exception> onFailure
) {
assert indexRequest.isPipelineResolved();

IngestService.PipelineIterator pipelines = getAndResetPipelines(indexRequest);
if (pipelines.hasNext() == false) {
return;
Expand Down Expand Up @@ -1342,10 +1346,6 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexReque
return Optional.of(new Pipelines(defaultPipeline, finalPipeline));
}

public boolean needsProcessing(IndexRequest indexRequest) {
return hasPipeline(indexRequest);
}

/**
* Checks whether an IndexRequest has at least one pipeline defined.
* <p>
Expand Down

0 comments on commit 8ae3cc5

Please sign in to comment.