Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to require an ingest pipeline #46847

Merged
merged 6 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,20 @@ specific index module:
The length of time that a <<delete-versioning,deleted document's version number>> remains available for <<index-versioning,further versioned operations>>.
Defaults to `60s`.

`index.default_pipeline`::
`index.default_pipeline`::

The default <<ingest,ingest node>> pipeline for this index. Index requests will fail
if the default pipeline is set and the pipeline does not exist. The default may be
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
no ingest pipeline should be run.

`index.required_pipeline`::
The required <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the required pipeline is set and the pipeline does not exist.
The required pipeline can not be overridden with the `pipeline` parameter. A
default pipeline and a required pipeline can not both be set. The special
pipeline name `_none` indicates no ingest pipeline will run.

[float]
=== Settings in other index modules

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with required pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }
# required pipeline via index
- do:
indices.create:
index: test
body:
settings:
index:
required_pipeline: "my_pipeline"
aliases:
test_alias: {}

- do:
index:
index: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via alias
- do:
index:
index: test_alias
id: 2
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via upsert
- do:
update:
index: test
id: 3
body:
script:
source: "ctx._source.ran_script = true"
lang: "painless"
upsert: { "bytes_source_field":"1kb" }
- do:
get:
index: test
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via scripted upsert
- do:
update:
index: test
id: 4
body:
script:
source: "ctx._source.bytes_source_field = '1kb'"
lang: "painless"
upsert : {}
scripted_upsert: true
- do:
get:
index: test
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via doc_as_upsert
- do:
update:
index: test
id: 5
body:
doc: { "bytes_source_field":"1kb" }
doc_as_upsert: true
- do:
get:
index: test
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
bulk:
refresh: true
body: |
{"update":{"_id":"6","_index":"test"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7","_index":"test"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8","_index":"test"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
{"update":{"_id":"6_alias","_index":"test_alias"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7_alias","_index":"test_alias"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8_alias","_index":"test_alias"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}

- do:
mget:
body:
docs:
- { _index: "test", _id: "6" }
- { _index: "test", _id: "7" }
- { _index: "test", _id: "8" }
- { _index: "test", _id: "6_alias" }
- { _index: "test", _id: "7_alias" }
- { _index: "test", _id: "8_alias" }
- match: { docs.0._index: "test" }
- match: { docs.0._id: "6" }
- match: { docs.0._source.bytes_source_field: "1kb" }
- match: { docs.0._source.bytes_target_field: 1024 }
- is_false: docs.0._source.ran_script
- match: { docs.1._index: "test" }
- match: { docs.1._id: "7" }
- match: { docs.1._source.bytes_source_field: "2kb" }
- match: { docs.1._source.bytes_target_field: 2048 }
- match: { docs.2._index: "test" }
- match: { docs.2._id: "8" }
- match: { docs.2._source.bytes_source_field: "3kb" }
- match: { docs.2._source.bytes_target_field: 3072 }
- match: { docs.2._source.ran_script: true }
- match: { docs.3._index: "test" }
- match: { docs.3._id: "6_alias" }
- match: { docs.3._source.bytes_source_field: "1kb" }
- match: { docs.3._source.bytes_target_field: 1024 }
- is_false: docs.3._source.ran_script
- match: { docs.4._index: "test" }
- match: { docs.4._id: "7_alias" }
- match: { docs.4._source.bytes_source_field: "2kb" }
- match: { docs.4._source.bytes_target_field: 2048 }
- match: { docs.5._index: "test" }
- match: { docs.5._id: "8_alias" }
- match: { docs.5._source.bytes_source_field: "3kb" }
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }

# bad request, request pipeline can not be specified
- do:
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
index:
index: test
id: 9
pipeline: "pipeline"
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -155,11 +157,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);

if (indexRequest != null) {
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
// get pipeline from request
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
// start to look for default pipeline via settings found in the index meta data
if (indexRequest.isForwardedRequest() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
Expand All @@ -178,34 +183,88 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
}
if (indexMetaData != null) {
// Find the default pipeline if one is defined from and existing index.
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (indexRequest.index() != null) {
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
// order of templates are highest order first, break if we find a default_pipeline
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
break;
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {

if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
if (clusterService.localNode().isIngestNode() == false) {
/*
* We have to track whether or not the pipeline for this request has already been derived. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk
* request has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating
* node will have already derived the pipeline for this request. It is important that we are able to distinguish
* this situation as we can not double-derive the pipeline because we will not be able to distinguish the case of
* the pipeline having been set from a request pipeline parameter versus having been set by the derivation. We need
* to be able to distinguish these cases as we need to reject the request if the pipeline was set by a required
* pipeline and there is a request pipeline parameter too.
*/
indexRequest.isForwardedRequest(true);
jasontedor marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
}

}

if (hasIndexRequestsWithPipelines) {
Expand All @@ -216,6 +275,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
if (Assertions.ENABLED) {
final boolean allAreForwardedRequests = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.allMatch(IndexRequest::isForwardedRequest);
assert allAreForwardedRequests : bulkRequest;
}
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
Expand Down
Loading