Skip to content

Commit

Permalink
Add support for writing tags along with events to Sink (#2850)
Browse files Browse the repository at this point in the history
* Updated to pass SinkContext to Sink constructors as suggested in the previous comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixed check style errors and renamed RoutedPluginSetting to SinkContextPluginSetting

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fixed s3-sink integration test

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Added javadoc for SinkContext

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
kkondaka and Krishna Kondaka authored Jun 27, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 51722ba commit 37297e7
Showing 33 changed files with 451 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -28,8 +28,8 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

SinkModel(final String pluginName, final List<String> routes, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, pluginSettings));
SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
@@ -46,18 +46,30 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

/**
* Gets the tags target key associated with this Sink.
*
* @return The tags target key
* @since 2.4
*/
public String getTagsTargetKey() {
return this.<SinkInternalJsonModel>getInternalJsonModel().tagsTargetKey;
}

public static class SinkModelBuilder {

private final PluginModel pluginModel;
private final List<String> routes;
private final String tagsTargetKey;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings());
}
}

@@ -70,21 +82,27 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("routes")
private final List<String> routes;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("tags_target_key")
private final String tagsTargetKey;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes) {
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey) {
super();
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}

private SinkInternalJsonModel(final List<String> routes, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}
}

static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null));
}
}
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.plugin;

import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.util.List;
@@ -27,6 +28,18 @@ public interface PluginFactory {
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting);

/**
* Loads a new instance of a plugin with SinkContext.
*
* @param baseClass The class type that the plugin is supporting.
* @param pluginSetting The {@link PluginSetting} to configure this plugin
* @param sinkContext The {@link SinkContext} to configure this plugin
* @param <T> The type
* @return A new instance of your plugin, configured
* @since 1.2
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final SinkContext sinkContext);

/**
* Loads a specified number of plugin instances. The total number of instances is provided
* by the numberOfInstancesFunction.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import java.util.Collection;

/**
* Data Prepper Sink Context class. This the class for keeping global
* sink configuration as context so that individual sinks may use them.
*/
public class SinkContext {
private final String tagsTargetKey;
private final Collection<String> routes;

public SinkContext(final String tagsTargetKey, final Collection<String> routes) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
}

/**
* returns the target key name for tags if configured for a given sink
* @return tags target key
*/
public String getTagsTargetKey() {
return tagsTargetKey;
}

/**
* returns routes if configured for a given sink
* @return routes
*/
public Collection<String> getRoutes() {
return routes;
}
}

Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
@@ -72,7 +72,7 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing
final DataPrepperVersion version = DataPrepperVersion.parse("2.0");
final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel));
@@ -93,7 +93,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> preppers = Collections.singletonList(new PluginModel("testPrepper", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
@@ -74,13 +75,15 @@ void serialize_into_known_SinkModel() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), pluginSettings);
final String tagsTargetKey = "tags";
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, pluginSettings);

final String actualJson = objectMapper.writeValueAsString(sinkModel);

final String expectedJson = createStringFromInputStream(this.getClass().getResourceAsStream("sink_plugin.yaml"));

assertThat("---\n" + actualJson, equalTo(expectedJson));
assertThat(sinkModel.getTagsTargetKey(), equalTo(tagsTargetKey));
}

@Test
@@ -93,7 +96,8 @@ void deserialize_with_any_pluginModel() throws IOException {
assertAll(
() -> assertThat(sinkModel.getPluginName(), equalTo("customPlugin")),
() -> assertThat(sinkModel.getPluginSettings(), notNullValue()),
() -> assertThat(sinkModel.getRoutes(), notNullValue())
() -> assertThat(sinkModel.getRoutes(), notNullValue()),
() -> assertThat(sinkModel.getTagsTargetKey(), nullValue())
);
assertAll(
() -> assertThat(sinkModel.getPluginSettings().size(), equalTo(3)),
@@ -123,7 +127,7 @@ void serialize_with_just_pluginModel() throws IOException {
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
pluginSettings.put("key3", "value3");
final SinkModel sinkModel = new SinkModel("customPlugin", null, pluginSettings);
final SinkModel sinkModel = new SinkModel("customPlugin", null, null, pluginSettings);

final String actualJson = objectMapper.writeValueAsString(sinkModel);

@@ -156,10 +160,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
assertThat(actualSinkModel.getPluginSettings(), equalTo(pluginSettings));
assertThat(actualSinkModel.getRoutes(), notNullValue());
assertThat(actualSinkModel.getRoutes(), empty());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
}
}

private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import org.apache.commons.lang3.RandomStringUtils;



public class SinkContextTest {
private SinkContext sinkContext;

@Test
public void testSinkContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
final List<String> testRoutes = Collections.emptyList();
sinkContext = new SinkContext(testTagsTargetKey, testRoutes);
assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(sinkContext.getRoutes(), equalTo(testRoutes));

}

}

Original file line number Diff line number Diff line change
@@ -3,5 +3,6 @@ customSinkPlugin:
routes:
- "routeA"
- "routeB"
tags_target_key: "tags"
key1: "value1"
key2: "value2"
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -82,7 +82,7 @@ private static void visitAndValidate(
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipeline);
touchedPipelineSet.add(pipeline);
//if validation is successful, then there is definitely sink
final List<RoutedPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
//Recursively check connected pipelines
for (PluginSetting pluginSetting : connectedPipelinesSettings) {
//Further process only if the sink is of pipeline type
@@ -159,7 +159,7 @@ private static void validateForOrphans(
throw new RuntimeException("Invalid configuration, cannot proceed with ambiguous configuration");
}
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(currentPipelineName);
final List<RoutedPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
for (PluginSetting pluginSetting : pluginSettings) {
if (PIPELINE_TYPE.equals(pluginSetting.getName()) &&
pluginSetting.getAttributeFromSettings(PIPELINE_ATTRIBUTE_NAME) != null) {
Original file line number Diff line number Diff line change
@@ -19,9 +19,10 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator;
@@ -292,13 +293,13 @@ private Optional<Source> getSourceIfPipelineType(
return Optional.empty();
}

private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final RoutedPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting);
private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final SinkContextPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext());

return new DataFlowComponent<>(sink, pluginSetting.getRoutes());
return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes());
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkContext sinkContext) {
LOG.info("Building [{}] as sink component", pluginSetting.getName());
final Optional<String> pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting);
if (pipelineNameOptional.isPresent()) { //update to ifPresentOrElse when using JDK9
@@ -307,7 +308,7 @@ private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
sourceConnectorMap.put(pipelineName, pipelineConnector); //TODO retrieve from parent Pipeline using name
return pipelineConnector;
} else {
return pluginFactory.loadPlugin(Sink.class, pluginSetting);
return pluginFactory.loadPlugin(Sink.class, pluginSetting, sinkContext);
}
}

@@ -337,7 +338,7 @@ private void removeConnectedPipelines(
sourcePipeline, pipelineConfigurationMap, pipelineMap));

//remove sink connected pipelines
final List<RoutedPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
sinkPluginSettings.forEach(sinkPluginSetting -> {
getPipelineNameIfPipelineType(sinkPluginSetting).ifPresent(sinkPipeline -> processRemoveIfRequired(
sinkPipeline, pipelineConfigurationMap, pipelineMap));
Loading

0 comments on commit 37297e7

Please sign in to comment.