Skip to content

Commit

Permalink
Updated the PipelineModel to return a SinkModel for sinks. This inclu…
Browse files Browse the repository at this point in the history
…des fields for conditional routing.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Sep 24, 2022
1 parent 715c7df commit d497df2
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class PipelineModel {
private List<ConditionalRoute> routes;

@JsonProperty("sink")
private final List<PluginModel> sinks;
private final List<SinkModel> sinks;

@JsonProperty("workers")
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand All @@ -65,7 +65,7 @@ public PipelineModel(
@JsonProperty("buffer") final PluginModel buffer,
@JsonProperty("processor") final List<PluginModel> processors,
@JsonProperty("router") final List<ConditionalRoute> routes,
@JsonProperty("sink") final List<PluginModel> sinks,
@JsonProperty("sink") final List<SinkModel> sinks,
@JsonProperty("workers") final Integer workers,
@JsonProperty("delay") final Integer delay) {
checkArgument(Objects.nonNull(source), "Source must not be null");
Expand Down Expand Up @@ -96,7 +96,7 @@ public List<ConditionalRoute> getRoutes() {
return routes;
}

public List<PluginModel> getSinks() {
public List<SinkModel> getSinks() {
return sinks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -45,6 +46,25 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

public static class SinkModelBuilder {

private final PluginModel pluginModel;
private final List<String> routes;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, pluginModel.getPluginSettings());
}
}

public static SinkModelBuilder builder(final PluginModel pluginModel) {
return new SinkModelBuilder(pluginModel);
}

private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("routes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void testPipelineModelCreation() {
final PluginModel originalSource = pipelineModel.getSource();
final PluginModel originalBuffer = pipelineModel.getBuffer();
final List<PluginModel> originalPreppers = pipelineModel.getProcessors();
final List<PluginModel> originalSinks = pipelineModel.getSinks();
final List<SinkModel> originalSinks = pipelineModel.getSinks();

assertThat(originalSource, notNullValue());
assertThat(originalBuffer, notNullValue());
Expand Down Expand Up @@ -87,8 +87,8 @@ private static List<ConditionalRoute> validPipelineRouter() {
return Collections.singletonList(new ConditionalRoute("my-route", "/a==b"));
}

static List<PluginModel> validSinksPluginModel() {
return Collections.singletonList(new PluginModel("sink", validPluginSettings()));
static List<SinkModel> validSinksPluginModel() {
return Collections.singletonList(SinkModel.builder(new PluginModel("sink", validPluginSettings())).build());
}

@Test
Expand Down Expand Up @@ -172,7 +172,7 @@ void testPipelineModelCreation_with_null_router_creates_model_with_empty_router(
final PluginModel originalSource = pipelineModel.getSource();
final PluginModel originalBuffer = pipelineModel.getBuffer();
final List<PluginModel> originalPreppers = pipelineModel.getProcessors();
final List<PluginModel> originalSinks = pipelineModel.getSinks();
final List<SinkModel> originalSinks = pipelineModel.getSinks();

assertThat(originalSource, notNullValue());
assertThat(originalBuffer, notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertAll;

class PipelinesDataFlowModelTest {

Expand All @@ -43,7 +45,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<PluginModel> sinks = Collections.singletonList(new PluginModel("testSink", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -64,7 +66,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> preppers = Collections.singletonList(new PluginModel("testPrepper", (Map<String, Object>) null));
final List<PluginModel> sinks = Collections.singletonList(new PluginModel("testSink", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null));
final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand Down Expand Up @@ -144,6 +146,11 @@ void deserialize_PipelinesDataFlowModel_with_route() throws IOException {
assertThat(pipelineModel.getSinks().size(), equalTo(1));
assertThat(pipelineModel.getSinks().get(0), notNullValue());
assertThat(pipelineModel.getSinks().get(0).getPluginName(), equalTo("testSink"));
assertThat(pipelineModel.getSinks().get(0).getRoutes(), notNullValue());
assertAll(
() -> assertThat(pipelineModel.getSinks().get(0).getRoutes().size(), equalTo(1)),
() -> assertThat(pipelineModel.getSinks().get(0).getRoutes(), hasItem("my-route"))
);

assertThat(pipelineModel.getRoutes(), notNullValue());
assertThat(pipelineModel.getRoutes().size(), equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -19,13 +20,17 @@
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class SinkModelTest {
private ObjectMapper objectMapper;
Expand Down Expand Up @@ -127,6 +132,33 @@ void serialize_with_just_pluginModel() throws IOException {
assertThat("---\n" + actualJson, equalTo(expectedJson));
}

@Nested
class BuilderTest {
private PluginModel pluginModel;
private String pluginName;
private Map<String, Object> pluginSettings;

@BeforeEach
void setUp() {
pluginName = UUID.randomUUID().toString();
pluginSettings = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
pluginModel = mock(PluginModel.class);
when(pluginModel.getPluginName()).thenReturn(pluginName);
when(pluginModel.getPluginSettings()).thenReturn(pluginSettings);
}

@Test
void build_with_only_PluginModel_should_return_expected_SinkModel() {
final SinkModel actualSinkModel = SinkModel.builder(pluginModel).build();

assertThat(actualSinkModel, notNullValue());
assertThat(actualSinkModel.getPluginName(), equalTo(pluginName));
assertThat(actualSinkModel.getPluginSettings(), equalTo(pluginSettings));
assertThat(actualSinkModel.getRoutes(), notNullValue());
assertThat(actualSinkModel.getRoutes(), empty());
}
}

private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ test-pipeline:
processor:
- testPrepper: null
sink:
- testSink: null
- testSink:
routes:
- "my-route"
workers: 8
delay: 50
route:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.amazon.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,7 +68,7 @@ private static void visitAndValidate(
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipeline);
touchedPipelineSet.add(pipeline);
//if validation is successful, then there is definitely sink
final List<PluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
final List<RoutedPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
//Recursively check connected pipelines
for (PluginSetting pluginSetting : connectedPipelinesSettings) {
//Further process only if the sink is of pipeline type
Expand Down Expand Up @@ -144,7 +145,7 @@ private static void validateForOrphans(
throw new RuntimeException("Invalid configuration, cannot proceed with ambiguous configuration");
}
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(currentPipelineName);
final List<PluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
final List<RoutedPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
for (PluginSetting pluginSetting : pluginSettings) {
if (PIPELINE_TYPE.equals(pluginSetting.getName()) &&
pluginSetting.getAttributeFromSettings(PIPELINE_ATTRIBUTE_NAME) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator;
import org.opensearch.dataprepper.pipeline.Pipeline;
Expand Down Expand Up @@ -233,7 +234,8 @@ private Optional<Source> getSourceIfPipelineType(
return Optional.empty();
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
private Sink buildSinkOrConnector(final RoutedPluginSetting pluginSetting) {
// TODO: This will return an object which can perform routing.
LOG.info("Building [{}] as sink component", pluginSetting.getName());
final Optional<String> pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting);
if (pipelineNameOptional.isPresent()) { //update to ifPresentOrElse when using JDK9
Expand Down Expand Up @@ -272,7 +274,7 @@ private void removeConnectedPipelines(
sourcePipeline, pipelineConfigurationMap, pipelineMap));

//remove sink connected pipelines
final List<PluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
final List<RoutedPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
sinkPluginSettings.forEach(sinkPluginSetting -> {
getPipelineNameIfPipelineType(sinkPluginSetting).ifPresent(sinkPipeline -> processRemoveIfRequired(
sinkPipeline, pipelineConfigurationMap, pipelineMap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.amazon.dataprepper.model.configuration.PipelineModel;
import com.amazon.dataprepper.model.configuration.PluginModel;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;

import java.util.Collections;
Expand All @@ -25,7 +26,7 @@ public class PipelineConfiguration {
private final PluginSetting sourcePluginSetting;
private final PluginSetting bufferPluginSetting;
private final List<PluginSetting> processorPluginSettings;
private final List<PluginSetting> sinkPluginSettings;
private final List<RoutedPluginSetting> sinkPluginSettings;
private final Integer workers;
private final Integer readBatchDelay;

Expand All @@ -50,7 +51,7 @@ public List<PluginSetting> getProcessorPluginSettings() {
return processorPluginSettings;
}

public List<PluginSetting> getSinkPluginSettings() {
public List<RoutedPluginSetting> getSinkPluginSettings() {
return sinkPluginSettings;
}

Expand Down Expand Up @@ -92,12 +93,12 @@ private PluginSetting getBufferFromPluginModelOrDefault(
return getPluginSettingFromPluginModel(pluginModel);
}

private List<PluginSetting> getSinksFromPluginModel(
final List<PluginModel> sinkConfigurations) {
private List<RoutedPluginSetting> getSinksFromPluginModel(
final List<SinkModel> sinkConfigurations) {
if (sinkConfigurations == null || sinkConfigurations.isEmpty()) {
throw new IllegalArgumentException("Invalid configuration, at least one sink is required");
}
return sinkConfigurations.stream().map(PipelineConfiguration::getPluginSettingFromPluginModel)
return sinkConfigurations.stream().map(PipelineConfiguration::getRoutedPluginSettingFromSinkModel)
.collect(Collectors.toList());
}

Expand All @@ -116,6 +117,11 @@ private static PluginSetting getPluginSettingFromPluginModel(final PluginModel p
return new PluginSetting(pluginModel.getPluginName(), settingsMap == null ? new HashMap<>() : settingsMap);
}

private static RoutedPluginSetting getRoutedPluginSettingFromSinkModel(final SinkModel sinkModel) {
final Map<String, Object> settingsMap = sinkModel.getPluginSettings();
return new RoutedPluginSetting(sinkModel.getPluginName(), settingsMap == null ? new HashMap<>() : settingsMap, sinkModel.getRoutes());
}

private Integer getWorkersFromPipelineModel(final PipelineModel pipelineModel) {
final Integer configuredWorkers = pipelineModel.getWorkers();
validateConfiguration(configuredWorkers, WORKERS_COMPONENT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.parser.model;

import com.amazon.dataprepper.model.configuration.PluginSetting;

import java.util.Collection;
import java.util.Map;

public class RoutedPluginSetting extends PluginSetting {
private final Collection<String> routes;

public RoutedPluginSetting(final String name, final Map<String, Object> settings, final Collection<String> routes) {
super(name, settings);
this.routes = routes;
}

public Collection<String> getRoutes() {
return routes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.Sink;
import com.amazon.dataprepper.model.source.Source;
import com.google.common.base.Preconditions;
import org.opensearch.dataprepper.pipeline.common.PipelineThreadFactory;
import org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -162,7 +162,7 @@ public void execute() {
}
}
).collect(Collectors.toList());
processorExecutorService.submit(new ProcessWorker(buffer, processors, sinks, this));
processorExecutorService.submit(new ProcessWorker(buffer, processors, this));
}
} catch (Exception ex) {
//source failed to start - Cannot proceed further with the current pipeline, skipping further execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
import org.slf4j.Logger;
Expand All @@ -26,18 +25,15 @@ public class ProcessWorker implements Runnable {

private final Buffer readBuffer;
private final List<Processor> processors;
private final Collection<Sink> sinks;
private final Pipeline pipeline;
private boolean isEmptyRecordsLogged = false;

public ProcessWorker(
final Buffer readBuffer,
final List<Processor> processors,
final Collection<Sink> sinks,
final Pipeline pipeline) {
this.readBuffer = readBuffer;
this.processors = processors;
this.sinks = sinks;
this.pipeline = pipeline;
}

Expand Down
Loading

0 comments on commit d497df2

Please sign in to comment.