Skip to content

Commit

Permalink
Added the router to the Data Prepper pipeline model. (#1666)
Browse files Browse the repository at this point in the history
Added the router to the Data Prepper pipeline model.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 18, 2022
1 parent c283c1c commit fabbb17
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.model.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.exc.InvalidFormatException;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

/**
* Model representing a route within a pipeline.
*
* @since 2.0
*/
@JsonSerialize(using = ConditionalRoute.ConditionalRouteSerializer.class)
@JsonDeserialize(using = ConditionalRoute.ConditionalRouteDeserializer.class)
public class ConditionalRoute {
private final String name;
private final String condition;

@JsonCreator
public ConditionalRoute(final String name, final String condition) {
this.name = name;
this.condition = condition;
}

/**
* Gets the name of the route.
*
* @return the route name
* @since 2.0
*/
public String getName() {
return name;
}

/**
* Gets the condition which applies for this route.
*
* @return the condition
* @since 2.0
*/
public String getCondition() {
return condition;
}

static class ConditionalRouteSerializer extends StdSerializer<ConditionalRoute> {

protected ConditionalRouteSerializer() {
super(ConditionalRoute.class);
}

@Override
public void serialize(final ConditionalRoute value, final JsonGenerator gen, final SerializerProvider provider) throws IOException {
gen.writeStartObject();
gen.writeObjectField(value.name, value.condition);
gen.writeEndObject();
}
}

static class ConditionalRouteDeserializer extends StdDeserializer<ConditionalRoute> {

protected ConditionalRouteDeserializer() {
super(ConditionalRoute.class);
}

@Override
public ConditionalRoute deserialize(final JsonParser parser, final DeserializationContext context) throws IOException, JacksonException {
final JsonNode node = context.readTree(parser);

final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
final Map.Entry<String, JsonNode> onlyField = fields.next();

final String routeName = onlyField.getKey();
final JsonNode value = onlyField.getValue();
if(!value.isTextual())
throw new InvalidFormatException(parser, "Route has a condition which is not a string.", value, String.class);
final String condition = value.asText();

if(fields.hasNext())
throw new InvalidFormatException(parser, "Route has too many fields.", null, String.class);

return new ConditionalRoute(routeName, condition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -64,6 +65,10 @@ else if (preppers != null) {
@JsonInclude(JsonInclude.Include.NON_NULL)
private final PluginModel buffer;

@JsonProperty("router")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private List<ConditionalRoute> router;

@JsonProperty("sink")
private final List<PluginModel> sinks;

Expand All @@ -79,6 +84,7 @@ public PipelineModel(
final PluginModel source,
final PluginModel buffer,
final List<PluginModel> processors,
final List<ConditionalRoute> router,
final List<PluginModel> sinks,
final Integer workers,
final Integer delay) {
Expand All @@ -90,6 +96,7 @@ public PipelineModel(
this.source = source;
this.buffer = buffer;
this.processors = processors;
this.router = router != null ? router : new ArrayList<>();
this.sinks = sinks;
this.workers = workers;
this.readBatchDelay = delay;
Expand All @@ -111,10 +118,11 @@ public PipelineModel(
@JsonProperty("buffer") final PluginModel buffer,
@Deprecated @JsonProperty("prepper") final List<PluginModel> preppers,
@JsonProperty("processor") final List<PluginModel> processors,
@JsonProperty("router") final List<ConditionalRoute> router,
@JsonProperty("sink") final List<PluginModel> sinks,
@JsonProperty("workers") final Integer workers,
@JsonProperty("delay") final Integer delay) {
this(source, buffer, validateProcessor(preppers, processors), sinks, workers, delay);
this(source, buffer, validateProcessor(preppers, processors), router, sinks, workers, delay);
}

public PluginModel getSource() {
Expand All @@ -133,6 +141,10 @@ public List<PluginModel> getProcessors() {
return processors;
}

public List<ConditionalRoute> getRouter() {
return router;
}

public List<PluginModel> getSinks() {
return sinks;
}
Expand All @@ -144,5 +156,4 @@ public Integer getWorkers() {
public Integer getReadBatchDelay() {
return readBatchDelay;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.model.configuration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidFormatException;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConditionalRouteTest {

public static final String KNOWN_ROUTE_NAME = "testRouteName";
public static final String KNOWN_CONDITION = "/my/property==value";

private ObjectMapper objectMapper;

@BeforeEach
void setUp() {
objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));
}

@Test
void serialize_single() throws IOException {
final ConditionalRoute conditionalRoute = new ConditionalRoute(KNOWN_ROUTE_NAME, KNOWN_CONDITION);

final String serialized = objectMapper.writeValueAsString(conditionalRoute);

final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_single.yaml");
assertThat(serialized, notNullValue());
assertThat(serialized, equalTo(createStringFromInputStream(inputStream)));
}

@Test
void deserialize_single() throws IOException {
final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_single.yaml");
final ConditionalRoute conditionalRoute = objectMapper.readValue(inputStream, ConditionalRoute.class);

assertThat(conditionalRoute, notNullValue());
assertThat(conditionalRoute.getName(), equalTo(KNOWN_ROUTE_NAME));
assertThat(conditionalRoute.getCondition(), equalTo(KNOWN_CONDITION));
}

@ParameterizedTest
@ValueSource(strings = {
"conditional_route_invalid_object.yaml",
"conditional_route_invalid_non_string.yaml",
"conditional_route_invalid_just_value.yaml"
})
void deserialize_single_invalid(final String invalidResourceName) throws IOException {
final InputStream inputStream = PluginModelTests.class.getResourceAsStream(invalidResourceName);

final String invalidYaml = createStringFromInputStream(inputStream);

final InvalidFormatException actualException = assertThrows(InvalidFormatException.class, () ->
objectMapper.readValue(invalidYaml, ConditionalRoute.class));

assertThat(actualException.getMessage(), containsString("Route"));
}

@Test
void serialize_list() throws IOException {
final List<ConditionalRoute> routes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String routeName = KNOWN_ROUTE_NAME + i;
final String condition = KNOWN_CONDITION + i;
final ConditionalRoute route = new ConditionalRoute(routeName, condition);
routes.add(route);
}

final String serialized = objectMapper.writeValueAsString(routes);

final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_list.yaml");
assertThat(serialized, notNullValue());
assertThat(serialized, equalTo(createStringFromInputStream(inputStream)));
}

@Test
void deserialize_list() throws IOException {
final InputStream inputStream = PluginModelTests.class.getResourceAsStream("conditional_route_list.yaml");

final TypeReference<List<ConditionalRoute>> listTypeReference = new TypeReference<>() {
};
final List<ConditionalRoute> conditionalRouteList = objectMapper.readValue(inputStream, listTypeReference);

assertThat(conditionalRouteList, notNullValue());
assertThat(conditionalRouteList.size(), equalTo(3));

for (int i = 0; i < 3; i++) {
assertThat(conditionalRouteList.get(i), notNullValue());
assertThat(conditionalRouteList.get(i).getName(), equalTo(KNOWN_ROUTE_NAME + i));
assertThat(conditionalRouteList.get(i).getCondition(), equalTo(KNOWN_CONDITION + i));
}
}

static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
Loading

0 comments on commit fabbb17

Please sign in to comment.