Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce test utils for ingest pipelines #112733

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.mapper.DateFieldMapper;
Expand Down Expand Up @@ -319,9 +316,7 @@ private void createReroutePipeline(String destination) {
}

private void createPipeline(String processor) {
String pipelineDefinition = Strings.format("{\"processors\": [{%s}]}", processor);
BytesReference bytes = new BytesArray(pipelineDefinition);
clusterAdmin().putPipeline(new PutPipelineRequest(pipeline, bytes, XContentType.JSON)).actionGet();
putJsonPipeline(pipeline, Strings.format("{\"processors\": [{%s}]}", processor));
}

private void indexDocs(String dataStream, int numDocs, String pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Strings;
Expand Down Expand Up @@ -79,7 +76,7 @@ public void testFailureInConditionalProcessor() {
internalCluster().ensureAtLeastNumDataNodes(1);
internalCluster().startMasterOnlyNode();
final String pipelineId = "foo";
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(Strings.format("""
putJsonPipeline(pipelineId, Strings.format("""
{
"processors": [
{
Expand All @@ -99,7 +96,7 @@ public void testFailureInConditionalProcessor() {
}
}
]
}""", MockScriptEngine.NAME)), XContentType.JSON).get();
}""", MockScriptEngine.NAME));

Exception e = expectThrows(
Exception.class,
Expand All @@ -126,22 +123,16 @@ public void testScriptDisabled() throws Exception {
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
internalCluster().startNode();

BytesReference pipelineWithScript = new BytesArray(Strings.format("""
putJsonPipeline(pipelineIdWithScript, Strings.format("""
{
"processors": [ { "script": { "lang": "%s", "source": "my_script" } } ]
}""", MockScriptEngine.NAME));
BytesReference pipelineWithoutScript = new BytesArray("""
putJsonPipeline(pipelineIdWithoutScript, """
{
"processors": [ { "set": { "field": "y", "value": 0 } } ]
}""");

Consumer<String> checkPipelineExists = (id) -> assertThat(
clusterAdmin().prepareGetPipeline(id).get().pipelines().get(0).getId(),
equalTo(id)
);

clusterAdmin().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
clusterAdmin().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();
Consumer<String> checkPipelineExists = (id) -> assertThat(getPipelines(id).pipelines().get(0).getId(), equalTo(id));

checkPipelineExists.accept(pipelineIdWithScript);
checkPipelineExists.accept(pipelineIdWithoutScript);
Expand Down Expand Up @@ -197,14 +188,13 @@ public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exceptio
putJsonStoredScript("1", Strings.format("""
{"script": {"lang": "%s", "source": "my_script"} }
""", MockScriptEngine.NAME));
BytesReference pipeline = new BytesArray("""
putJsonPipeline("_id", """
{
"processors" : [
{"set" : {"field": "y", "value": 0}},
{"script" : {"id": "1"}}
]
}""");
clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get();

prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

Expand Down Expand Up @@ -232,13 +222,12 @@ public void testWithDedicatedIngestNode() throws Exception {
String node = internalCluster().startNode();
String ingestNode = internalCluster().startNode(onlyRole(DiscoveryNodeRole.INGEST_ROLE));

BytesReference pipeline = new BytesArray("""
putJsonPipeline("_id", """
{
"processors" : [
{"set" : {"field": "y", "value": 0}}
]
}""");
clusterAdmin().preparePutPipeline("_id", pipeline, XContentType.JSON).get();

prepareIndex("index").setId("1").setSource("x", 0).setPipeline("_id").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

Expand All @@ -264,7 +253,7 @@ public void testWithDedicatedIngestNode() throws Exception {
public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
internalCluster().startNode();

final var pipeline = new BytesArray("""
putJsonPipeline("test_pipeline", """
{
"processors" : [
{
Expand All @@ -275,8 +264,8 @@ public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
}
]
}""");

final TimeValue timeout = TimeValue.timeValueSeconds(10);
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout);
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
{
"index" : {
Expand Down Expand Up @@ -357,16 +346,13 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
// Create Bulk Request
createIndex("index");

BytesReference source = new BytesArray("""
putJsonPipeline("_id", """
{
"processors" : [
{"set" : {"field": "y", "value": 0}}
]
}""");

PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
clusterAdmin().putPipeline(putPipelineRequest).get();

int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
BulkResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Strings;
import org.elasticsearch.ingest.GraphStructureException;
Expand Down Expand Up @@ -166,7 +165,7 @@ private void createChainedPipelines(String prefix, int count) {
private void createChainedPipeline(String prefix, int number) {
String pipelineId = prefix + "pipeline_" + number;
String nextPipelineId = prefix + "pipeline_" + (number + 1);
String pipelineTemplate = """
putJsonPipeline(pipelineId, Strings.format("""
{
"processors": [
{
Expand All @@ -176,9 +175,7 @@ private void createChainedPipeline(String prefix, int number) {
}
]
}
""";
String pipeline = Strings.format(pipelineTemplate, nextPipelineId);
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
""", nextPipelineId));
}

private void createLastPipeline(String prefix, int number) {
Expand All @@ -195,6 +192,6 @@ private void createLastPipeline(String prefix, int number) {
]
}
""";
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
putJsonPipeline(pipelineId, pipeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
Expand All @@ -21,7 +18,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.FilterXContentParserWrapper;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -44,7 +40,7 @@ public class XContentMeteringParserDecoratorWithPipelinesIT extends ESIntegTestC
public void testDocumentIsReportedWithPipelines() throws Exception {
hasWrappedParser = false;
// pipeline adding fields, changing destination is not affecting reporting
final BytesReference pipelineBody = new BytesArray("""
putJsonPipeline("pipeline", """
{
"processors": [
{
Expand All @@ -62,7 +58,6 @@ public void testDocumentIsReportedWithPipelines() throws Exception {
]
}
""");
clusterAdmin().putPipeline(new PutPipelineRequest("pipeline", pipelineBody, XContentType.JSON)).actionGet();

client().index(
new IndexRequest(TEST_INDEX_NAME).setPipeline("pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -36,9 +34,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.ClassRule;

import java.io.IOException;
Expand All @@ -47,7 +43,6 @@

import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderTaskExecutor.MAXMIND_LICENSE_KEY_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {
Expand Down Expand Up @@ -155,31 +150,24 @@ private void configureDatabase(String databaseType) throws Exception {
}

private void createGeoIpPipeline(String pipelineName, String databaseType, String sourceField, String targetField) throws IOException {
final BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
putJsonPipeline(pipelineName, (builder, params) -> {
builder.field("description", "test");
builder.startArray("processors");
{
builder.field("description", "test");
builder.startArray("processors");
builder.startObject();
{
builder.startObject();
builder.startObject("geoip");
{
builder.startObject("geoip");
{
builder.field("field", sourceField);
builder.field("target_field", targetField);
builder.field("database_file", databaseType + ".mmdb");
}
builder.endObject();
builder.field("field", sourceField);
builder.field("target_field", targetField);
builder.field("database_file", databaseType + ".mmdb");
}
builder.endObject();
}
builder.endArray();
builder.endObject();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(clusterAdmin().putPipeline(new PutPipelineRequest(pipelineName, bytes, XContentType.JSON)).actionGet());
return builder.endArray();
});
}

private String ingestDocument(String indexName, String pipelineName, String sourceField) {
Expand Down
Loading