From c1e0559de16347dbf5e07e8ff439875f09897cc5 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 13 Mar 2024 00:28:53 -0500 Subject: [PATCH 1/2] Refactor PipelinesDataFlowModelParser to take in an InputStream instead of a file path Signed-off-by: Taylor Gray --- .../config/PipelineParserConfiguration.java | 12 +- .../parser/PipelineTransformerTests.java | 4 +- .../PipelineConfigurationFileReader.java | 66 ++++++++ .../parser/PipelineConfigurationReader.java | 14 ++ .../parser/PipelinesDataflowModelParser.java | 53 ++----- .../PipelineConfigurationFileReaderTest.java | 98 ++++++++++++ .../PipelinesDataflowModelParserTest.java | 149 +++++++++++++----- 7 files changed, 315 insertions(+), 81 deletions(-) create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index 555dd7a987..107302ce06 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -11,6 +11,8 @@ import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; +import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; +import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader; import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; @@ -46,9 +48,15 @@ public PipelineTransformer pipelineParser( } @Bean - public PipelinesDataflowModelParser pipelinesDataflowModelParser( + public PipelineConfigurationReader pipelineConfigurationReader( final FileStructurePathProvider fileStructurePathProvider) { - return new PipelinesDataflowModelParser(fileStructurePathProvider.getPipelineConfigFileLocation()); + return new PipelineConfigurationFileReader(fileStructurePathProvider.getPipelineConfigFileLocation()); + } + + @Bean + public PipelinesDataflowModelParser pipelinesDataflowModelParser( + final PipelineConfigurationReader pipelineConfigurationReader) { + return new PipelinesDataflowModelParser(pipelineConfigurationReader); } @Bean diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index a2f7979d48..3ac23171ad 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -31,6 +31,7 @@ import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.peerforwarder.PeerForwarderReceiveBuffer; import org.opensearch.dataprepper.pipeline.Pipeline; +import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.plugin.DefaultPluginFactory; @@ -115,8 +116,9 @@ void tearDown() { } private PipelineTransformer createObjectUnderTest(final String pipelineConfigurationFileLocation) { + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataflowModelParser( - pipelineConfigurationFileLocation).parseConfiguration(); + new PipelineConfigurationFileReader(pipelineConfigurationFileLocation)).parseConfiguration(); return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, sourceCoordinatorFactory); diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java new file mode 100644 index 0000000000..35519f31d1 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java @@ -0,0 +1,66 @@ +package org.opensearch.dataprepper.pipeline.parser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; + +public class PipelineConfigurationFileReader implements PipelineConfigurationReader { + private static final Logger LOG = LoggerFactory.getLogger(PipelineConfigurationFileReader.class); + private final String pipelineConfigurationFileLocation; + + public PipelineConfigurationFileReader(final String pipelineConfigurationFileLocation) { + this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation; + } + + @Override + public List getPipelineConfigurationInputStreams() { + return getInputStreamsForConfigurationFiles(); + } + + private List getInputStreamsForConfigurationFiles() { + final File configurationLocation = new File(pipelineConfigurationFileLocation); + + if (configurationLocation.isFile()) { + return Stream.of(configurationLocation).map(this::getInputStreamForFile) + .filter(Objects::nonNull).collect(Collectors.toList()); + } else if (configurationLocation.isDirectory()) { + FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); + List inputStreams = Stream.of(configurationLocation.listFiles(yamlFilter)) + .map(this::getInputStreamForFile) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + if (inputStreams.isEmpty()) { + LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); + throw new ParseException( + format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); + } + + return inputStreams; + } else { + LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); + throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); + } + } + + private InputStream getInputStreamForFile(final File pipelineConfigurationFile) { + + try { + return new FileInputStream(pipelineConfigurationFile); + } catch (IOException e) { + LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName()); + return null; + } + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java new file mode 100644 index 0000000000..80a4c43c7b --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.pipeline.parser; + +import java.io.InputStream; +import java.util.List; + +public interface PipelineConfigurationReader { + + /** + * + * @return a List of InputStream that contains each of the pipeline configurations. + * the caller of this method is responsible for closing these input streams after they are used + */ + List getPipelineConfigurationInputStreams(); +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java index 9ab813812f..43021a17c5 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java @@ -15,17 +15,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.lang.String.format; @@ -34,14 +29,14 @@ public class PipelinesDataflowModelParser { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()) .enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); - private final String pipelineConfigurationFileLocation; + private final PipelineConfigurationReader pipelineConfigurationReader; - public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocation) { - this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation; + public PipelinesDataflowModelParser(final PipelineConfigurationReader pipelineConfigurationReader) { + this.pipelineConfigurationReader = pipelineConfigurationReader; } public PipelinesDataFlowModel parseConfiguration() { - final List pipelinesDataFlowModels = parsePipelineConfigurationFiles(); + final List pipelinesDataFlowModels = parseStreamsToPipelinesDataFlowModel(); return mergePipelinesDataModels(pipelinesDataFlowModels); } @@ -53,35 +48,14 @@ private void validateDataPrepperVersion(final DataPrepperVersion version) { } } - private List parsePipelineConfigurationFiles() { - final File configurationLocation = new File(pipelineConfigurationFileLocation); - - if (configurationLocation.isFile()) { - return Stream.of(configurationLocation).map(this::parsePipelineConfigurationFile) - .filter(Objects::nonNull).collect(Collectors.toList()); - } else if (configurationLocation.isDirectory()) { - FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); - List pipelinesDataFlowModels = Stream.of(configurationLocation.listFiles(yamlFilter)) - .map(this::parsePipelineConfigurationFile) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - if (pipelinesDataFlowModels.isEmpty()) { - LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); - throw new ParseException( - format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); - } - - return pipelinesDataFlowModels; - } else { - LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); - throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); - } + private List parseStreamsToPipelinesDataFlowModel() { + return pipelineConfigurationReader.getPipelineConfigurationInputStreams().stream() + .map(this::parseStreamToPipelineDataFlowModel) + .collect(Collectors.toList()); } - private PipelinesDataFlowModel parsePipelineConfigurationFile(final File pipelineConfigurationFile) { - try (final InputStream pipelineConfigurationInputStream = new FileInputStream(pipelineConfigurationFile)) { - LOG.info("Reading pipeline configuration from {}", pipelineConfigurationFile.getName()); + private PipelinesDataFlowModel parseStreamToPipelineDataFlowModel(final InputStream configurationInputStream) { + try (final InputStream pipelineConfigurationInputStream = configurationInputStream) { final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(pipelineConfigurationInputStream, PipelinesDataFlowModel.class); @@ -90,12 +64,7 @@ private PipelinesDataFlowModel parsePipelineConfigurationFile(final File pipelin return pipelinesDataFlowModel; } catch (IOException e) { - if (e instanceof FileNotFoundException) { - LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName()); - return null; - } - LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); - throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); + throw new ParseException("Failed to parse the configuration", e); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java new file mode 100644 index 0000000000..2811551e86 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java @@ -0,0 +1,98 @@ +package org.opensearch.dataprepper.pipeline.parser; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@ExtendWith(MockitoExtension.class) +public class PipelineConfigurationFileReaderTest { + + @TempDir + Path tempDir; + + @Test + void getPipelineConfigurationInputStreams_from_directory_with_no_yaml_files_should_throw() { + final PipelineConfigurationReader objectUnderTest = + new PipelineConfigurationFileReader(TestConfigurationProvider.EMPTY_PIPELINE_DIRECTOTRY); + + + final RuntimeException actualException = assertThrows(RuntimeException.class, + objectUnderTest::getPipelineConfigurationInputStreams); + assertThat(actualException.getMessage(), equalTo( + String.format("Pipelines configuration file not found at %s", TestConfigurationProvider.EMPTY_PIPELINE_DIRECTOTRY))); + } + + @Test + void getPipelineConfigurationInputStreams_with_a_configuration_file_which_does_not_exist_should_throw() { + final PipelineConfigurationReader objectUnderTest = + new PipelineConfigurationFileReader("file_does_not_exist.yml"); + + final RuntimeException actualException = assertThrows(RuntimeException.class, + objectUnderTest::getPipelineConfigurationInputStreams); + assertThat(actualException.getMessage(), equalTo("Pipelines configuration file not found at file_does_not_exist.yml")); + } + + @Test + void getPipelineConfigurationInput_streams_from_existing_file() throws IOException { + + final String yamlContent = UUID.randomUUID().toString(); + final Path file = tempDir.resolve("test-pipeline.yaml"); + Files.writeString(file, yamlContent); + + final PipelineConfigurationReader objectUnderTest = + new PipelineConfigurationFileReader(file.toString()); + + final List inputStreams = objectUnderTest.getPipelineConfigurationInputStreams(); + + assertThat(inputStreams.size(), equalTo(1)); + + try (final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStreams.get(0), StandardCharsets.UTF_8))) { + final String content = bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); + assertThat(content, equalTo(yamlContent)); + } + } + + @Test + void getPipelineConfigurationInput_streams_from_existing_directory() throws IOException { + + + final String yamlContentPipelineOne = UUID.randomUUID().toString(); + final String yamlContentPipelineTwo = UUID.randomUUID().toString(); + + Files.writeString(tempDir.resolve("test-pipeline-1.yaml"), yamlContentPipelineOne); + Files.writeString(tempDir.resolve("tset-pipeline-2.yml"), yamlContentPipelineTwo); + + final PipelineConfigurationReader objectUnderTest = + new PipelineConfigurationFileReader(tempDir.toString()); + + final List inputStreams = objectUnderTest.getPipelineConfigurationInputStreams(); + + assertThat(inputStreams.size(), equalTo(2)); + + try (final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStreams.get(0), StandardCharsets.UTF_8))) { + final String content = bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); + assertThat(content, equalTo(yamlContentPipelineOne)); + } + + try (final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStreams.get(1), StandardCharsets.UTF_8))) { + final String content = bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); + assertThat(content, equalTo(yamlContentPipelineTwo)); + } + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java index 83370b0093..4dd5ae2b98 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java @@ -6,33 +6,92 @@ package org.opensearch.dataprepper.pipeline.parser; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class PipelinesDataflowModelParserTest { + + @Mock + private PipelineConfigurationReader pipelineConfigurationReader; + @Test - void parseConfiguration_with_multiple_valid_pipelines() { + void parseConfiguration_with_multiple_valid_pipelines() throws FileNotFoundException { + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()) + .thenReturn(List.of(new FileInputStream(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE))); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); + new PipelinesDataflowModelParser(pipelineConfigurationReader); final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); } @Test - void parseConfiguration_with_valid_pipelines_and_extensions() { + void parseConfiguration_from_string_input_stream_creates_the_correct_model() { + + final String configurationYamlString = "# this configuration file is solely for testing formatting\n" + + "test-pipeline-1:\n" + + " source:\n" + + " file:\n" + + " path: \"/tmp/file-source.tmp\"\n" + + " buffer:\n" + + " bounded_blocking: #to check non object nodes for plugins\n" + + " sink:\n" + + " - pipeline:\n" + + " name: \"test-pipeline-2\"\n" + + "test-pipeline-2:\n" + + " source:\n" + + " pipeline:\n" + + " name: \"test-pipeline-1\"\n" + + " sink:\n" + + " - pipeline:\n" + + " name: \"test-pipeline-3\"\n" + + "test-pipeline-3:\n" + + " source:\n" + + " pipeline:\n" + + " name: \"test-pipeline-2\"\n" + + " sink:\n" + + " - file:\n" + + " path: \"/tmp/todelete.txt\""; + + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()) + .thenReturn(List.of(new ByteArrayInputStream(configurationYamlString.getBytes()))); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser(TestConfigurationProvider.VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS); + new PipelinesDataflowModelParser(pipelineConfigurationReader); + final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), + equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + } + + @Test + void parseConfiguration_with_valid_pipelines_and_extensions() throws FileNotFoundException { + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()) + .thenReturn(List.of(new FileInputStream(TestConfigurationProvider.VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS))); + + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); @@ -43,9 +102,12 @@ void parseConfiguration_with_valid_pipelines_and_extensions() { } @Test - void parseConfiguration_with_incompatible_version_should_throw() { + void parseConfiguration_with_incompatible_version_should_throw() throws FileNotFoundException { + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()) + .thenReturn(List.of(new FileInputStream(TestConfigurationProvider.INCOMPATIBLE_VERSION_CONFIG_FILE))); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser(TestConfigurationProvider.INCOMPATIBLE_VERSION_CONFIG_FILE); + new PipelinesDataflowModelParser(pipelineConfigurationReader); final RuntimeException actualException = assertThrows( RuntimeException.class, pipelinesDataflowModelParser::parseConfiguration); @@ -54,33 +116,45 @@ void parseConfiguration_with_incompatible_version_should_throw() { } @Test - void parseConfiguration_with_a_configuration_file_which_does_not_exist_should_throw() { - final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser("file_does_no_exist.yml"); - final RuntimeException actualException = assertThrows(RuntimeException.class, - pipelinesDataflowModelParser::parseConfiguration); - assertThat(actualException.getMessage(), equalTo("Pipelines configuration file not found at file_does_no_exist.yml")); - } + void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_model() throws Exception { + final File directoryLocation = new File(TestConfigurationProvider.MULTI_FILE_PIPELINE_DIRECTOTRY); + final List fileInputStreams = Stream.of(directoryLocation.listFiles()) + .map(file -> { + try { + return new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()).thenReturn(fileInputStreams); - @Test - void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_model() { final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser(TestConfigurationProvider.MULTI_FILE_PIPELINE_DIRECTOTRY); + new PipelinesDataflowModelParser(pipelineConfigurationReader); final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue()); } - @ParameterizedTest - @ValueSource(strings = { - TestConfigurationProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY, - TestConfigurationProvider.MULTI_FILE_PIPELINE_WITH_SINGLE_PIPELINE_CONFIGURATIONS_DIRECTOTRY - }) + @Test void parseConfiguration_from_directory_with_multiple_files_and_pipeline_extensions_should_throw() { + final File directoryLocation = new File(TestConfigurationProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY); + final List fileInputStreams = Stream.of(directoryLocation.listFiles()) + .map(file -> { + try { + return new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()).thenReturn(fileInputStreams); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser( - TestConfigurationProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY); + new PipelinesDataflowModelParser(pipelineConfigurationReader); final ParseException actualException = assertThrows( ParseException.class, pipelinesDataflowModelParser::parseConfiguration); assertThat(actualException.getMessage(), equalTo( @@ -90,20 +164,23 @@ void parseConfiguration_from_directory_with_multiple_files_and_pipeline_extensio @Test void parseConfiguration_from_directory_with_single_file_creates_the_correct_model() { + final File directoryLocation = new File(TestConfigurationProvider.SINGLE_FILE_PIPELINE_DIRECTOTRY); + final List fileInputStreams = Stream.of(directoryLocation.listFiles()) + .map(file -> { + try { + return new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()).thenReturn(fileInputStreams); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser(TestConfigurationProvider.SINGLE_FILE_PIPELINE_DIRECTOTRY); + new PipelinesDataflowModelParser(pipelineConfigurationReader); final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); } - - @Test - void parseConfiguration_from_directory_with_no_yaml_files_should_throw() { - final PipelinesDataflowModelParser pipelinesDataflowModelParser = - new PipelinesDataflowModelParser(TestConfigurationProvider.EMPTY_PIPELINE_DIRECTOTRY); - final RuntimeException actualException = assertThrows(RuntimeException.class, - pipelinesDataflowModelParser::parseConfiguration); - assertThat(actualException.getMessage(), equalTo( - String.format("Pipelines configuration file not found at %s", TestConfigurationProvider.EMPTY_PIPELINE_DIRECTOTRY))); - } } \ No newline at end of file From 641b6c6e5491582c89537f38f851c8a0090c6eb8 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 19 Mar 2024 10:57:21 -0500 Subject: [PATCH 2/2] Address PR comments Signed-off-by: Taylor Gray --- .../parser/PipelineConfigurationFileReader.java | 9 +++++++-- .../PipelineConfigurationFileReaderTest.java | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java index 35519f31d1..0867b1e824 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java @@ -32,8 +32,13 @@ private List getInputStreamsForConfigurationFiles() { final File configurationLocation = new File(pipelineConfigurationFileLocation); if (configurationLocation.isFile()) { - return Stream.of(configurationLocation).map(this::getInputStreamForFile) + final List inputStreams = Stream.of(configurationLocation).map(this::getInputStreamForFile) .filter(Objects::nonNull).collect(Collectors.toList()); + + if (inputStreams.size() != 1) { + throw new ParseException(format("Pipeline configuration file not loadable at %s", configurationLocation.getName())); + } + return inputStreams; } else if (configurationLocation.isDirectory()) { FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); List inputStreams = Stream.of(configurationLocation.listFiles(yamlFilter)) @@ -59,7 +64,7 @@ private InputStream getInputStreamForFile(final File pipelineConfigurationFile) try { return new FileInputStream(pipelineConfigurationFile); } catch (IOException e) { - LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName()); + LOG.warn("Unable to load pipeline configuration file {}", pipelineConfigurationFile.getName()); return null; } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java index 2811551e86..467e7816ae 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReaderTest.java @@ -48,6 +48,22 @@ void getPipelineConfigurationInputStreams_with_a_configuration_file_which_does_n assertThat(actualException.getMessage(), equalTo("Pipelines configuration file not found at file_does_not_exist.yml")); } + @Test + void getPipelineConfigurationInputStreams_with_a_configuration_file_exists_and_is_not_loadable_should_throw() throws IOException { + final String yamlContent = UUID.randomUUID().toString(); + final Path file = tempDir.resolve("test-pipeline.yaml"); + Files.writeString(file, yamlContent); + + file.toFile().setReadable(false, false); + + final PipelineConfigurationReader objectUnderTest = + new PipelineConfigurationFileReader(file.toString()); + + final RuntimeException actualException = assertThrows(RuntimeException.class, + objectUnderTest::getPipelineConfigurationInputStreams); + assertThat(actualException.getMessage(), equalTo("Pipeline configuration file not loadable at test-pipeline.yaml")); + } + @Test void getPipelineConfigurationInput_streams_from_existing_file() throws IOException {