diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java index bd3974646e..0511cdc75d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java @@ -7,6 +7,7 @@ import com.amazon.dataprepper.model.annotations.SingleThread; import com.amazon.dataprepper.model.buffer.Buffer; +import com.amazon.dataprepper.model.configuration.PipelinesDataFlowModel; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.model.peerforwarder.RequiresPeerForwarding; import com.amazon.dataprepper.model.plugin.NoPluginFoundException; @@ -15,14 +16,13 @@ import com.amazon.dataprepper.model.processor.Processor; import com.amazon.dataprepper.model.sink.Sink; import com.amazon.dataprepper.model.source.Source; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.opensearch.dataprepper.parser.model.PipelineConfiguration; import org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator; import org.opensearch.dataprepper.pipeline.Pipeline; import org.opensearch.dataprepper.pipeline.PipelineConnector; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +59,15 @@ public PipelineParser(final String pipelineConfigurationFileLocation, final Plug */ public Map parseConfiguration() { try { - final Map pipelineConfigurationMap = OBJECT_MAPPER.readValue( - new File(pipelineConfigurationFileLocation), - new TypeReference>() { - }); + final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(new File(pipelineConfigurationFileLocation), + PipelinesDataFlowModel.class); + + final Map pipelineConfigurationMap = pipelinesDataFlowModel.getPipelines().entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> new PipelineConfiguration(entry.getValue()) + )); final List allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap); // LinkedHashMap to preserve insertion order diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java index 72e486c472..b5f52c771e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java @@ -5,12 +5,10 @@ package org.opensearch.dataprepper.parser.model; +import com.amazon.dataprepper.model.configuration.PipelineModel; +import com.amazon.dataprepper.model.configuration.PluginModel; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -18,39 +16,7 @@ import java.util.Map; import java.util.stream.Collectors; -import static java.lang.String.format; - public class PipelineConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(PipelineConfiguration.class); - - /** - * @throws IllegalArgumentException If a non-null value is provided to both parameters. - * Guarantees only a prepper, processor, or neither is provided, not both. - * @param preppers Deserialized preppers plugin configuration, cannot be used in combination with the processors parameter, nullable - * @param processors Deserialized processors plugin configuration, cannot be used in combination with the preppers parameter, nullable - * @return the non-null parameter passed or null if both parameters are null. - */ - private static List>> validateProcessor( - final List>> preppers, - final List>> processors) { - if (preppers != null) { - LOG.warn("Prepper configurations are deprecated, processor configurations will be required in Data Prepper 2.0"); - } - - if (preppers != null && processors != null) { - final String message = "Pipeline configuration cannot specify a prepper and processor configuration. " + - "It is recommended to move prepper configurations to the processor section to maintain " + - "compatibility with DataPrepper version 1.2 and above."; - throw new IllegalArgumentException(message); - } - else if (preppers != null) { - return preppers; - } - else { - return processors; - } - } - private static final String WORKERS_COMPONENT = "workers"; private static final String DELAY_COMPONENT = "delay"; private static final int DEFAULT_READ_BATCH_DELAY = 3_000; @@ -63,43 +29,13 @@ else if (preppers != null) { private final Integer workers; private final Integer readBatchDelay; - public PipelineConfiguration( - final Map.Entry> source, - final Map.Entry> buffer, - final List>> processors, - final List>> sinks, - final Integer workers, - final Integer delay) { - this.sourcePluginSetting = getSourceFromConfiguration(source); - this.bufferPluginSetting = getBufferFromConfigurationOrDefault(buffer); - this.processorPluginSettings = getProcessorsFromConfiguration(processors); - this.sinkPluginSettings = getSinksFromConfiguration(sinks); - this.workers = getWorkersFromConfiguration(workers); - this.readBatchDelay = getReadBatchDelayFromConfiguration(delay); - } - - /** - * @since 1.2 - * Constructor for deserialized Json data. - * @param source Deserialized source plugin configuration - * @param buffer Deserialized buffer plugin configuration, nullable - * @param preppers Deserialized preppers plugin configuration, cannot be used in combination with the processors parameter, nullable - * @param processors Deserialized processors plugin configuration, cannot be used in combination with the preppers parameter, nullable - * @param sinks Deserialized sinks plugin configuration - * @param workers Deserialized workers plugin configuration, nullable - * @param delay Deserialized delay plugin configuration, nullable - */ - @JsonCreator - @Deprecated - public PipelineConfiguration( - @JsonProperty("source") final Map.Entry> source, - @JsonProperty("buffer") final Map.Entry> buffer, - @Deprecated @JsonProperty("prepper") final List>> preppers, - @JsonProperty("processor") final List>> processors, - @JsonProperty("sink") final List>> sinks, - @JsonProperty("workers") final Integer workers, - @JsonProperty("delay") final Integer delay) { - this(source, buffer, validateProcessor(preppers, processors), sinks, workers, delay); + public PipelineConfiguration(final PipelineModel pipelineModel) { + this.sourcePluginSetting = getSourceFromPluginModel(pipelineModel.getSource()); + this.bufferPluginSetting = getBufferFromPluginModelOrDefault(pipelineModel.getBuffer()); + this.processorPluginSettings = getProcessorsFromPluginModel(pipelineModel.getProcessors()); + this.sinkPluginSettings = getSinksFromPluginModel(pipelineModel.getSinks()); + this.workers = getWorkersFromPipelineModel(pipelineModel); + this.readBatchDelay = getReadBatchDelayFromPipelineModel(pipelineModel); } public PluginSetting getSourcePluginSetting() { @@ -141,62 +77,61 @@ private void updatePluginSetting( pluginSetting.setProcessWorkers(this.workers); } - private PluginSetting getSourceFromConfiguration(final Map.Entry> sourceConfiguration) { - if (sourceConfiguration == null) { + private PluginSetting getSourceFromPluginModel(final PluginModel pluginModel) { + if (pluginModel == null) { throw new IllegalArgumentException("Invalid configuration, source is a required component"); } - return getPluginSettingFromConfiguration(sourceConfiguration); + return getPluginSettingFromPluginModel(pluginModel); } - private PluginSetting getBufferFromConfigurationOrDefault( - final Map.Entry> bufferConfiguration) { - if (bufferConfiguration == null) { + private PluginSetting getBufferFromPluginModelOrDefault( + final PluginModel pluginModel) { + if (pluginModel == null) { return BlockingBuffer.getDefaultPluginSettings(); } - return getPluginSettingFromConfiguration(bufferConfiguration); + return getPluginSettingFromPluginModel(pluginModel); } - private List getSinksFromConfiguration( - final List>> sinkConfigurations) { + private List getSinksFromPluginModel( + final List sinkConfigurations) { if (sinkConfigurations == null || sinkConfigurations.isEmpty()) { throw new IllegalArgumentException("Invalid configuration, at least one sink is required"); } - return sinkConfigurations.stream().map(PipelineConfiguration::getPluginSettingFromConfiguration) + return sinkConfigurations.stream().map(PipelineConfiguration::getPluginSettingFromPluginModel) .collect(Collectors.toList()); } - private List getProcessorsFromConfiguration( - final List>> processorConfigurations) { + private List getProcessorsFromPluginModel( + final List processorConfigurations) { if (processorConfigurations == null || processorConfigurations.isEmpty()) { return Collections.emptyList(); } - return processorConfigurations.stream().map(PipelineConfiguration::getPluginSettingFromConfiguration) + return processorConfigurations.stream().map(PipelineConfiguration::getPluginSettingFromPluginModel) .collect(Collectors.toList()); } - private static PluginSetting getPluginSettingFromConfiguration( - final Map.Entry> configuration) { - final Map settingsMap = configuration.getValue(); - //PluginSettings is required to update pipeline name - return new PluginSetting(configuration.getKey(), settingsMap == null ? new HashMap<>() : settingsMap); + private static PluginSetting getPluginSettingFromPluginModel(final PluginModel pluginModel) { + final Map settingsMap = pluginModel.getPluginSettings(); + return new PluginSetting(pluginModel.getPluginName(), settingsMap == null ? new HashMap<>() : settingsMap); } - private Integer getWorkersFromConfiguration(final Integer workersConfiguration) { - final Integer configuredWorkers = getValueFromConfiguration(workersConfiguration, WORKERS_COMPONENT); + private Integer getWorkersFromPipelineModel(final PipelineModel pipelineModel) { + final Integer configuredWorkers = pipelineModel.getWorkers(); + validateConfiguration(configuredWorkers, WORKERS_COMPONENT); return configuredWorkers == null ? DEFAULT_WORKERS : configuredWorkers; } - private Integer getReadBatchDelayFromConfiguration(final Integer delayConfiguration) { - final Integer configuredDelay = getValueFromConfiguration(delayConfiguration, DELAY_COMPONENT); + private Integer getReadBatchDelayFromPipelineModel(final PipelineModel pipelineModel) { + final Integer configuredDelay = pipelineModel.getReadBatchDelay(); + validateConfiguration(configuredDelay, DELAY_COMPONENT); return configuredDelay == null ? DEFAULT_READ_BATCH_DELAY : configuredDelay; } - private Integer getValueFromConfiguration(final Integer configuration, final String component) { + private void validateConfiguration(final Integer configuration, final String component) { if (configuration != null && configuration <= 0) { - throw new IllegalArgumentException(format("Invalid configuration, %s cannot be %s", + throw new IllegalArgumentException(String.format("Invalid configuration, %s cannot be %s", component, configuration)); } - return configuration; } } \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index 2b4df175e5..01b124040b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper; +import com.amazon.dataprepper.model.configuration.PluginModel; import com.amazon.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.parser.model.PipelineConfiguration; import com.fasterxml.jackson.core.type.TypeReference; @@ -22,6 +23,9 @@ import java.util.Map; import java.util.Set; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestDataProvider { public static final String TEST_PIPELINE_NAME = "test-pipeline-1"; public static final String TEST_PLUGIN_NAME_1 = "test-plugin-1"; @@ -84,16 +88,23 @@ private static Map validSettingsForPlugin() { return settingsMap; } - public static Map.Entry> validSingleConfiguration() { - return Map.entry(TEST_PLUGIN_NAME_1, validSettingsForPlugin()); + public static PluginModel validSingleConfiguration() { + return validPluginModel(TEST_PLUGIN_NAME_1); + } + + private static PluginModel validPluginModel(final String pluginName) { + final PluginModel pluginModel = mock(PluginModel.class); + when(pluginModel.getPluginName()).thenReturn(pluginName); + when(pluginModel.getPluginSettings()).thenReturn(validSettingsForPlugin()); + return pluginModel; } - public static List>> validMultipleConfiguration() { - return Arrays.asList(Map.entry(TEST_PLUGIN_NAME_1, validSettingsForPlugin()), Map.entry(TEST_PLUGIN_NAME_2, validSettingsForPlugin())); + public static List validMultipleConfiguration() { + return Arrays.asList(validPluginModel(TEST_PLUGIN_NAME_1), validPluginModel(TEST_PLUGIN_NAME_2)); } - public static List>> validMultipleConfigurationOfSizeOne() { - return Collections.singletonList(Map.entry(TEST_PLUGIN_NAME_1, validSettingsForPlugin())); + public static List validMultipleConfigurationOfSizeOne() { + return Collections.singletonList(validSingleConfiguration()); } public static Map readConfigFile(final String configFilePath) throws IOException { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java index 5900c1eafc..e57a452dc1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java @@ -5,36 +5,53 @@ package org.opensearch.dataprepper.parser.model; +import com.amazon.dataprepper.model.configuration.PipelineModel; +import com.amazon.dataprepper.model.configuration.PluginModel; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; -import org.opensearch.dataprepper.TestDataProvider; import org.hamcrest.CoreMatchers; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.TestDataProvider; import java.util.ArrayList; -import java.util.InputMismatchException; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PipelineConfigurationTests { + + private PluginModel source; + private List processors; + private List sinks; -public class PipelineConfigurationTests { + @BeforeEach + void setUp() { + source = TestDataProvider.validSingleConfiguration(); + processors = TestDataProvider.validMultipleConfigurationOfSizeOne(); + sinks = TestDataProvider.validMultipleConfiguration(); + } @Test - public void testPipelineConfigurationCreation() { - final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(TestDataProvider.validSingleConfiguration(), - null, - TestDataProvider.validMultipleConfigurationOfSizeOne(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, TestDataProvider.TEST_DELAY); + void testPipelineConfigurationCreation() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(processors); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel); + final PluginSetting actualSourcePluginSetting = pipelineConfiguration.getSourcePluginSetting(); final PluginSetting actualBufferPluginSetting = pipelineConfiguration.getBufferPluginSetting(); final List actualProcesserPluginSettings = pipelineConfiguration.getProcessorPluginSettings(); @@ -67,78 +84,16 @@ public void testPipelineConfigurationCreation() { } @Test - public void testExceptionThrownWhenPrepperAndProcessorAreConfigured() { - Exception exception = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - null, - TestDataProvider.validMultipleConfigurationOfSizeOne(), - TestDataProvider.validMultipleConfigurationOfSizeOne(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, - TestDataProvider.TEST_DELAY)); - - final String expected = "Pipeline configuration cannot specify a prepper and processor configuration. It is " + - "recommended to move prepper configurations to the processor section to maintain compatibility with " + - "DataPrepper version 1.2 and above."; - - assertTrue(exception.getMessage().contains(expected)); - } - - private void assertEqualProcessorPluginSettings( - final List>> expectedPluginSettings, - final List actualPluginSettings) { - assertEquals(expectedPluginSettings.size(), actualPluginSettings.size()); - - expectedPluginSettings.forEach(expectedSetting -> { - final PluginSetting actualSetting = actualPluginSettings.stream() - .filter(plugin -> expectedSetting.getKey().equals(plugin.getName())) - .findFirst() - .orElseThrow(() -> new InputMismatchException("Expected setting named " + expectedSetting.getKey())); - - final Map expectedSettingValue = expectedSetting.getValue(); - final Set expectedKeySet = expectedSettingValue.keySet(); - final Map settings = actualSetting.getSettings(); - assertEquals(expectedKeySet.size(), settings.size()); - - expectedKeySet.forEach(key -> { - assertEquals(expectedSettingValue.get(key), settings.get(key)); - }); - }); - } - - @Test - public void testPipelineConfigurationWithPrepperOrProcessorAreEquivalent() { - Map.Entry> sourcePluginSettings = TestDataProvider.validSingleConfiguration(); - List>> expectedPluginSettings = TestDataProvider.validMultipleConfigurationOfSizeOne(); - - PipelineConfiguration prepperConfig = new PipelineConfiguration( - sourcePluginSettings, - null, - expectedPluginSettings, - null, - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, - TestDataProvider.TEST_DELAY); - PipelineConfiguration processorConfig = new PipelineConfiguration( - sourcePluginSettings, - null, - null, - expectedPluginSettings, - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, - TestDataProvider.TEST_DELAY); - - assertEqualProcessorPluginSettings(expectedPluginSettings, prepperConfig.getProcessorPluginSettings()); - assertEqualProcessorPluginSettings(expectedPluginSettings, processorConfig.getProcessorPluginSettings()); - } - - @Test - public void testOnlySourceAndSink() { - final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(TestDataProvider.validSingleConfiguration(), - null, - null, - TestDataProvider.validMultipleConfigurationOfSizeOne(), - null, null); + void testOnlySourceAndSink() { + sinks = TestDataProvider.validMultipleConfigurationOfSizeOne(); + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getProcessors()).thenReturn(null); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getWorkers()).thenReturn(null); + when(pipelineModel.getReadBatchDelay()).thenReturn(null); + final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel); final PluginSetting actualSourcePluginSetting = pipelineConfiguration.getSourcePluginSetting(); final PluginSetting actualBufferPluginSetting = pipelineConfiguration.getBufferPluginSetting(); final List actualProcessorPluginSettings = pipelineConfiguration.getProcessorPluginSettings(); @@ -155,108 +110,102 @@ public void testOnlySourceAndSink() { assertThat(pipelineConfiguration.getReadBatchDelay(), CoreMatchers.is(TestDataProvider.DEFAULT_READ_BATCH_DELAY)); } - @Test //not using expected to assert the message - public void testNoSourceConfiguration() { - try { - new PipelineConfiguration( - null, - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, TestDataProvider.TEST_DELAY); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), is("Invalid configuration, source is a required component")); - } + @Test + void testNoSourceConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getProcessors()).thenReturn(processors); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + + final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); + + assertThat(actual.getMessage(), equalTo("Invalid configuration, source is a required component")); } @Test - public void testNoProcessorAndNoPrepperConfiguration() { - final PipelineConfiguration nullProcessorConfiguration = new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validSingleConfiguration(), - null, - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, TestDataProvider.TEST_DELAY); - assertThat(nullProcessorConfiguration.getProcessorPluginSettings(), isA(Iterable.class)); - assertThat(nullProcessorConfiguration.getProcessorPluginSettings().size(), is(0)); - - final PipelineConfiguration emptyProcessorsConfiguration = new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validSingleConfiguration(), - new ArrayList<>(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, TestDataProvider.TEST_DELAY); - assertThat(emptyProcessorsConfiguration.getProcessorPluginSettings(), isA(Iterable.class)); - assertThat(emptyProcessorsConfiguration.getProcessorPluginSettings().size(), is(0)); + void testNullProcessorAndNoPrepperConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(null); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel); + assertThat(pipelineConfiguration.getProcessorPluginSettings(), isA(Iterable.class)); + assertThat(pipelineConfiguration.getProcessorPluginSettings().size(), is(0)); } - @Test //not using expected to assert the message - public void testNoSinkConfiguration() { - try { - new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - null, - TestDataProvider.TEST_WORKERS, TestDataProvider.TEST_DELAY); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), is("Invalid configuration, at least one sink is required")); - } + @Test + void testEmptyProcessorAndNoPrepperConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(new ArrayList<>()); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel); + assertThat(pipelineConfiguration.getProcessorPluginSettings(), isA(Iterable.class)); + assertThat(pipelineConfiguration.getProcessorPluginSettings().size(), is(0)); + } - try { - new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - new ArrayList<>(), - TestDataProvider.TEST_WORKERS, TestDataProvider.TEST_DELAY); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), is("Invalid configuration, at least one sink is required")); - } + @Test + void testNullSinkConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(processors); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(Collections.emptyList()); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + + final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); + + assertThat(actual.getMessage(), equalTo("Invalid configuration, at least one sink is required")); } - @Test //not using expected to assert the message - public void testInvalidWorkersConfiguration() { - try { - new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - 0, TestDataProvider.TEST_DELAY); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), is("Invalid configuration, workers cannot be 0")); - } + @Test + void testEmptySinkConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(processors); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(new ArrayList<>()); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + + final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); + + assertThat(actual.getMessage(), equalTo("Invalid configuration, at least one sink is required")); } - @Test //not using expected to assert the message - public void testInvalidDelayConfiguration() { - try { - new PipelineConfiguration( - TestDataProvider.validSingleConfiguration(), - TestDataProvider.validSingleConfiguration(), - null, - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.validMultipleConfiguration(), - TestDataProvider.TEST_WORKERS, 0); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), is("Invalid configuration, delay cannot be 0")); - } + @Test + void testInvalidWorkersConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(processors); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getWorkers()).thenReturn(0); + when(pipelineModel.getReadBatchDelay()).thenReturn(TestDataProvider.TEST_DELAY); + final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); + assertThat(actual.getMessage(), equalTo("Invalid configuration, workers cannot be 0")); } @Test - public void testPipelineConfigurationWithoutPluginSettingAttributes() throws Exception { - final Map pipelineConfigurationMap = TestDataProvider.readConfigFile( - TestDataProvider.VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE); - assertThat(pipelineConfigurationMap.size(), is(equalTo(1))); - final PipelineConfiguration actualPipelineConfiguration = pipelineConfigurationMap.get(TestDataProvider.TEST_PIPELINE_NAME); - assertThat(actualPipelineConfiguration, notNullValue()); - assertThat(actualPipelineConfiguration.getSourcePluginSetting(), notNullValue()); - assertThat(actualPipelineConfiguration.getBufferPluginSetting(), notNullValue()); - assertThat(actualPipelineConfiguration.getProcessorPluginSettings(), notNullValue()); - assertThat(actualPipelineConfiguration.getProcessorPluginSettings().size(), is(equalTo(0))); - assertThat(actualPipelineConfiguration.getSinkPluginSettings(), notNullValue()); - assertThat(actualPipelineConfiguration.getSinkPluginSettings().size(), is(equalTo(1))); + void testInvalidDelayConfiguration() { + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getProcessors()).thenReturn(processors); + when(pipelineModel.getPreppers()).thenReturn(null); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getWorkers()).thenReturn(TestDataProvider.TEST_WORKERS); + when(pipelineModel.getReadBatchDelay()).thenReturn(0); + final IllegalArgumentException actual = assertThrows(IllegalArgumentException.class, () -> new PipelineConfiguration(pipelineModel)); + assertThat(actual.getMessage(), equalTo("Invalid configuration, delay cannot be 0")); } private void comparePluginSettings(final PluginSetting actual, final PluginSetting expected) {