Skip to content

Commit

Permalink
Add the ability to require an ingest pipeline (#46847)
Browse files Browse the repository at this point in the history
This commit adds the ability to require an ingest pipeline on an
index. Today we can have a default pipeline, but that could be
overridden by a request pipeline parameter. This commit introduces a new
index setting index.required_pipeline that acts similarly to
index.default_pipeline, except that it can not be overridden by a
request pipeline parameter. Additionally, a default pipeline and a
request pipeline can not both be set. The required pipeline can be set
to _none to ensure that no pipeline ever runs for index requests on that
index.
  • Loading branch information
jasontedor authored Sep 19, 2019
1 parent 9f65af9 commit 19b710a
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 30 deletions.
9 changes: 8 additions & 1 deletion docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,20 @@ specific index module:
The length of time that a <<delete-versioning,deleted document's version number>> remains available for <<index-versioning,further versioned operations>>.
Defaults to `60s`.

`index.default_pipeline`::
`index.default_pipeline`::

The default <<ingest,ingest node>> pipeline for this index. Index requests will fail
if the default pipeline is set and the pipeline does not exist. The default may be
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
no ingest pipeline should be run.

`index.required_pipeline`::
The required <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the required pipeline is set and the pipeline does not exist.
The required pipeline can not be overridden with the `pipeline` parameter. A
default pipeline and a required pipeline can not both be set. The special
pipeline name `_none` indicates no ingest pipeline will run.

[float]
=== Settings in other index modules

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with required pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }
# required pipeline via index
- do:
indices.create:
index: test
body:
settings:
index:
required_pipeline: "my_pipeline"
aliases:
test_alias: {}

- do:
index:
index: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via alias
- do:
index:
index: test_alias
id: 2
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via upsert
- do:
update:
index: test
id: 3
body:
script:
source: "ctx._source.ran_script = true"
lang: "painless"
upsert: { "bytes_source_field":"1kb" }
- do:
get:
index: test
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via scripted upsert
- do:
update:
index: test
id: 4
body:
script:
source: "ctx._source.bytes_source_field = '1kb'"
lang: "painless"
upsert : {}
scripted_upsert: true
- do:
get:
index: test
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via doc_as_upsert
- do:
update:
index: test
id: 5
body:
doc: { "bytes_source_field":"1kb" }
doc_as_upsert: true
- do:
get:
index: test
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
bulk:
refresh: true
body: |
{"update":{"_id":"6","_index":"test"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7","_index":"test"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8","_index":"test"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
{"update":{"_id":"6_alias","_index":"test_alias"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7_alias","_index":"test_alias"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8_alias","_index":"test_alias"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
- do:
mget:
body:
docs:
- { _index: "test", _id: "6" }
- { _index: "test", _id: "7" }
- { _index: "test", _id: "8" }
- { _index: "test", _id: "6_alias" }
- { _index: "test", _id: "7_alias" }
- { _index: "test", _id: "8_alias" }
- match: { docs.0._index: "test" }
- match: { docs.0._id: "6" }
- match: { docs.0._source.bytes_source_field: "1kb" }
- match: { docs.0._source.bytes_target_field: 1024 }
- is_false: docs.0._source.ran_script
- match: { docs.1._index: "test" }
- match: { docs.1._id: "7" }
- match: { docs.1._source.bytes_source_field: "2kb" }
- match: { docs.1._source.bytes_target_field: 2048 }
- match: { docs.2._index: "test" }
- match: { docs.2._id: "8" }
- match: { docs.2._source.bytes_source_field: "3kb" }
- match: { docs.2._source.bytes_target_field: 3072 }
- match: { docs.2._source.ran_script: true }
- match: { docs.3._index: "test" }
- match: { docs.3._id: "6_alias" }
- match: { docs.3._source.bytes_source_field: "1kb" }
- match: { docs.3._source.bytes_target_field: 1024 }
- is_false: docs.3._source.ran_script
- match: { docs.4._index: "test" }
- match: { docs.4._id: "7_alias" }
- match: { docs.4._source.bytes_source_field: "2kb" }
- match: { docs.4._source.bytes_target_field: 2048 }
- match: { docs.5._index: "test" }
- match: { docs.5._id: "8_alias" }
- match: { docs.5._source.bytes_source_field: "3kb" }
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }

# bad request, request pipeline can not be specified
- do:
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
index:
index: test
id: 9
pipeline: "pipeline"
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -156,11 +158,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);

if (indexRequest != null) {
// get pipeline from request
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
// start to look for default pipeline via settings found in the index meta data
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
Expand All @@ -179,34 +184,86 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
}
if (indexMetaData != null) {
// Find the default pipeline if one is defined from and existing index.
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
// order of templates are highest order first, break if we find a default_pipeline
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
break;
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {

if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
}

}

if (hasIndexRequestsWithPipelines) {
Expand All @@ -217,6 +274,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
if (Assertions.ENABLED) {
final boolean allAreForwardedRequests = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.allMatch(IndexRequest::isPipelineResolved);
assert allAreForwardedRequests : bulkRequest;
}
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 19b710a

Please sign in to comment.