Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of conditional routing of sinks #1832

Merged

Conversation

dlvenable
Copy link
Member

@dlvenable dlvenable commented Sep 29, 2022

Description

This PR implements conditional routing.

I've rebased this PR to include #1840 and now it works with multiple processor threads. Note that this only works for single-threaded pipelines due to #1189.

Issues Resolved

Resolves #1337

Testing Information

Sample Pipeline:

simple-test-pipeline:
  workers: 1
  delay: "5000"
  source:
    file:
      path: /usr/share/data-prepper-file-source/test-in.txt
      record_type: event
      format: json
  route:
    - alpha: '/value == "a"'
    - beta: '/value == "b"'
    - sigma: '/value == "s"'
  sink:
    - file:
        path: /usr/share/data-prepper-file-source/out-alpha.txt
        routes:
          - alpha
    - file:
        path: /usr/share/data-prepper-file-source/out-beta.txt
        routes:
          - beta
    - file:
        path: /usr/share/data-prepper-file-source/out-should-be-empty.txt
        routes:
          - sigma
    - file:
        path: /usr/share/data-prepper-file-source/out-alpha-beta.txt
        routes:
          - alpha
          - beta
    - file:
        path: /usr/share/data-prepper-file-source/out-all.txt

File Input Source:

cat file-source/test-in.txt
{"value":"a", "other":"x", "sequence" : 1}
{"value":"a", "other":"x", "sequence" : 2}
{"value":"b", "sequence" : 3}
{"value":"b", "sequence" : 4}
{"value":"c", "sequence" : 5}
{"value":"d", "sequence" : 6}
{"value":"b", "sequence" : 7}
{"value":"a", "sequence" : 8}

Run with Docker:

docker run --name data-prepper -p 4900:4900 \
  -v ${PWD}/pipelines.yaml:/usr/share/data-prepper/pipelines/pipelines.yaml \
  -v ${PWD}/file-source:/usr/share/data-prepper-file-source/ \
  opensearch-data-prepper:2.0.0-SNAPSHOT

Results:

tail -n +1 file-source/out-*
==> file-source/out-all.txt <==
{"value":"a","other":"x","sequence":1}
{"value":"a","other":"x","sequence":2}
{"value":"b","sequence":3}
{"value":"b","sequence":4}
{"value":"c","sequence":5}
{"value":"d","sequence":6}
{"value":"b","sequence":7}
{"value":"a","sequence":8}

==> file-source/out-alpha-beta.txt <==
{"value":"a","other":"x","sequence":1}
{"value":"a","other":"x","sequence":2}
{"value":"b","sequence":3}
{"value":"b","sequence":4}
{"value":"b","sequence":7}
{"value":"a","sequence":8}

==> file-source/out-alpha.txt <==
{"value":"a","other":"x","sequence":1}
{"value":"a","other":"x","sequence":2}
{"value":"a","sequence":8}

==> file-source/out-beta.txt <==
{"value":"b","sequence":3}
{"value":"b","sequence":4}
{"value":"b","sequence":7}

==> file-source/out-should-be-empty.txt <==

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dlvenable dlvenable requested a review from a team as a code owner September 29, 2022 02:25
@dlvenable dlvenable force-pushed the conditional-routing-impl branch from 8756767 to a878b30 Compare September 29, 2022 15:18
@dlvenable dlvenable force-pushed the conditional-routing-impl branch from af8a123 to fac39ba Compare September 29, 2022 23:41
Copy link
Collaborator

@oeyh oeyh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works great! I was also able to run a small test with routes set on a pipeline connector.
A few small things below:

return new DataFlowComponent<>(sink, pluginSetting.getRoutes());
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
// TODO: This will return an object which can perform routing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this TODO item refer to the method defined around Line 249 and thus can be resolved?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct. Thanks for noting this, and I will remove.

}

@Test
void getComponent_returns_input_routes() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
void getComponent_returns_input_routes() {
void getRoutes_returns_input_routes() {

}

@Test
void getComponent_returns_input_routes_when_empty() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here:

Suggested change
void getComponent_returns_input_routes_when_empty() {
void getRoutes_returns_input_routes_when_empty() {

void parseConfiguration_with_routes_creates_correct_pipeline() {
mockDataPrepperConfigurationAccesses();
final PipelineParser pipelineParser =
createObjectUnderTest("src/test/resources/valid_multiple_sinks.yml");
Copy link
Collaborator

@oeyh oeyh Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline filename should be valid_multiple_sinks_with_routes.yml.

…cted test names, correct test file.

Signed-off-by: David Venable <[email protected]>
oeyh
oeyh previously approved these changes Oct 4, 2022
Comment on lines 43 to 66
if(data instanceof Event) {

final Event event = (Event) data;

recordsToRoutes.put(record, new HashSet<>());

for (ConditionalRoute route : routes) {
Boolean routed;
try {
routed = evaluator.evaluate(route.getCondition(), event);
} catch (final Exception ex) {
routed = false;
LOG.error("Failed to evaluate route. This route will not be applied to any events.", ex);
}
if (routed) {
recordsToRoutes
.get(record)
.add(route.getName());
}
}
} else {
nonEventRecords++;
recordsToRoutes.put(record, Collections.emptySet());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified by breaking this down. This would also eliminate variable re-assignment and reads better:

    if(data instanceof Event) {
        final Event event = (Event) data;
        final Set<String> matchedRoutes = findMatchedRoutes(event);
        recordsToRoutes.put(record, matchedRoutes);
    } else {
         nonEventRecords++;
        recordsToRoutes.put(record, Collections.emptySet());
    }

...

private Set<String> findMatchedRoutes(final Event event) {
    final Set<String> matchRoutes = new HashSet();
    for  (ConditionalRoute route : routes) {
       try {
           if (evaluator.evaluate(route.getCondition(), event)) {
               matchRoutes.add(route.getName());
           }
     } catch (final Exception ex) {
          LOG.error("Failed to evaluate route. This route will not be applied to any events.", ex);
     }
     return matchRoutes;
}

Comment on lines 35 to 48
final Set<String> routesForEvent = recordsToRoutes
.getOrDefault(event, Collections.emptySet());

boolean routed = false;
for (String route : dataFlowComponent.getRoutes()) {
if (routesForEvent.contains(route)) {
routed = true;
break;
}
}

if (routed) {
recordsForComponent.add(event);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to eliminate the break. This should still short circuit and not loop through every item if a match occurs.

final Set<String> routesForEvent = recordsToRoutes
    .getOrDefault(event, Collections.emptySet());

final Set<String> dataFlowComponentRoutes =  dataFlowComponent.getRoutes();

if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) {
   recordsForComponent.add(event);
}

@dlvenable dlvenable merged commit 42b732f into opensearch-project:main Oct 5, 2022
@dlvenable dlvenable deleted the conditional-routing-impl branch October 6, 2022 22:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Conditional Routing for Sinks
4 participants