Skip to content

Commit

Permalink
Implementation of conditional routing of sinks.
Browse files Browse the repository at this point in the history
Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Sep 29, 2022
1 parent 1aff0e6 commit 0b0981f
Show file tree
Hide file tree
Showing 19 changed files with 1,344 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.parser;

import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

/**
* Represents a part of the pipeline through which data flows. This includes
* {@link com.amazon.dataprepper.model.sink.Sink} and {@link com.amazon.dataprepper.model.processor.Processor}.
*
* @param <T> The type of component.
*/
public class DataFlowComponent<T> {
private final T component;
private final Set<String> routes;

DataFlowComponent(final T component, final Collection<String> routes) {
this.component = Objects.requireNonNull(component);
this.routes = new HashSet<>(Objects.requireNonNull(routes));
}

public T getComponent() {
return component;
}

public Set<String> getRoutes() {
return routes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.PipelineConnector;
import org.opensearch.dataprepper.plugins.MultiBufferDecorator;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,6 +59,7 @@ public class PipelineParser {
private static final String PIPELINE_TYPE = "pipeline";
private static final String ATTRIBUTE_NAME = "name";
private final String pipelineConfigurationFileLocation;
private final RouterFactory routerFactory;
private final DataPrepperConfiguration dataPrepperConfiguration;
private final Map<String, PipelineConnector> sourceConnectorMap = new HashMap<>(); //TODO Remove this and rely only on pipelineMap
private final PluginFactory pluginFactory;
Expand All @@ -65,10 +68,12 @@ public class PipelineParser {
public PipelineParser(final String pipelineConfigurationFileLocation,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
final DataPrepperConfiguration dataPrepperConfiguration) {
this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
this.routerFactory = routerFactory;
this.dataPrepperConfiguration = Objects.requireNonNull(dataPrepperConfiguration);
}

Expand Down Expand Up @@ -176,15 +181,17 @@ private void buildPipelineFromConfiguration(
final int readBatchDelay = pipelineConfiguration.getReadBatchDelay();

LOG.info("Building sinks for the pipeline [{}]", pipelineName);
final List<Sink> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildSinkOrConnector)
final List<DataFlowComponent<Sink>> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildRoutedSinkOrConnector)
.collect(Collectors.toList());

final List<Buffer> secondaryBuffers = getSecondaryBuffers();
LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName);
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(buffer, secondaryBuffers);

final Pipeline pipeline = new Pipeline(pipelineName, source, multiBufferDecorator, decoratedProcessorSets, sinks, processorThreads, readBatchDelay,
final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());

final Pipeline pipeline = new Pipeline(pipelineName, source, multiBufferDecorator, decoratedProcessorSets, sinks, router, processorThreads, readBatchDelay,
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
getPeerForwarderDrainTimeout(dataPrepperConfiguration));
pipelineMap.put(pipelineName, pipeline);
Expand Down Expand Up @@ -239,7 +246,13 @@ private Optional<Source> getSourceIfPipelineType(
return Optional.empty();
}

private Sink buildSinkOrConnector(final RoutedPluginSetting pluginSetting) {
private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final RoutedPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting);

return new DataFlowComponent<>(sink, pluginSetting.getRoutes());
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
// TODO: This will return an object which can perform routing.
LOG.info("Building [{}] as sink component", pluginSetting.getName());
final Optional<String> pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.PipelineParser;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -20,11 +21,13 @@ public PipelineParser pipelineParser(
final DataPrepperArgs dataPrepperArgs,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
final DataPrepperConfiguration dataPrepperConfiguration
) {
return new PipelineParser(dataPrepperArgs.getPipelineConfigFileLocation(),
pluginFactory,
peerForwarderProvider,
routerFactory,
dataPrepperConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.parser.model;

import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -13,9 +14,11 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PipelineConfiguration {
Expand All @@ -28,8 +31,10 @@ public class PipelineConfiguration {
private final PluginSetting bufferPluginSetting;
private final List<PluginSetting> processorPluginSettings;
private final List<RoutedPluginSetting> sinkPluginSettings;

private final Integer workers;
private final Integer readBatchDelay;
private final Set<ConditionalRoute> routes;

public PipelineConfiguration(final PipelineModel pipelineModel) {
this.sourcePluginSetting = getSourceFromPluginModel(pipelineModel.getSource());
Expand All @@ -38,6 +43,7 @@ public PipelineConfiguration(final PipelineModel pipelineModel) {
this.sinkPluginSettings = getSinksFromPluginModel(pipelineModel.getSinks());
this.workers = getWorkersFromPipelineModel(pipelineModel);
this.readBatchDelay = getReadBatchDelayFromPipelineModel(pipelineModel);
routes = new HashSet<>(pipelineModel.getRoutes());
}

public PluginSetting getSourcePluginSetting() {
Expand All @@ -56,6 +62,10 @@ public List<RoutedPluginSetting> getSinkPluginSettings() {
return sinkPluginSettings;
}

public Set<ConditionalRoute> getRoutes() {
return routes;
}

public Integer getWorkers() {
return workers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import com.google.common.base.Preconditions;
import org.opensearch.dataprepper.parser.DataFlowComponent;
import org.opensearch.dataprepper.pipeline.common.PipelineThreadFactory;
import org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -42,7 +44,8 @@ public class Pipeline {
private final Source source;
private final Buffer buffer;
private final List<List<Processor>> processorSets;
private final List<Sink> sinks;
private final List<DataFlowComponent<Sink>> sinks;
private final Router router;
private final int processorThreads;
private final int readBatchTimeoutInMillis;
private final Duration processorShutdownTimeout;
Expand Down Expand Up @@ -75,7 +78,8 @@ public Pipeline(
@Nonnull final Source source,
@Nonnull final Buffer buffer,
@Nonnull final List<List<Processor>> processorSets,
@Nonnull final List<Sink> sinks,
@Nonnull final List<DataFlowComponent<Sink>> sinks,
@Nonnull final Router router,
final int processorThreads,
final int readBatchTimeoutInMillis,
final Duration processorShutdownTimeout,
Expand All @@ -88,6 +92,7 @@ public Pipeline(
this.buffer = buffer;
this.processorSets = processorSets;
this.sinks = sinks;
this.router = router;
this.processorThreads = processorThreads;
this.readBatchTimeoutInMillis = readBatchTimeoutInMillis;
this.processorShutdownTimeout = processorShutdownTimeout;
Expand Down Expand Up @@ -127,8 +132,11 @@ public Buffer getBuffer() {
/**
* @return {@link Sink} of this pipeline.
*/
// TODO: We can probably remove this
public Collection<Sink> getSinks() {
return this.sinks;
return this.sinks.stream()
.map(DataFlowComponent::getComponent)
.collect(Collectors.toList());
}

public boolean isStopRequested() {
Expand Down Expand Up @@ -202,7 +210,9 @@ public void shutdown() {
shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis());

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
sinks.forEach(Sink::shutdown);
sinks.stream()
.map(DataFlowComponent::getComponent)
.forEach(Sink::shutdown);

shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout.toMillis());
}
Expand Down Expand Up @@ -233,10 +243,9 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);
for (int i = 0; i < sinksSize; i++) {
final int finalI = i;
sinkFutures.add(sinkExecutorService.submit(() -> sinks.get(finalI).output(records), null));
}
router.route(records, sinks, (sink, events) ->
sinkFutures.add(sinkExecutorService.submit(() -> sink.output(events), null))
);
return sinkFutures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.router;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.parser.DataFlowComponent;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;

/**
* Package-protected utility to route for a single {@link DataFlowComponent}. This is
* intended to help break apart {@link Router} for better testing.
*/
class DataFlowComponentRouter {

<C> void route(final Collection<Record> allRecords,
final DataFlowComponent<C> dataFlowComponent,
final Map<Record, Set<String>> recordsToRoutes,
final BiConsumer<C, Collection<Record>> componentRecordsConsumer) {

final Collection<Record> recordsForComponent;
if (dataFlowComponent.getRoutes().isEmpty()) {
recordsForComponent = allRecords;
} else {
recordsForComponent = new ArrayList<>();
for (Record event : allRecords) {
final Set<String> routesForEvent = recordsToRoutes
.getOrDefault(event, Collections.emptySet());

boolean routed = false;
for (String route : dataFlowComponent.getRoutes()) {
if (routesForEvent.contains(route)) {
routed = true;
break;
}
}

if (routed) {
recordsForComponent.add(event);
}
}

}
componentRecordsConsumer.accept(dataFlowComponent.getComponent(), recordsForComponent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.router;

import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

class RouteEventEvaluator {

private static final Logger LOG = LoggerFactory.getLogger(RouteEventEvaluator.class);

private final ExpressionEvaluator<Boolean> evaluator;
private final Collection<ConditionalRoute> routes;

RouteEventEvaluator(final ExpressionEvaluator<Boolean> evaluator, final Collection<ConditionalRoute> routes) {
this.evaluator = evaluator;
this.routes = routes;
}

Map<Record, Set<String>> evaluateEventRoutes(final Collection<Record> records) {
final Map<Record, Set<String>> recordsToRoutes = new HashMap<>();

int nonEventRecords = 0;

for (Record record : records) {

final Object data = record.getData();

if(data instanceof Event) {

final Event event = (Event) data;

recordsToRoutes.put(record, new HashSet<>());

for (ConditionalRoute route : routes) {
Boolean routed;
try {
routed = evaluator.evaluate(route.getCondition(), event);
} catch (final Exception ex) {
routed = false;
LOG.error("Failed to evaluate route. This route will not be applied to any events.", ex);
}
if (routed) {
recordsToRoutes
.get(record)
.add(route.getName());
}
}
} else {
nonEventRecords++;
recordsToRoutes.put(record, Collections.emptySet());
}
}

if(nonEventRecords > 0) {
LOG.warn("Received {} records which are not events. These will have no routes applied.", nonEventRecords);
}

return recordsToRoutes;
}
}
Loading

0 comments on commit 0b0981f

Please sign in to comment.