Skip to content

Commit

Permalink
Add indices pipeline settings check when deleting a pipeline (#77013) (
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann authored Oct 18, 2021
1 parent 2ce1fe1 commit 60e8292
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,8 @@ teardown:
id: 10
pipeline: ""
body: {bytes_source_field: "1kb"}

# Delete the index
- do:
indices.delete:
index: test
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ teardown:
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }

# Delete the index
- do:
indices.delete:
index: test

---
"Test final pipeline when target index is changed":
- do:
Expand Down Expand Up @@ -273,3 +278,12 @@ teardown:
- match: { _source.foo: "bar" }
- match: { _source.final_pipeline_2: true }
- is_false: _source.final_pipeline_1

# Delete the index
- do:
indices.delete:
index: index_with_final_pipeline_1

- do:
indices.delete:
index: index_with_final_pipeline_2
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@After
public void cleanUpPipelines() {
client().admin().indices().prepareDelete("*").get();

final GetPipelineResponse response = client().admin()
.cluster()
.getPipeline(new GetPipelineRequest("default_pipeline", "final_pipeline", "request_pipeline"))
Expand Down
37 changes: 37 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.ingest;


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;

/**
* Holder class for several ingest related services.
Expand Down Expand Up @@ -283,7 +286,9 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr
return currentState;
}
final Map<String, PipelineConfiguration> pipelinesCopy = new HashMap<>(pipelines);
ImmutableOpenMap<String, IndexMetadata> indices = currentState.metadata().indices();
for (String key : toRemove) {
validateNotInUse(key, indices);
pipelinesCopy.remove(key);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
Expand All @@ -293,6 +298,38 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr
return newState.build();
}

static void validateNotInUse(String pipeline, ImmutableOpenMap<String, IndexMetadata> indices) {
List<String> defaultPipelineIndices = new ArrayList<>();
List<String> finalPipelineIndices = new ArrayList<>();
for (IndexMetadata indexMetadata : indices.values()) {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
if (pipeline.equals(defaultPipeline)) {
defaultPipelineIndices.add(indexMetadata.getIndex().getName());
}

if (pipeline.equals(finalPipeline)) {
finalPipelineIndices.add(indexMetadata.getIndex().getName());
}
}

if (defaultPipelineIndices.size() > 0 || finalPipelineIndices.size() > 0) {
throw new IllegalArgumentException(
"pipeline ["
+ pipeline
+ "] cannot be deleted because it is the default pipeline for "
+ defaultPipelineIndices.size()
+ " indices including ["
+ defaultPipelineIndices.stream().limit(3).collect(Collectors.joining(","))
+ "] and the final pipeline for "
+ finalPipelineIndices.size()
+ " indices including ["
+ finalPipelineIndices.stream().limit(3).collect(Collectors.joining(","))
+ "]"
);
}
}

/**
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
* may be returned
Expand Down
120 changes: 119 additions & 1 deletion server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
Expand Down Expand Up @@ -85,6 +86,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.elasticsearch.core.Tuple.tuple;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -288,7 +290,7 @@ public void testDelete() {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), nullValue());

// Delete existing pipeline:
// Delete not existing pipeline:
try {
IngestService.innerDelete(deleteRequest, clusterState);
fail("exception expected");
Expand All @@ -315,6 +317,55 @@ public void testValidateNoIngestInfo() throws Exception {
ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest.getId(), pipelineConfig);
}

public void testValidateNotInUse() {
String pipeline = "pipeline";
ImmutableOpenMap.Builder<String, IndexMetadata> indices = ImmutableOpenMap.builder();
int defaultPipelineCount = 0;
int finalPipelineCount = 0;
int indicesCount = randomIntBetween(5, 10);
for (int i = 0; i < indicesCount; i++) {
IndexMetadata.Builder builder = IndexMetadata.builder("index" + i).numberOfShards(1).numberOfReplicas(1);
Settings.Builder settingsBuilder = settings(Version.CURRENT);
if (randomBoolean()) {
settingsBuilder.put(IndexSettings.DEFAULT_PIPELINE.getKey(), pipeline);
defaultPipelineCount++;
}

if (randomBoolean()) {
settingsBuilder.put(IndexSettings.FINAL_PIPELINE.getKey(), pipeline);
finalPipelineCount++;
}

builder.settings(settingsBuilder);
IndexMetadata indexMetadata = builder.settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build();
indices.put(indexMetadata.getIndex().getName(), indexMetadata);
}

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> IngestService.validateNotInUse(pipeline, indices.build())
);
assertThat(e.getMessage(), containsString("default pipeline for " + defaultPipelineCount + " indices including"));
assertThat(e.getMessage(), containsString("final pipeline for " + finalPipelineCount + " indices including"));
if (defaultPipelineCount >= 3) {
// assert index limit
String content = "default pipeline for " + defaultPipelineCount + " indices including [";
int start = e.getMessage().indexOf(content) + content.length();
int end = e.getMessage().indexOf("] and the final pipeline");
// indices content length, eg: index0,index1,index2
assertEquals(end - start, (6 + 1 + 6 + 1 + 6));
}

if (finalPipelineCount >= 3) {
// assert index limit
String content = "final pipeline for " + finalPipelineCount + " indices including [";
int start = e.getMessage().indexOf(content) + content.length();
int end = e.getMessage().lastIndexOf("]");
// indices content length, eg: index0,index1,index2
assertEquals(end - start, (6 + 1 + 6 + 1 + 6));
}
}

public void testGetProcessorsInPipeline() throws Exception {
IngestService ingestService = createWithProcessors();
String id = "_id";
Expand Down Expand Up @@ -571,6 +622,73 @@ public void testDeleteWithExistingUnmatchedPipelines() {
}
}

public void testDeleteWithIndexUsePipeline() {
IngestService ingestService = createWithProcessors();
PipelineConfiguration config = new PipelineConfiguration(
"_id",
new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"),
XContentType.JSON
);
IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config));
Metadata.Builder builder = Metadata.builder();
for (int i = 0; i < randomIntBetween(2, 10); i++) {
builder.put(
IndexMetadata.builder("test" + i).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1).build(),
true
);
}
builder.putCustom(IngestMetadata.TYPE, ingestMetadata);
Metadata metadata = builder.build();

ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).metadata(metadata).build();
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), notNullValue());

DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id");

{
// delete pipeline which is in used of default_pipeline
IndexMetadata indexMetadata = IndexMetadata.builder("pipeline-index")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
ClusterState finalClusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(metadata).put(indexMetadata, true))
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> IngestService.innerDelete(deleteRequest, finalClusterState)
);
assertThat(e.getMessage(), containsString("default pipeline for 1 indices including [pipeline-index]"));
}

{
// delete pipeline which is in used of final_pipeline
IndexMetadata indexMetadata = IndexMetadata.builder("pipeline-index")
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "_id"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
ClusterState finalClusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(metadata).put(indexMetadata, true))
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> IngestService.innerDelete(deleteRequest, finalClusterState)
);
assertThat(e.getMessage(), containsString("final pipeline for 1 indices including [pipeline-index]"));
}

// Delete pipeline:
previousClusterState = clusterState;
clusterState = IngestService.innerDelete(deleteRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), nullValue());
}

public void testGetPipelines() {
Map<String, PipelineConfiguration> configs = new HashMap<>();
configs.put("_id1", new PipelineConfiguration(
Expand Down

0 comments on commit 60e8292

Please sign in to comment.