Skip to content

Commit

Permalink
MAINT: deprecate pipeline_configurations with extension (#4428)
Browse files Browse the repository at this point in the history
* MAINT: deprecate-pipeline_configurations with extension

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Apr 19, 2024
1 parent 8255920 commit 2e8ea82
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.configuration;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -30,7 +31,8 @@ public class PipelinesDataFlowModel {
@JsonInclude(JsonInclude.Include.NON_NULL)
private DataPrepperVersion version;

@JsonProperty("pipeline_configurations")
@JsonAlias("pipeline_configurations")
@JsonProperty("extension")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
private PipelineExtensions pipelineExtensions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class PipelinesDataFlowModelTest {
private static final String RESOURCE_PATH_WITH_ROUTES = "/pipelines_data_flow_routes.yaml";
private static final String RESOURCE_PATH_WITH_SHORT_HAND_VERSION = "/pipeline_with_short_hand_version.yaml";
private static final String RESOURCE_PATH_WITH_VERSION = "/pipeline_with_version.yaml";
private static final String RESOURCE_PATH_WITH_DEPRECATED_EXTENSION = "/pipeline_with_depreciated_extension.yaml";
private static final String RESOURCE_PATH_WITH_EXTENSION = "/pipeline_with_extension.yaml";
private ObjectMapper objectMapper;

Expand Down Expand Up @@ -272,12 +273,21 @@ void deserialize_PipelinesDataFlowModel_with_shorthand_version() throws IOExcept
assertThat(version.getMinorVersion(), is(equalTo(Optional.empty())));
}

@Test
void deserialize_PipelinesDataFlowModel_with_deprecated_pipeline_configurations() throws IOException {
final InputStream inputStream = this.getClass().getResourceAsStream(RESOURCE_PATH_WITH_DEPRECATED_EXTENSION);

final PipelinesDataFlowModel actualModel = objectMapper.readValue(inputStream, PipelinesDataFlowModel.class);
assertThat(actualModel, notNullValue());
assertThat(actualModel.getPipelineExtensions(), notNullValue());
assertThat(actualModel.getPipelineExtensions().getExtensionMap().containsKey("test_extension"), is(true));
}

@Test
void deserialize_PipelinesDataFlowModel_with_extension() throws IOException {
final InputStream inputStream = this.getClass().getResourceAsStream(RESOURCE_PATH_WITH_EXTENSION);

final PipelinesDataFlowModel actualModel = objectMapper.readValue(inputStream, PipelinesDataFlowModel.class);

assertThat(actualModel, notNullValue());
assertThat(actualModel.getPipelineExtensions(), notNullValue());
assertThat(actualModel.getPipelineExtensions().getExtensionMap().containsKey("test_extension"), is(true));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pipeline_configurations:
test_extension:
test-pipeline:
source:
testSource: null
processor:
- testProcessor: null
sink:
- testSink: null
workers: 8
delay: 50
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pipeline_configurations:
extension:
test_extension:
test-pipeline:
source:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private PipelinesDataFlowModel mergePipelinesDataModels(
if (pipelineExtensionsList.size() > 1 ||
(pipelineExtensionsList.size() == 1 && pipelinesDataFlowModels.size() > 1)) {
throw new ParseException(
"pipeline_configurations and definition must all be defined in a single YAML file if pipeline_configurations is configured.");
"extension/pipeline_configurations and definition must all be defined in a single YAML file if extension/pipeline_configurations is configured.");
}
return pipelineExtensionsList.isEmpty() ? new PipelinesDataFlowModel(pipelinesDataFlowModelMap) :
new PipelinesDataFlowModel(pipelineExtensionsList.get(0), pipelinesDataFlowModelMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
Expand Down Expand Up @@ -85,10 +87,12 @@ void parseConfiguration_from_string_input_stream_creates_the_correct_model() {
equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES));
}

@Test
void parseConfiguration_with_valid_pipelines_and_extensions() throws FileNotFoundException {
@ParameterizedTest
@ValueSource(strings = {TestConfigurationProvider.VALID_PIPELINE_CONFIG_FILE_WITH_DEPRECATED_EXTENSIONS,
TestConfigurationProvider.VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSION})
void parseConfiguration_with_valid_pipelines_and_extension(final String filePath) throws FileNotFoundException {
when(pipelineConfigurationReader.getPipelineConfigurationInputStreams())
.thenReturn(List.of(new FileInputStream(TestConfigurationProvider.VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS)));
.thenReturn(List.of(new FileInputStream(filePath)));

final PipelinesDataflowModelParser pipelinesDataflowModelParser =
new PipelinesDataflowModelParser(pipelineConfigurationReader);
Expand Down Expand Up @@ -158,8 +162,8 @@ void parseConfiguration_from_directory_with_multiple_files_and_pipeline_extensio
final ParseException actualException = assertThrows(
ParseException.class, pipelinesDataflowModelParser::parseConfiguration);
assertThat(actualException.getMessage(), equalTo(
"pipeline_configurations and definition must all be defined in a single YAML file " +
"if pipeline_configurations is configured."));
"extension/pipeline_configurations and definition must all be defined in a single YAML file " +
"if extension/pipeline_configurations is configured."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class TestConfigurationProvider {
public static final Integer DEFAULT_READ_BATCH_DELAY = 3_000;
public static final Integer TEST_DELAY = 3_000;
public static final String VALID_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_pipeline_configuration.yml";
public static final String VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS = "src/test/resources/valid_pipeline_configuration_with_extensions.yml";
public static final String VALID_PIPELINE_CONFIG_FILE_WITH_DEPRECATED_EXTENSIONS = "src/test/resources/valid_pipeline_configuration_with_deprecated_extensions.yml";
public static final String VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSION = "src/test/resources/valid_pipeline_configuration_with_extension.yml";
public static final String MULTI_FILE_PIPELINE_DIRECTOTRY = "src/test/resources/multi-pipelines";
public static final String MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY = "src/test/resources/multi-pipelines-distributed-pipeline-configurations";
public static final String MULTI_FILE_PIPELINE_WITH_SINGLE_PIPELINE_CONFIGURATIONS_DIRECTOTRY = "src/test/resources/multi-pipelines-single-pipeline-configurations";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# this configuration file is solely for testing formatting
extension:
test_extension:
test_attribute: test_string_1
test-pipeline-1:
source:
file:
path: "/tmp/file-source.tmp"
buffer:
bounded_blocking: #to check non object nodes for plugins
sink:
- pipeline:
name: "test-pipeline-2"
test-pipeline-2:
source:
pipeline:
name: "test-pipeline-1"
sink:
- pipeline:
name: "test-pipeline-3"
test-pipeline-3:
source:
pipeline:
name: "test-pipeline-2"
sink:
- file:
path: "/tmp/todelete.txt"

0 comments on commit 2e8ea82

Please sign in to comment.