Skip to content

Commit

Permalink
Data Prepper Extensions #2636, #2637 (#2730)
Browse files Browse the repository at this point in the history
Data Prepper Extensions #2636, #2637

Initial work supports the basic model and the ability to inject shared objects across plugins.

Signed-off-by: David Venable <[email protected]>

---------

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored May 24, 2023
1 parent 5e92474 commit 3ce5b53
Show file tree
Hide file tree
Showing 25 changed files with 1,253 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.plugin;

/**
* The interface to implement to become an extension.
*
* @since 2.3
*/
public interface ExtensionPlugin {
/**
* Register your extension with the available {@link ExtensionPoints} provided
* by Data Prepper.
* <p>
* Each extension will have this method called once on start-up.
*
* @param extensionPoints The {@link ExtensionPoints} wherein the extension can extend behaviors.
*/
void apply(ExtensionPoints extensionPoints);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.plugin;

/**
* A model for extending Data Prepper. A Data Prepper extension will call methods in a provided instance
* of this class.
*
* @since 2.3
*/
public interface ExtensionPoints {
/**
* Adds an {@link ExtensionProvider} to Data Prepper. This allows an extension to make a class
* available to plugins within Data Prepper.
*
* @param extensionProvider The {@link ExtensionProvider} which this extension is creating.
* @since 2.3
*/
void addExtensionProvider(ExtensionProvider<?> extensionProvider);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.plugin;

import java.util.Optional;

/**
* An interface to be provided by extensions which wish to provide classes to plugins.
*
* @param <T> The type of class provided.
* @since 2.3
*/
public interface ExtensionProvider<T> {
/**
* Returns an instance of the class being provided.
* <p>
* This is called everytime a plugin requires an instance. The implementor can re-use
* instances, or create them on-demand depending on the intention of the extension
* author.
*
* @param context The context for the request. This is currently a placeholder.
* @return An instance as requested.
*/
Optional<T> provideInstance(Context context);

/**
* Returns the Java {@link Class} which this extension is providing.
*
* @return A {@link Class}.
*/
Class<T> supportedClass();

/**
* The context for creating a new instance.
*
* @since 2.3
*/
interface Context {

}
}
1 change: 1 addition & 0 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation 'org.apache.logging.log4j:log4j-core'
implementation 'org.apache.logging.log4j:log4j-slf4j2-impl'
implementation 'javax.inject:javax.inject:1'
implementation 'javax.annotation:javax.annotation-api:1.3.2'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
import org.reflections.util.FilterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Implements {@link ExtensionClassProvider} using the classpath to detect extensions.
* This uses the same {@link PluginPackagesSupplier} as {@link ClasspathPluginProvider}.
*/
@Named
public class ClasspathExtensionClassProvider implements ExtensionClassProvider {
private static final Logger LOG = LoggerFactory.getLogger(ClasspathExtensionClassProvider.class);
private final Reflections reflections;
private Set<Class<? extends ExtensionPlugin>> extensionPluginClasses;

@Inject
public ClasspathExtensionClassProvider() {
this(createReflections());
}

private static Reflections createReflections() {
final String[] packages = new PluginPackagesSupplier().get();
FilterBuilder filterBuilder = new FilterBuilder();
for (String packageToInclude : packages) {
filterBuilder = filterBuilder.includePackage(packageToInclude);
}

return new Reflections(new ConfigurationBuilder()
.forPackages(packages)
.filterInputsBy(filterBuilder));
}

/**
* For testing purposes.
*
* @param reflections A {@link Reflections} object.
*/
ClasspathExtensionClassProvider(final Reflections reflections) {
this.reflections = reflections;
}

@Override
public Collection<Class<? extends ExtensionPlugin>> loadExtensionPluginClasses() {
if (extensionPluginClasses == null) {
extensionPluginClasses = scanForExtensionPlugins();
}
return extensionPluginClasses;
}

private Set<Class<? extends ExtensionPlugin>> scanForExtensionPlugins() {
final Set<Class<? extends ExtensionPlugin>> extensionClasses = reflections.getSubTypesOf(ExtensionPlugin.class);

if (LOG.isDebugEnabled()) {
LOG.debug("Found {} extension classes.", extensionClasses.size());
LOG.debug("Extensions classes: {}",
extensionClasses.stream().map(Class::getName).collect(Collectors.joining(", ")));
}

return extensionClasses;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
* An internal class which represents all the data which can be provided
* when constructing a new plugin.
*/
class ComponentPluginArgumentsContext implements PluginArgumentsContext {
private static final String UNABLE_TO_CREATE_PLUGIN_PARAMETER = "Unable to create an argument for required plugin parameter type: ";
private final Map<Class<?>, Supplier<Object>> typedArgumentsSuppliers;

@Nullable
private final BeanFactory beanFactory;

private ComponentPluginArgumentsContext(final Builder builder) {
Objects.requireNonNull(builder.pluginSetting,
"PluginArgumentsContext received a null Builder object. This is likely an error in the plugin framework.");

beanFactory = builder.beanFactory;

typedArgumentsSuppliers = new HashMap<>();

typedArgumentsSuppliers.put(PluginSetting.class, () -> builder.pluginSetting);

if(builder.pluginConfiguration != null) {
typedArgumentsSuppliers.put(builder.pluginConfiguration.getClass(), () -> builder.pluginConfiguration);
}

typedArgumentsSuppliers.put(PluginMetrics.class, () -> PluginMetrics.fromPluginSetting(builder.pluginSetting));

if (builder.pipelineDescription != null) {
typedArgumentsSuppliers.put(PipelineDescription.class, () -> builder.pipelineDescription);
}

if (builder.pluginFactory != null) {
typedArgumentsSuppliers.put(PluginFactory.class, () -> builder.pluginFactory);
}

if (builder.eventFactory != null) {
typedArgumentsSuppliers.put(EventFactory.class, () -> builder.eventFactory);
}

if (builder.acknowledgementSetManager != null) {
typedArgumentsSuppliers.put(AcknowledgementSetManager.class, () -> builder.acknowledgementSetManager);
}
}

@Override
public Object[] createArguments(final Class<?>[] parameterTypes) {
return Arrays.stream(parameterTypes)
.map(this::getRequiredArgumentSupplier)
.map(Supplier::get)
.toArray();
}

private Supplier<Object> getRequiredArgumentSupplier(final Class<?> parameterType) {
if(typedArgumentsSuppliers.containsKey(parameterType)) {
return typedArgumentsSuppliers.get(parameterType);
}
else if (beanFactory != null) {
return createBeanSupplier(parameterType, beanFactory);
}
else {
throw new InvalidPluginDefinitionException(UNABLE_TO_CREATE_PLUGIN_PARAMETER + parameterType);
}
}

/**
* @since 1.3
*
* Create a supplier to return a bean of type <pre>parameterType</pre> if one is available in <pre>beanFactory</pre>
*
* @param parameterType type of bean requested
* @param beanFactory bean source the generated supplier will use
* @return supplier of object type bean
* @throws InvalidPluginDefinitionException if no bean is available from beanFactory
*/
private <T> Supplier<T> createBeanSupplier(final Class<? extends T> parameterType, final BeanFactory beanFactory) {
return () -> {
try {
return beanFactory.getBean(parameterType);
} catch (final BeansException e) {
throw new InvalidPluginDefinitionException(UNABLE_TO_CREATE_PLUGIN_PARAMETER + parameterType, e);
}
};
}

static class Builder {
private Object pluginConfiguration;
private PluginSetting pluginSetting;
private PluginFactory pluginFactory;
private PipelineDescription pipelineDescription;
private BeanFactory beanFactory;
private EventFactory eventFactory;
private AcknowledgementSetManager acknowledgementSetManager;

Builder withPluginConfiguration(final Object pluginConfiguration) {
this.pluginConfiguration = pluginConfiguration;
return this;
}

Builder withPluginSetting(final PluginSetting pluginSetting) {
this.pluginSetting = pluginSetting;
return this;
}

Builder withEventFactory(final EventFactory eventFactory) {
this.eventFactory = eventFactory;
return this;
}

Builder withAcknowledgementSetManager(final AcknowledgementSetManager acknowledgementSetManager) {
this.acknowledgementSetManager = acknowledgementSetManager;
return this;
}

Builder withPluginFactory(final PluginFactory pluginFactory) {
this.pluginFactory = pluginFactory;
return this;
}

Builder withPipelineDescription(final PipelineDescription pipelineDescription) {
this.pipelineDescription = pipelineDescription;
return this;
}

Builder withBeanFactory(final BeanFactory beanFactory) {
this.beanFactory = beanFactory;
return this;
}

ComponentPluginArgumentsContext build() {
return new ComponentPluginArgumentsContext(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.plugin.ExtensionPoints;
import org.opensearch.dataprepper.model.plugin.ExtensionProvider;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.support.GenericApplicationContext;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.Objects;

@Named
public class DataPrepperExtensionPoints implements ExtensionPoints {
private static final ExtensionProvider.Context EMPTY_CONTEXT = new EmptyContext();
private final GenericApplicationContext sharedApplicationContext;

@Inject
public DataPrepperExtensionPoints(
final PluginBeanFactoryProvider pluginBeanFactoryProvider) {
Objects.requireNonNull(pluginBeanFactoryProvider);
Objects.requireNonNull(pluginBeanFactoryProvider.getSharedPluginApplicationContext());
this.sharedApplicationContext = pluginBeanFactoryProvider.getSharedPluginApplicationContext();
}

@Override
public void addExtensionProvider(final ExtensionProvider extensionProvider) {
sharedApplicationContext.registerBean(
extensionProvider.supportedClass(),
() -> extensionProvider.provideInstance(EMPTY_CONTEXT),
b -> b.setScope(BeanDefinition.SCOPE_PROTOTYPE));
}

private static class EmptyContext implements ExtensionProvider.Context {

}
}
Loading

0 comments on commit 3ce5b53

Please sign in to comment.