From e263910be5cb3afe1f460d450fda50867f5dac0e Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 25 Mar 2024 16:40:52 +0800 Subject: [PATCH 1/4] Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches with the index template Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../rest-api-spec/test/ingest/70_bulk.yml | 37 +++++++++++++++++++ .../org/opensearch/ingest/IngestService.java | 17 +++++---- .../opensearch/ingest/IngestServiceTests.java | 16 ++++++++ 4 files changed, 64 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 346913f025f4d..62e2540e32713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches with the index template ### Security diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index 2dfa17174b139..1793c821a14b1 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -41,6 +41,10 @@ teardown: ingest.delete_pipeline: id: "pipeline2" ignore: 404 + - do: + indices.delete_index_template: + name: test_index_template_for_bulk + ignore: 404 --- "Test bulk request without default pipeline": @@ -144,3 +148,36 @@ teardown: - is_false: _source.field1 - match: {_source.field2: value2} + +--- +"Test bulk upsert honors default_pipeline and final_pipeline when the auto-created index matches with the index template": + - skip: + version: " - 2.99.99" + reason: "fixed in 3.0.0" + - do: + indices.put_index_template: + name: test_for_bulk_upsert_index_template + body: + index_patterns: test_bulk_upsert_* + template: + settings: + number_of_shards: 1 + number_of_replicas: 0 + default_pipeline: pipeline1 + final_pipeline: pipeline2 + + - do: + bulk: + refresh: true + body: + - '{"update": {"_index": "test_bulk_upsert_index", "_id": "test_id3"}}' + - '{"upsert": {"f1": "v2", "f2": 47}, "doc": {"x": 1}}' + + - match: { errors: false } + - match: { items.0.update.result: created } + + - do: + get: + index: test_bulk_upsert_index + id: test_id3 + - match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 2d4439e86461b..e24493e5f2069 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -211,11 +211,18 @@ public static boolean resolvePipelines( finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings); indexRequest.setFinalPipeline(finalPipeline); } - } else if (indexRequest.index() != null) { + } else if (indexRequest.index() != null || originalRequest != null && originalRequest.index() != null) { // the index does not exist yet (and this is a valid request), so match index // templates to look for pipelines in either a matching V2 template (which takes // precedence), or if a V2 template does not match, any V1 templates - String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false); + String indexName; + if (indexRequest.index() != null) { + indexName = indexRequest.index(); + } else { + assert originalRequest != null; + indexName = originalRequest.index(); + } + String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false); if (v2Template != null) { Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template); if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { @@ -229,11 +236,7 @@ public static boolean resolvePipelines( indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : NOOP_PIPELINE_NAME); indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : NOOP_PIPELINE_NAME); } else { - List templates = MetadataIndexTemplateService.findV1Templates( - metadata, - indexRequest.index(), - null - ); + List templates = MetadataIndexTemplateService.findV1Templates(metadata, indexName, null); // order of templates are highest order first for (final IndexTemplateMetadata template : templates) { final Settings settings = template.settings(); diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 2edfe87387c92..2a7a8e4531c55 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -1582,6 +1582,14 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); + + // index name matches with ITMD within bulk upsert: + IndexRequest upsertRequest = new IndexRequest().source(emptyMap()); + UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1")); + result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); + assertThat(result, is(true)); + assertThat(upsertRequest.isPipelineResolved(), is(true)); + assertThat(upsertRequest.getPipeline(), equalTo("default-pipeline")); } public void testResolveFinalPipeline() { @@ -1619,6 +1627,14 @@ public void testResolveFinalPipeline() { assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("_none")); assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline")); + + // index name matches with ITMD within bulk upsert: + IndexRequest upsertRequest = new IndexRequest().source(emptyMap()); + UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1")); + result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); + assertThat(result, is(true)); + assertThat(upsertRequest.isPipelineResolved(), is(true)); + assertThat(upsertRequest.getFinalPipeline(), equalTo("final-pipeline")); } public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { From f74b0b8191c12106aae04a2807861aa8ae092f7c Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Mon, 25 Mar 2024 20:42:47 +0800 Subject: [PATCH 2/4] Modify changelog & comment Signed-off-by: Gao Binlong --- CHANGELOG.md | 2 +- .../test/java/org/opensearch/ingest/IngestServiceTests.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62e2540e32713..344f643a55e23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,7 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed -- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches with the index template +- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches the index template ([#12891](https://github.com/opensearch-project/OpenSearch/pull/12891)) ### Security diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 2a7a8e4531c55..01248dda3980b 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -1583,7 +1583,7 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); - // index name matches with ITMD within bulk upsert: + // index name matches with ITMD for bulk upsert IndexRequest upsertRequest = new IndexRequest().source(emptyMap()); UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1")); result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); @@ -1628,7 +1628,7 @@ public void testResolveFinalPipeline() { assertThat(indexRequest.getPipeline(), equalTo("_none")); assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline")); - // index name matches with ITMD within bulk upsert: + // index name matches with ITMD for bulk upsert: IndexRequest upsertRequest = new IndexRequest().source(emptyMap()); UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1")); result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); From dd79c54539dc50d68dd945a9daa6a8c7847e8583 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Tue, 16 Jul 2024 18:11:48 +0800 Subject: [PATCH 3/4] Use new approach Signed-off-by: Gao Binlong --- .../opensearch/action/update/UpdateRequest.java | 4 ++-- .../org/opensearch/ingest/IngestService.java | 17 +++++++---------- .../action/update/UpdateRequestTests.java | 2 ++ 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java index 9654bd1c114ba..6cb5e049e0f1e 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateRequest.java @@ -717,7 +717,7 @@ public IndexRequest doc() { private IndexRequest safeDoc() { if (doc == null) { - doc = new IndexRequest(); + doc = new IndexRequest(index); } return doc; } @@ -803,7 +803,7 @@ public IndexRequest upsertRequest() { private IndexRequest safeUpsertRequest() { if (upsertRequest == null) { - upsertRequest = new IndexRequest(); + upsertRequest = new IndexRequest(index); } return upsertRequest; } diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 5c0ecb2f6bc56..ab8e823199447 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -213,18 +213,11 @@ public static boolean resolvePipelines( finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings); indexRequest.setFinalPipeline(finalPipeline); } - } else if (indexRequest.index() != null || originalRequest != null && originalRequest.index() != null) { + } else if (indexRequest.index() != null) { // the index does not exist yet (and this is a valid request), so match index // templates to look for pipelines in either a matching V2 template (which takes // precedence), or if a V2 template does not match, any V1 templates - String indexName; - if (indexRequest.index() != null) { - indexName = indexRequest.index(); - } else { - assert originalRequest != null; - indexName = originalRequest.index(); - } - String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexName, false); + String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false); if (v2Template != null) { Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template); if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { @@ -238,7 +231,11 @@ public static boolean resolvePipelines( indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : NOOP_PIPELINE_NAME); indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : NOOP_PIPELINE_NAME); } else { - List templates = MetadataIndexTemplateService.findV1Templates(metadata, indexName, null); + List templates = MetadataIndexTemplateService.findV1Templates( + metadata, + indexRequest.index(), + null + ); // order of templates are highest order first for (final IndexTemplateMetadata template : templates) { final Settings settings = template.settings(); diff --git a/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java index b70fda0d86240..fa9618024ddcb 100644 --- a/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java @@ -247,6 +247,7 @@ public void testFromXContent() throws Exception { assertThat(params, notNullValue()); assertThat(params.size(), equalTo(1)); assertThat(params.get("param1").toString(), equalTo("value1")); + assertThat(request.upsertRequest().index(), equalTo("test")); Map upsertDoc = XContentHelper.convertToMap( request.upsertRequest().source(), true, @@ -304,6 +305,7 @@ public void testFromXContent() throws Exception { ) ); Map doc = request.doc().sourceAsMap(); + assertThat(request.doc().index(), equalTo("test")); assertThat(doc.get("field1").toString(), equalTo("value1")); assertThat(((Map) doc.get("compound")).get("field2").toString(), equalTo("value2")); } From 5f9e3047e6ab1d96d432c57461616a1a3ea16701 Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Tue, 16 Jul 2024 20:32:43 +0800 Subject: [PATCH 4/4] Fix test failure Signed-off-by: Gao Binlong --- .../action/update/UpdateRequestTests.java | 2 +- .../org/opensearch/ingest/IngestServiceTests.java | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java index fa9618024ddcb..e85dfa8cca556 100644 --- a/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java @@ -664,7 +664,7 @@ public void testToString() throws IOException { request.toString(), equalTo( "update {[test][1], doc_as_upsert[false], " - + "doc[index {[null][null], source[{\"body\":\"bar\"}]}], scripted_upsert[false], detect_noop[true]}" + + "doc[index {[test][null], source[{\"body\":\"bar\"}]}], scripted_upsert[false], detect_noop[true]}" ) ); } diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 2f90b71e9ae86..e61fbb6e1dbff 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -1607,12 +1607,11 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); // index name matches with ITMD for bulk upsert - IndexRequest upsertRequest = new IndexRequest().source(emptyMap()); - UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1")); + UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(emptyMap()).script(mockScript("1")); result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); assertThat(result, is(true)); - assertThat(upsertRequest.isPipelineResolved(), is(true)); - assertThat(upsertRequest.getPipeline(), equalTo("default-pipeline")); + assertThat(updateRequest.upsertRequest().isPipelineResolved(), is(true)); + assertThat(updateRequest.upsertRequest().getPipeline(), equalTo("default-pipeline")); } public void testResolveFinalPipeline() { @@ -1652,12 +1651,11 @@ public void testResolveFinalPipeline() { assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline")); // index name matches with ITMD for bulk upsert: - IndexRequest upsertRequest = new IndexRequest().source(emptyMap()); - UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(upsertRequest).script(mockScript("1")); + UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(emptyMap()).script(mockScript("1")); result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); assertThat(result, is(true)); - assertThat(upsertRequest.isPipelineResolved(), is(true)); - assertThat(upsertRequest.getFinalPipeline(), equalTo("final-pipeline")); + assertThat(updateRequest.upsertRequest().isPipelineResolved(), is(true)); + assertThat(updateRequest.upsertRequest().getFinalPipeline(), equalTo("final-pipeline")); } public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {