-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest ingest then create index #39607
Changes from all commits
12aad7e
44b4ef1
f64fdc3
efdaf53
18a30a2
80d71b4
c528171
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,4 +91,4 @@ teardown: | |
get: | ||
index: test | ||
id: 3 | ||
- match: { found: false } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is removed because with this change the |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
--- | ||
teardown: | ||
- do: | ||
ingest.delete_pipeline: | ||
id: "retarget" | ||
ignore: 404 | ||
|
||
- do: | ||
indices.delete: | ||
index: foo | ||
|
||
--- | ||
"Test Change Target Index with Explicit Pipeline": | ||
|
||
- do: | ||
ingest.put_pipeline: | ||
id: "retarget" | ||
body: > | ||
{ | ||
"processors": [ | ||
{ | ||
"set" : { | ||
"field" : "_index", | ||
"value" : "foo" | ||
} | ||
} | ||
] | ||
} | ||
- match: { acknowledged: true } | ||
|
||
# no indices | ||
- do: | ||
cat.indices: {} | ||
|
||
- match: | ||
$body: | | ||
/^$/ | ||
|
||
- do: | ||
index: | ||
index: test | ||
id: 1 | ||
pipeline: "retarget" | ||
body: { | ||
a: true | ||
} | ||
|
||
- do: | ||
get: | ||
index: foo | ||
id: 1 | ||
- match: { _source.a: true } | ||
|
||
# only the foo index | ||
- do: | ||
cat.indices: | ||
h: i | ||
|
||
- match: | ||
$body: | | ||
/^foo\n$/ | ||
|
||
--- | ||
"Test Change Target Index with Default Pipeline": | ||
|
||
- do: | ||
indices.put_template: | ||
name: index_template | ||
body: | ||
index_patterns: test | ||
settings: | ||
default_pipeline: "retarget" | ||
|
||
- do: | ||
ingest.put_pipeline: | ||
id: "retarget" | ||
body: > | ||
{ | ||
"processors": [ | ||
{ | ||
"set" : { | ||
"field" : "_index", | ||
"value" : "foo" | ||
} | ||
} | ||
] | ||
} | ||
- match: { acknowledged: true } | ||
|
||
# no indices | ||
- do: | ||
cat.indices: {} | ||
|
||
- match: | ||
$body: | | ||
/^$/ | ||
|
||
- do: | ||
index: | ||
index: test | ||
id: 1 | ||
body: { | ||
a: true | ||
} | ||
|
||
- do: | ||
get: | ||
index: foo | ||
id: 1 | ||
- match: { _source.a: true } | ||
|
||
# only the foo index | ||
- do: | ||
cat.indices: | ||
h: i | ||
|
||
- match: | ||
$body: | | ||
/^foo\n$/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,11 +47,14 @@ | |
import org.elasticsearch.cluster.metadata.AliasOrIndex; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; | ||
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.collect.ImmutableOpenMap; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.common.util.concurrent.AtomicArray; | ||
|
@@ -151,6 +154,72 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk | |
final long startTime = relativeTime(); | ||
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size()); | ||
|
||
boolean hasIndexRequestsWithPipelines = false; | ||
final MetaData metaData = clusterService.state().getMetaData(); | ||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices(); | ||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) { | ||
IndexRequest indexRequest = getIndexWriteRequest(actionRequest); | ||
if (indexRequest != null) { | ||
// get pipeline from request | ||
String pipeline = indexRequest.getPipeline(); | ||
if (pipeline == null) { | ||
// start to look for default pipeline via settings found in the index meta data | ||
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); | ||
if (indexMetaData == null && indexRequest.index() != null) { | ||
// if the write request if through an alias use the write index's meta data | ||
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); | ||
if (indexOrAlias != null && indexOrAlias.isAlias()) { | ||
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; | ||
indexMetaData = alias.getWriteIndex(); | ||
} | ||
} | ||
if (indexMetaData != null) { | ||
// Find the the default pipeline if one is defined from and existing index. | ||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); | ||
indexRequest.setPipeline(defaultPipeline); | ||
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} else if (indexRequest.index() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moving the ingest node execution before the create index is the primary change of this PR. However, to allow the prevent re-introducing #32758, we need to look up the default pipeline from the matched templates which is what this code block does. |
||
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline | ||
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); | ||
assert (templates != null); | ||
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; | ||
// order of templates are highest order first, break if we find a default_pipeline | ||
for (IndexTemplateMetaData template : templates) { | ||
final Settings settings = template.settings(); | ||
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { | ||
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); | ||
break; | ||
} | ||
} | ||
indexRequest.setPipeline(defaultPipeline); | ||
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} | ||
|
||
if (hasIndexRequestsWithPipelines) { | ||
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but | ||
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, | ||
// this path is never taken. | ||
try { | ||
if (clusterService.localNode().isIngestNode()) { | ||
processBulkIndexIngestRequest(task, bulkRequest, listener); | ||
} else { | ||
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); | ||
} | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
return; | ||
} | ||
|
||
if (needToCheck()) { | ||
// Attempt to create all the indices that we're going to need during the bulk before we start. | ||
// Step 1: collect all the indices in the request | ||
|
@@ -181,15 +250,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk | |
} | ||
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. | ||
if (autoCreateIndices.isEmpty()) { | ||
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); | ||
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); | ||
} else { | ||
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); | ||
for (String index : autoCreateIndices) { | ||
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() { | ||
@Override | ||
public void onResponse(CreateIndexResponse result) { | ||
if (counter.decrementAndGet() == 0) { | ||
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); | ||
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); | ||
} | ||
} | ||
|
||
|
@@ -205,7 +274,7 @@ public void onFailure(Exception e) { | |
} | ||
} | ||
if (counter.decrementAndGet() == 0) { | ||
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { | ||
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { | ||
inner.addSuppressed(e); | ||
listener.onFailure(inner); | ||
}), responses, indicesThatCannotBeCreated); | ||
|
@@ -215,56 +284,7 @@ public void onFailure(Exception e) { | |
} | ||
} | ||
} else { | ||
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); | ||
} | ||
} | ||
|
||
private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, | ||
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses, | ||
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) { | ||
boolean hasIndexRequestsWithPipelines = false; | ||
final MetaData metaData = clusterService.state().getMetaData(); | ||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices(); | ||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) { | ||
IndexRequest indexRequest = getIndexWriteRequest(actionRequest); | ||
if(indexRequest != null){ | ||
String pipeline = indexRequest.getPipeline(); | ||
if (pipeline == null) { | ||
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); | ||
if (indexMetaData == null && indexRequest.index() != null) { | ||
//check the alias | ||
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); | ||
if (indexOrAlias != null && indexOrAlias.isAlias()) { | ||
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; | ||
indexMetaData = alias.getWriteIndex(); | ||
} | ||
} | ||
if (indexMetaData == null) { | ||
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); | ||
} else { | ||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); | ||
indexRequest.setPipeline(defaultPipeline); | ||
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} | ||
if (hasIndexRequestsWithPipelines) { | ||
try { | ||
if (clusterService.localNode().isIngestNode()) { | ||
processBulkIndexIngestRequest(task, bulkRequest, listener); | ||
} else { | ||
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); | ||
} | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
} else { | ||
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); | ||
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these were removed because now that the ingest happens before index creation, if the index doesn't exist yet, and we only drop a document, the index never exists, and thus the response is different.