diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 0c171f7eef6f7..e75c2304d8c98 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -61,7 +61,7 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.BulkRequestPreprocessor; import org.elasticsearch.ingest.FieldInferenceBulkRequestPreprocessor; -import org.elasticsearch.ingest.PipelinesBulkRequestPreprocessor; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -111,7 +111,7 @@ public TransportBulkAction( ThreadPool threadPool, TransportService transportService, ClusterService clusterService, - PipelinesBulkRequestPreprocessor pipelinesBulkRequestPreprocessor, + IngestService ingestService, FieldInferenceBulkRequestPreprocessor fieldInferenceBulkRequestPreprocessor, NodeClient client, ActionFilters actionFilters, @@ -123,7 +123,7 @@ public TransportBulkAction( threadPool, transportService, clusterService, - pipelinesBulkRequestPreprocessor, + ingestService, fieldInferenceBulkRequestPreprocessor, client, actionFilters, @@ -138,7 +138,7 @@ public TransportBulkAction( ThreadPool threadPool, TransportService transportService, ClusterService clusterService, - PipelinesBulkRequestPreprocessor pipelinesBulkRequestPreprocessor, + IngestService ingestService, FieldInferenceBulkRequestPreprocessor fieldInferenceBulkRequestPreprocessor, NodeClient client, ActionFilters actionFilters, @@ -151,7 +151,7 @@ public TransportBulkAction( Objects.requireNonNull(relativeTimeProvider); this.threadPool = threadPool; this.clusterService = clusterService; - this.bulkRequestPreprocessors = List.of(pipelinesBulkRequestPreprocessor, fieldInferenceBulkRequestPreprocessor); + this.bulkRequestPreprocessors = List.of(ingestService, fieldInferenceBulkRequestPreprocessor); this.relativeTimeProvider = relativeTimeProvider; this.ingestForwarder = new IngestActionForwarder(transportService); this.client = client; @@ -355,12 +355,7 @@ protected void doRun() { } } - private boolean preprocessBulkRequest( - Task task, - BulkRequest bulkRequest, - String executorName, - ActionListener listener - ) { + private boolean preprocessBulkRequest(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { final Metadata metadata = clusterService.state().getMetadata(); final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion(); boolean needsProcessing = false; @@ -385,12 +380,12 @@ private boolean preprocessBulkRequest( // this path is never taken. ActionListener.run(listener, l -> { if (Assertions.ENABLED) { - final boolean areRequestsProcessed = bulkRequest.requests() + final boolean allRequestsUnprocessed = bulkRequest.requests() .stream() .map(TransportBulkAction::getIndexWriteRequest) .filter(Objects::nonNull) - .allMatch(preprocessor::hasBeenProcessed); - assert areRequestsProcessed : bulkRequest; + .noneMatch(preprocessor::hasBeenProcessed); + assert allRequestsUnprocessed : bulkRequest; } if ((preprocessor.shouldExecuteOnIngestNode() == false) || clusterService.localNode().isIngestNode()) { preprocessBulkRequestWithPreprocessor(preprocessor, task, bulkRequest, executorName, l); diff --git a/server/src/main/java/org/elasticsearch/ingest/AbstractBulkRequestPreprocessor.java b/server/src/main/java/org/elasticsearch/ingest/AbstractBulkRequestPreprocessor.java index 72d7ad820dcfd..4dca78b58511a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/AbstractBulkRequestPreprocessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/AbstractBulkRequestPreprocessor.java @@ -70,7 +70,8 @@ protected abstract void processIndexRequest( int slot, RefCountingRunnable refs, IntConsumer onDropped, - BiConsumer onFailure); + BiConsumer onFailure + ); /** * Updates an index request based on the source of an ingest document, guarding against self-references if necessary. diff --git a/server/src/main/java/org/elasticsearch/ingest/BulkRequestPreprocessor.java b/server/src/main/java/org/elasticsearch/ingest/BulkRequestPreprocessor.java index 7296175ff7c7c..73adba4726dd5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/BulkRequestPreprocessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/BulkRequestPreprocessor.java @@ -27,7 +27,7 @@ void processBulkRequest( String executorName ); - boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata); + boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata); boolean hasBeenProcessed(IndexRequest indexRequest); diff --git a/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java b/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java index afc23b0095966..876322fa38c35 100644 --- a/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/FieldInferenceBulkRequestPreprocessor.java @@ -51,7 +51,7 @@ protected void processIndexRequest( } @Override - public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata) { + public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata) { return (indexRequest.isFieldInferenceResolved() == false) && indexRequest.sourceAsMap().keySet().stream().anyMatch(fieldName -> fieldNeedsInference(indexRequest.index(), fieldName)); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b91d709839ab9..4d98d4b7d04fd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -15,11 +15,14 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -27,7 +30,13 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; @@ -35,14 +44,18 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.node.ReportingService; import org.elasticsearch.plugins.IngestPlugin; @@ -64,19 +77,24 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.IntConsumer; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.core.Strings.format; + /** * Holder class for several ingest related services. */ -public class IngestService implements ClusterStateApplier, ReportingService { +public class IngestService extends AbstractBulkRequestPreprocessor implements ClusterStateApplier, ReportingService { public static final String NOOP_PIPELINE_NAME = "_none"; @@ -87,14 +105,15 @@ public class IngestService implements ClusterStateApplier, ReportingService taskQueue; private final ClusterService clusterService; private final ScriptService scriptService; - protected final Supplier documentParsingObserverSupplier; private final Map processorFactories; + protected final Client client; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some // processor factories rely on other node services. Custom metadata is statically registered when classes // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. private volatile Map pipelines = Map.of(); protected final ThreadPool threadPool; + private final IngestMetric totalMetrics = new IngestMetric(); private final List> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); private volatile ClusterState state; @@ -134,8 +153,6 @@ public static MatcherWatchdog createGrokThreadWatchdog(Environment env, ThreadPo : batchExecutionContext.initialState().copyAndUpdateMetadata(b -> b.putCustom(IngestMetadata.TYPE, finalIngestMetadata)); }; - - /** * Specialized cluster state update task specifically for ingest pipeline operations. * These operations all receive an AcknowledgedResponse. @@ -167,9 +184,10 @@ public IngestService( MatcherWatchdog matcherWatchdog, Supplier documentParsingObserverSupplier ) { + super(documentParsingObserverSupplier); this.clusterService = clusterService; this.scriptService = scriptService; - this.documentParsingObserverSupplier = documentParsingObserverSupplier; + this.client = client; this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -203,6 +221,53 @@ private static Map processorFactories(List + * Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node + * to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline + * comes as part of the request or resolved from this method. All this is made to later be able to reject the request in case the + * pipeline was set by a required pipeline **and** the request also has a pipeline request too. + * + * @param originalRequest Original write request received. + * @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update. + * @param metadata Cluster metadata from where the pipeline information could be derived. + */ + public static void resolvePipelinesAndUpdateIndexRequest( + final DocWriteRequest originalRequest, + final IndexRequest indexRequest, + final Metadata metadata + ) { + resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis()); + } + + static void resolvePipelinesAndUpdateIndexRequest( + final DocWriteRequest originalRequest, + final IndexRequest indexRequest, + final Metadata metadata, + final long epochMillis + ) { + if (indexRequest.isPipelineResolved()) { + return; + } + + String requestPipeline = indexRequest.getPipeline(); + + Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) // + .or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)) + .orElse(Pipelines.NO_PIPELINES_DEFINED); + + // The pipeline coming as part of the request always has priority over the resolved one from metadata or templates + if (requestPipeline != null) { + indexRequest.setPipeline(requestPipeline); + } else { + indexRequest.setPipeline(pipelines.defaultPipeline); + } + indexRequest.setFinalPipeline(pipelines.finalPipeline); + indexRequest.isPipelineResolved(true); + } + public ClusterService getClusterService() { return clusterService; } @@ -476,6 +541,74 @@ private static void collectProcessorMetrics( } } + @Override + public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata) { + resolvePipelinesAndUpdateIndexRequest(docWriteRequest, indexRequest, metadata); + return hasPipeline(indexRequest); + } + + @Override + public boolean hasBeenProcessed(IndexRequest indexRequest) { + return indexRequest.isPipelineResolved(); + } + + @Override + public boolean shouldExecuteOnIngestNode() { + return true; + } + + @Override + protected void processIndexRequest( + IndexRequest indexRequest, + int slot, + RefCountingRunnable refs, + IntConsumer onDropped, + final BiConsumer onFailure + ) { + IngestService.PipelineIterator pipelines = getAndResetPipelines(indexRequest); + if (pipelines.hasNext() == false) { + return; + } + + // start the stopwatch and acquire a ref to indicate that we're working on this document + final long startTimeInNanos = System.nanoTime(); + ingestMetric.preIngest(); + final Releasable ref = refs.acquire(); + // the document listener gives us three-way logic: a document can fail processing (1), or it can + // be successfully processed. a successfully processed document can be kept (2) or dropped (3). + final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Boolean kept) { + assert kept != null; + if (kept == false) { + onDropped.accept(slot); + } + } + + @Override + public void onFailure(Exception e) { + ingestMetric.ingestFailed(); + onFailure.accept(slot, e); + } + }, () -> { + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + ingestMetric.postIngest(ingestTimeInNanos); + ref.close(); + }); + DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); + + IngestDocument ingestDocument = newIngestDocument(indexRequest); + + executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener); + indexRequest.setPipelinesHaveRun(); + + assert indexRequest.index() != null; + documentParsingObserver.setIndexName(indexRequest.index()); + documentParsingObserver.close(); + } + /** * Used in this class and externally by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction} */ @@ -588,9 +721,98 @@ void validatePipeline(Map ingestInfos, String pipelin ExceptionsHelper.rethrowAndSuppress(exceptions); } + public void processBulkRequest( + final int numberOfActionRequests, + final Iterable> actionRequests, + final IntConsumer onDropped, + final BiConsumer onFailure, + final BiConsumer onCompletion, + final String executorName + ) { + assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]"; + + threadPool.executor(executorName).execute(new AbstractRunnable() { + + @Override + public void onFailure(Exception e) { + onCompletion.accept(null, e); + } + + @Override + protected void doRun() { + final Thread originalThread = Thread.currentThread(); + try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) { + int slot = 0; + for (DocWriteRequest actionRequest : actionRequests) { + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); + if (indexRequest != null) { + processIndexRequest(indexRequest, slot, refs, onDropped, onFailure); + } + slot++; + } + } + } + }); + } + + private void executePipelinesOnActionRequest( + DocWriteRequest actionRequest, + final int slot, + final Releasable ref, + IndexRequest indexRequest, + PipelineIterator pipelines, + IntConsumer onDropped, + BiConsumer onFailure + ) { + // start the stopwatch and acquire a ref to indicate that we're working on this document + final long startTimeInNanos = System.nanoTime(); + totalMetrics.preIngest(); + // the document listener gives us three-way logic: a document can fail processing (1), or it can + // be successfully processed. a successfully processed document can be kept (2) or dropped (3). + final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Boolean kept) { + assert kept != null; + if (kept == false) { + onDropped.accept(slot); + } + } + + @Override + public void onFailure(Exception e) { + totalMetrics.ingestFailed(); + onFailure.accept(slot, e); + } + }, () -> { + // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate + // that we're finished with this document + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + ref.close(); + }); + DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); + IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver); + executePipelinesOnActionRequest(pipelines, indexRequest, ingestDocument, documentListener); + indexRequest.setPipelinesHaveRun(); + + assert actionRequest.index() != null; + documentParsingObserver.setIndexName(actionRequest.index()); + documentParsingObserver.close(); + } + /** + * Returns the pipelines of the request, and updates the request so that it no longer references + * any pipelines (both the default and final pipeline are set to the noop pipeline). + */ + private PipelineIterator getAndResetPipelines(IndexRequest indexRequest) { + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + return new PipelineIterator(pipelineId, finalPipelineId); + } /** * A triple for tracking the non-null id of a pipeline, the pipeline itself, and whether the pipeline is a final pipeline. @@ -599,19 +821,19 @@ void validatePipeline(Map ingestInfos, String pipelin * @param pipeline a possibly-null reference to the pipeline for the given pipeline id * @param isFinal true if the pipeline is a final pipeline */ - record PipelineSlot(String id, @Nullable Pipeline pipeline, boolean isFinal) { + private record PipelineSlot(String id, @Nullable Pipeline pipeline, boolean isFinal) { public PipelineSlot { Objects.requireNonNull(id); } } - class PipelineIterator implements Iterator { + private class PipelineIterator implements Iterator { private final String defaultPipeline; private final String finalPipeline; private final Iterator pipelineSlotIterator; - PipelineIterator(String defaultPipeline, String finalPipeline) { + private PipelineIterator(String defaultPipeline, String finalPipeline) { this.defaultPipeline = NOOP_PIPELINE_NAME.equals(defaultPipeline) ? null : defaultPipeline; this.finalPipeline = NOOP_PIPELINE_NAME.equals(finalPipeline) ? null : finalPipeline; this.pipelineSlotIterator = iterator(); @@ -652,6 +874,160 @@ public PipelineSlot next() { } } + private void executePipelinesOnActionRequest( + final PipelineIterator pipelines, + final IndexRequest indexRequest, + final IngestDocument ingestDocument, + final ActionListener listener + ) { + assert pipelines.hasNext(); + PipelineSlot slot = pipelines.next(); + final String pipelineId = slot.id(); + final Pipeline pipeline = slot.pipeline(); + final boolean isFinalPipeline = slot.isFinal(); + + // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet + ingestDocument.resetReroute(); + + try { + if (pipeline == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + indexRequest.addPipeline(pipelineId); + final String originalIndex = indexRequest.indices()[0]; + executePipeline(ingestDocument, pipeline, (keep, e) -> { + assert keep != null; + + if (e != null) { + logger.debug( + () -> format( + "failed to execute pipeline [%s] for document [%s/%s]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + e + ); + listener.onFailure(e); + return; // document failed! + } + + if (keep == false) { + listener.onResponse(false); + return; // document dropped! + } + + // update the index request so that we can execute additional pipelines (if any), etc + updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); + try { + // check for self-references if necessary, (i.e. if a script processor has run), and clear the bit + if (ingestDocument.doNoSelfReferencesCheck()) { + CollectionUtils.ensureNoSelfReferences(ingestDocument.getSource(), null); + ingestDocument.doNoSelfReferencesCheck(false); + } + } catch (IllegalArgumentException ex) { + // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. + // In that case, we catch and wrap the exception, so we can include more details + listener.onFailure( + new IllegalArgumentException( + format( + "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + ex + ) + ); + return; // document failed! + } + + PipelineIterator newPipelines = pipelines; + final String newIndex = indexRequest.indices()[0]; + + if (Objects.equals(originalIndex, newIndex) == false) { + // final pipelines cannot change the target index (either directly or by way of a reroute) + if (isFinalPipeline) { + listener.onFailure( + new IllegalStateException( + format( + "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", + pipelineId, + originalIndex, + newIndex, + indexRequest.id() + ) + ) + ); + return; // document failed! + } + + // add the index to the document's index history, and check for cycles in the visited indices + boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; + if (cycle) { + List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); + indexCycle.add(newIndex); + listener.onFailure( + new IllegalStateException( + format( + "index cycle detected while processing pipeline [%s] for document [%s]: %s", + pipelineId, + indexRequest.id(), + indexCycle + ) + ) + ); + return; // document failed! + } + + // clear the current pipeline, then re-resolve the pipelines for this request + indexRequest.setPipeline(null); + indexRequest.isPipelineResolved(false); + resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); + newPipelines = getAndResetPipelines(indexRequest); + + // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute + // mechanism, do not invoke the default pipeline of the new target index + if (ingestDocument.isReroute() == false) { + newPipelines = newPipelines.withoutDefaultPipeline(); + } + } + + if (newPipelines.hasNext()) { + executePipelinesOnActionRequest(newPipelines, indexRequest, ingestDocument, listener); + } else { + // update the index request's source and (potentially) cache the timestamp for TSDB + updateIndexRequestSource(indexRequest, ingestDocument); + cacheRawTimestamp(indexRequest, ingestDocument); + listener.onResponse(true); // document succeeded! + } + }); + } catch (Exception e) { + logger.debug( + () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), + e + ); + listener.onFailure(e); // document failed! + } + } + + private static void executePipeline( + final IngestDocument ingestDocument, + final Pipeline pipeline, + final BiConsumer handler + ) { + // adapt our {@code BiConsumer} handler shape to the + // {@code BiConsumer} handler shape used internally + // by ingest pipelines and processors + ingestDocument.executePipeline(pipeline, (result, e) -> { + if (e != null) { + handler.accept(true, e); + } else { + handler.accept(result != null, null); + } + }); + } + public IngestStats stats() { IngestStats.Builder statsBuilder = new IngestStats.Builder(); statsBuilder.addTotalMetrics(totalMetrics); @@ -703,8 +1079,72 @@ static String getProcessorName(Processor processor) { return sb.toString(); } + /** + * Builds a new ingest document from the passed-in index request. + */ + protected static IngestDocument newIngestDocument(final IndexRequest request, DocumentParsingObserver documentParsingObserver) { + return new IngestDocument( + request.index(), + request.id(), + request.version(), + request.routing(), + request.versionType(), + request.sourceAsMap(documentParsingObserver) + ); + } + /** + * Updates an index request based on the metadata of an ingest document. + */ + private static void updateIndexRequestMetadata(final IndexRequest request, final org.elasticsearch.script.Metadata metadata) { + // it's fine to set all metadata fields all the time, as ingest document holds their starting values + // before ingestion, which might also get modified during ingestion. + request.index(metadata.getIndex()); + request.id(metadata.getId()); + request.routing(metadata.getRouting()); + request.version(metadata.getVersion()); + if (metadata.getVersionType() != null) { + request.versionType(VersionType.fromString(metadata.getVersionType())); + } + Number number; + if ((number = metadata.getIfSeqNo()) != null) { + request.setIfSeqNo(number.longValue()); + } + if ((number = metadata.getIfPrimaryTerm()) != null) { + request.setIfPrimaryTerm(number.longValue()); + } + Map map; + if ((map = metadata.getDynamicTemplates()) != null) { + Map mergedDynamicTemplates = new HashMap<>(request.getDynamicTemplates()); + mergedDynamicTemplates.putAll(map); + request.setDynamicTemplates(mergedDynamicTemplates); + } + } + /** + * Updates an index request based on the source of an ingest document, guarding against self-references if necessary. + */ + protected static void updateIndexRequestSource(final IndexRequest request, final IngestDocument document) { + boolean ensureNoSelfReferences = document.doNoSelfReferencesCheck(); + // we already check for self references elsewhere (and clear the bit), so this should always be false, + // keeping the check and assert as a guard against extraordinarily surprising circumstances + assert ensureNoSelfReferences == false; + request.source(document.getSource(), request.getContentType(), ensureNoSelfReferences); + } + + /** + * Grab the @timestamp and store it on the index request so that TSDB can use it without needing to parse + * the source for this document. + */ + private static void cacheRawTimestamp(final IndexRequest request, final IngestDocument document) { + if (request.getRawTimestamp() == null) { + // cache the @timestamp from the ingest document's source map if there is one + Object rawTimestamp = document.getSource().get(DataStream.TIMESTAMP_FIELD_NAME); + if (rawTimestamp != null) { + request.setRawTimestamp(rawTimestamp); + } + } + } @Override public void applyClusterState(final ClusterChangedEvent event) { @@ -908,6 +1348,89 @@ record PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { } } + private static Optional resolvePipelinesFromMetadata( + DocWriteRequest originalRequest, + IndexRequest indexRequest, + Metadata metadata, + long epochMillis + ) { + IndexMetadata indexMetadata = null; + // start to look for default or final pipelines via settings found in the cluster metadata + if (originalRequest != null) { + indexMetadata = metadata.indices() + .get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis)); + } + // check the alias for the index request (this is how normal index requests are modeled) + if (indexMetadata == null && indexRequest.index() != null) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index()); + if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) { + indexMetadata = metadata.index(indexAbstraction.getWriteIndex()); + } + } + // check the alias for the action request (this is how upserts are modeled) + if (indexMetadata == null && originalRequest != null && originalRequest.index() != null) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(originalRequest.index()); + if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) { + indexMetadata = metadata.index(indexAbstraction.getWriteIndex()); + } + } + + if (indexMetadata == null) { + return Optional.empty(); + } + + final Settings settings = indexMetadata.getSettings(); + return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings))); + } + + private static Optional resolvePipelinesFromIndexTemplates(IndexRequest indexRequest, Metadata metadata) { + if (indexRequest.index() == null) { + return Optional.empty(); + } + + // the index does not exist yet (and this is a valid request), so match index + // templates to look for pipelines in either a matching V2 template (which takes + // precedence), or if a V2 template does not match, any V1 templates + String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false); + if (v2Template != null) { + final Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template); + return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings))); + } + + String defaultPipeline = null; + String finalPipeline = null; + List templates = MetadataIndexTemplateService.findV1Templates(metadata, indexRequest.index(), null); + // order of templates are the highest order first + for (final IndexTemplateMetadata template : templates) { + final Settings settings = template.settings(); + + // note: the exists/get trickiness here is because we explicitly *don't* want the default value + // of the settings -- a non-null value would terminate the search too soon + if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); + // we can not break in case a lower-order template has a final pipeline that we need to collect + } + if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) { + finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings); + // we can not break in case a lower-order template has a default pipeline that we need to collect + } + if (defaultPipeline != null && finalPipeline != null) { + // we can break if we have already collected a default and final pipeline + break; + } + } + + // having exhausted the search, if nothing was found, then use the default noop pipeline names + defaultPipeline = Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME); + finalPipeline = Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME); + + return Optional.of(new Pipelines(defaultPipeline, finalPipeline)); + } + + public boolean needsProcessing(IndexRequest indexRequest) { + return hasPipeline(indexRequest); + } + /** * Checks whether an IndexRequest has at least one pipeline defined. *

@@ -920,4 +1443,14 @@ public static boolean hasPipeline(IndexRequest indexRequest) { return NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; } + + private record Pipelines(String defaultPipeline, String finalPipeline) { + + private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME); + + public Pipelines { + Objects.requireNonNull(defaultPipeline); + Objects.requireNonNull(finalPipeline); + } + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelinesBulkRequestPreprocessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelinesBulkRequestPreprocessor.java deleted file mode 100644 index 0898220bca0f9..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/PipelinesBulkRequestPreprocessor.java +++ /dev/null @@ -1,475 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.ingest; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.RefCountingRunnable; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.plugins.internal.DocumentParsingObserver; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.function.IntConsumer; -import java.util.function.Supplier; - -import static org.elasticsearch.core.Strings.format; -import static org.elasticsearch.ingest.IngestService.NOOP_PIPELINE_NAME; - -public class PipelinesBulkRequestPreprocessor extends AbstractBulkRequestPreprocessor implements ClusterStateApplier { - - private final IngestService ingestService; - - private volatile ClusterState state; - - public PipelinesBulkRequestPreprocessor(Supplier documentParsingObserver, IngestService ingestService) { - super(documentParsingObserver); - this.ingestService = ingestService; - } - - @Override - public void applyClusterState(final ClusterChangedEvent event) { - state = event.state(); - } - - private static final Logger logger = LogManager.getLogger(PipelinesBulkRequestPreprocessor.class); - - @Override - public boolean needsProcessing(DocWriteRequest docWriteRequest, IndexRequest indexRequest, Metadata metadata) { - resolvePipelinesAndUpdateIndexRequest(docWriteRequest, indexRequest, metadata); - return ingestService.hasPipeline(indexRequest); - } - - @Override - public boolean hasBeenProcessed(IndexRequest indexRequest) { - return indexRequest.isPipelineResolved(); - } - - @Override - public boolean shouldExecuteOnIngestNode() { - return true; - } - - @Override - protected void processIndexRequest(IndexRequest indexRequest, int slot, RefCountingRunnable refs, IntConsumer onDropped, - final BiConsumer onFailure) { - IngestService.PipelineIterator pipelines = getAndResetPipelines(indexRequest); - if (pipelines.hasNext() == false) { - return; - } - - // start the stopwatch and acquire a ref to indicate that we're working on this document - final long startTimeInNanos = System.nanoTime(); - ingestMetric.preIngest(); - final Releasable ref = refs.acquire(); - // the document listener gives us three-way logic: a document can fail processing (1), or it can - // be successfully processed. a successfully processed document can be kept (2) or dropped (3). - final ActionListener documentListener = ActionListener.runAfter(new ActionListener<>() { - @Override - public void onResponse(Boolean kept) { - assert kept != null; - if (kept == false) { - onDropped.accept(slot); - } - } - - @Override - public void onFailure(Exception e) { - ingestMetric.ingestFailed(); - onFailure.accept(slot, e); - } - }, () -> { - // regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate - // that we're finished with this document - final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - ingestMetric.postIngest(ingestTimeInNanos); - ref.close(); - }); - DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get(); - - IngestDocument ingestDocument = newIngestDocument(indexRequest); - - executePipelines(pipelines, indexRequest, ingestDocument, documentListener); - indexRequest.setPipelinesHaveRun(); - - assert indexRequest.index() != null; - documentParsingObserver.setIndexName(indexRequest.index()); - documentParsingObserver.close(); - } - - private void executePipelines( - final IngestService.PipelineIterator pipelines, - final IndexRequest indexRequest, - final IngestDocument ingestDocument, - final ActionListener listener - ) { - assert pipelines.hasNext(); - IngestService.PipelineSlot slot = pipelines.next(); - final String pipelineId = slot.id(); - final Pipeline pipeline = slot.pipeline(); - final boolean isFinalPipeline = slot.isFinal(); - - // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet - ingestDocument.resetReroute(); - - try { - if (pipeline == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } - indexRequest.addPipeline(pipelineId); - final String originalIndex = indexRequest.indices()[0]; - executePipeline(ingestDocument, pipeline, (keep, e) -> { - assert keep != null; - - if (e != null) { - logger.debug( - () -> format( - "failed to execute pipeline [%s] for document [%s/%s]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - e - ); - listener.onFailure(e); - return; // document failed! - } - - if (keep == false) { - listener.onResponse(false); - return; // document dropped! - } - - // update the index request so that we can execute additional pipelines (if any), etc - updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata()); - try { - // check for self-references if necessary, (i.e. if a script processor has run), and clear the bit - if (ingestDocument.doNoSelfReferencesCheck()) { - CollectionUtils.ensureNoSelfReferences(ingestDocument.getSource(), null); - ingestDocument.doNoSelfReferencesCheck(false); - } - } catch (IllegalArgumentException ex) { - // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. - // In that case, we catch and wrap the exception, so we can include more details - listener.onFailure( - new IllegalArgumentException( - format( - "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - ex - ) - ); - return; // document failed! - } - - IngestService.PipelineIterator newPipelines = pipelines; - final String newIndex = indexRequest.indices()[0]; - - if (Objects.equals(originalIndex, newIndex) == false) { - // final pipelines cannot change the target index (either directly or by way of a reroute) - if (isFinalPipeline) { - listener.onFailure( - new IllegalStateException( - format( - "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]", - pipelineId, - originalIndex, - newIndex, - indexRequest.id() - ) - ) - ); - return; // document failed! - } - - // add the index to the document's index history, and check for cycles in the visited indices - boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; - if (cycle) { - List indexCycle = new ArrayList<>(ingestDocument.getIndexHistory()); - indexCycle.add(newIndex); - listener.onFailure( - new IllegalStateException( - format( - "index cycle detected while processing pipeline [%s] for document [%s]: %s", - pipelineId, - indexRequest.id(), - indexCycle - ) - ) - ); - return; // document failed! - } - - // clear the current pipeline, then re-resolve the pipelines for this request - indexRequest.setPipeline(null); - indexRequest.isPipelineResolved(false); - resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); - newPipelines = getAndResetPipelines(indexRequest); - - // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute - // mechanism, do not invoke the default pipeline of the new target index - if (ingestDocument.isReroute() == false) { - newPipelines = newPipelines.withoutDefaultPipeline(); - } - } - - if (newPipelines.hasNext()) { - executePipelines(newPipelines, indexRequest, ingestDocument, listener); - } else { - // update the index request's source and (potentially) cache the timestamp for TSDB - updateIndexRequestSource(indexRequest, ingestDocument); - cacheRawTimestamp(indexRequest, ingestDocument); - listener.onResponse(true); // document succeeded! - } - }); - } catch (Exception e) { - logger.debug( - () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), - e - ); - listener.onFailure(e); // document failed! - } - } - - private static void executePipeline( - final IngestDocument ingestDocument, - final Pipeline pipeline, - final BiConsumer handler - ) { - // adapt our {@code BiConsumer} handler shape to the - // {@code BiConsumer} handler shape used internally - // by ingest pipelines and processors - ingestDocument.executePipeline(pipeline, (result, e) -> { - if (e != null) { - handler.accept(true, e); - } else { - handler.accept(result != null, null); - } - }); - } - - - /** - * Updates an index request based on the metadata of an ingest document. - */ - private static void updateIndexRequestMetadata(final IndexRequest request, final org.elasticsearch.script.Metadata metadata) { - // it's fine to set all metadata fields all the time, as ingest document holds their starting values - // before ingestion, which might also get modified during ingestion. - request.index(metadata.getIndex()); - request.id(metadata.getId()); - request.routing(metadata.getRouting()); - request.version(metadata.getVersion()); - if (metadata.getVersionType() != null) { - request.versionType(VersionType.fromString(metadata.getVersionType())); - } - Number number; - if ((number = metadata.getIfSeqNo()) != null) { - request.setIfSeqNo(number.longValue()); - } - if ((number = metadata.getIfPrimaryTerm()) != null) { - request.setIfPrimaryTerm(number.longValue()); - } - Map map; - if ((map = metadata.getDynamicTemplates()) != null) { - Map mergedDynamicTemplates = new HashMap<>(request.getDynamicTemplates()); - mergedDynamicTemplates.putAll(map); - request.setDynamicTemplates(mergedDynamicTemplates); - } - } - - /** - * Returns the pipelines of the request, and updates the request so that it no longer references - * any pipelines (both the default and final pipeline are set to the noop pipeline). - */ - private IngestService.PipelineIterator getAndResetPipelines(IndexRequest indexRequest) { - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - return ingestService.new PipelineIterator(pipelineId, finalPipelineId); - } - - - /** - * Resolves the potential pipelines (default and final) from the requests or templates associated to the index and then **mutates** - * the {@link org.elasticsearch.action.index.IndexRequest} passed object with the pipeline information. - *

- * Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node - * to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline - * comes as part of the request or resolved from this method. All this is made to later be able to reject the request in case the - * pipeline was set by a required pipeline **and** the request also has a pipeline request too. - * - * @param originalRequest Original write request received. - * @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update. - * @param metadata Cluster metadata from where the pipeline information could be derived. - */ - public static void resolvePipelinesAndUpdateIndexRequest( - final DocWriteRequest originalRequest, - final IndexRequest indexRequest, - final Metadata metadata - ) { - resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis()); - } - - static void resolvePipelinesAndUpdateIndexRequest( - final DocWriteRequest originalRequest, - final IndexRequest indexRequest, - final Metadata metadata, - final long epochMillis - ) { - if (indexRequest.isPipelineResolved()) { - return; - } - - String requestPipeline = indexRequest.getPipeline(); - - Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) // - .or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)) - .orElse(Pipelines.NO_PIPELINES_DEFINED); - - // The pipeline coming as part of the request always has priority over the resolved one from metadata or templates - if (requestPipeline != null) { - indexRequest.setPipeline(requestPipeline); - } else { - indexRequest.setPipeline(pipelines.defaultPipeline); - } - indexRequest.setFinalPipeline(pipelines.finalPipeline); - indexRequest.isPipelineResolved(true); - } - - - private static Optional resolvePipelinesFromMetadata( - DocWriteRequest originalRequest, - IndexRequest indexRequest, - Metadata metadata, - long epochMillis - ) { - IndexMetadata indexMetadata = null; - // start to look for default or final pipelines via settings found in the cluster metadata - if (originalRequest != null) { - indexMetadata = metadata.indices() - .get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis)); - } - // check the alias for the index request (this is how normal index requests are modeled) - if (indexMetadata == null && indexRequest.index() != null) { - IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index()); - if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) { - indexMetadata = metadata.index(indexAbstraction.getWriteIndex()); - } - } - // check the alias for the action request (this is how upserts are modeled) - if (indexMetadata == null && originalRequest != null && originalRequest.index() != null) { - IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(originalRequest.index()); - if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) { - indexMetadata = metadata.index(indexAbstraction.getWriteIndex()); - } - } - - if (indexMetadata == null) { - return Optional.empty(); - } - - final Settings settings = indexMetadata.getSettings(); - return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings))); - } - - private static Optional resolvePipelinesFromIndexTemplates(IndexRequest indexRequest, Metadata metadata) { - if (indexRequest.index() == null) { - return Optional.empty(); - } - - // the index does not exist yet (and this is a valid request), so match index - // templates to look for pipelines in either a matching V2 template (which takes - // precedence), or if a V2 template does not match, any V1 templates - String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false); - if (v2Template != null) { - final Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template); - return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings))); - } - - String defaultPipeline = null; - String finalPipeline = null; - List templates = MetadataIndexTemplateService.findV1Templates(metadata, indexRequest.index(), null); - // order of templates are the highest order first - for (final IndexTemplateMetadata template : templates) { - final Settings settings = template.settings(); - - // note: the exists/get trickiness here is because we explicitly *don't* want the default value - // of the settings -- a non-null value would terminate the search too soon - if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { - defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - // we can not break in case a lower-order template has a final pipeline that we need to collect - } - if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) { - finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings); - // we can not break in case a lower-order template has a default pipeline that we need to collect - } - if (defaultPipeline != null && finalPipeline != null) { - // we can break if we have already collected a default and final pipeline - break; - } - } - - // having exhausted the search, if nothing was found, then use the default noop pipeline names - defaultPipeline = Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME); - finalPipeline = Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME); - - return Optional.of(new Pipelines(defaultPipeline, finalPipeline)); - } - - - /** - * Grab the @timestamp and store it on the index request so that TSDB can use it without needing to parse - * the source for this document. - */ - private static void cacheRawTimestamp(final IndexRequest request, final IngestDocument document) { - if (request.getRawTimestamp() == null) { - // cache the @timestamp from the ingest document's source map if there is one - Object rawTimestamp = document.getSource().get(DataStream.TIMESTAMP_FIELD_NAME); - if (rawTimestamp != null) { - request.setRawTimestamp(rawTimestamp); - } - } - } - - record Pipelines(String defaultPipeline, String finalPipeline) { - - static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME); - - public Pipelines { - Objects.requireNonNull(defaultPipeline); - Objects.requireNonNull(finalPipeline); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index dd55b7736b333..b377c7226db07 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -149,7 +149,6 @@ import org.elasticsearch.inference.InferenceServiceRegistry; import org.elasticsearch.ingest.FieldInferenceBulkRequestPreprocessor; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelinesBulkRequestPreprocessor; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; @@ -553,10 +552,6 @@ protected Node( IngestService.createGrokThreadWatchdog(this.environment, threadPool), documentParsingObserverSupplier ); - final PipelinesBulkRequestPreprocessor pipelinesBulkRequestPreprocessor = new PipelinesBulkRequestPreprocessor( - documentParsingObserverSupplier, - ingestService - ); final FieldInferenceBulkRequestPreprocessor fieldInferenceBulkRequestPreprocessor = new FieldInferenceBulkRequestPreprocessor( documentParsingObserverSupplier, client @@ -1097,6 +1092,7 @@ protected Node( b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); + b.bind(FieldInferenceBulkRequestPreprocessor.class).toInstance(fieldInferenceBulkRequestPreprocessor); b.bind(IndexingPressure.class).toInstance(indexingLimits); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 7e9556be18f6d..cf4d5cab8b617 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -247,6 +247,7 @@ static class TestTransportBulkAction extends TransportBulkAction { transportService, clusterService, null, + null, client, actionFilters, indexNameExpressionResolver, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 652d6815eea46..1f5912f09d077 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -15,8 +15,10 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.downsample.DownsampleAction; +import org.elasticsearch.action.inference.InferenceAction; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; +import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; import org.elasticsearch.xpack.core.security.support.MetadataUtils; import java.util.Collection; @@ -189,7 +191,26 @@ public class InternalUsers { null, new RoleDescriptor.IndicesPrivileges[] { RoleDescriptor.IndicesPrivileges.builder().indices(".synonyms*").privileges("all").allowRestrictedIndices(true).build(), - RoleDescriptor.IndicesPrivileges.builder().indices("*").privileges(ReloadAnalyzerAction.NAME).build(), }, + RoleDescriptor.IndicesPrivileges.builder().indices("*").privileges(ReloadAnalyzerAction.NAME).build()}, + null, + null, + null, + MetadataUtils.DEFAULT_RESERVED_METADATA, + Map.of() + ) + ); + + public static final InternalUser SEMANTIC_TEXT_USER = new InternalUser( + UsernamesField.SEMANTIC_TEXT_USER_NAME, + new RoleDescriptor( + UsernamesField.SEMANTIC_TEXT_ROLE_NAME, + new String[] { "monitor" }, + new RoleDescriptor.IndicesPrivileges[] { + RoleDescriptor.IndicesPrivileges.builder() + .indices(".inference*", ".secrets-inference*") + .privileges("read") + .allowRestrictedIndices(true) + .build()}, null, null, null, @@ -211,7 +232,8 @@ public class InternalUsers { ASYNC_SEARCH_USER, STORAGE_USER, DATA_STREAM_LIFECYCLE_USER, - SYNONYMS_USER + SYNONYMS_USER, + SEMANTIC_TEXT_USER ).collect(Collectors.toUnmodifiableMap(InternalUser::principal, Function.identity())); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java index 821d222bb930c..57f6d50327496 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/UsernamesField.java @@ -34,6 +34,8 @@ public final class UsernamesField { public static final String STORAGE_ROLE_NAME = "_storage"; public static final String SYNONYMS_USER_NAME = "_synonyms"; public static final String SYNONYMS_ROLE_NAME = "_synonyms"; + public static final String SEMANTIC_TEXT_USER_NAME = "_semantic_text"; + public static final String SEMANTIC_TEXT_ROLE_NAME = "_semantic_text"; public static final String REMOTE_MONITORING_NAME = "remote_monitoring_user"; public static final String REMOTE_MONITORING_COLLECTION_ROLE = "remote_monitoring_collector"; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index d93ee6ad36c67..3b9b7b68237a0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -8,6 +8,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.ingest.FieldInferenceBulkRequestPreprocessor; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; @@ -159,6 +160,9 @@ public static void switchUserBasedOnActionOriginAndExecute( case SYNONYMS_ORIGIN: securityContext.executeAsInternalUser(InternalUsers.SYNONYMS_USER, version, consumer); break; + case FieldInferenceBulkRequestPreprocessor.SEMANTIC_TEXT_ORIGIN: + securityContext.executeAsInternalUser(InternalUsers.SEMANTIC_TEXT_USER, version, consumer); + break; default: assert false : "action.origin [" + actionOrigin + "] is unknown!"; throw new IllegalStateException("action.origin [" + actionOrigin + "] should always be a known value");