From d0ebc3c674be419279d26773a2c767cd2142fcec Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 18 Aug 2022 12:51:22 -0500 Subject: [PATCH] Added the router to the Data Prepper pipeline model. (#1666) Added the router to the Data Prepper pipeline model. Signed-off-by: David Venable --- .../model/configuration/ConditionalRoute.java | 101 +++++++++++++++ .../model/configuration/PipelineModel.java | 15 ++- .../configuration/ConditionalRouteTest.java | 118 ++++++++++++++++++ .../configuration/PipelineModelTest.java | 77 +++++++++--- .../PipelinesDataFlowModelTest.java | 2 +- .../conditional_route_invalid_just_value.yaml | 3 + .../conditional_route_invalid_non_string.yaml | 4 + .../conditional_route_invalid_object.yaml | 4 + .../configuration/conditional_route_list.yaml | 4 + .../conditional_route_single.yaml | 2 + .../logstash/mapping/LogstashMapper.java | 2 +- .../logstash/mapping/LogstashMapperTest.java | 2 +- 12 files changed, 315 insertions(+), 19 deletions(-) create mode 100644 data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/ConditionalRoute.java create mode 100644 data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/ConditionalRouteTest.java create mode 100644 data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_just_value.yaml create mode 100644 data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_non_string.yaml create mode 100644 data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_object.yaml create mode 100644 data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_list.yaml create mode 100644 data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_single.yaml diff --git a/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/ConditionalRoute.java b/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/ConditionalRoute.java new file mode 100644 index 0000000000..59c2f21fe3 --- /dev/null +++ b/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/ConditionalRoute.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.model.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.exc.InvalidFormatException; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Model representing a route within a pipeline. + * + * @since 2.0 + */ +@JsonSerialize(using = ConditionalRoute.ConditionalRouteSerializer.class) +@JsonDeserialize(using = ConditionalRoute.ConditionalRouteDeserializer.class) +public class ConditionalRoute { + private final String name; + private final String condition; + + @JsonCreator + public ConditionalRoute(final String name, final String condition) { + this.name = name; + this.condition = condition; + } + + /** + * Gets the name of the route. + * + * @return the route name + * @since 2.0 + */ + public String getName() { + return name; + } + + /** + * Gets the condition which applies for this route. + * + * @return the condition + * @since 2.0 + */ + public String getCondition() { + return condition; + } + + static class ConditionalRouteSerializer extends StdSerializer { + + protected ConditionalRouteSerializer() { + super(ConditionalRoute.class); + } + + @Override + public void serialize(final ConditionalRoute value, final JsonGenerator gen, final SerializerProvider provider) throws IOException { + gen.writeStartObject(); + gen.writeObjectField(value.name, value.condition); + gen.writeEndObject(); + } + } + + static class ConditionalRouteDeserializer extends StdDeserializer { + + protected ConditionalRouteDeserializer() { + super(ConditionalRoute.class); + } + + @Override + public ConditionalRoute deserialize(final JsonParser parser, final DeserializationContext context) throws IOException, JacksonException { + final JsonNode node = context.readTree(parser); + + final Iterator> fields = node.fields(); + final Map.Entry onlyField = fields.next(); + + final String routeName = onlyField.getKey(); + final JsonNode value = onlyField.getValue(); + if(!value.isTextual()) + throw new InvalidFormatException(parser, "Route has a condition which is not a string.", value, String.class); + final String condition = value.asText(); + + if(fields.hasNext()) + throw new InvalidFormatException(parser, "Route has too many fields.", null, String.class); + + return new ConditionalRoute(routeName, condition); + } + } +} diff --git a/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/PipelineModel.java b/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/PipelineModel.java index e8cb877798..7d51937ff1 100644 --- a/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/PipelineModel.java +++ b/data-prepper-api/src/main/java/com/amazon/dataprepper/model/configuration/PipelineModel.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -64,6 +65,10 @@ else if (preppers != null) { @JsonInclude(JsonInclude.Include.NON_NULL) private final PluginModel buffer; + @JsonProperty("router") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private List router; + @JsonProperty("sink") private final List sinks; @@ -79,6 +84,7 @@ public PipelineModel( final PluginModel source, final PluginModel buffer, final List processors, + final List router, final List sinks, final Integer workers, final Integer delay) { @@ -90,6 +96,7 @@ public PipelineModel( this.source = source; this.buffer = buffer; this.processors = processors; + this.router = router != null ? router : new ArrayList<>(); this.sinks = sinks; this.workers = workers; this.readBatchDelay = delay; @@ -111,10 +118,11 @@ public PipelineModel( @JsonProperty("buffer") final PluginModel buffer, @Deprecated @JsonProperty("prepper") final List preppers, @JsonProperty("processor") final List processors, + @JsonProperty("router") final List router, @JsonProperty("sink") final List sinks, @JsonProperty("workers") final Integer workers, @JsonProperty("delay") final Integer delay) { - this(source, buffer, validateProcessor(preppers, processors), sinks, workers, delay); + this(source, buffer, validateProcessor(preppers, processors), router, sinks, workers, delay); } public PluginModel getSource() { @@ -133,6 +141,10 @@ public List getProcessors() { return processors; } + public List getRouter() { + return router; + } + public List getSinks() { return sinks; } @@ -144,5 +156,4 @@ public Integer getWorkers() { public Integer getReadBatchDelay() { return readBatchDelay; } - } diff --git a/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/ConditionalRouteTest.java b/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/ConditionalRouteTest.java new file mode 100644 index 0000000000..7f128b05e2 --- /dev/null +++ b/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/ConditionalRouteTest.java @@ -0,0 +1,118 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.model.configuration; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.InvalidFormatException; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ConditionalRouteTest { + + public static final String KNOWN_ROUTE_NAME = "testRouteName"; + public static final String KNOWN_CONDITION = "/my/property==value"; + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + } + + @Test + void serialize_single() throws IOException { + final ConditionalRoute conditionalRoute = new ConditionalRoute(KNOWN_ROUTE_NAME, KNOWN_CONDITION); + + final String serialized = objectMapper.writeValueAsString(conditionalRoute); + + final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_single.yaml"); + assertThat(serialized, notNullValue()); + assertThat(serialized, equalTo(createStringFromInputStream(inputStream))); + } + + @Test + void deserialize_single() throws IOException { + final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_single.yaml"); + final ConditionalRoute conditionalRoute = objectMapper.readValue(inputStream, ConditionalRoute.class); + + assertThat(conditionalRoute, notNullValue()); + assertThat(conditionalRoute.getName(), equalTo(KNOWN_ROUTE_NAME)); + assertThat(conditionalRoute.getCondition(), equalTo(KNOWN_CONDITION)); + } + + @ParameterizedTest + @ValueSource(strings = { + "conditional_route_invalid_object.yaml", + "conditional_route_invalid_non_string.yaml", + "conditional_route_invalid_just_value.yaml" + }) + void deserialize_single_invalid(final String invalidResourceName) throws IOException { + final InputStream inputStream = PluginModelTests.class.getResourceAsStream(invalidResourceName); + + final String invalidYaml = createStringFromInputStream(inputStream); + + final InvalidFormatException actualException = assertThrows(InvalidFormatException.class, () -> + objectMapper.readValue(invalidYaml, ConditionalRoute.class)); + + assertThat(actualException.getMessage(), containsString("Route")); + } + + @Test + void serialize_list() throws IOException { + final List routes = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + final String routeName = KNOWN_ROUTE_NAME + i; + final String condition = KNOWN_CONDITION + i; + final ConditionalRoute route = new ConditionalRoute(routeName, condition); + routes.add(route); + } + + final String serialized = objectMapper.writeValueAsString(routes); + + final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_list.yaml"); + assertThat(serialized, notNullValue()); + assertThat(serialized, equalTo(createStringFromInputStream(inputStream))); + } + + @Test + void deserialize_list() throws IOException { + final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_list.yaml"); + + final TypeReference> listTypeReference = new TypeReference<>() { + }; + final List conditionalRouteList = objectMapper.readValue(inputStream, listTypeReference); + + assertThat(conditionalRouteList, notNullValue()); + assertThat(conditionalRouteList.size(), equalTo(3)); + + for (int i = 0; i < 3; i++) { + assertThat(conditionalRouteList.get(i), notNullValue()); + assertThat(conditionalRouteList.get(i).getName(), equalTo(KNOWN_ROUTE_NAME + i)); + assertThat(conditionalRouteList.get(i).getCondition(), equalTo(KNOWN_CONDITION + i)); + } + } + + static String createStringFromInputStream(final InputStream inputStream) throws IOException { + return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + } +} \ No newline at end of file diff --git a/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelineModelTest.java b/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelineModelTest.java index 8dc9b74d6b..b0aafc8806 100644 --- a/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelineModelTest.java +++ b/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelineModelTest.java @@ -5,7 +5,7 @@ package com.amazon.dataprepper.model.configuration; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; @@ -21,7 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class PipelineModelTest { +class PipelineModelTest { public static Random random = new Random(); public static final Integer TEST_WORKERS = random.nextInt(30); @@ -32,11 +32,12 @@ public class PipelineModelTest { public static PluginModel TEST_VALID_SINKS_PLUGIN_MODEL = new PluginModel("sink", validPluginSettings()); @Test - public void testPipelineModelCreation() { + void testPipelineModelCreation() { final PipelineModel pipelineModel = new PipelineModel( validSourcePluginModel(), validBufferPluginModel(), validPreppersPluginModel(), + validPipelineRouter(), validSinksPluginModel(), TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -56,42 +57,49 @@ public void testPipelineModelCreation() { assertThat(originalBuffer.getPluginSettings(), is(equalTo(TEST_VALID_BUFFER_PLUGIN_MODEL.getPluginSettings()))); assertThat(originalPreppers.get(0).getPluginName(), is(equalTo(TEST_VALID_PREPPERS_PLUGIN_MODEL.getPluginName()))); assertThat(originalPreppers.get(0).getPluginSettings(), is(equalTo(TEST_VALID_PREPPERS_PLUGIN_MODEL.getPluginSettings()))); + assertThat(pipelineModel.getRouter(), notNullValue()); + assertThat(pipelineModel.getRouter().size(), equalTo(1)); assertThat(originalSinks.get(0).getPluginName(), is(equalTo(TEST_VALID_SINKS_PLUGIN_MODEL.getPluginName()))); assertThat(originalSinks.get(0).getPluginSettings(), is(equalTo(TEST_VALID_SINKS_PLUGIN_MODEL.getPluginSettings()))); assertThat(pipelineModel.getWorkers(), is(TEST_WORKERS)); assertThat(pipelineModel.getReadBatchDelay(), is(TEST_READ_BATCH_DELAY)); } - public static Map validPluginSettings() { + static Map validPluginSettings() { final Map settings = new HashMap<>(); settings.put("property", "value"); return settings; } - public static PluginModel validSourcePluginModel() { + static PluginModel validSourcePluginModel() { return new PluginModel("source-plugin", validPluginSettings()); } - public static PluginModel validBufferPluginModel() { + static PluginModel validBufferPluginModel() { return new PluginModel("buffer", validPluginSettings()); } - public static List validPreppersPluginModel() { + static List validPreppersPluginModel() { return Collections.singletonList(new PluginModel("prepper", validPluginSettings())); } - public static List validSinksPluginModel() { + private static List validPipelineRouter() { + return Collections.singletonList(new ConditionalRoute("router", "/a==b")); + } + + static List validSinksPluginModel() { return Collections.singletonList(new PluginModel("sink", validPluginSettings())); } @Test - public void testPipelineModelWithPrepperAndProcessorConfigThrowsException() { + void testPipelineModelWithPrepperAndProcessorConfigThrowsException() { final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( validSourcePluginModel(), validBufferPluginModel(), validPreppersPluginModel(), validPreppersPluginModel(), + validPipelineRouter(), validSinksPluginModel(), TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -105,13 +113,14 @@ public void testPipelineModelWithPrepperAndProcessorConfigThrowsException() { } @Test - public void testPipelineModelWithValidPrepperConfig() { + void testPipelineModelWithValidPrepperConfig() { final List expectedPreppersPluginModel = validPreppersPluginModel(); final PipelineModel pipelineModel = new PipelineModel( validSourcePluginModel(), null, expectedPreppersPluginModel, null, + null, validSinksPluginModel(), TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -122,13 +131,14 @@ public void testPipelineModelWithValidPrepperConfig() { } @Test - public void testPipelineModelWithValidProcessorConfig() { + void testPipelineModelWithValidProcessorConfig() { final List expectedPreppersPluginModel = validPreppersPluginModel(); final PipelineModel pipelineModel = new PipelineModel( validSourcePluginModel(), null, null, expectedPreppersPluginModel, + validPipelineRouter(), validSinksPluginModel(), TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -139,11 +149,12 @@ public void testPipelineModelWithValidProcessorConfig() { } @Test - public void testPipelineModelWithNullSourceThrowsException() { + void testPipelineModelWithNullSourceThrowsException() { final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( null, validBufferPluginModel(), validPreppersPluginModel(), + validPipelineRouter(), validSinksPluginModel(), TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -155,11 +166,12 @@ public void testPipelineModelWithNullSourceThrowsException() { } @Test - public void testPipelineModelWithNullSinksThrowsException() { + void testPipelineModelWithNullSinksThrowsException() { final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( validSourcePluginModel(), validBufferPluginModel(), validPreppersPluginModel(), + validPipelineRouter(), null, TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -171,11 +183,12 @@ public void testPipelineModelWithNullSinksThrowsException() { } @Test - public void testPipelineModelWithEmptySinksThrowsException() { + void testPipelineModelWithEmptySinksThrowsException() { final Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineModel( validSourcePluginModel(), validBufferPluginModel(), validPreppersPluginModel(), + validPipelineRouter(), Collections.emptyList(), TEST_WORKERS, TEST_READ_BATCH_DELAY @@ -185,4 +198,40 @@ public void testPipelineModelWithEmptySinksThrowsException() { assertThat(exception.getMessage(), equalTo(expected)); } + + @Test + void testPipelineModelCreation_with_null_router_creates_model_with_empty_router() { + final PipelineModel pipelineModel = new PipelineModel( + validSourcePluginModel(), + validBufferPluginModel(), + validPreppersPluginModel(), + null, + validSinksPluginModel(), + TEST_WORKERS, + TEST_READ_BATCH_DELAY + ); + final PluginModel originalSource = pipelineModel.getSource(); + final PluginModel originalBuffer = pipelineModel.getBuffer(); + final List originalPreppers = pipelineModel.getProcessors(); + final List originalSinks = pipelineModel.getSinks(); + + assertThat(originalSource, notNullValue()); + assertThat(originalBuffer, notNullValue()); + assertThat(originalPreppers, notNullValue()); + assertThat(originalSinks, notNullValue()); + assertThat(originalSource.getPluginName(), is(equalTo(TEST_VALID_SOURCE_PLUGIN_MODEL.getPluginName()))); + assertThat(originalSource.getPluginSettings(), is(equalTo(TEST_VALID_SOURCE_PLUGIN_MODEL.getPluginSettings()))); + assertThat(originalBuffer.getPluginName(), is(equalTo(TEST_VALID_BUFFER_PLUGIN_MODEL.getPluginName()))); + assertThat(originalBuffer.getPluginSettings(), is(equalTo(TEST_VALID_BUFFER_PLUGIN_MODEL.getPluginSettings()))); + assertThat(originalPreppers.get(0).getPluginName(), is(equalTo(TEST_VALID_PREPPERS_PLUGIN_MODEL.getPluginName()))); + assertThat(originalPreppers.get(0).getPluginSettings(), is(equalTo(TEST_VALID_PREPPERS_PLUGIN_MODEL.getPluginSettings()))); + assertThat(originalSinks.get(0).getPluginName(), is(equalTo(TEST_VALID_SINKS_PLUGIN_MODEL.getPluginName()))); + assertThat(originalSinks.get(0).getPluginSettings(), is(equalTo(TEST_VALID_SINKS_PLUGIN_MODEL.getPluginSettings()))); + assertThat(pipelineModel.getWorkers(), is(TEST_WORKERS)); + assertThat(pipelineModel.getReadBatchDelay(), is(TEST_READ_BATCH_DELAY)); + + assertThat(pipelineModel.getRouter(), notNullValue()); + assertThat(pipelineModel.getRouter().size(), equalTo(0)); + } + } diff --git a/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelinesDataFlowModelTest.java b/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelinesDataFlowModelTest.java index c5e422a935..c4542d7011 100644 --- a/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelinesDataFlowModelTest.java +++ b/data-prepper-api/src/test/java/com/amazon/dataprepper/model/configuration/PipelinesDataFlowModelTest.java @@ -42,7 +42,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an final PluginModel source = new PluginModel("testSource", null); final List preppers = Collections.singletonList(new PluginModel("testPrepper", null)); final List sinks = Collections.singletonList(new PluginModel("testSink", null)); - final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, sinks, 8, 50); + final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, null, sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel)); diff --git a/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_just_value.yaml b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_just_value.yaml new file mode 100644 index 0000000000..ee6cf914c2 --- /dev/null +++ b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_just_value.yaml @@ -0,0 +1,3 @@ +--- +a: b +c: d diff --git a/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_non_string.yaml b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_non_string.yaml new file mode 100644 index 0000000000..f7ee1d477b --- /dev/null +++ b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_non_string.yaml @@ -0,0 +1,4 @@ +--- +testRouteName: + this: is + invalid: format diff --git a/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_object.yaml b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_object.yaml new file mode 100644 index 0000000000..f7ee1d477b --- /dev/null +++ b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_invalid_object.yaml @@ -0,0 +1,4 @@ +--- +testRouteName: + this: is + invalid: format diff --git a/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_list.yaml b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_list.yaml new file mode 100644 index 0000000000..d875244867 --- /dev/null +++ b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_list.yaml @@ -0,0 +1,4 @@ +--- +- testRouteName0: "/my/property==value0" +- testRouteName1: "/my/property==value1" +- testRouteName2: "/my/property==value2" diff --git a/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_single.yaml b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_single.yaml new file mode 100644 index 0000000000..47b7ae0cb7 --- /dev/null +++ b/data-prepper-api/src/test/resources/com/amazon/dataprepper/model/configuration/conditional_route_single.yaml @@ -0,0 +1,2 @@ +--- +testRouteName: "/my/property==value" diff --git a/data-prepper-logstash-configuration/src/main/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapper.java b/data-prepper-logstash-configuration/src/main/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapper.java index d4cbb03558..c93fc0a6c6 100644 --- a/data-prepper-logstash-configuration/src/main/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapper.java +++ b/data-prepper-logstash-configuration/src/main/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapper.java @@ -37,7 +37,7 @@ public PipelineModel mapPipeline(LogstashConfiguration logstashConfiguration) { throw new LogstashMappingException("At least one logstash output plugin is required"); } - return new PipelineModel(sourcePlugin, null, prepperPluginModels, sinkPluginModels, null, null); + return new PipelineModel(sourcePlugin, null, prepperPluginModels, null, sinkPluginModels, null, null); } private List mapPluginSection(LogstashConfiguration logstashConfiguration, LogstashPluginType logstashPluginType) { diff --git a/data-prepper-logstash-configuration/src/test/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapperTest.java b/data-prepper-logstash-configuration/src/test/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapperTest.java index 3e45d04af6..eff006b501 100644 --- a/data-prepper-logstash-configuration/src/test/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapperTest.java +++ b/data-prepper-logstash-configuration/src/test/java/org/opensearch/dataprepper/logstash/mapping/LogstashMapperTest.java @@ -40,7 +40,7 @@ void mapPipeline_returns_pipeline_model() { PipelineModel actualPipelineModel = logstashMapper.mapPipeline(logstashConfiguration); PipelineModel expectedPipelineModel = new PipelineModel(TestDataProvider.samplePluginModel(), - null, null, Collections.singletonList(TestDataProvider.samplePluginModel()), null, null); + null, null, null, Collections.singletonList(TestDataProvider.samplePluginModel()), null, null); assertThat(actualPipelineModel.getSource().getPluginName(), equalTo(expectedPipelineModel.getSource().getPluginName()));