From 006afa4344a2e63bdf8f401a78ef40660a2cf9fa Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 10 Sep 2024 14:01:50 +0100 Subject: [PATCH] Introduce test utils for ingest pipelines Replaces the somewhat-awkward API on `ClusterAdminClient` for manipulating ingest pipelines with some test-specific utilities that are easier to use. Relates #107984 in that this change massively reduces the noise that would otherwise result from removing the trappy timeouts in these APIs. --- .../IngestFailureStoreMetricsIT.java | 7 +- .../ingest/common/IngestRestartIT.java | 34 +-- .../ingest/common/ManyNestedPipelinesIT.java | 9 +- ...eteringParserDecoratorWithPipelinesIT.java | 7 +- .../geoip/EnterpriseGeoIpDownloaderIT.java | 34 +-- .../ingest/geoip/GeoIpDownloaderIT.java | 162 ++++++------- .../ingest/geoip/GeoIpDownloaderStatsIT.java | 30 +-- .../geoip/GeoIpProcessorNonIngestNodeIT.java | 67 +++--- .../elasticsearch/reindex/CancelTests.java | 15 +- .../action/bulk/BulkIntegrationIT.java | 21 +- .../elasticsearch/index/FinalPipelineIT.java | 64 ++--- .../ingest/IngestAsyncProcessorIT.java | 6 +- .../elasticsearch/ingest/IngestClientIT.java | 179 ++++++-------- .../ingest/IngestFileSettingsIT.java | 5 +- ...gestProcessorNotInstalledOnAllNodesIT.java | 53 +++-- .../ingest/IngestStatsNamesAndTypesIT.java | 6 +- .../SnapshotCustomPluginStateIT.java | 20 +- .../ingest/DeletePipelineRequestBuilder.java | 29 --- .../ingest/GetPipelineRequestBuilder.java | 23 -- .../ingest/PutPipelineRequestBuilder.java | 22 -- .../action/ingest/ReservedPipelineAction.java | 2 +- .../client/internal/ClusterAdminClient.java | 42 ---- .../elasticsearch/ingest/IngestService.java | 9 +- .../ingest/RestDeletePipelineAction.java | 3 +- .../action/ingest/RestGetPipelineAction.java | 7 +- .../action/ingest/RestPutPipelineAction.java | 3 +- .../ingest/PutPipelineRequestTests.java | 6 +- .../ingest/IngestServiceTests.java | 221 +++++------------- .../ingest/IngestPipelineTestUtils.java | 121 ++++++++++ .../elasticsearch/test/ESIntegTestCase.java | 52 +++++ .../test/ESSingleNodeTestCase.java | 32 +++ .../xpack/enrich/EnrichMultiNodeIT.java | 9 +- .../enrich/EnrichPolicyReindexPipeline.java | 4 +- .../xpack/enrich/BasicEnrichTests.java | 43 ++-- .../xpack/enrich/EnrichPolicyUpdateTests.java | 9 +- .../xpack/enrich/EnrichResiliencyTests.java | 46 +--- .../ml/integration/TestFeatureResetIT.java | 24 +- .../license/MachineLearningLicensingIT.java | 41 +--- .../TestFeatureLicenseTrackingIT.java | 25 +- 39 files changed, 606 insertions(+), 886 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineRequestBuilder.java delete mode 100644 server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java delete mode 100644 server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequestBuilder.java create mode 100644 test/framework/src/main/java/org/elasticsearch/ingest/IngestPipelineTestUtils.java diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java index 18ba5f4bc1213..66bb06ca4240a 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java @@ -19,14 +19,11 @@ import org.elasticsearch.action.bulk.FailureStoreMetrics; import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Template; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.core.Strings; import org.elasticsearch.index.mapper.DateFieldMapper; @@ -319,9 +316,7 @@ private void createReroutePipeline(String destination) { } private void createPipeline(String processor) { - String pipelineDefinition = Strings.format("{\"processors\": [{%s}]}", processor); - BytesReference bytes = new BytesArray(pipelineDefinition); - clusterAdmin().putPipeline(new PutPipelineRequest(pipeline, bytes, XContentType.JSON)).actionGet(); + putJsonPipeline(pipeline, Strings.format("{\"processors\": [{%s}]}", processor)); } private void indexDocs(String dataStream, int numDocs, String pipeline) { diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java index f1c592e6e8345..4a0a55dce9483 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -15,14 +15,11 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Requests; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Strings; @@ -79,7 +76,7 @@ public void testFailureInConditionalProcessor() { internalCluster().ensureAtLeastNumDataNodes(1); internalCluster().startMasterOnlyNode(); final String pipelineId = "foo"; - clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(Strings.format(""" + putJsonPipeline(pipelineId, Strings.format(""" { "processors": [ { @@ -99,7 +96,7 @@ public void testFailureInConditionalProcessor() { } } ] - }""", MockScriptEngine.NAME)), XContentType.JSON).get(); + }""", MockScriptEngine.NAME)); Exception e = expectThrows( Exception.class, @@ -126,22 +123,16 @@ public void testScriptDisabled() throws Exception { String pipelineIdWithScript = pipelineIdWithoutScript + "_script"; internalCluster().startNode(); - BytesReference pipelineWithScript = new BytesArray(Strings.format(""" + putJsonPipeline(pipelineIdWithScript, Strings.format(""" { "processors": [ { "script": { "lang": "%s", "source": "my_script" } } ] }""", MockScriptEngine.NAME)); - BytesReference pipelineWithoutScript = new BytesArray(""" + putJsonPipeline(pipelineIdWithoutScript, """ { "processors": [ { "set": { "field": "y", "value": 0 } } ] }"""); - Consumer checkPipelineExists = (id) -> assertThat( - clusterAdmin().prepareGetPipeline(id).get().pipelines().get(0).getId(), - equalTo(id) - ); - - clusterAdmin().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get(); - clusterAdmin().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get(); + Consumer checkPipelineExists = (id) -> assertThat(getPipelines(id).pipelines().get(0).getId(), equalTo(id)); checkPipelineExists.accept(pipelineIdWithScript); checkPipelineExists.accept(pipelineIdWithoutScript); @@ -197,14 +188,13 @@ public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exceptio putJsonStoredScript("1", Strings.format(""" {"script": {"lang": "%s", "source": "my_script"} } """, MockScriptEngine.NAME)); - BytesReference pipeline = new BytesArray(""" + putJsonPipeline("_id", """ { "processors" : [ {"set" : {"field": "y", "value": 0}}, {"script" : {"id": "1"}} ] }"""); - clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get(); prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -232,13 +222,12 @@ public void testWithDedicatedIngestNode() throws Exception { String node = internalCluster().startNode(); String ingestNode = internalCluster().startNode(onlyRole(DiscoveryNodeRole.INGEST_ROLE)); - BytesReference pipeline = new BytesArray(""" + putJsonPipeline("_id", """ { "processors" : [ {"set" : {"field": "y", "value": 0}} ] }"""); - clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get(); prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -264,7 +253,7 @@ public void testWithDedicatedIngestNode() throws Exception { public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception { internalCluster().startNode(); - final var pipeline = new BytesArray(""" + putJsonPipeline("test_pipeline", """ { "processors" : [ { @@ -275,8 +264,8 @@ public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception { } ] }"""); + final TimeValue timeout = TimeValue.timeValueSeconds(10); - client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout); client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings(""" { "index" : { @@ -357,16 +346,13 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception { // Create Bulk Request createIndex("index"); - BytesReference source = new BytesArray(""" + putJsonPipeline("_id", """ { "processors" : [ {"set" : {"field": "y", "value": 0}} ] }"""); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - int numRequests = scaledRandomIntBetween(32, 128); BulkRequest bulkRequest = new BulkRequest(); BulkResponse response; diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/ManyNestedPipelinesIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/ManyNestedPipelinesIT.java index c9f3f023b43ef..2c9ea27805a18 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/ManyNestedPipelinesIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/ManyNestedPipelinesIT.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.SimulateProcessorResult; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Strings; import org.elasticsearch.ingest.GraphStructureException; @@ -166,7 +165,7 @@ private void createChainedPipelines(String prefix, int count) { private void createChainedPipeline(String prefix, int number) { String pipelineId = prefix + "pipeline_" + number; String nextPipelineId = prefix + "pipeline_" + (number + 1); - String pipelineTemplate = """ + putJsonPipeline(pipelineId, Strings.format(""" { "processors": [ { @@ -176,9 +175,7 @@ private void createChainedPipeline(String prefix, int number) { } ] } - """; - String pipeline = Strings.format(pipelineTemplate, nextPipelineId); - clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get(); + """, nextPipelineId)); } private void createLastPipeline(String prefix, int number) { @@ -195,6 +192,6 @@ private void createLastPipeline(String prefix, int number) { ] } """; - clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get(); + putJsonPipeline(pipelineId, pipeline); } } diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java index 7f0910ea5cc4d..0b93609b3156e 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java @@ -10,9 +10,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.ingest.common.IngestCommonPlugin; @@ -21,7 +18,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.FilterXContentParserWrapper; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.util.Collection; @@ -44,7 +40,7 @@ public class XContentMeteringParserDecoratorWithPipelinesIT extends ESIntegTestC public void testDocumentIsReportedWithPipelines() throws Exception { hasWrappedParser = false; // pipeline adding fields, changing destination is not affecting reporting - final BytesReference pipelineBody = new BytesArray(""" + putJsonPipeline("pipeline", """ { "processors": [ { @@ -62,7 +58,6 @@ public void testDocumentIsReportedWithPipelines() throws Exception { ] } """); - clusterAdmin().putPipeline(new PutPipelineRequest("pipeline", pipelineBody, XContentType.JSON)).actionGet(); client().index( new IndexRequest(TEST_INDEX_NAME).setPipeline("pipeline") diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java index cc757c413713d..15e7299dc104f 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java @@ -19,10 +19,8 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; @@ -36,9 +34,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.ClassRule; import java.io.IOException; @@ -47,7 +43,6 @@ import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase { @@ -155,31 +150,24 @@ private void configureDatabase(String databaseType) throws Exception { } private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException { - final BytesReference bytes; - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - builder.startObject(); + putJsonPipeline(pipelineName, (builder, params) -> { + builder.field("description", "test"); + builder.startArray("processors"); { - builder.field("description", "test"); - builder.startArray("processors"); + builder.startObject(); { - builder.startObject(); + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", sourceField); - builder.field("target_field", targetField); - builder.field("database_file", databaseType + ".mmdb"); - } - builder.endObject(); + builder.field("field", sourceField); + builder.field("target_field", targetField); + builder.field("database_file", databaseType + ".mmdb"); } builder.endObject(); } - builder.endArray(); + builder.endObject(); } - builder.endObject(); - bytes = BytesReference.bytes(builder); - } - assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineName, bytes, XContentType.JSON)).actionGet()); + return builder.endArray(); + }); } private String ingestDocument(String indexName, String pipelineName, String sourceField) { diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java index d994bd70eb7a0..41d711be2dee9 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java @@ -530,91 +530,84 @@ private void putGeoIpPipeline(String pipelineId) throws IOException { * @throws IOException */ private void putGeoIpPipeline(String pipelineId, boolean downloadDatabaseOnPipelineCreation) throws IOException { - BytesReference bytes; - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - builder.startObject(); + putJsonPipeline(pipelineId, ((builder, params) -> { + builder.startArray("processors"); { - builder.startArray("processors"); + /* + * First we add a non-geo pipeline with a random field value. This is purely here so that each call to this method + * creates a pipeline that is unique. Creating the a pipeline twice with the same ID and exact same bytes + * results in a no-op, meaning that the pipeline won't actually be updated and won't actually trigger all of the + * things we expect it to. + */ + builder.startObject(); { - /* - * First we add a non-geo pipeline with a random field value. This is purely here so that each call to this method - * creates a pipeline that is unique. Creating the a pipeline twice with the same ID and exact same bytes - * results in a no-op, meaning that the pipeline won't actually be updated and won't actually trigger all of the - * things we expect it to. - */ - builder.startObject(); + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); { - builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); - { - builder.field("randomField", randomAlphaOfLength(20)); - } - builder.endObject(); + builder.field("randomField", randomAlphaOfLength(20)); } builder.endObject(); + } + builder.endObject(); - builder.startObject(); + builder.startObject(); + { + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-city"); - builder.field("database_file", "GeoLite2-City.mmdb"); - if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { - builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); - } + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); } - builder.endObject(); } builder.endObject(); - builder.startObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-country"); - builder.field("database_file", "GeoLite2-Country.mmdb"); - if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { - builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); - } + builder.field("field", "ip"); + builder.field("target_field", "ip-country"); + builder.field("database_file", "GeoLite2-Country.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); } - builder.endObject(); } builder.endObject(); - builder.startObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-asn"); - builder.field("database_file", "GeoLite2-ASN.mmdb"); - if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { - builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); - } + builder.field("field", "ip"); + builder.field("target_field", "ip-asn"); + builder.field("database_file", "GeoLite2-ASN.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); } - builder.endObject(); } builder.endObject(); - builder.startObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-city"); - builder.field("database_file", "MyCustomGeoLite2-City.mmdb"); - if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { - builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); - } + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "MyCustomGeoLite2-City.mmdb"); + if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) { + builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation); } - builder.endObject(); } builder.endObject(); } - builder.endArray(); + builder.endObject(); } - builder.endObject(); - bytes = BytesReference.bytes(builder); - } - assertAcked(clusterAdmin().preparePutPipeline(pipelineId, bytes, XContentType.JSON).get()); + return builder.endArray(); + })); } /** @@ -626,40 +619,33 @@ private void putNonGeoipPipeline(String pipelineId) throws IOException { * Adding the exact same pipeline twice is treated as a no-op. The random values that go into randomField make each pipeline * created by this method is unique to avoid this. */ - BytesReference bytes; - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - builder.startObject(); + putJsonPipeline(pipelineId, ((builder, params) -> { + builder.startArray("processors"); { - builder.startArray("processors"); + builder.startObject(); { - builder.startObject(); - { - builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); - builder.field("randomField", randomAlphaOfLength(20)); - builder.endObject(); - } + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); + builder.field("randomField", randomAlphaOfLength(20)); builder.endObject(); - builder.startObject(); - { - builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); - builder.field("randomField", randomAlphaOfLength(20)); - builder.endObject(); - } + } + builder.endObject(); + builder.startObject(); + { + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); + builder.field("randomField", randomAlphaOfLength(20)); builder.endObject(); - builder.startObject(); - { - builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); - builder.field("randomField", randomAlphaOfLength(20)); - builder.endObject(); - } + } + builder.endObject(); + builder.startObject(); + { + builder.startObject(NonGeoProcessorsPlugin.NON_GEO_PROCESSOR_TYPE); + builder.field("randomField", randomAlphaOfLength(20)); builder.endObject(); } - builder.endArray(); + builder.endObject(); } - builder.endObject(); - bytes = BytesReference.bytes(builder); - } - assertAcked(clusterAdmin().preparePutPipeline(pipelineId, bytes, XContentType.JSON).get()); + return builder.endArray(); + })); } private List getGeoIpTmpDirs() throws IOException { diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java index ec54317e144d1..51ad7cedba98a 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderStatsIT.java @@ -19,8 +19,6 @@ import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.After; import java.io.IOException; @@ -29,7 +27,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -98,30 +95,23 @@ public void testStats() throws Exception { } private void putPipeline() throws IOException { - BytesReference bytes; - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - builder.startObject(); + putJsonPipeline("_id", (builder, params) -> { + builder.startArray("processors"); { - builder.startArray("processors"); + builder.startObject(); { - builder.startObject(); + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-city"); - builder.field("database_file", "GeoLite2-City.mmdb"); - } - builder.endObject(); + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); } builder.endObject(); } - builder.endArray(); + builder.endObject(); } - builder.endObject(); - bytes = BytesReference.bytes(builder); - } - assertAcked(clusterAdmin().preparePutPipeline("_id", bytes, XContentType.JSON).get()); + return builder.endArray(); + }); } public static Map convertToMap(ToXContent part) throws IOException { diff --git a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java index f34f647a01e05..58fdc81b72ae6 100644 --- a/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java +++ b/modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java @@ -11,22 +11,16 @@ import org.apache.lucene.util.Constants; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.NodeRoles; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; import java.util.Arrays; import java.util.Map; import static org.elasticsearch.test.NodeRoles.nonIngestNode; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; public class GeoIpProcessorNonIngestNodeIT extends AbstractGeoIpIT { @@ -43,53 +37,46 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { */ public void testLazyLoading() throws IOException { assumeFalse("https://github.com/elastic/elasticsearch/issues/37342", Constants.WINDOWS); - final BytesReference bytes; - try (XContentBuilder builder = JsonXContent.contentBuilder()) { - builder.startObject(); + putJsonPipeline("geoip", (builder, params) -> { + builder.field("description", "test"); + builder.startArray("processors"); { - builder.field("description", "test"); - builder.startArray("processors"); + builder.startObject(); { - builder.startObject(); + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-city"); - builder.field("database_file", "GeoLite2-City.mmdb"); - } - builder.endObject(); + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); } builder.endObject(); - builder.startObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-country"); - builder.field("database_file", "GeoLite2-Country.mmdb"); - } - builder.endObject(); + builder.field("field", "ip"); + builder.field("target_field", "ip-country"); + builder.field("database_file", "GeoLite2-Country.mmdb"); } builder.endObject(); - builder.startObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("geoip"); { - builder.startObject("geoip"); - { - builder.field("field", "ip"); - builder.field("target_field", "ip-asn"); - builder.field("database_file", "GeoLite2-ASN.mmdb"); - } - builder.endObject(); + builder.field("field", "ip"); + builder.field("target_field", "ip-asn"); + builder.field("database_file", "GeoLite2-ASN.mmdb"); } builder.endObject(); } - builder.endArray(); + builder.endObject(); } - builder.endObject(); - bytes = BytesReference.bytes(builder); - } - assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest("geoip", bytes, XContentType.JSON)).actionGet()); + return builder.endArray(); + }); // the geo-IP databases should not be loaded on any nodes as they are all non-ingest nodes Arrays.stream(internalCluster().getNodeNames()).forEach(node -> assertDatabaseLoadStatus(node, false)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/CancelTests.java index a2911090ab931..4c914764cdb52 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/CancelTests.java @@ -15,9 +15,6 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.Operation.Origin; @@ -35,7 +32,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.xcontent.XContentType; import org.hamcrest.Matcher; import org.junit.Before; @@ -47,7 +43,6 @@ import java.util.stream.IntStream; import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; @@ -231,14 +226,13 @@ public void testReindexCancel() throws Exception { } public void testUpdateByQueryCancel() throws Exception { - BytesReference pipeline = new BytesArray(""" + putJsonPipeline("set-processed", """ { "description" : "sets processed to true", "processors" : [ { "test" : {} } ] }"""); - assertAcked(clusterAdmin().preparePutPipeline("set-processed", pipeline, XContentType.JSON).get()); testCancel( UpdateByQueryAction.INSTANCE, @@ -250,7 +244,7 @@ public void testUpdateByQueryCancel() throws Exception { equalTo("update-by-query [" + INDEX + "]") ); - assertAcked(clusterAdmin().deletePipeline(new DeletePipelineRequest("set-processed")).get()); + deletePipeline("set-processed"); } public void testDeleteByQueryCancel() throws Exception { @@ -279,14 +273,13 @@ public void testReindexCancelWithWorkers() throws Exception { } public void testUpdateByQueryCancelWithWorkers() throws Exception { - BytesReference pipeline = new BytesArray(""" + putJsonPipeline("set-processed", """ { "description" : "sets processed to true", "processors" : [ { "test" : {} } ] }"""); - assertAcked(clusterAdmin().preparePutPipeline("set-processed", pipeline, XContentType.JSON).get()); testCancel( UpdateByQueryAction.INSTANCE, @@ -298,7 +291,7 @@ public void testUpdateByQueryCancelWithWorkers() throws Exception { equalTo("update-by-query [" + INDEX + "]") ); - assertAcked(clusterAdmin().deletePipeline(new DeletePipelineRequest("set-processed")).get()); + deletePipeline("set-processed"); } public void testDeleteByQueryCancelWithWorkers() throws Exception { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index 38d5719287292..300ef1691a07d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -14,15 +14,12 @@ import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.ingest.IngestTestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import java.io.IOException; @@ -31,7 +28,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +35,6 @@ import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -134,17 +129,11 @@ public void testBulkWithGlobalDefaults() throws Exception { } } - private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException { - XContentBuilder pipeline = jsonBuilder().startObject() - .startArray("processors") - .startObject() - .startObject("test") - .endObject() - .endObject() - .endArray() - .endObject(); - - assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(pipeline), XContentType.JSON))); + private void createSamplePipeline(String pipelineId) throws IOException { + putJsonPipeline( + pipelineId, + (builder, params) -> builder.startArray("processors").startObject().startObject("test").endObject().endObject().endArray() + ); } /** This test ensures that index deletion makes indexing fail quickly, not wait on the index that has disappeared */ diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 216d5e25218e3..026bb00d69bbb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -12,12 +12,8 @@ import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; @@ -28,7 +24,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.XContentType; import org.junit.After; import java.io.IOException; @@ -56,10 +51,9 @@ protected Collection> nodePlugins() { public void cleanUpPipelines() { indicesAdmin().prepareDelete("*").get(); - final GetPipelineResponse response = clusterAdmin().prepareGetPipeline("default_pipeline", "final_pipeline", "request_pipeline") - .get(); + final GetPipelineResponse response = getPipelines("default_pipeline", "final_pipeline", "request_pipeline"); for (final PipelineConfiguration pipeline : response.pipelines()) { - clusterAdmin().deletePipeline(new DeletePipelineRequest(pipeline.getId())).actionGet(); + deletePipeline(pipeline.getId()); } } @@ -67,9 +61,8 @@ public void testFinalPipelineCantChangeDestination() { final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); - final BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"changing_dest": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); final IllegalStateException e = expectThrows( IllegalStateException.class, @@ -87,9 +80,8 @@ public void testFinalPipelineCantRerouteDestination() { final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); - final BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"reroute": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); final IllegalStateException e = expectThrows( IllegalStateException.class, @@ -110,13 +102,11 @@ public void testFinalPipelineOfOldDestinationIsNotInvoked() { .build(); createIndex("index", settings); - BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"changing_dest": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"final": {"exists":"no_such_field"}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); DocWriteResponse indexResponse = prepareIndex("index").setId("1") .setSource(Map.of("field", "value")) @@ -136,13 +126,11 @@ public void testFinalPipelineOfNewDestinationIsInvoked() { settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("target", settings); - BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"changing_dest": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"final": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); DocWriteResponse indexResponse = prepareIndex("index").setId("1") .setSource(Map.of("field", "value")) @@ -162,13 +150,11 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); createIndex("target", settings); - BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"changing_dest": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - BytesReference targetPipeline = new BytesArray(""" + putJsonPipeline("target_default_pipeline", """ {"processors": [{"final": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)).actionGet(); DocWriteResponse indexResponse = prepareIndex("index").setId("1") .setSource(Map.of("field", "value")) @@ -188,13 +174,11 @@ public void testDefaultPipelineOfRerouteDestinationIsInvoked() { settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); createIndex("target", settings); - BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"reroute": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - BytesReference targetPipeline = new BytesArray(""" + putJsonPipeline("target_default_pipeline", """ {"processors": [{"final": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)).actionGet(); DocWriteResponse indexResponse = prepareIndex("index").setId("1") .setSource(Map.of("field", "value")) @@ -214,13 +198,11 @@ public void testAvoidIndexingLoop() { settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); createIndex("target", settings); - BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"reroute": {"dest": "target"}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - BytesReference targetPipeline = new BytesArray(""" + putJsonPipeline("target_default_pipeline", """ {"processors": [{"reroute": {"dest": "index"}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)).actionGet(); IllegalStateException exception = expectThrows( IllegalStateException.class, @@ -245,12 +227,10 @@ public void testFinalPipeline() { } public void testRequestPipelineAndFinalPipeline() { - final BytesReference requestPipelineBody = new BytesArray(""" + putJsonPipeline("request_pipeline", """ {"processors": [{"request": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("request_pipeline", requestPipelineBody, XContentType.JSON)).actionGet(); - final BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"final": {"exists":"request"}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); final IndexRequestBuilder index = prepareIndex("index").setId("1"); @@ -270,12 +250,10 @@ public void testRequestPipelineAndFinalPipeline() { } public void testDefaultAndFinalPipeline() { - final BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"default": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - final BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"final": {"exists":"default"}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); final Settings settings = Settings.builder() .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") .put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline") @@ -297,12 +275,10 @@ public void testDefaultAndFinalPipeline() { } public void testDefaultAndFinalPipelineFromTemplates() { - final BytesReference defaultPipelineBody = new BytesArray(""" + putJsonPipeline("default_pipeline", """ {"processors": [{"default": {}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)).actionGet(); - final BytesReference finalPipelineBody = new BytesArray(""" + putJsonPipeline("final_pipeline", """ {"processors": [{"final": {"exists":"default"}}]}"""); - clusterAdmin().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); final int finalPipelineOrder; diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java index 2e515b07b59a0..828c02a2ba89c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java @@ -14,10 +14,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -56,8 +53,7 @@ protected Collection> getPlugins() { public void testAsyncProcessorImplementation() { // A pipeline with 2 processors: the test async processor and sync test processor. - BytesReference pipelineBody = new BytesArray("{\"processors\": [{\"test-async\": {}, \"test\": {}}]}"); - clusterAdmin().putPipeline(new PutPipelineRequest("_id", pipelineBody, XContentType.JSON)).actionGet(); + putJsonPipeline("_id", "{\"processors\": [{\"test-async\": {}, \"test\": {}}]}"); BulkRequest bulkRequest = new BulkRequest(); int numDocs = randomIntBetween(8, 256); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java index 9fd7aaabaf2f5..4b26240d81652 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java @@ -16,13 +16,12 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.internal.Requests; import org.elasticsearch.common.bytes.BytesReference; @@ -30,7 +29,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import java.util.Collection; @@ -38,6 +36,7 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.ingest.IngestPipelineTestUtils.putJsonPipelineRequest; import static org.elasticsearch.test.NodeRoles.nonIngestNode; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; @@ -63,19 +62,17 @@ protected Collection> nodePlugins() { } public void testSimulate() throws Exception { - BytesReference pipelineSource = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") + putJsonPipeline( + "_id", + (builder, params) -> builder.field("description", "my_pipeline") .startArray("processors") .startObject() .startObject("test") .endObject() .endObject() .endArray() - .endObject() ); - clusterAdmin().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get(); - GetPipelineResponse getResponse = clusterAdmin().prepareGetPipeline("_id").get(); + GetPipelineResponse getResponse = getPipelines("_id"); assertThat(getResponse.isFound(), is(true)); assertThat(getResponse.pipelines().size(), equalTo(1)); assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); @@ -118,26 +115,22 @@ public void testSimulate() throws Exception { assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); // cleanup - AcknowledgedResponse deletePipelineResponse = clusterAdmin().prepareDeletePipeline("_id").get(); - assertTrue(deletePipelineResponse.isAcknowledged()); + deletePipeline("_id"); } public void testBulkWithIngestFailures() throws Exception { createIndex("index"); - BytesReference source = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") + putJsonPipeline( + "_id", + (builder, params) -> builder.field("description", "my_pipeline") .startArray("processors") .startObject() .startObject("test") .endObject() .endObject() .endArray() - .endObject() ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); int numRequests = scaledRandomIntBetween(32, 128); BulkRequest bulkRequest = new BulkRequest(); @@ -169,26 +162,22 @@ public void testBulkWithIngestFailures() throws Exception { } // cleanup - AcknowledgedResponse deletePipelineResponse = clusterAdmin().prepareDeletePipeline("_id").get(); - assertTrue(deletePipelineResponse.isAcknowledged()); + deletePipeline("_id"); } public void testBulkWithUpsert() throws Exception { createIndex("index"); - BytesReference source = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") + putJsonPipeline( + "_id", + (builder, params) -> builder.field("description", "my_pipeline") .startArray("processors") .startObject() .startObject("test") .endObject() .endObject() .endArray() - .endObject() ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); BulkRequest bulkRequest = new BulkRequest(); IndexRequest indexRequest = new IndexRequest("index").id("1").setPipeline("_id"); @@ -211,21 +200,18 @@ public void testBulkWithUpsert() throws Exception { } public void test() throws Exception { - BytesReference source = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") + putJsonPipeline( + "_id", + (builder, params) -> builder.field("description", "my_pipeline") .startArray("processors") .startObject() .startObject("test") .endObject() .endObject() .endArray() - .endObject() ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - GetPipelineResponse getResponse = clusterAdmin().prepareGetPipeline("_id").get(); + GetPipelineResponse getResponse = getPipelines("_id"); assertThat(getResponse.isFound(), is(true)); assertThat(getResponse.pipelines().size(), equalTo(1)); assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id")); @@ -241,11 +227,9 @@ public void test() throws Exception { assertThat(doc.get("field"), equalTo("value2")); assertThat(doc.get("processed"), equalTo(true)); - DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id"); - AcknowledgedResponse response = clusterAdmin().deletePipeline(deletePipelineRequest).get(); - assertThat(response.isAcknowledged(), is(true)); + deletePipeline("_id"); - getResponse = clusterAdmin().prepareGetPipeline("_id").get(); + getResponse = getPipelines("_id"); assertThat(getResponse.isFound(), is(false)); assertThat(getResponse.pipelines().size(), equalTo(0)); } @@ -263,29 +247,29 @@ public void testPutWithPipelineFactoryError() throws Exception { .endArray() .endObject() ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id2", source, XContentType.JSON); - Exception e = expectThrows(ElasticsearchParseException.class, clusterAdmin().putPipeline(putPipelineRequest)); + PutPipelineRequest putPipelineRequest = putJsonPipelineRequest("_id2", source); + Exception e = expectThrows( + ElasticsearchParseException.class, + client().execute(PutPipelineTransportAction.TYPE, putPipelineRequest) + ); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); - GetPipelineResponse response = clusterAdmin().prepareGetPipeline("_id2").get(); + GetPipelineResponse response = getPipelines("_id2"); assertFalse(response.isFound()); } public void testWithDedicatedMaster() throws Exception { String masterOnlyNode = internalCluster().startMasterOnlyNode(); - BytesReference source = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") + putJsonPipeline( + "_id", + (builder, params) -> builder.field("description", "my_pipeline") .startArray("processors") .startObject() .startObject("test") .endObject() .endObject() .endArray() - .endObject() ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); BulkItemResponse item = client(masterOnlyNode).prepareBulk() .add(prepareIndex("test").setSource("field", "value2", "drop", true).setPipeline("_id")) @@ -296,56 +280,38 @@ public void testWithDedicatedMaster() throws Exception { } public void testPipelineOriginHeader() throws Exception { - { - XContentBuilder source = jsonBuilder().startObject(); + putJsonPipeline("1", (source, params) -> { + source.startArray("processors"); + source.startObject(); { - source.startArray("processors"); - source.startObject(); - { - source.startObject("pipeline"); - source.field("name", "2"); - source.endObject(); - } + source.startObject("pipeline"); + source.field("name", "2"); source.endObject(); - source.endArray(); } source.endObject(); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - } - { - XContentBuilder source = jsonBuilder().startObject(); + return source.endArray(); + }); + putJsonPipeline("2", (source, params) -> { + source.startArray("processors"); + source.startObject(); { - source.startArray("processors"); - source.startObject(); - { - source.startObject("pipeline"); - source.field("name", "3"); - source.endObject(); - } + source.startObject("pipeline"); + source.field("name", "3"); source.endObject(); - source.endArray(); } source.endObject(); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - } - { - XContentBuilder source = jsonBuilder().startObject(); + return source.endArray(); + }); + putJsonPipeline("3", (source, params) -> { + source.startArray("processors"); + source.startObject(); { - source.startArray("processors"); - source.startObject(); - { - source.startObject("fail"); - source.endObject(); - } + source.startObject("fail"); source.endObject(); - source.endArray(); } source.endObject(); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - } + return source.endArray(); + }); Exception e = expectThrows(Exception.class, () -> { IndexRequest indexRequest = new IndexRequest("test"); @@ -359,8 +325,7 @@ public void testPipelineOriginHeader() throws Exception { } public void testPipelineProcessorOnFailure() throws Exception { - { - XContentBuilder source = jsonBuilder().startObject(); + putJsonPipeline("1", (source, params) -> { { source.startArray("processors"); source.startObject(); @@ -382,43 +347,29 @@ public void testPipelineProcessorOnFailure() throws Exception { source.endObject(); source.endArray(); } - source.endObject(); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", BytesReference.bytes(source), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - } - { - XContentBuilder source = jsonBuilder().startObject(); + return source; + }); + putJsonPipeline("2", (source, params) -> { + source.startArray("processors"); + source.startObject(); { - source.startArray("processors"); - source.startObject(); - { - source.startObject("pipeline"); - source.field("name", "3"); - source.endObject(); - } + source.startObject("pipeline"); + source.field("name", "3"); source.endObject(); - source.endArray(); } source.endObject(); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("2", BytesReference.bytes(source), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - } - { - XContentBuilder source = jsonBuilder().startObject(); + return source.endArray(); + }); + putJsonPipeline("3", (source, params) -> { + source.startArray("processors"); + source.startObject(); { - source.startArray("processors"); - source.startObject(); - { - source.startObject("fail"); - source.endObject(); - } + source.startObject("fail"); source.endObject(); - source.endArray(); } source.endObject(); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("3", BytesReference.bytes(source), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).get(); - } + return source.endArray(); + }); prepareIndex("test").setId("1").setSource("{}", XContentType.JSON).setPipeline("1").get(); Map inserted = client().prepareGet("test", "1").get().getSourceAsMap(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestFileSettingsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestFileSettingsIT.java index 0fa1ef1208593..5ec3e18d124e3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestFileSettingsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestFileSettingsIT.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; import org.elasticsearch.plugins.Plugin; @@ -41,6 +40,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.common.bytes.BytesReference.bytes; +import static org.elasticsearch.ingest.IngestPipelineTestUtils.putJsonPipelineRequest; import static org.elasticsearch.xcontent.XContentType.JSON; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -252,7 +253,7 @@ private PutPipelineRequest sampleRestRequest(String id) throws Exception { var builder = XContentFactory.contentBuilder(JSON) ) { builder.map(parser.map()); - return new PutPipelineRequest(id, BytesReference.bytes(builder), JSON); + return putJsonPipelineRequest(id, bytes(builder)); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 5f036681f8492..08c3d690ef472 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -9,12 +9,13 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.node.NodeService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.XContentType; import java.io.IOException; import java.util.Arrays; @@ -24,7 +25,6 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) @@ -51,7 +51,7 @@ protected Collection> nodePlugins() { return installPlugin ? Arrays.asList(IngestTestPlugin.class) : Collections.emptyList(); } - public void testFailPipelineCreation() throws Exception { + public void testFailPipelineCreation() { installPlugin = true; String node1 = internalCluster().startNode(); installPlugin = false; @@ -59,12 +59,22 @@ public void testFailPipelineCreation() throws Exception { ensureStableCluster(2, node1); ensureStableCluster(2, node2); - try { - clusterAdmin().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get(); - fail("exception expected"); - } catch (ElasticsearchParseException e) { - assertThat(e.getMessage(), containsString("Processor type [test] is not installed on node")); - } + assertThat( + asInstanceOf( + ElasticsearchParseException.class, + ExceptionsHelper.unwrapCause( + safeAwaitFailure( + AcknowledgedResponse.class, + l -> client().execute( + PutPipelineTransportAction.TYPE, + IngestPipelineTestUtils.putJsonPipelineRequest("id", pipelineSource), + l + ) + ) + ) + ).getMessage(), + containsString("Processor type [test] is not installed on node") + ); } public void testFailPipelineCreationProcessorNotInstalledOnMasterNode() throws Exception { @@ -72,12 +82,22 @@ public void testFailPipelineCreationProcessorNotInstalledOnMasterNode() throws E installPlugin = true; internalCluster().startNode(); - try { - clusterAdmin().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get(); - fail("exception expected"); - } catch (ElasticsearchParseException e) { - assertThat(e.getMessage(), equalTo("No processor type exists with name [test]")); - } + assertThat( + asInstanceOf( + ElasticsearchParseException.class, + ExceptionsHelper.unwrapCause( + safeAwaitFailure( + AcknowledgedResponse.class, + l -> client().execute( + PutPipelineTransportAction.TYPE, + IngestPipelineTestUtils.putJsonPipelineRequest("id", pipelineSource), + l + ) + ) + ) + ).getMessage(), + equalTo("No processor type exists with name [test]") + ); } // If there is pipeline defined and a node joins that doesn't have the processor installed then @@ -86,8 +106,7 @@ public void testFailStartNode() throws Exception { installPlugin = true; String node1 = internalCluster().startNode(); - AcknowledgedResponse response = clusterAdmin().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get(); - assertThat(response.isAcknowledged(), is(true)); + putJsonPipeline("_id", pipelineSource); Pipeline pipeline = internalCluster().getInstance(NodeService.class, node1).getIngestService().getPipeline("_id"); assertThat(pipeline, notNullValue()); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java index 86e1d2e332f36..63a16eae6e1ec 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java @@ -14,10 +14,7 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptPlugin; @@ -92,8 +89,7 @@ public void testIngestStatsNamesAndTypes() throws IOException { ] } """, MockScriptEngine.NAME, MockScriptEngine.NAME); - BytesReference pipeline1Reference = new BytesArray(pipeline1); - clusterAdmin().putPipeline(new PutPipelineRequest("pipeline1", pipeline1Reference, XContentType.JSON)).actionGet(); + putJsonPipeline("pipeline1", pipeline1); // index a single document through the pipeline BulkRequest bulkRequest = new BulkRequest(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotCustomPluginStateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotCustomPluginStateIT.java index 8f2702099c102..a3f1f0038a03f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotCustomPluginStateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotCustomPluginStateIT.java @@ -18,15 +18,12 @@ import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.admin.cluster.storedscripts.TransportDeleteStoredScriptAction; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; -import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.ingest.IngestTestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.StoredScriptsIT; import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xcontent.XContentType; import java.util.Arrays; import java.util.Collection; @@ -36,7 +33,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateExists; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateMissing; -import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -84,18 +80,16 @@ public void testIncludeGlobalState() throws Exception { if (testPipeline) { logger.info("--> creating test pipeline"); - BytesReference pipelineSource = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") + putJsonPipeline( + "barbaz", + (builder, params) -> builder.field("description", "my_pipeline") .startArray("processors") .startObject() .startObject("test") .endObject() .endObject() .endArray() - .endObject() ); - assertAcked(clusterAdmin().preparePutPipeline("barbaz", pipelineSource, XContentType.JSON).get()); } if (testScript) { @@ -144,7 +138,7 @@ public void testIncludeGlobalState() throws Exception { if (testPipeline) { logger.info("--> delete test pipeline"); - assertAcked(clusterAdmin().deletePipeline(new DeletePipelineRequest("barbaz")).get()); + deletePipeline("barbaz"); } if (testScript) { @@ -184,7 +178,7 @@ public void testIncludeGlobalState() throws Exception { if (testPipeline) { logger.info("--> check that pipeline is restored"); - GetPipelineResponse getPipelineResponse = clusterAdmin().prepareGetPipeline("barbaz").get(); + GetPipelineResponse getPipelineResponse = getPipelines("barbaz"); assertTrue(getPipelineResponse.isFound()); } @@ -218,7 +212,7 @@ public void testIncludeGlobalState() throws Exception { cluster().wipeTemplates("test-template"); } if (testPipeline) { - assertAcked(clusterAdmin().deletePipeline(new DeletePipelineRequest("barbaz")).get()); + deletePipeline("barbaz"); } if (testScript) { @@ -245,7 +239,7 @@ public void testIncludeGlobalState() throws Exception { logger.info("--> check that global state wasn't restored but index was"); getIndexTemplatesResponse = indicesAdmin().prepareGetTemplates().get(); assertIndexTemplateMissing(getIndexTemplatesResponse, "test-template"); - assertFalse(clusterAdmin().prepareGetPipeline("barbaz").get().isFound()); + assertFalse(getPipelines("barbaz").isFound()); assertNull(safeExecute(GetStoredScriptAction.INSTANCE, new GetStoredScriptRequest(TEST_REQUEST_TIMEOUT, "foobar")).getSource()); assertDocCount("test-idx", 100L); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineRequestBuilder.java deleted file mode 100644 index ef08f64765f98..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineRequestBuilder.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.internal.ElasticsearchClient; - -public class DeletePipelineRequestBuilder extends ActionRequestBuilder { - - public DeletePipelineRequestBuilder(ElasticsearchClient client, String id) { - super(client, DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(id)); - } - - /** - * Sets the id of the pipeline to delete. - */ - public DeletePipelineRequestBuilder setId(String id) { - request.setId(id); - return this; - } - -} diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java deleted file mode 100644 index ca873c5aa3843..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineRequestBuilder.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; -import org.elasticsearch.client.internal.ElasticsearchClient; - -public class GetPipelineRequestBuilder extends MasterNodeReadOperationRequestBuilder< - GetPipelineRequest, - GetPipelineResponse, - GetPipelineRequestBuilder> { - - public GetPipelineRequestBuilder(ElasticsearchClient client, String[] ids) { - super(client, GetPipelineAction.INSTANCE, new GetPipelineRequest(ids)); - } - -} diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequestBuilder.java deleted file mode 100644 index 2fce285d83f06..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequestBuilder.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.internal.ElasticsearchClient; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.xcontent.XContentType; - -public class PutPipelineRequestBuilder extends ActionRequestBuilder { - - public PutPipelineRequestBuilder(ElasticsearchClient client, String id, BytesReference source, XContentType xContentType) { - super(client, PutPipelineTransportAction.TYPE, new PutPipelineRequest(id, source, xContentType)); - } -} diff --git a/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java b/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java index aca9bb81fb53f..ba0c06db968e9 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java @@ -100,7 +100,7 @@ public TransformState transform(Object source, TransformState prevState) throws toDelete.removeAll(entities); for (var pipelineToDelete : toDelete) { - var task = new IngestService.DeletePipelineClusterStateUpdateTask(pipelineToDelete); + var task = new IngestService.DeletePipelineClusterStateUpdateTask(null, new DeletePipelineRequest(pipelineToDelete)); state = wrapIngestTaskExecute(task, state); } diff --git a/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java b/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java index 1509e398fbffa..95dd1ccdf86de 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/ClusterAdminClient.java @@ -94,16 +94,6 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequestBuilder; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder; -import org.elasticsearch.action.ingest.DeletePipelineTransportAction; -import org.elasticsearch.action.ingest.GetPipelineAction; -import org.elasticsearch.action.ingest.GetPipelineRequest; -import org.elasticsearch.action.ingest.GetPipelineRequestBuilder; -import org.elasticsearch.action.ingest.GetPipelineResponse; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequestBuilder; -import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.ingest.SimulatePipelineAction; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder; @@ -370,38 +360,6 @@ public SnapshotsStatusRequestBuilder prepareSnapshotStatus(TimeValue masterNodeT return new SnapshotsStatusRequestBuilder(this, masterNodeTimeout); } - public void putPipeline(PutPipelineRequest request, ActionListener listener) { - execute(PutPipelineTransportAction.TYPE, request, listener); - } - - public ActionFuture putPipeline(PutPipelineRequest request) { - return execute(PutPipelineTransportAction.TYPE, request); - } - - public PutPipelineRequestBuilder preparePutPipeline(String id, BytesReference source, XContentType xContentType) { - return new PutPipelineRequestBuilder(this, id, source, xContentType); - } - - public void deletePipeline(DeletePipelineRequest request, ActionListener listener) { - execute(DeletePipelineTransportAction.TYPE, request, listener); - } - - public ActionFuture deletePipeline(DeletePipelineRequest request) { - return execute(DeletePipelineTransportAction.TYPE, request); - } - - public DeletePipelineRequestBuilder prepareDeletePipeline(String id) { - return new DeletePipelineRequestBuilder(this, id); - } - - public void getPipeline(GetPipelineRequest request, ActionListener listener) { - execute(GetPipelineAction.INSTANCE, request, listener); - } - - public GetPipelineRequestBuilder prepareGetPipeline(String... ids) { - return new GetPipelineRequestBuilder(this, ids); - } - public void simulatePipeline(SimulatePipelineRequest request, ActionListener listener) { execute(SimulatePipelineAction.INSTANCE, request, listener); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 20f97e1871483..c54bdbf71a38a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -323,18 +323,11 @@ public void delete(DeletePipelineRequest request, ActionListener listener, DeletePipelineRequest request) { + public DeletePipelineClusterStateUpdateTask(ActionListener listener, DeletePipelineRequest request) { super(listener); this.request = request; } - /** - * Used by the {@link org.elasticsearch.action.ingest.ReservedPipelineAction} - */ - public DeletePipelineClusterStateUpdateTask(String id) { - this(null, new DeletePipelineRequest(id)); - } - @Override public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection allIndexMetadata) { if (currentIngestMetadata == null) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java index da22a211bd58f..2659fed00879e 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestDeletePipelineAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest.action.ingest; import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineTransportAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -40,6 +41,6 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl DeletePipelineRequest request = new DeletePipelineRequest(restRequest.param("id")); request.masterNodeTimeout(getMasterNodeTimeout(restRequest)); request.ackTimeout(getAckTimeout(restRequest)); - return channel -> client.admin().cluster().deletePipeline(request, new RestToXContentListener<>(channel)); + return channel -> client.execute(DeletePipelineTransportAction.TYPE, request, new RestToXContentListener<>(channel)); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java index d6712b44f3e03..6f8d8ce926ae6 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestGetPipelineAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.rest.action.ingest; +import org.elasticsearch.action.ingest.GetPipelineAction; import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; import org.elasticsearch.client.internal.node.NodeClient; @@ -44,6 +45,10 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl Strings.splitStringByCommaToArray(restRequest.param("id")) ); request.masterNodeTimeout(getMasterNodeTimeout(restRequest)); - return channel -> client.admin().cluster().getPipeline(request, new RestToXContentListener<>(channel, GetPipelineResponse::status)); + return channel -> client.execute( + GetPipelineAction.INSTANCE, + request, + new RestToXContentListener<>(channel, GetPipelineResponse::status) + ); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 520855b8987cd..e58a6e62d689b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest.action.ingest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Tuple; @@ -58,6 +59,6 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1(), ifVersion); request.masterNodeTimeout(getMasterNodeTimeout(restRequest)); request.ackTimeout(getAckTimeout(restRequest)); - return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel)); + return channel -> client.execute(PutPipelineTransportAction.TYPE, request, new RestToXContentListener<>(channel)); } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java index 576b9fb7224d5..9e021e4ec1b91 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineRequestTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -19,12 +18,13 @@ import org.elasticsearch.xcontent.XContentType; import java.io.IOException; -import java.nio.charset.StandardCharsets; + +import static org.elasticsearch.ingest.IngestPipelineTestUtils.putJsonPipelineRequest; public class PutPipelineRequestTests extends ESTestCase { public void testSerializationWithXContent() throws IOException { - PutPipelineRequest request = new PutPipelineRequest("1", new BytesArray("{}".getBytes(StandardCharsets.UTF_8)), XContentType.JSON); + PutPipelineRequest request = putJsonPipelineRequest("1", "{}"); assertEquals(XContentType.JSON, request.getXContentType()); BytesStreamOutput output = new BytesStreamOutput(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 5c07c2344cf13..635be15ff8990 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -97,6 +97,7 @@ import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful; import static org.elasticsearch.core.Tuple.tuple; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; +import static org.elasticsearch.ingest.IngestPipelineTestUtils.putJsonPipelineRequest; import static org.elasticsearch.ingest.IngestService.NOOP_PIPELINE_NAME; import static org.elasticsearch.ingest.IngestService.hasPipeline; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -422,8 +423,8 @@ public void testDelete() { public void testValidateNoIngestInfo() throws Exception { IngestService ingestService = createWithProcessors(); - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" - {"processors": [{"set" : {"field": "_field", "value": "_value"}}]}"""), XContentType.JSON); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ + {"processors": [{"set" : {"field": "_field", "value": "_value"}}]}"""); var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); Exception e = expectThrows( @@ -511,7 +512,7 @@ public void testGetProcessorsInPipeline() throws Exception { assertThat(pipeline, nullValue()); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ { "processors": [ { @@ -528,7 +529,7 @@ public void testGetProcessorsInPipeline() throws Exception { } } ] - }"""), XContentType.JSON); + }"""); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -554,7 +555,7 @@ public void testGetPipelineWithProcessorType() throws Exception { ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; - PutPipelineRequest putRequest1 = new PutPipelineRequest("_id1", new BytesArray(""" + PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", """ { "processors": [ { @@ -571,10 +572,10 @@ public void testGetPipelineWithProcessorType() throws Exception { } } ] - }"""), XContentType.JSON); + }"""); clusterState = executePut(putRequest1, clusterState); - PutPipelineRequest putRequest2 = new PutPipelineRequest("_id2", new BytesArray(""" - {"processors": [{"set" : {"field": "_field", "value": "_value", "tag": "tag2"}}]}"""), XContentType.JSON); + PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", """ + {"processors": [{"set" : {"field": "_field", "value": "_value", "tag": "tag2"}}]}"""); clusterState = executePut(putRequest2, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -611,8 +612,8 @@ public String getType() { ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; - PutPipelineRequest putRequest1 = new PutPipelineRequest("_id1", new BytesArray(""" - {"processors": [{"set" : {"field": "_field", "value": "_value", "tag": "tag1"}}]}"""), XContentType.JSON); + PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", """ + {"processors": [{"set" : {"field": "_field", "value": "_value", "tag": "tag1"}}]}"""); clusterState = executePut(putRequest1, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -676,8 +677,8 @@ public void testGetProcessorsInPipelineComplexConditional() throws Exception { assertThat(pipeline, nullValue()); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray(""" - {"processors": [{"complexSet" : {"field": "_field", "value": "_value"}}]}"""), XContentType.JSON); + PutPipelineRequest putRequest = putJsonPipelineRequest(id, """ + {"processors": [{"complexSet" : {"field": "_field", "value": "_value"}}]}"""); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -699,8 +700,8 @@ public void testCrud() throws Exception { assertThat(pipeline, nullValue()); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray(""" - {"processors": [{"set" : {"field": "_field", "value": "_value"}}]}"""), XContentType.JSON); + PutPipelineRequest putRequest = putJsonPipelineRequest(id, """ + {"processors": [{"set" : {"field": "_field", "value": "_value"}}]}"""); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -727,7 +728,7 @@ public void testPut() { ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // add a new pipeline: - PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); + PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"processors\": []}"); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -738,8 +739,8 @@ public void testPut() { assertThat(pipeline.getProcessors().size(), equalTo(0)); // overwrite existing pipeline: - putRequest = new PutPipelineRequest(id, new BytesArray(""" - {"processors": [], "description": "_description"}"""), XContentType.JSON); + putRequest = putJsonPipelineRequest(id, """ + {"processors": [], "description": "_description"}"""); previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -757,11 +758,7 @@ public void testPutWithErrorResponse() throws IllegalAccessException { assertThat(pipeline, nullValue()); ClusterState previousClusterState = ClusterState.builder(new ClusterName("_name")).build(); - PutPipelineRequest putRequest = new PutPipelineRequest( - id, - new BytesArray("{\"description\": \"empty processors\"}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"description\": \"empty processors\"}"); ClusterState clusterState = executePut(putRequest, previousClusterState); MockLog.assertThatLogger( () -> ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)), @@ -962,7 +959,7 @@ public void testGetPipelines() { public void testValidateProcessorTypeOnAllNodes() throws Exception { IngestService ingestService = createWithProcessors(); - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ { "processors": [ { @@ -979,7 +976,7 @@ public void testValidateProcessorTypeOnAllNodes() throws Exception { } } ] - }"""), XContentType.JSON); + }"""); var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); DiscoveryNode node1 = DiscoveryNodeUtils.create("_node_id1", buildNewFakeTransportAddress(), Map.of(), Set.of()); @@ -1006,7 +1003,7 @@ public void testValidateConfigurationExceptions() { // ordinary validation issues happen at processor construction time throw newConfigurationException("fail_validation", tag, "no_property_name", "validation failure reason"); })); - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ { "processors": [ { @@ -1014,7 +1011,7 @@ public void testValidateConfigurationExceptions() { } } ] - }"""), XContentType.JSON); + }"""); var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); // other validation actually consults this map, but this validation does not. however, it must not be empty. @@ -1040,7 +1037,7 @@ public void extraValidation() throws Exception { } }; })); - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ { "processors": [ { @@ -1048,7 +1045,7 @@ public void extraValidation() throws Exception { } } ] - }"""), XContentType.JSON); + }"""); var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2(); // other validation actually consults this map, but this validation does not. however, it must not be empty. @@ -1080,11 +1077,7 @@ public String getType() { ); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty String id = "_id"; - PutPipelineRequest putRequest = new PutPipelineRequest( - id, - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest(id, "{\"processors\": [{\"mock\" : {}}]}"); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -1129,11 +1122,7 @@ public void testExecuteBulkPipelineDoesNotExist() { Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()) ); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1201,11 +1190,7 @@ public XContentParser decorate(XContentParser xContentParser) { documentParsingProvider ); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1239,11 +1224,7 @@ public void testExecuteSuccess() { IngestService ingestService = createWithProcessors( Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()) ); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1282,11 +1263,7 @@ public void testDynamicTemplates() throws Exception { ) ) ); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"set\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"set\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1314,8 +1291,8 @@ public void testDynamicTemplates() throws Exception { public void testExecuteEmptyPipeline() throws Exception { IngestService ingestService = createWithProcessors(Map.of()); - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" - {"processors": [], "description": "_description"}"""), XContentType.JSON); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ + {"processors": [], "description": "_description"}"""); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1345,11 +1322,7 @@ public void testExecuteEmptyPipeline() throws Exception { public void testExecutePropagateAllMetadataUpdates() throws Exception { final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor)); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1423,17 +1396,9 @@ public void testExecuteFailure() throws Exception { (factories, tag, description, config) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail()) ) ); - PutPipelineRequest putRequest1 = new PutPipelineRequest( - "_id1", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); // given that set -> fail() above, it's a failure if a document executes against this pipeline - PutPipelineRequest putRequest2 = new PutPipelineRequest( - "_id2", - new BytesArray("{\"processors\": [{\"set\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"set\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest1, clusterState); @@ -1490,11 +1455,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { List.of(new CompoundProcessor(onFailureProcessor)) ); IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> compoundProcessor)); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1536,11 +1497,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { List.of(new CompoundProcessor(false, processors, onFailureProcessors)) ); IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> compoundProcessor)); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1608,11 +1565,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { return null; }).when(processor).execute(any(), any()); IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor)); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1647,17 +1600,9 @@ public void testExecuteFailureRedirection() throws Exception { (factories, tag, description, config) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail()) ) ); - PutPipelineRequest putRequest1 = new PutPipelineRequest( - "_id1", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); // given that set -> fail() above, it's a failure if a document executes against this pipeline - PutPipelineRequest putRequest2 = new PutPipelineRequest( - "_id2", - new BytesArray("{\"processors\": [{\"set\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"set\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest1, clusterState); @@ -1707,11 +1652,7 @@ public void testExecuteFailureRedirectionWithNestedOnFailure() throws Exception List.of(new CompoundProcessor(false, processors, onFailureProcessors)) ); IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> compoundProcessor)); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1783,11 +1724,7 @@ public void testBulkRequestExecutionWithRedirectedFailures() throws Exception { return null; }).when(processor).execute(any(), any()); IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor)); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1849,8 +1786,8 @@ public void testBulkRequestExecution() throws Exception { map.put("mock", (factories, tag, description, config) -> processor); IngestService ingestService = createWithProcessors(map); - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(""" - {"processors": [{"mock": {}}], "description": "_description"}"""), XContentType.JSON); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """ + {"processors": [{"mock": {}}], "description": "_description"}"""); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -1929,22 +1866,10 @@ public String execute() { } // put some pipelines, and now there are pipeline and processor stats, too - PutPipelineRequest putRequest1 = new PutPipelineRequest( - "_id1", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); // n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below - PutPipelineRequest putRequest2 = new PutPipelineRequest( - "_id2", - new BytesArray("{\"processors\": [{\"pipeline\" : {}}]}"), - XContentType.JSON - ); - PutPipelineRequest putRequest3 = new PutPipelineRequest( - "_id3", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"pipeline\" : {}}]}"); + PutPipelineRequest putRequest3 = putJsonPipelineRequest("_id3", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest1, clusterState); @@ -2035,16 +1960,12 @@ public void testStats() throws Exception { assertThat(initialStats.pipelineStats().size(), equalTo(0)); assertStats(initialStats.totalStats(), 0, 0, 0); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id1", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - putRequest = new PutPipelineRequest("_id2", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + putRequest = putJsonPipelineRequest("_id2", "{\"processors\": [{\"mock\" : {}}]}"); previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -2109,11 +2030,7 @@ public void testStats() throws Exception { assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); // update cluster state and ensure that new stats are added to old stats - putRequest = new PutPipelineRequest( - "_id1", - new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), - XContentType.JSON - ); + putRequest = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"); previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -2146,8 +2063,8 @@ public void testStats() throws Exception { assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0); // test a failure, and that the processor stats are added from the old stats - putRequest = new PutPipelineRequest("_id1", new BytesArray(""" - {"processors": [{"failure-mock" : { "on_failure": [{"mock" : {}}]}}, {"mock" : {}}]}"""), XContentType.JSON); + putRequest = putJsonPipelineRequest("_id1", """ + {"processors": [{"failure-mock" : { "on_failure": [{"mock" : {}}]}}, {"mock" : {}}]}"""); previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -2177,7 +2094,7 @@ public void testStats() throws Exception { assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0); // test with drop processor - putRequest = new PutPipelineRequest("_id3", new BytesArray("{\"processors\": [{\"drop\" : {}}]}"), XContentType.JSON); + putRequest = putJsonPipelineRequest("_id3", "{\"processors\": [{\"drop\" : {}}]}"); previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -2255,11 +2172,7 @@ public String getDescription() { } }); IngestService ingestService = createWithProcessors(factories); - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -2330,11 +2243,7 @@ public Map getProcessors(Processor.Parameters paramet ingestService.addIngestClusterStateListener(ingestClusterStateListener); // Create pipeline and apply the resulting cluster state, which should update the counter in the right order: - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"test\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"test\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest, clusterState); @@ -2353,11 +2262,7 @@ public void testCBORParsing() throws Exception { ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; - PutPipelineRequest putRequest = new PutPipelineRequest( - "_id", - new BytesArray("{\"processors\": [{\"foo\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"foo\" : {}}]}"); clusterState = executePut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); assertThat(ingestService.getPipeline("_id"), notNullValue()); @@ -2403,16 +2308,8 @@ public void testSetsRawTimestamp() { ) ); - PutPipelineRequest putRequest1 = new PutPipelineRequest( - "_id1", - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - XContentType.JSON - ); - PutPipelineRequest putRequest2 = new PutPipelineRequest( - "_id2", - new BytesArray("{\"processors\": [{\"set\" : {}}]}"), - XContentType.JSON - ); + PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); + PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"set\" : {}}]}"); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState previousClusterState = clusterState; clusterState = executePut(putRequest1, clusterState); @@ -2724,7 +2621,7 @@ public long getExecutionCount() { } }; - var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON); + var request = putJsonPipelineRequest(pipelineId, pipelineString); ingestService.putPipeline(request, listener, consumer); latch.await(); diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/IngestPipelineTestUtils.java b/test/framework/src/main/java/org/elasticsearch/ingest/IngestPipelineTestUtils.java new file mode 100644 index 0000000000000..1acd5b7d637cf --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/ingest/IngestPipelineTestUtils.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineTransportAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.ElasticsearchClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; + +import static org.elasticsearch.test.ESTestCase.safeGet; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; + +/** + * Utils for creating/retrieving/deleting ingest pipelines in a test cluster. + */ +public class IngestPipelineTestUtils { + private static final Logger logger = LogManager.getLogger(IngestPipelineTestUtils.class); + + private IngestPipelineTestUtils() { /* no instances */ } + + /** + * @param id The pipeline id. + * @param source The body of the {@link PutPipelineRequest} as a JSON-formatted {@link BytesReference}. + * @return a new {@link PutPipelineRequest} with the given {@code id} and body. + */ + public static PutPipelineRequest putJsonPipelineRequest(String id, BytesReference source) { + return new PutPipelineRequest(id, source, XContentType.JSON); + } + + /** + * @param id The pipeline id. + * @param jsonString The body of the {@link PutPipelineRequest} as a JSON-formatted {@link String}. + * @return a new {@link PutPipelineRequest} with the given {@code id} and body. + */ + public static PutPipelineRequest putJsonPipelineRequest(String id, String jsonString) { + return putJsonPipelineRequest(id, new BytesArray(jsonString)); + } + + /** + * Create an ingest pipeline with the given ID and body, using the given {@link ElasticsearchClient}. + * + * @param client The client to use to execute the {@link PutPipelineTransportAction}. + * @param id The pipeline id. + * @param source The body of the {@link PutPipelineRequest} as a JSON-formatted {@link BytesReference}. + */ + public static void putJsonPipeline(ElasticsearchClient client, String id, BytesReference source) { + assertAcked(safeGet(client.execute(PutPipelineTransportAction.TYPE, putJsonPipelineRequest(id, source)))); + } + + /** + * Create an ingest pipeline with the given ID and body, using the given {@link ElasticsearchClient}. + * + * @param client The client to use to execute the {@link PutPipelineTransportAction}. + * @param id The pipeline id. + * @param jsonString The body of the {@link PutPipelineRequest} as a JSON-formatted {@link String}. + */ + public static void putJsonPipeline(ElasticsearchClient client, String id, String jsonString) { + putJsonPipeline(client, id, new BytesArray(jsonString)); + } + + /** + * Create an ingest pipeline with the given ID and body, using the given {@link ElasticsearchClient}. + * + * @param client The client to use to execute the {@link PutPipelineTransportAction}. + * @param id The pipeline id. + * @param toXContent The body of the {@link PutPipelineRequest} as a {@link ToXContentFragment}. + */ + public static void putJsonPipeline(ElasticsearchClient client, String id, ToXContentFragment toXContent) throws IOException { + try (var xContentBuilder = jsonBuilder()) { + xContentBuilder.startObject(); + toXContent.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + putJsonPipeline(client, id, BytesReference.bytes(xContentBuilder)); + } + } + + /** + * Attempt to delete the ingest pipeline with the given {@code id}, using the given {@link ElasticsearchClient}, and logging (but + * otherwise ignoring) the result. + */ + public static void deletePipelinesIgnoringExceptions(ElasticsearchClient client, Iterable ids) { + for (final var id : ids) { + ESTestCase.safeAwait( + l -> client.execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(id), new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + logger.info("delete pipeline [{}] success [acknowledged={}]", id, acknowledgedResponse.isAcknowledged()); + l.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + logger.warn(Strings.format("delete pipeline [%s] failure", id), e); + l.onResponse(null); + } + }) + ); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 71628967bf266..b41819c83ffcc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -48,6 +48,12 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineTransportAction; +import org.elasticsearch.action.ingest.GetPipelineAction; +import org.elasticsearch.action.ingest.GetPipelineRequest; +import org.elasticsearch.action.ingest.GetPipelineResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -126,6 +132,7 @@ import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.store.IndicesStore; +import org.elasticsearch.ingest.IngestPipelineTestUtils; import org.elasticsearch.monitor.jvm.HotThreads; import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -150,6 +157,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; @@ -2613,4 +2621,48 @@ private static long fullyAllocate(CircuitBreaker circuitBreaker) { } return totalAllocated; } + + /** + * Create an ingest pipeline with the given ID and body, using the default {@link ESIntegTestCase#client()}. + * + * @param id The pipeline id. + * @param source The body of the {@link PutPipelineRequest} as a JSON-formatted {@link BytesReference}. + */ + protected static void putJsonPipeline(String id, BytesReference source) { + IngestPipelineTestUtils.putJsonPipeline(client(), id, source); + } + + /** + * Create an ingest pipeline with the given ID and body, using the default {@link ESIntegTestCase#client()}. + * + * @param id The pipeline id. + * @param jsonString The body of the {@link PutPipelineRequest} as a JSON-formatted {@link String}. + */ + protected static void putJsonPipeline(String id, String jsonString) { + IngestPipelineTestUtils.putJsonPipeline(client(), id, jsonString); + } + + /** + * Create an ingest pipeline with the given ID and body, using the default {@link ESIntegTestCase#client()}. + * + * @param id The pipeline id. + * @param toXContent The body of the {@link PutPipelineRequest} as a {@link ToXContentFragment}. + */ + protected static void putJsonPipeline(String id, ToXContentFragment toXContent) throws IOException { + IngestPipelineTestUtils.putJsonPipeline(client(), id, toXContent); + } + + /** + * @return the result of running the {@link GetPipelineAction} on the given IDs, using the default {@link ESIntegTestCase#client()}. + */ + protected static GetPipelineResponse getPipelines(String... ids) { + return safeGet(client().execute(GetPipelineAction.INSTANCE, new GetPipelineRequest(ids))); + } + + /** + * Delete the ingest pipeline with the given {@code id}, the default {@link ESIntegTestCase#client()}. + */ + protected static void deletePipeline(String id) { + assertAcked(safeGet(client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(id)))); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 6eb8e0474225a..fe9fe6395271b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -20,6 +20,9 @@ import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteComposableIndexTemplateAction; import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineTransportAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.AdminClient; @@ -45,6 +48,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; +import org.elasticsearch.ingest.IngestPipelineTestUtils; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; @@ -56,6 +60,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -493,4 +498,31 @@ protected void awaitIndexShardCloseAsyncTasks() { getInstanceFromNode(IndicesClusterStateService.class).onClusterStateShardsClosed(latch::countDown); safeAwait(latch); } + + /** + * Create an ingest pipeline with the given ID and body, using the default {@link ESSingleNodeTestCase#client()}. + * + * @param id The pipeline id. + * @param jsonString The body of the {@link PutPipelineRequest} as a JSON-formatted {@link String}. + */ + protected final void putJsonPipeline(String id, String jsonString) { + IngestPipelineTestUtils.putJsonPipeline(client(), id, jsonString); + } + + /** + * Create an ingest pipeline with the given ID and body, using the default {@link ESSingleNodeTestCase#client()}. + * + * @param id The pipeline id. + * @param toXContent The body of the {@link PutPipelineRequest} as a {@link ToXContentFragment}. + */ + protected final void putJsonPipeline(String id, ToXContentFragment toXContent) throws IOException { + IngestPipelineTestUtils.putJsonPipeline(client(), id, toXContent); + } + + /** + * Delete the ingest pipeline with the given {@code id}, the default {@link ESSingleNodeTestCase#client()}. + */ + protected final void deletePipeline(String id) { + assertAcked(safeGet(client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(id)))); + } } diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index 24669b694a33b..c7eb24ba5f627 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -18,9 +18,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Strings; @@ -29,7 +27,6 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; @@ -352,11 +349,9 @@ private static void createPipeline() { } private static void createPipeline(String policyName, String pipelineName) { - String pipelineBody = Strings.format(""" + putJsonPipeline(pipelineName, Strings.format(""" { "processors": [ { "enrich": { "policy_name": "%s", "field": "%s", "target_field": "user" } } ] - }""", policyName, MATCH_FIELD); - PutPipelineRequest request = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(request).actionGet(); + }""", policyName, MATCH_FIELD)); } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java index 5b7020c3f2bb0..8d9da1ba631f6 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyReindexPipeline.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -67,8 +68,7 @@ static boolean exists(ClusterState clusterState) { */ public static void create(Client client, ActionListener listener) { final BytesReference pipeline = BytesReference.bytes(currentEnrichPipelineDefinition(XContentType.JSON)); - final PutPipelineRequest request = new PutPipelineRequest(pipelineName(), pipeline, XContentType.JSON); - client.admin().cluster().putPipeline(request, listener); + client.execute(PutPipelineTransportAction.TYPE, new PutPipelineRequest(pipelineName(), pipeline, XContentType.JSON), listener); } private static XContentBuilder currentEnrichPipelineDefinition(XContentType xContentType) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index d17728fdd8037..afb5da07268cb 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -16,9 +16,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.ingest.common.IngestCommonPlugin; @@ -26,7 +24,6 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.script.mustache.MustachePlugin; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; @@ -99,7 +96,7 @@ public void testIngestDataWithMatchProcessor() { .actionGet(); String pipelineName = "my-pipeline"; - String pipelineBody = Strings.format(""" + putJsonPipeline(pipelineName, Strings.format(""" { "processors": [ { @@ -111,9 +108,7 @@ public void testIngestDataWithMatchProcessor() { } } ] - }""", policyName, MATCH_FIELD, maxMatches); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).actionGet(); + }""", policyName, MATCH_FIELD, maxMatches)); BulkRequest bulkRequest = new BulkRequest("my-index"); for (int i = 0; i < numDocs; i++) { @@ -195,7 +190,7 @@ public void testIngestDataWithGeoMatchProcessor() { .actionGet(); String pipelineName = "my-pipeline"; - String pipelineBody = Strings.format(""" + putJsonPipeline(pipelineName, Strings.format(""" { "processors": [ { @@ -207,9 +202,7 @@ public void testIngestDataWithGeoMatchProcessor() { } } ] - }""", policyName, matchField); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).actionGet(); + }""", policyName, matchField)); BulkRequest bulkRequest = new BulkRequest("my-index"); IndexRequest indexRequest = new IndexRequest(); @@ -258,12 +251,10 @@ public void testMultiplePolicies() { .actionGet(); String pipelineName = "pipeline" + i; - String pipelineBody = Strings.format(""" + putJsonPipeline(pipelineName, Strings.format(""" { "processors": [ { "enrich": { "policy_name": "%s", "field": "key", "target_field": "target" } } ] - }""", policyName); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).actionGet(); + }""", policyName)); } BulkRequest bulkRequest = new BulkRequest("my-index"); @@ -316,12 +307,10 @@ public void testAsyncTaskExecute() throws Exception { }); String pipelineName = "test-pipeline"; - String pipelineBody = Strings.format(""" + putJsonPipeline(pipelineName, Strings.format(""" { "processors": [ { "enrich": { "policy_name": "%s", "field": "key", "target_field": "target" } } ] - }""", policyName); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).actionGet(); + }""", policyName)); BulkRequest bulkRequest = new BulkRequest("my-index"); int numTestDocs = randomIntBetween(3, 10); @@ -359,13 +348,14 @@ public void testTemplating() throws Exception { .actionGet(); String pipelineName = "my-pipeline"; - String pipelineBody = Strings.format( - """ - {"processors": [{"enrich": {"policy_name":"%s", "field": "{{indirection1}}", "target_field": "{{indirection2}}"}}]}""", - policyName + putJsonPipeline( + pipelineName, + Strings.format( + """ + {"processors": [{"enrich": {"policy_name":"%s", "field": "{{indirection1}}", "target_field": "{{indirection2}}"}}]}""", + policyName + ) ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).actionGet(); IndexRequest indexRequest = new IndexRequest("my-index").id("1") .setPipeline(pipelineName) @@ -404,8 +394,7 @@ public void testFailureAfterEnrich() throws Exception { + "\", \"field\": \"email\", \"target_field\": \"users\"}}," + "{ \"foreach\": {\"field\":\"users\", \"processor\":{\"append\":{\"field\":\"matched2\",\"value\":\"{{_ingest._value}}\"}}}}" + "]}"; - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); - clusterAdmin().putPipeline(putPipelineRequest).actionGet(); + putJsonPipeline(pipelineName, pipelineBody); for (int i = 0; i < 5; i++) { IndexRequest indexRequest = new IndexRequest("my-index").id("1") diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index b015e97909179..efbe39b244de3 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; @@ -16,7 +14,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; @@ -63,10 +60,8 @@ public void testUpdatePolicyOnly() { equalTo(true) ); - String pipelineConfig = """ - {"processors":[{"enrich": {"policy_name": "my_policy", "field": "key", "target_field": "target"}}]}"""; - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON); - assertAcked(clusterAdmin().putPipeline(putPipelineRequest).actionGet()); + putJsonPipeline("1", """ + {"processors":[{"enrich": {"policy_name": "my_policy", "field": "key", "target_field": "target"}}]}"""); Pipeline pipelineInstance1 = ingestService.getPipeline("1"); assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(MatchProcessor.class)); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java index 3a2bfd87cff14..881c3f9245d26 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java @@ -12,10 +12,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.ingest.common.IngestCommonPlugin; @@ -23,7 +20,6 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -91,9 +87,7 @@ public void testWriteThreadLivenessBackToBack() throws Exception { new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, enrichPolicyName).setWaitForCompletion(true) ).actionGet(); - XContentBuilder pipe1 = JsonXContent.contentBuilder(); - pipe1.startObject(); - { + putJsonPipeline(enrichPipelineName, (pipe1, params) -> { pipe1.startArray("processors"); { pipe1.startObject(); @@ -119,14 +113,8 @@ public void testWriteThreadLivenessBackToBack() throws Exception { } pipe1.endObject(); } - pipe1.endArray(); - } - pipe1.endObject(); - - client().execute( - PutPipelineTransportAction.TYPE, - new PutPipelineRequest(enrichPipelineName, BytesReference.bytes(pipe1), XContentType.JSON) - ).actionGet(); + return pipe1.endArray(); + }); client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet(); @@ -191,9 +179,7 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, enrichPolicyName).setWaitForCompletion(true) ).actionGet(); - XContentBuilder pipe1 = JsonXContent.contentBuilder(); - pipe1.startObject(); - { + putJsonPipeline(enrichPipelineName1, (pipe1, params) -> { pipe1.startArray("processors"); { pipe1.startObject(); @@ -217,13 +203,10 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { } pipe1.endObject(); } - pipe1.endArray(); - } - pipe1.endObject(); + return pipe1.endArray(); + }); - XContentBuilder pipe2 = JsonXContent.contentBuilder(); - pipe2.startObject(); - { + putJsonPipeline(enrichPipelineName2, (pipe2, params) -> { pipe2.startArray("processors"); { pipe2.startObject(); @@ -238,19 +221,8 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { } pipe2.endObject(); } - pipe2.endArray(); - } - pipe2.endObject(); - - client().execute( - PutPipelineTransportAction.TYPE, - new PutPipelineRequest(enrichPipelineName1, BytesReference.bytes(pipe1), XContentType.JSON) - ).actionGet(); - - client().execute( - PutPipelineTransportAction.TYPE, - new PutPipelineRequest(enrichPipelineName2, BytesReference.bytes(pipe2), XContentType.JSON) - ).actionGet(); + return pipe2.endArray(); + }); client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java index da86bcf01b406..d179a28aa9890 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureResetIT.java @@ -8,12 +8,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction; import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.DeletePipelineTransportAction; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.ingest.IngestPipelineTestUtils; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -112,13 +109,7 @@ protected void cleanUpResources() { @After public void cleanup() throws Exception { cleanUp(); - for (String pipeline : createdPipelines) { - try { - client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(pipeline)).actionGet(); - } catch (Exception ex) { - logger.warn(() -> "error cleaning up pipeline [" + pipeline + "]", ex); - } - } + IngestPipelineTestUtils.deletePipelinesIgnoringExceptions(client(), createdPipelines); } public void testMLFeatureReset() throws Exception { @@ -130,7 +121,7 @@ public void testMLFeatureReset() throws Exception { for (int i = 0; i < 100; i++) { indexDocForInference("feature_reset_inference_pipeline"); } - client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest("feature_reset_inference_pipeline")).actionGet(); + deletePipeline("feature_reset_inference_pipeline"); createdPipelines.remove("feature_reset_inference_pipeline"); assertBusy( @@ -160,8 +151,7 @@ public void testMLFeatureResetFailureDueToPipelines() throws Exception { "Unable to reset machine learning feature as there are ingest pipelines still referencing trained machine learning models" ) ); - client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest("feature_reset_failure_inference_pipeline")) - .actionGet(); + deletePipeline("feature_reset_failure_inference_pipeline"); createdPipelines.remove("feature_reset_failure_inference_pipeline"); assertThat(isResetMode(), is(false)); } @@ -294,8 +284,8 @@ private void startRealtime(String jobId) throws Exception { }, 30, TimeUnit.SECONDS); } - private void putTrainedModelIngestPipeline(String pipelineId) throws Exception { - client().execute(PutPipelineTransportAction.TYPE, new PutPipelineRequest(pipelineId, new BytesArray(""" + private void putTrainedModelIngestPipeline(String pipelineId) { + putJsonPipeline(pipelineId, """ { "processors": [ { @@ -306,7 +296,7 @@ private void putTrainedModelIngestPipeline(String pipelineId) throws Exception { } } ] - }"""), XContentType.JSON)).actionGet(); + }"""); } private void indexDocForInference(String pipelineId) { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/license/MachineLearningLicensingIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/license/MachineLearningLicensingIT.java index 98ad515680734..08d09f70cb46b 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/license/MachineLearningLicensingIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/license/MachineLearningLicensingIT.java @@ -9,8 +9,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; import org.elasticsearch.action.ingest.SimulatePipelineAction; import org.elasticsearch.action.ingest.SimulatePipelineRequest; @@ -527,18 +525,7 @@ public void testMachineLearningCreateInferenceProcessorRestricted() { }]} """; // Creating a pipeline should work - PlainActionFuture putPipelineListener = new PlainActionFuture<>(); - client().execute( - PutPipelineTransportAction.TYPE, - new PutPipelineRequest( - "test_infer_license_pipeline", - new BytesArray(pipeline.getBytes(StandardCharsets.UTF_8)), - XContentType.JSON - ), - putPipelineListener - ); - AcknowledgedResponse putPipelineResponse = putPipelineListener.actionGet(); - assertTrue(putPipelineResponse.isAcknowledged()); + putJsonPipeline("test_infer_license_pipeline", pipeline); prepareIndex("infer_license_test").setPipeline("test_infer_license_pipeline").setSource("{}", XContentType.JSON).get(); @@ -575,18 +562,7 @@ public void testMachineLearningCreateInferenceProcessorRestricted() { } // Creating a new pipeline with an inference processor should work - putPipelineListener = new PlainActionFuture<>(); - client().execute( - PutPipelineTransportAction.TYPE, - new PutPipelineRequest( - "test_infer_license_pipeline_again", - new BytesArray(pipeline.getBytes(StandardCharsets.UTF_8)), - XContentType.JSON - ), - putPipelineListener - ); - putPipelineResponse = putPipelineListener.actionGet(); - assertTrue(putPipelineResponse.isAcknowledged()); + putJsonPipeline("test_infer_license_pipeline_again", pipeline); // Inference against the new pipeline should fail since it has never previously succeeded ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class, () -> { @@ -609,18 +585,7 @@ public void testMachineLearningCreateInferenceProcessorRestricted() { enableLicensing(mode); assertMLAllowed(true); // test that license restricted apis do now work - PlainActionFuture putPipelineListenerNewLicense = new PlainActionFuture<>(); - client().execute( - PutPipelineTransportAction.TYPE, - new PutPipelineRequest( - "test_infer_license_pipeline", - new BytesArray(pipeline.getBytes(StandardCharsets.UTF_8)), - XContentType.JSON - ), - putPipelineListenerNewLicense - ); - AcknowledgedResponse putPipelineResponseNewLicense = putPipelineListenerNewLicense.actionGet(); - assertTrue(putPipelineResponseNewLicense.isAcknowledged()); + putJsonPipeline("test_infer_license_pipeline", pipeline); PlainActionFuture simulatePipelineListenerNewLicense = new PlainActionFuture<>(); client().execute( diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureLicenseTrackingIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureLicenseTrackingIT.java index 936e499e94feb..ce270c570c8cd 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureLicenseTrackingIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/TestFeatureLicenseTrackingIT.java @@ -7,16 +7,10 @@ package org.elasticsearch.xpack.ml.integration; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.DeletePipelineTransportAction; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineTransportAction; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.core.Strings; +import org.elasticsearch.ingest.IngestPipelineTestUtils; import org.elasticsearch.license.GetFeatureUsageRequest; import org.elasticsearch.license.GetFeatureUsageResponse; import org.elasticsearch.license.TransportGetFeatureUsageAction; -import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -42,6 +36,7 @@ import java.util.Map; import java.util.Set; +import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ml.MachineLearningField.ML_FEATURE_FAMILY; import static org.elasticsearch.xpack.ml.inference.loadingservice.LocalModelTests.buildClassification; import static org.elasticsearch.xpack.ml.integration.ModelInferenceActionIT.buildTrainedModelConfigBuilder; @@ -59,13 +54,7 @@ public class TestFeatureLicenseTrackingIT extends MlSingleNodeTestCase { @After public void cleanup() throws Exception { - for (String pipeline : createdPipelines) { - try { - client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(pipeline)).actionGet(); - } catch (Exception ex) { - logger.warn(() -> "error cleaning up pipeline [" + pipeline + "]", ex); - } - } + IngestPipelineTestUtils.deletePipelinesIgnoringExceptions(client(), createdPipelines); // Some of the tests have async side effects. We need to wait for these to complete before continuing // the cleanup, otherwise unexpected indices may get created during the cleanup process. BaseMlIntegTestCase.waitForPendingTasks(client()); @@ -170,7 +159,7 @@ public void testFeatureTrackingInferenceModelPipeline() throws Exception { assertThat(lastUsage.toInstant(), lessThan(recentUsage.toInstant())); }); - client().execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(pipelineId)).actionGet(); + deletePipeline(pipelineId); createdPipelines.remove(pipelineId); // Make sure that feature usage keeps the last usage once the model is removed @@ -210,8 +199,8 @@ private List getJobStats(String jobId) { return response.getResponse().results(); } - private void putTrainedModelIngestPipeline(String pipelineId, String modelId) throws Exception { - client().execute(PutPipelineTransportAction.TYPE, new PutPipelineRequest(pipelineId, new BytesArray(Strings.format(""" + private void putTrainedModelIngestPipeline(String pipelineId, String modelId) { + putJsonPipeline(pipelineId, format(""" { "processors": [ { @@ -222,7 +211,7 @@ private void putTrainedModelIngestPipeline(String pipelineId, String modelId) th } } ] - }""", modelId)), XContentType.JSON)).actionGet(); + }""", modelId)); } }