Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the router to the Data Prepper pipeline model. #1666

Merged
merged 2 commits into from
Aug 18, 2022

Conversation

dlvenable
Copy link
Member

Description

Adds router to the pipeline model to support conditional routing.

Some additional code and tests had to change to support the new model design.

Issues Resolved

Resolves #1665

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dlvenable dlvenable requested a review from a team as a code owner August 15, 2022 13:17
Comment on lines 97 to +98
this.processors = processors;
this.router = router != null ? router : new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processors are optional and we do not perform this check. Are we building these objects in a different way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably want to make processors return an empty list too.

Right now, this class has two constructors. One used by Jackson and the other used by other code in Data Prepper. I'd like to replace the second constructor with a builder pattern. It would be too much for this PR though. In that scenario, we could always use empty arrays instead of null to make development easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #1669 to track these two changes. I propose there that processors should return an empty list.

Comment on lines 60 to 61
assertThat(pipelineModel.getRouter(), notNullValue());
assertThat(pipelineModel.getRouter(), notNullValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate assertions

@@ -64,6 +65,9 @@ else if (preppers != null) {
@JsonInclude(JsonInclude.Include.NON_NULL)
private final PluginModel buffer;

@JsonProperty("router")
private List<PipelineConditionalRoute> router;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a router a list of PipelineConditionalRoutes?

From a users perspective, am I creating routes or a router when I put a route in my pipeline? Should the JsonProperty be routes?

Copy link
Member Author

@dlvenable dlvenable Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RFC calls for:

router:
  - myRouteA: '/some/conditional'
  - myRouteA: '/some/conditional'

I tried to use Jackson to create a Router class as an abstraction of the whole router concept. It could then have routes (which are not exposed in the YAML model, only the Java model) But, I hit quite a few limitations with Jackson that prevented it from working the way I would have liked.

There is a bit of discussion in that RFC, but overall the concept of a router seemed most appropriate. We could rename it to routes. We could also consider making a model like the following:

router:
  routes:
    - myRouteA: '/some/conditional'
    - myRouteB: '/some/conditional'

This last approach actually lines up with what I was trying to do with the Java model. I think that extra nesting is not valuable to pipeline authors.

*/
@JsonSerialize(using = PipelineConditionalRoute.PipelineConditionalRouteSerializer.class)
@JsonDeserialize(using = PipelineConditionalRoute.PipelineConditionalRouteDeserializer.class)
public class PipelineConditionalRoute {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelineCondiationalRoute seems overly verbose. Are we expecting to have routes which are not part of the pipeliens or conditional based? Can we simplify the naming here?

If we are or do plan on extending (to support non-conditional routes). We should define an interface and have this class be the first implementation.

The java docs does not mention anything about being a conditional route either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not expecting non-conditional routes. But, at some point in the code we will have routes applied to events. I was hoping to distinguish between these routes here and then routes which are added to events at runtime.

I'll try to think about a better name here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to ConditionalRoute. I think this may help clear it up from a "route" which has already been applied to an event at runtime.

final List<PluginModel> expectedPreppersPluginModel = validPreppersPluginModel();
final PipelineModel pipelineModel = new PipelineModel(
validSourcePluginModel(),
null,
expectedPreppersPluginModel,
null,
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious as to why you elected not to include a router here? This test should be eliminated when prepper is deprecated. I want to make sure we are capturing this test long term if it's required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to remove prepper as you noted, so I'm not too worried about this test. But, we do need to cover the scenario for now (or test coverage is too low).

@@ -64,6 +65,9 @@ else if (preppers != null) {
@JsonInclude(JsonInclude.Include.NON_NULL)
private final PluginModel buffer;

@JsonProperty("router")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have the @JsonInclude(JsonInclude.Include.NON_NULL annotation? It is an optional configuration similar to buffer and processor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to use NON_EMPTY. This will not serialize an empty list. This makes null and empty list semantically the same and allows Java clients to not have to worry about null.

I create #1669 to make some related changes. In that, I will change processor to also have the same behavior. This should make it easier to code against these Java models (no null lists).


@Override
public PipelineConditionalRoute deserialize(final JsonParser parser, final DeserializationContext context) throws IOException, JacksonException {
final JsonNode node = parser.getCodec().readTree(parser);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the input to readTree be some serialized data ?

}

@Override
public PipelineConditionalRoute deserialize(final JsonParser parser, final DeserializationContext context) throws IOException, JacksonException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is context used here ?

@JsonDeserialize(using = PipelineConditionalRoute.PipelineConditionalRouteDeserializer.class)
public class PipelineConditionalRoute {
private final String name;
private final String condition;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we think about an interface for condition ? This will help us do some validation over condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, but I really think it should be a larger change that covers any Data Prepper condition (or even expression). It thus affects the data-prepper-expression package and would be a fairly significant change.

}

@Override
public void serialize(final PipelineConditionalRoute value, final JsonGenerator gen, final SerializerProvider provider) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to use the serializer provider ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but this is an overridden method from a Jackson interface.

final JsonNode node = parser.getCodec().readTree(parser);

final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
final Map.Entry<String, JsonNode> onlyField = fields.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do validation for single value entry ?

Comment on lines 98 to 106
final StringBuilder stringBuilder = new StringBuilder();
try (final Reader reader = new BufferedReader(new InputStreamReader(inputStream, Charset.forName(StandardCharsets.UTF_8.name())))) {
int counter = 0;
while ((counter = reader.read()) != -1) {
stringBuilder.append((char) counter);
}
}
return stringBuilder.toString();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java 9:

new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);

…alize router as non-empty, negative test cases.

Signed-off-by: David Venable <[email protected]>
@dlvenable dlvenable merged commit fabbb17 into opensearch-project:main Aug 18, 2022
engechas pushed a commit to engechas/data-prepper that referenced this pull request Sep 12, 2022
…ect#1666)

Added the router to the Data Prepper pipeline model.

Signed-off-by: David Venable <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add the router to the Data Prepper pipeline model
4 participants