From 4c157259e6318a1826daebaa98ab8e8be0876aea Mon Sep 17 00:00:00 2001 From: pgomulka Date: Thu, 17 Jun 2021 10:36:21 +0200 Subject: [PATCH] [Rest Api Compatibility] Type metadata for docs used in simulate request This commit allows to provide _type field on document ingested in simulate pipeline requests. relates main meta issue #51816 relates types removal issue #54160 --- .../ingest/SimulatePipelineRequest.java | 29 ++++- .../SimulatePipelineTransportAction.java | 5 +- .../elasticsearch/ingest/IngestDocument.java | 1 + .../SimulatePipelineRequestParsingTests.java | 120 ++++++++++++++++-- 4 files changed, 137 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index d8aa4989bce33..1ed322ec245ac 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -13,10 +13,12 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -33,17 +35,24 @@ import java.util.Objects; public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject { + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class); private String id; private boolean verbose; private BytesReference source; private XContentType xContentType; + private RestApiVersion restApiVersion; /** * Creates a new request with the given source and its content type */ public SimulatePipelineRequest(BytesReference source, XContentType xContentType) { + this(source, xContentType, RestApiVersion.current()); + } + + public SimulatePipelineRequest(BytesReference source, XContentType xContentType, RestApiVersion restApiVersion) { this.source = Objects.requireNonNull(source); this.xContentType = Objects.requireNonNull(xContentType); + this.restApiVersion = restApiVersion; } SimulatePipelineRequest() { @@ -133,7 +142,8 @@ public boolean isVerbose() { static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline"; - static Parsed parseWithPipelineId(String pipelineId, Map config, boolean verbose, IngestService ingestService) { + static Parsed parseWithPipelineId(String pipelineId, Map config, boolean verbose, IngestService ingestService, + RestApiVersion restApiVersion) { if (pipelineId == null) { throw new IllegalArgumentException("param [pipeline] is null"); } @@ -141,20 +151,21 @@ static Parsed parseWithPipelineId(String pipelineId, Map config, if (pipeline == null) { throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist"); } - List ingestDocumentList = parseDocs(config); + List ingestDocumentList = parseDocs(config, restApiVersion); return new Parsed(pipeline, ingestDocumentList, verbose); } - static Parsed parse(Map config, boolean verbose, IngestService ingestService) throws Exception { + static Parsed parse(Map config, boolean verbose, IngestService ingestService, RestApiVersion restApiVersion) + throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); Pipeline pipeline = Pipeline.create( SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService() ); - List ingestDocumentList = parseDocs(config); + List ingestDocumentList = parseDocs(config, restApiVersion); return new Parsed(pipeline, ingestDocumentList, verbose); } - private static List parseDocs(Map config) { + private static List parseDocs(Map config, RestApiVersion restApiVersion) { List> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS); if (docs.isEmpty()) { @@ -174,6 +185,10 @@ private static List parseDocs(Map config) { dataMap, Metadata.ID.getFieldName(), "_id"); String routing = ConfigurationUtils.readOptionalStringOrIntProperty(null, null, dataMap, Metadata.ROUTING.getFieldName()); + if (restApiVersion == RestApiVersion.V_7 && dataMap.containsKey(Metadata.TYPE.getFieldName())) { + deprecationLogger.compatibleApiWarning("simulate_pipeline_with_types", + "[types removal] specifying _type in pipeline simulation requests is deprecated"); + } Long version = null; if (dataMap.containsKey(Metadata.VERSION.getFieldName())) { String versionValue = ConfigurationUtils.readOptionalStringOrLongProperty(null, null, @@ -224,4 +239,8 @@ private static List parseDocs(Map config) { } return ingestDocumentList; } + + public RestApiVersion getRestApiVersion() { + return restApiVersion; + } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 9ec44f4661904..4600baf9b921f 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -42,9 +42,10 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe final SimulatePipelineRequest.Parsed simulateRequest; try { if (request.getId() != null) { - simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService); + simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService, + request.getRestApiVersion()); } else { - simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService); + simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService, request.getRestApiVersion()); } } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 952277977debe..8557a12ae8801 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -810,6 +810,7 @@ public String toString() { public enum Metadata { INDEX(IndexFieldMapper.NAME), + TYPE("type"), ID(IdFieldMapper.NAME), ROUTING(RoutingFieldMapper.NAME), VERSION(VersionFieldMapper.NAME), diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index e285e06671f22..c272787405dee 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; @@ -31,12 +32,13 @@ import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields; import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID; import static org.elasticsearch.ingest.IngestDocument.Metadata.ID; +import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM; +import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO; import static org.elasticsearch.ingest.IngestDocument.Metadata.INDEX; import static org.elasticsearch.ingest.IngestDocument.Metadata.ROUTING; +import static org.elasticsearch.ingest.IngestDocument.Metadata.TYPE; import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION; import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION_TYPE; -import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO; -import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -49,7 +51,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { @Before public void init() throws IOException { - TestProcessor processor = new TestProcessor(ingestDocument -> {}); + TestProcessor processor = new TestProcessor(ingestDocument -> { + }); CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor); Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor); Map registry = @@ -84,7 +87,8 @@ public void testParseUsingPipelineStore() throws Exception { } SimulatePipelineRequest.Parsed actualRequest = - SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService); + SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService, + RestApiVersion.current()); assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); @@ -112,7 +116,7 @@ public void testParseWithProvidedPipeline() throws Exception { Map doc = new HashMap<>(); Map expectedDoc = new HashMap<>(); List fields = Arrays.asList(INDEX, ID, ROUTING, VERSION, VERSION_TYPE, IF_SEQ_NO, IF_PRIMARY_TERM); - for(IngestDocument.Metadata field : fields) { + for (IngestDocument.Metadata field : fields) { if (field == VERSION) { Object value = randomBoolean() ? randomLong() : randomInt(); doc.put(field.getFieldName(), randomBoolean() ? value : value.toString()); @@ -177,7 +181,8 @@ public void testParseWithProvidedPipeline() throws Exception { requestContent.put(Fields.PIPELINE, pipelineConfig); - SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService); + SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService, + RestApiVersion.current()); assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); @@ -204,7 +209,7 @@ public void testNullPipelineId() { List> docs = new ArrayList<>(); requestContent.put(Fields.DOCS, docs); Exception e = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService)); + () -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService, RestApiVersion.current())); assertThat(e.getMessage(), equalTo("param [pipeline] is null")); } @@ -214,7 +219,7 @@ public void testNonExistentPipelineId() { List> docs = new ArrayList<>(); requestContent.put(Fields.DOCS, docs); Exception e = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService)); + () -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService, RestApiVersion.current())); assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist")); } @@ -227,7 +232,7 @@ public void testNotValidDocs() { requestContent.put(Fields.DOCS, docs); requestContent.put(Fields.PIPELINE, pipelineConfig); Exception e1 = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parse(requestContent, false, ingestService)); + () -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current())); assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]")); List stringList = new ArrayList<>(); @@ -236,14 +241,107 @@ public void testNotValidDocs() { requestContent.put(Fields.DOCS, stringList); requestContent.put(Fields.PIPELINE, pipelineConfig); Exception e2 = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parse(requestContent, false, ingestService)); + () -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current())); assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object")); docs.add(new HashMap<>()); requestContent.put(Fields.DOCS, docs); requestContent.put(Fields.PIPELINE, pipelineConfig); Exception e3 = expectThrows(ElasticsearchParseException.class, - () -> SimulatePipelineRequest.parse(requestContent, false, ingestService)); + () -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current())); assertThat(e3.getMessage(), containsString("required property is missing")); } + + public void testIngestPipelineWithDocumentsWithType() throws Exception { + int numDocs = randomIntBetween(1, 10); + + Map requestContent = new HashMap<>(); + List> docs = new ArrayList<>(); + List> expectedDocs = new ArrayList<>(); + requestContent.put(Fields.DOCS, docs); + for (int i = 0; i < numDocs; i++) { + Map doc = new HashMap<>(); + Map expectedDoc = new HashMap<>(); + List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE); + for (IngestDocument.Metadata field : fields) { + if (field == VERSION) { + Long value = randomLong(); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); + } else if (field == VERSION_TYPE) { + String value = VersionType.toString( + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE) + ); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); + } else if (field == TYPE) { + String value = randomAlphaOfLengthBetween(1, 10); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); + } else { + if (randomBoolean()) { + String value = randomAlphaOfLengthBetween(1, 10); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); + } else { + Integer value = randomIntBetween(1, 1000000); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), String.valueOf(value)); + } + } + } + String fieldName = randomAlphaOfLengthBetween(1, 10); + String fieldValue = randomAlphaOfLengthBetween(1, 10); + doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue)); + docs.add(doc); + expectedDoc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue)); + expectedDocs.add(expectedDoc); + } + Map pipelineConfig = new HashMap<>(); + List> processors = new ArrayList<>(); + int numProcessors = randomIntBetween(1, 10); + for (int i = 0; i < numProcessors; i++) { + Map processorConfig = new HashMap<>(); + List> onFailureProcessors = new ArrayList<>(); + int numOnFailureProcessors = randomIntBetween(0, 1); + for (int j = 0; j < numOnFailureProcessors; j++) { + onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap())); + } + if (numOnFailureProcessors > 0) { + processorConfig.put("on_failure", onFailureProcessors); + } + processors.add(Collections.singletonMap("mock_processor", processorConfig)); + } + pipelineConfig.put("processors", processors); + List> onFailureProcessors = new ArrayList<>(); + int numOnFailureProcessors = randomIntBetween(0, 1); + for (int i = 0; i < numOnFailureProcessors; i++) { + onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap())); + } + if (numOnFailureProcessors > 0) { + pipelineConfig.put("on_failure", onFailureProcessors); + } + requestContent.put(Fields.PIPELINE, pipelineConfig); + SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService, + RestApiVersion.V_7); + assertThat(actualRequest.isVerbose(), equalTo(false)); + assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); + Iterator> expectedDocsIterator = expectedDocs.iterator(); + for (IngestDocument ingestDocument : actualRequest.getDocuments()) { + Map expectedDocument = expectedDocsIterator.next(); + Map metadataMap = ingestDocument.extractMetadata(); + assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName()))); + assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName()))); + assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName()))); + assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName()))); + assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName()))); + assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE))); + } + assertThat(actualRequest.getPipeline().getId(), equalTo(SIMULATED_PIPELINE_ID)); + assertThat(actualRequest.getPipeline().getDescription(), nullValue()); + assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(numProcessors)); + + assertWarnings("[types removal] specifying _type in pipeline simulation requests is deprecated"); + + } }