From 59ad8f93b874d6c15d97a1cbfddbd6ce6fcb1de1 Mon Sep 17 00:00:00 2001 From: weizijun Date: Tue, 12 Oct 2021 20:31:02 +0800 Subject: [PATCH] Add indices pipeline settings check when deleting a pipeline (#77013) --- modules/ingest-common/build.gradle | 9 ++ .../test/ingest/200_default_pipeline.yml | 5 + .../test/ingest/240_required_pipeline.yml | 14 ++ .../elasticsearch/index/FinalPipelineIT.java | 2 + .../elasticsearch/ingest/IngestService.java | 37 ++++++ .../ingest/IngestServiceTests.java | 120 +++++++++++++++++- 6 files changed, 186 insertions(+), 1 deletion(-) diff --git a/modules/ingest-common/build.gradle b/modules/ingest-common/build.gradle index cba6774f464e7..6175ad84c45c7 100644 --- a/modules/ingest-common/build.gradle +++ b/modules/ingest-common/build.gradle @@ -47,3 +47,12 @@ tasks.named("thirdPartyAudit").configure { tasks.named("yamlRestTestV7CompatTransform").configure { task -> task.addAllowedWarningRegex("\\[types removal\\].*") } + +tasks.named("yamlRestTestV7CompatTest").configure { + systemProperty 'tests.rest.blacklist', [ + //marked as not needing compatible api + 'ingest/200_default_pipeline/Test index with default pipeline', + 'ingest/240_required_pipeline/Test index with final pipeline', + 'ingest/240_required_pipeline/Test final pipeline when target index is changed', + ].join(',') +} diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/200_default_pipeline.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/200_default_pipeline.yml index 3d100fad3b027..cee76d0eaca64 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/200_default_pipeline.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/200_default_pipeline.yml @@ -187,3 +187,8 @@ teardown: id: 10 pipeline: "" body: {bytes_source_field: "1kb"} + + # Delete the index + - do: + indices.delete: + index: test diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/240_required_pipeline.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/240_required_pipeline.yml index 611d43dd493b3..dafbe0510c321 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/240_required_pipeline.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/240_required_pipeline.yml @@ -180,6 +180,11 @@ teardown: - match: { docs.5._source.bytes_target_field: 3072 } - match: { docs.5._source.ran_script: true } + # Delete the index + - do: + indices.delete: + index: test + --- "Test final pipeline when target index is changed": - do: @@ -273,3 +278,12 @@ teardown: - match: { _source.foo: "bar" } - match: { _source.final_pipeline_2: true } - is_false: _source.final_pipeline_1 + + # Delete the index + - do: + indices.delete: + index: index_with_final_pipeline_1 + + - do: + indices.delete: + index: index_with_final_pipeline_2 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 0b2525bbf1148..d46d7083d57ff 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -64,6 +64,8 @@ protected Collection> nodePlugins() { @After public void cleanUpPipelines() { + client().admin().indices().prepareDelete("*").get(); + final GetPipelineResponse response = client().admin() .cluster() .getPipeline(new GetPipelineRequest("default_pipeline", "final_pipeline", "request_pipeline")) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 98ea8a351e607..3d97f2670233a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -8,6 +8,7 @@ package org.elasticsearch.ingest; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -36,6 +37,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -66,6 +68,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.IntConsumer; +import java.util.stream.Collectors; /** * Holder class for several ingest related services. @@ -281,7 +284,9 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr return currentState; } final Map pipelinesCopy = new HashMap<>(pipelines); + ImmutableOpenMap indices = currentState.metadata().indices(); for (String key : toRemove) { + validateNotInUse(key, indices); pipelinesCopy.remove(key); } ClusterState.Builder newState = ClusterState.builder(currentState); @@ -291,6 +296,38 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr return newState.build(); } + static void validateNotInUse(String pipeline, ImmutableOpenMap indices) { + List defaultPipelineIndices = new ArrayList<>(); + List finalPipelineIndices = new ArrayList<>(); + for (IndexMetadata indexMetadata : indices.values()) { + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings()); + String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings()); + if (pipeline.equals(defaultPipeline)) { + defaultPipelineIndices.add(indexMetadata.getIndex().getName()); + } + + if (pipeline.equals(finalPipeline)) { + finalPipelineIndices.add(indexMetadata.getIndex().getName()); + } + } + + if (defaultPipelineIndices.size() > 0 || finalPipelineIndices.size() > 0) { + throw new IllegalArgumentException( + "pipeline [" + + pipeline + + "] cannot be deleted because it is the default pipeline for " + + defaultPipelineIndices.size() + + " indices including [" + + defaultPipelineIndices.stream().limit(3).collect(Collectors.joining(",")) + + "] and the final pipeline for " + + finalPipelineIndices.size() + + " indices including [" + + finalPipelineIndices.stream().limit(3).collect(Collectors.joining(",")) + + "]" + ); + } + } + /** * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines * may be returned diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 087f961b61ac0..b774393548f92 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; @@ -85,6 +86,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.containsString; import static org.elasticsearch.core.Tuple.tuple; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -269,7 +271,7 @@ public void testDelete() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); assertThat(ingestService.getPipeline("_id"), nullValue()); - // Delete existing pipeline: + // Delete not existing pipeline: try { IngestService.innerDelete(deleteRequest, clusterState); fail("exception expected"); @@ -296,6 +298,55 @@ public void testValidateNoIngestInfo() throws Exception { ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest.getId(), pipelineConfig); } + public void testValidateNotInUse() { + String pipeline = "pipeline"; + ImmutableOpenMap.Builder indices = ImmutableOpenMap.builder(); + int defaultPipelineCount = 0; + int finalPipelineCount = 0; + int indicesCount = randomIntBetween(5, 10); + for (int i = 0; i < indicesCount; i++) { + IndexMetadata.Builder builder = IndexMetadata.builder("index" + i).numberOfShards(1).numberOfReplicas(1); + Settings.Builder settingsBuilder = settings(Version.CURRENT); + if (randomBoolean()) { + settingsBuilder.put(IndexSettings.DEFAULT_PIPELINE.getKey(), pipeline); + defaultPipelineCount++; + } + + if (randomBoolean()) { + settingsBuilder.put(IndexSettings.FINAL_PIPELINE.getKey(), pipeline); + finalPipelineCount++; + } + + builder.settings(settingsBuilder); + IndexMetadata indexMetadata = builder.settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build(); + indices.put(indexMetadata.getIndex().getName(), indexMetadata); + } + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> IngestService.validateNotInUse(pipeline, indices.build()) + ); + assertThat(e.getMessage(), containsString("default pipeline for " + defaultPipelineCount + " indices including")); + assertThat(e.getMessage(), containsString("final pipeline for " + finalPipelineCount + " indices including")); + if (defaultPipelineCount >= 3) { + // assert index limit + String content = "default pipeline for " + defaultPipelineCount + " indices including ["; + int start = e.getMessage().indexOf(content) + content.length(); + int end = e.getMessage().indexOf("] and the final pipeline"); + // indices content length, eg: index0,index1,index2 + assertEquals(end - start, (6 + 1 + 6 + 1 + 6)); + } + + if (finalPipelineCount >= 3) { + // assert index limit + String content = "final pipeline for " + finalPipelineCount + " indices including ["; + int start = e.getMessage().indexOf(content) + content.length(); + int end = e.getMessage().lastIndexOf("]"); + // indices content length, eg: index0,index1,index2 + assertEquals(end - start, (6 + 1 + 6 + 1 + 6)); + } + } + public void testGetProcessorsInPipeline() throws Exception { IngestService ingestService = createWithProcessors(); String id = "_id"; @@ -552,6 +603,73 @@ public void testDeleteWithExistingUnmatchedPipelines() { } } + public void testDeleteWithIndexUsePipeline() { + IngestService ingestService = createWithProcessors(); + PipelineConfiguration config = new PipelineConfiguration( + "_id", + new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), + XContentType.JSON + ); + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); + Metadata.Builder builder = Metadata.builder(); + for (int i = 0; i < randomIntBetween(2, 10); i++) { + builder.put( + IndexMetadata.builder("test" + i).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).build(), + true + ); + } + builder.putCustom(IngestMetadata.TYPE, ingestMetadata); + Metadata metadata = builder.build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metadata(metadata).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("_id"), notNullValue()); + + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id"); + + { + // delete pipeline which is in used of default_pipeline + IndexMetadata indexMetadata = IndexMetadata.builder("pipeline-index") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id")) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + ClusterState finalClusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder(metadata).put(indexMetadata, true)) + .build(); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> IngestService.innerDelete(deleteRequest, finalClusterState) + ); + assertThat(e.getMessage(), containsString("default pipeline for 1 indices including [pipeline-index]")); + } + + { + // delete pipeline which is in used of final_pipeline + IndexMetadata indexMetadata = IndexMetadata.builder("pipeline-index") + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "_id")) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + ClusterState finalClusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder(metadata).put(indexMetadata, true)) + .build(); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> IngestService.innerDelete(deleteRequest, finalClusterState) + ); + assertThat(e.getMessage(), containsString("final pipeline for 1 indices including [pipeline-index]")); + } + + // Delete pipeline: + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("_id"), nullValue()); + } + public void testGetPipelines() { Map configs = new HashMap<>(); configs.put("_id1", new PipelineConfiguration(