Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of conditional routing of sinks #1832

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.parser;

import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

/**
* Represents a part of the pipeline through which data flows. This includes
* {@link org.opensearch.dataprepper.model.sink.Sink} and {@link org.opensearch.dataprepper.model.processor.Processor}.
*
* @param <T> The type of component.
*/
public class DataFlowComponent<T> {
private final T component;
private final Set<String> routes;

DataFlowComponent(final T component, final Collection<String> routes) {
this.component = Objects.requireNonNull(component);
this.routes = new HashSet<>(Objects.requireNonNull(routes));
}

public T getComponent() {
return component;
}

public Set<String> getRoutes() {
return routes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.PipelineConnector;
import org.opensearch.dataprepper.plugins.MultiBufferDecorator;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,6 +59,7 @@ public class PipelineParser {
private static final String PIPELINE_TYPE = "pipeline";
private static final String ATTRIBUTE_NAME = "name";
private final String pipelineConfigurationFileLocation;
private final RouterFactory routerFactory;
private final DataPrepperConfiguration dataPrepperConfiguration;
private final Map<String, PipelineConnector> sourceConnectorMap = new HashMap<>(); //TODO Remove this and rely only on pipelineMap
private final PluginFactory pluginFactory;
Expand All @@ -65,10 +68,12 @@ public class PipelineParser {
public PipelineParser(final String pipelineConfigurationFileLocation,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
final DataPrepperConfiguration dataPrepperConfiguration) {
this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
this.routerFactory = routerFactory;
this.dataPrepperConfiguration = Objects.requireNonNull(dataPrepperConfiguration);
}

Expand Down Expand Up @@ -176,15 +181,17 @@ private void buildPipelineFromConfiguration(
final int readBatchDelay = pipelineConfiguration.getReadBatchDelay();

LOG.info("Building sinks for the pipeline [{}]", pipelineName);
final List<Sink> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildSinkOrConnector)
final List<DataFlowComponent<Sink>> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildRoutedSinkOrConnector)
.collect(Collectors.toList());

final List<Buffer> secondaryBuffers = getSecondaryBuffers();
LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName);
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(buffer, secondaryBuffers);

final Pipeline pipeline = new Pipeline(pipelineName, source, multiBufferDecorator, decoratedProcessorSets, sinks, processorThreads, readBatchDelay,
final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());

final Pipeline pipeline = new Pipeline(pipelineName, source, multiBufferDecorator, decoratedProcessorSets, sinks, router, processorThreads, readBatchDelay,
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
getPeerForwarderDrainTimeout(dataPrepperConfiguration));
pipelineMap.put(pipelineName, pipeline);
Expand Down Expand Up @@ -239,8 +246,13 @@ private Optional<Source> getSourceIfPipelineType(
return Optional.empty();
}

private Sink buildSinkOrConnector(final RoutedPluginSetting pluginSetting) {
// TODO: This will return an object which can perform routing.
private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final RoutedPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting);

return new DataFlowComponent<>(sink, pluginSetting.getRoutes());
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
LOG.info("Building [{}] as sink component", pluginSetting.getName());
final Optional<String> pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting);
if (pipelineNameOptional.isPresent()) { //update to ifPresentOrElse when using JDK9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.PipelineParser;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -20,11 +21,13 @@ public PipelineParser pipelineParser(
final DataPrepperArgs dataPrepperArgs,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
final DataPrepperConfiguration dataPrepperConfiguration
) {
return new PipelineParser(dataPrepperArgs.getPipelineConfigFileLocation(),
pluginFactory,
peerForwarderProvider,
routerFactory,
dataPrepperConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.parser.model;

import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -13,9 +14,11 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PipelineConfiguration {
Expand All @@ -28,8 +31,10 @@ public class PipelineConfiguration {
private final PluginSetting bufferPluginSetting;
private final List<PluginSetting> processorPluginSettings;
private final List<RoutedPluginSetting> sinkPluginSettings;

private final Integer workers;
private final Integer readBatchDelay;
private final Set<ConditionalRoute> routes;

public PipelineConfiguration(final PipelineModel pipelineModel) {
this.sourcePluginSetting = getSourceFromPluginModel(pipelineModel.getSource());
Expand All @@ -38,6 +43,7 @@ public PipelineConfiguration(final PipelineModel pipelineModel) {
this.sinkPluginSettings = getSinksFromPluginModel(pipelineModel.getSinks());
this.workers = getWorkersFromPipelineModel(pipelineModel);
this.readBatchDelay = getReadBatchDelayFromPipelineModel(pipelineModel);
routes = new HashSet<>(pipelineModel.getRoutes());
}

public PluginSetting getSourcePluginSetting() {
Expand All @@ -56,6 +62,10 @@ public List<RoutedPluginSetting> getSinkPluginSettings() {
return sinkPluginSettings;
}

public Set<ConditionalRoute> getRoutes() {
return routes;
}

public Integer getWorkers() {
return workers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import com.google.common.base.Preconditions;
import org.opensearch.dataprepper.parser.DataFlowComponent;
import org.opensearch.dataprepper.pipeline.common.PipelineThreadFactory;
import org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -42,7 +44,8 @@ public class Pipeline {
private final Source source;
private final Buffer buffer;
private final List<List<Processor>> processorSets;
private final List<Sink> sinks;
private final List<DataFlowComponent<Sink>> sinks;
private final Router router;
private final int processorThreads;
private final int readBatchTimeoutInMillis;
private final Duration processorShutdownTimeout;
Expand Down Expand Up @@ -75,7 +78,8 @@ public Pipeline(
@Nonnull final Source source,
@Nonnull final Buffer buffer,
@Nonnull final List<List<Processor>> processorSets,
@Nonnull final List<Sink> sinks,
@Nonnull final List<DataFlowComponent<Sink>> sinks,
@Nonnull final Router router,
final int processorThreads,
final int readBatchTimeoutInMillis,
final Duration processorShutdownTimeout,
Expand All @@ -88,6 +92,7 @@ public Pipeline(
this.buffer = buffer;
this.processorSets = processorSets;
this.sinks = sinks;
this.router = router;
this.processorThreads = processorThreads;
this.readBatchTimeoutInMillis = readBatchTimeoutInMillis;
this.processorShutdownTimeout = processorShutdownTimeout;
Expand Down Expand Up @@ -127,8 +132,11 @@ public Buffer getBuffer() {
/**
* @return {@link Sink} of this pipeline.
*/
// TODO: We can probably remove this
public Collection<Sink> getSinks() {
return this.sinks;
return this.sinks.stream()
.map(DataFlowComponent::getComponent)
.collect(Collectors.toList());
}

public boolean isStopRequested() {
Expand Down Expand Up @@ -202,7 +210,9 @@ public void shutdown() {
shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis());

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
sinks.forEach(Sink::shutdown);
sinks.stream()
.map(DataFlowComponent::getComponent)
.forEach(Sink::shutdown);

shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout.toMillis());
}
Expand Down Expand Up @@ -233,10 +243,9 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);
for (int i = 0; i < sinksSize; i++) {
final int finalI = i;
sinkFutures.add(sinkExecutorService.submit(() -> sinks.get(finalI).output(records), null));
}
router.route(records, sinks, (sink, events) ->
sinkFutures.add(sinkExecutorService.submit(() -> sink.output(events), null))
);
return sinkFutures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.router;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.parser.DataFlowComponent;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;

/**
* Package-protected utility to route for a single {@link DataFlowComponent}. This is
* intended to help break apart {@link Router} for better testing.
*/
class DataFlowComponentRouter {

<C> void route(final Collection<Record> allRecords,
final DataFlowComponent<C> dataFlowComponent,
final Map<Record, Set<String>> recordsToRoutes,
final BiConsumer<C, Collection<Record>> componentRecordsConsumer) {

final Collection<Record> recordsForComponent;
final Set<String> dataFlowComponentRoutes = dataFlowComponent.getRoutes();

if (dataFlowComponentRoutes.isEmpty()) {
recordsForComponent = allRecords;
} else {
recordsForComponent = new ArrayList<>();
for (Record event : allRecords) {
final Set<String> routesForEvent = recordsToRoutes
.getOrDefault(event, Collections.emptySet());

if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) {
recordsForComponent.add(event);
}
}
}
componentRecordsConsumer.accept(dataFlowComponent.getComponent(), recordsForComponent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.router;

import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

class RouteEventEvaluator {

private static final Logger LOG = LoggerFactory.getLogger(RouteEventEvaluator.class);

private final ExpressionEvaluator<Boolean> evaluator;
private final Collection<ConditionalRoute> routes;

RouteEventEvaluator(final ExpressionEvaluator<Boolean> evaluator, final Collection<ConditionalRoute> routes) {
this.evaluator = evaluator;
this.routes = routes;
}

Map<Record, Set<String>> evaluateEventRoutes(final Collection<Record> records) {
final Map<Record, Set<String>> recordsToRoutes = new HashMap<>();

int nonEventRecords = 0;

for (Record record : records) {

final Object data = record.getData();

if (data instanceof Event) {
final Event event = (Event) data;
final Set<String> matchedRoutes = findMatchedRoutes(event);
recordsToRoutes.put(record, matchedRoutes);
} else {
nonEventRecords++;
recordsToRoutes.put(record, Collections.emptySet());
}
}

if (nonEventRecords > 0) {
LOG.warn("Received {} records which are not events. These will have no routes applied.", nonEventRecords);
}

return recordsToRoutes;
}

private Set<String> findMatchedRoutes(final Event event) {
final Set<String> matchRoutes = new HashSet<>();
for (ConditionalRoute route : routes) {
try {
if (evaluator.evaluate(route.getCondition(), event)) {
matchRoutes.add(route.getName());
}
} catch (final Exception ex) {
LOG.error("Failed to evaluate route. This route will not be applied to any events.", ex);
}
}
return matchRoutes;
}
}
Loading