Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Add indices pipeline settings check when deleting a pipeline (#77013) #79353

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,8 @@ teardown:
id: 10
pipeline: ""
body: {bytes_source_field: "1kb"}

# Delete the index
- do:
indices.delete:
index: test
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ teardown:
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }

# Delete the index
- do:
indices.delete:
index: test

---
"Test final pipeline when target index is changed":
- do:
Expand Down Expand Up @@ -273,3 +278,12 @@ teardown:
- match: { _source.foo: "bar" }
- match: { _source.final_pipeline_2: true }
- is_false: _source.final_pipeline_1

# Delete the index
- do:
indices.delete:
index: index_with_final_pipeline_1

- do:
indices.delete:
index: index_with_final_pipeline_2
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@After
public void cleanUpPipelines() {
client().admin().indices().prepareDelete("*").get();

final GetPipelineResponse response = client().admin()
.cluster()
.getPipeline(new GetPipelineRequest("default_pipeline", "final_pipeline", "request_pipeline"))
Expand Down
37 changes: 37 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.ingest;


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;

/**
* Holder class for several ingest related services.
Expand Down Expand Up @@ -283,7 +286,9 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr
return currentState;
}
final Map<String, PipelineConfiguration> pipelinesCopy = new HashMap<>(pipelines);
ImmutableOpenMap<String, IndexMetadata> indices = currentState.metadata().indices();
for (String key : toRemove) {
validateNotInUse(key, indices);
pipelinesCopy.remove(key);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
Expand All @@ -293,6 +298,38 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr
return newState.build();
}

static void validateNotInUse(String pipeline, ImmutableOpenMap<String, IndexMetadata> indices) {
List<String> defaultPipelineIndices = new ArrayList<>();
List<String> finalPipelineIndices = new ArrayList<>();
for (IndexMetadata indexMetadata : indices.values()) {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
if (pipeline.equals(defaultPipeline)) {
defaultPipelineIndices.add(indexMetadata.getIndex().getName());
}

if (pipeline.equals(finalPipeline)) {
finalPipelineIndices.add(indexMetadata.getIndex().getName());
}
}

if (defaultPipelineIndices.size() > 0 || finalPipelineIndices.size() > 0) {
throw new IllegalArgumentException(
"pipeline ["
+ pipeline
+ "] cannot be deleted because it is the default pipeline for "
+ defaultPipelineIndices.size()
+ " indices including ["
+ defaultPipelineIndices.stream().limit(3).collect(Collectors.joining(","))
+ "] and the final pipeline for "
+ finalPipelineIndices.size()
+ " indices including ["
+ finalPipelineIndices.stream().limit(3).collect(Collectors.joining(","))
+ "]"
);
}
}

/**
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
* may be returned
Expand Down
120 changes: 119 additions & 1 deletion server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
Expand Down Expand Up @@ -85,6 +86,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.elasticsearch.core.Tuple.tuple;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -288,7 +290,7 @@ public void testDelete() {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), nullValue());

// Delete existing pipeline:
// Delete not existing pipeline:
try {
IngestService.innerDelete(deleteRequest, clusterState);
fail("exception expected");
Expand All @@ -315,6 +317,55 @@ public void testValidateNoIngestInfo() throws Exception {
ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest.getId(), pipelineConfig);
}

public void testValidateNotInUse() {
String pipeline = "pipeline";
ImmutableOpenMap.Builder<String, IndexMetadata> indices = ImmutableOpenMap.builder();
int defaultPipelineCount = 0;
int finalPipelineCount = 0;
int indicesCount = randomIntBetween(5, 10);
for (int i = 0; i < indicesCount; i++) {
IndexMetadata.Builder builder = IndexMetadata.builder("index" + i).numberOfShards(1).numberOfReplicas(1);
Settings.Builder settingsBuilder = settings(Version.CURRENT);
if (randomBoolean()) {
settingsBuilder.put(IndexSettings.DEFAULT_PIPELINE.getKey(), pipeline);
defaultPipelineCount++;
}

if (randomBoolean()) {
settingsBuilder.put(IndexSettings.FINAL_PIPELINE.getKey(), pipeline);
finalPipelineCount++;
}

builder.settings(settingsBuilder);
IndexMetadata indexMetadata = builder.settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build();
indices.put(indexMetadata.getIndex().getName(), indexMetadata);
}

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> IngestService.validateNotInUse(pipeline, indices.build())
);
assertThat(e.getMessage(), containsString("default pipeline for " + defaultPipelineCount + " indices including"));
assertThat(e.getMessage(), containsString("final pipeline for " + finalPipelineCount + " indices including"));
if (defaultPipelineCount >= 3) {
// assert index limit
String content = "default pipeline for " + defaultPipelineCount + " indices including [";
int start = e.getMessage().indexOf(content) + content.length();
int end = e.getMessage().indexOf("] and the final pipeline");
// indices content length, eg: index0,index1,index2
assertEquals(end - start, (6 + 1 + 6 + 1 + 6));
}

if (finalPipelineCount >= 3) {
// assert index limit
String content = "final pipeline for " + finalPipelineCount + " indices including [";
int start = e.getMessage().indexOf(content) + content.length();
int end = e.getMessage().lastIndexOf("]");
// indices content length, eg: index0,index1,index2
assertEquals(end - start, (6 + 1 + 6 + 1 + 6));
}
}

public void testGetProcessorsInPipeline() throws Exception {
IngestService ingestService = createWithProcessors();
String id = "_id";
Expand Down Expand Up @@ -571,6 +622,73 @@ public void testDeleteWithExistingUnmatchedPipelines() {
}
}

public void testDeleteWithIndexUsePipeline() {
IngestService ingestService = createWithProcessors();
PipelineConfiguration config = new PipelineConfiguration(
"_id",
new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"),
XContentType.JSON
);
IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config));
Metadata.Builder builder = Metadata.builder();
for (int i = 0; i < randomIntBetween(2, 10); i++) {
builder.put(
IndexMetadata.builder("test" + i).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).build(),
true
);
}
builder.putCustom(IngestMetadata.TYPE, ingestMetadata);
Metadata metadata = builder.build();

ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).metadata(metadata).build();
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), notNullValue());

DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id");

{
// delete pipeline which is in used of default_pipeline
IndexMetadata indexMetadata = IndexMetadata.builder("pipeline-index")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
ClusterState finalClusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(metadata).put(indexMetadata, true))
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> IngestService.innerDelete(deleteRequest, finalClusterState)
);
assertThat(e.getMessage(), containsString("default pipeline for 1 indices including [pipeline-index]"));
}

{
// delete pipeline which is in used of final_pipeline
IndexMetadata indexMetadata = IndexMetadata.builder("pipeline-index")
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "_id"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
ClusterState finalClusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(metadata).put(indexMetadata, true))
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> IngestService.innerDelete(deleteRequest, finalClusterState)
);
assertThat(e.getMessage(), containsString("final pipeline for 1 indices including [pipeline-index]"));
}

// Delete pipeline:
previousClusterState = clusterState;
clusterState = IngestService.innerDelete(deleteRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), nullValue());
}

public void testGetPipelines() {
Map<String, PipelineConfiguration> configs = new HashMap<>();
configs.put("_id1", new PipelineConfiguration(
Expand Down