Skip to content

Commit

Permalink
Introduce test utils for ingest pipelines (#112733)
Browse files Browse the repository at this point in the history
Replaces the somewhat-awkward API on `ClusterAdminClient` for
manipulating ingest pipelines with some test-specific utilities that are
easier to use.

Relates #107984 in that this change massively reduces the noise that
would otherwise result from removing the trappy timeouts in these APIs.
  • Loading branch information
DaveCTurner authored and davidkyle committed Sep 12, 2024
1 parent ca30b69 commit 1bbb739
Show file tree
Hide file tree
Showing 39 changed files with 606 additions and 886 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.mapper.DateFieldMapper;
Expand Down Expand Up @@ -319,9 +316,7 @@ private void createReroutePipeline(String destination) {
}

private void createPipeline(String processor) {
String pipelineDefinition = Strings.format("{\"processors\": [{%s}]}", processor);
BytesReference bytes = new BytesArray(pipelineDefinition);
clusterAdmin().putPipeline(new PutPipelineRequest(pipeline, bytes, XContentType.JSON)).actionGet();
putJsonPipeline(pipeline, Strings.format("{\"processors\": [{%s}]}", processor));
}

private void indexDocs(String dataStream, int numDocs, String pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Strings;
Expand Down Expand Up @@ -79,7 +76,7 @@ public void testFailureInConditionalProcessor() {
internalCluster().ensureAtLeastNumDataNodes(1);
internalCluster().startMasterOnlyNode();
final String pipelineId = "foo";
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(Strings.format("""
putJsonPipeline(pipelineId, Strings.format("""
{
"processors": [
{
Expand All @@ -99,7 +96,7 @@ public void testFailureInConditionalProcessor() {
}
}
]
}""", MockScriptEngine.NAME)), XContentType.JSON).get();
}""", MockScriptEngine.NAME));

Exception e = expectThrows(
Exception.class,
Expand All @@ -126,22 +123,16 @@ public void testScriptDisabled() throws Exception {
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
internalCluster().startNode();

BytesReference pipelineWithScript = new BytesArray(Strings.format("""
putJsonPipeline(pipelineIdWithScript, Strings.format("""
{
"processors": [ { "script": { "lang": "%s", "source": "my_script" } } ]
}""", MockScriptEngine.NAME));
BytesReference pipelineWithoutScript = new BytesArray("""
putJsonPipeline(pipelineIdWithoutScript, """
{
"processors": [ { "set": { "field": "y", "value": 0 } } ]
}""");

Consumer<String> checkPipelineExists = (id) -> assertThat(
clusterAdmin().prepareGetPipeline(id).get().pipelines().get(0).getId(),
equalTo(id)
);

clusterAdmin().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
clusterAdmin().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();
Consumer<String> checkPipelineExists = (id) -> assertThat(getPipelines(id).pipelines().get(0).getId(), equalTo(id));

checkPipelineExists.accept(pipelineIdWithScript);
checkPipelineExists.accept(pipelineIdWithoutScript);
Expand Down Expand Up @@ -197,14 +188,13 @@ public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exceptio
putJsonStoredScript("1", Strings.format("""
{"script": {"lang": "%s", "source": "my_script"} }
""", MockScriptEngine.NAME));
BytesReference pipeline = new BytesArray("""
putJsonPipeline("_id", """
{
"processors" : [
{"set" : {"field": "y", "value": 0}},
{"script" : {"id": "1"}}
]
}""");
clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get();

prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

Expand Down Expand Up @@ -232,13 +222,12 @@ public void testWithDedicatedIngestNode() throws Exception {
String node = internalCluster().startNode();
String ingestNode = internalCluster().startNode(onlyRole(DiscoveryNodeRole.INGEST_ROLE));

BytesReference pipeline = new BytesArray("""
putJsonPipeline("_id", """
{
"processors" : [
{"set" : {"field": "y", "value": 0}}
]
}""");
clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get();

prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

Expand All @@ -264,7 +253,7 @@ public void testWithDedicatedIngestNode() throws Exception {
public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
internalCluster().startNode();

final var pipeline = new BytesArray("""
putJsonPipeline("test_pipeline", """
{
"processors" : [
{
Expand All @@ -275,8 +264,8 @@ public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
}
]
}""");

final TimeValue timeout = TimeValue.timeValueSeconds(10);
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout);
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
{
"index" : {
Expand Down Expand Up @@ -357,16 +346,13 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
// Create Bulk Request
createIndex("index");

BytesReference source = new BytesArray("""
putJsonPipeline("_id", """
{
"processors" : [
{"set" : {"field": "y", "value": 0}}
]
}""");

PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
clusterAdmin().putPipeline(putPipelineRequest).get();

int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
BulkResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Strings;
import org.elasticsearch.ingest.GraphStructureException;
Expand Down Expand Up @@ -166,7 +165,7 @@ private void createChainedPipelines(String prefix, int count) {
private void createChainedPipeline(String prefix, int number) {
String pipelineId = prefix + "pipeline_" + number;
String nextPipelineId = prefix + "pipeline_" + (number + 1);
String pipelineTemplate = """
putJsonPipeline(pipelineId, Strings.format("""
{
"processors": [
{
Expand All @@ -176,9 +175,7 @@ private void createChainedPipeline(String prefix, int number) {
}
]
}
""";
String pipeline = Strings.format(pipelineTemplate, nextPipelineId);
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
""", nextPipelineId));
}

private void createLastPipeline(String prefix, int number) {
Expand All @@ -195,6 +192,6 @@ private void createLastPipeline(String prefix, int number) {
]
}
""";
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
putJsonPipeline(pipelineId, pipeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
Expand All @@ -21,7 +18,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.FilterXContentParserWrapper;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -44,7 +40,7 @@ public class XContentMeteringParserDecoratorWithPipelinesIT extends ESIntegTestC
public void testDocumentIsReportedWithPipelines() throws Exception {
hasWrappedParser = false;
// pipeline adding fields, changing destination is not affecting reporting
final BytesReference pipelineBody = new BytesArray("""
putJsonPipeline("pipeline", """
{
"processors": [
{
Expand All @@ -62,7 +58,6 @@ public void testDocumentIsReportedWithPipelines() throws Exception {
]
}
""");
clusterAdmin().putPipeline(new PutPipelineRequest("pipeline", pipelineBody, XContentType.JSON)).actionGet();

client().index(
new IndexRequest(TEST_INDEX_NAME).setPipeline("pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -36,9 +34,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.ClassRule;

import java.io.IOException;
Expand All @@ -47,7 +43,6 @@

import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {
Expand Down Expand Up @@ -155,31 +150,24 @@ private void configureDatabase(String databaseType) throws Exception {
}

private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException {
final BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
putJsonPipeline(pipelineName, (builder, params) -> {
builder.field("description", "test");
builder.startArray("processors");
{
builder.field("description", "test");
builder.startArray("processors");
builder.startObject();
{
builder.startObject();
builder.startObject("geoip");
{
builder.startObject("geoip");
{
builder.field("field", sourceField);
builder.field("target_field", targetField);
builder.field("database_file", databaseType + ".mmdb");
}
builder.endObject();
builder.field("field", sourceField);
builder.field("target_field", targetField);
builder.field("database_file", databaseType + ".mmdb");
}
builder.endObject();
}
builder.endArray();
builder.endObject();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineName, bytes, XContentType.JSON)).actionGet());
return builder.endArray();
});
}

private String ingestDocument(String indexName, String pipelineName, String sourceField) {
Expand Down
Loading

0 comments on commit 1bbb739

Please sign in to comment.