Skip to content

Commit

Permalink
KAFKA-18419: KIP-891 Connect Multiversion Support (Transformation and…
Browse files Browse the repository at this point in the history
… Predicate Changes) (#17742)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
snehashisp authored Jan 6, 2025
1 parent 23e77ed commit ad33698
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,11 @@ ConfigInfos validateConnectorConfig(

addNullValuedErrors(connectorProps, validatedConnectorConfig);

ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
// the order of operations here is important, converter validations can add error messages to the connector config
// which are collected and converted to ConfigInfos in validateConnectorPluginSpecifiedConfigs
ConfigInfos converterConfigInfo = validateAllConverterConfigs(connectorProps, validatedConnectorConfig, connectorLoader, reportStage);
ConfigInfos clientOverrideInfo = validateClientOverrides(connectorProps, connectorType, connector.getClass(), reportStage, doLog);
ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);

return mergeConfigInfos(connType,
connectorConfigInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;

Expand All @@ -31,8 +32,8 @@ public class CachedConnectors {
private static final String LATEST_VERSION = "latest";

private final Map<String, Map<String, Connector>> connectors;
private final Map<String, Exception> invalidConnectors;
private final Map<String, Map<String, Exception>> invalidVersions;
private final Map<String, Throwable> invalidConnectors;
private final Map<String, Map<String, VersionedPluginLoadingException>> invalidVersions;
private final Plugins plugins;

public CachedConnectors(Plugins plugins) {
Expand All @@ -42,14 +43,14 @@ public CachedConnectors(Plugins plugins) {
this.invalidVersions = new ConcurrentHashMap<>();
}

private void validate(String connectorName, VersionRange range) throws Exception {
private void validate(String connectorName, VersionRange range) throws ConnectException, VersionedPluginLoadingException {
if (invalidConnectors.containsKey(connectorName)) {
throw new Exception(invalidConnectors.get(connectorName));
throw new ConnectException(invalidConnectors.get(connectorName));
}

String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
throw new Exception(invalidVersions.get(connectorName).get(version));
throw new VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage());
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.maven.artifact.versioning.ArtifactVersion;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
Expand All @@ -28,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -127,19 +127,7 @@ String resolveFullClassName(String classOrAlias) {
return aliases.getOrDefault(classOrAlias, classOrAlias);
}

String latestVersion(String classOrAlias) {
if (classOrAlias == null) {
return null;
}
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
return null;
}
return inner.lastKey().version();
}

String versionInLocation(String classOrAlias, String location) {
PluginDesc<?> pluginDesc(String classOrAlias, String preferredLocation, Set<PluginType> allowedTypes) {
if (classOrAlias == null) {
return null;
}
Expand All @@ -148,12 +136,17 @@ String versionInLocation(String classOrAlias, String location) {
if (inner == null) {
return null;
}
PluginDesc<?> result = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : inner.entrySet()) {
if (entry.getKey().location().equals(location)) {
return entry.getKey().version();
if (!allowedTypes.contains(entry.getKey().type())) {
continue;
}
result = entry.getKey();
if (result.location().equals(preferredLocation)) {
return result;
}
}
return null;
return result;
}

private ClassLoader findPluginLoader(
Expand All @@ -170,7 +163,6 @@ private ClassLoader findPluginLoader(
+ "Provided soft version: %s ", range));
}

ArtifactVersion version = null;
ClassLoader loader = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) {
// the entries should be in sorted order of versions so this should end up picking the latest version which matches the range
Expand Down Expand Up @@ -227,19 +219,19 @@ protected Class<?> loadVersionedPluginClass(
if (range == null) {
return plugin;
}
verifyClasspathVersionedPlugin(name, plugin, range);
verifyClasspathVersionedPlugin(fullName, plugin, range);
}
return plugin;
}

private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, VersionRange range) throws VersionedPluginLoadingException {
private void verifyClasspathVersionedPlugin(String fullName, Class<?> plugin, VersionRange range) throws VersionedPluginLoadingException {
String pluginVersion;
SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(name);
SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(fullName);

if (scannedPlugin == null) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)",
name
fullName
));
}

Expand All @@ -255,7 +247,7 @@ private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, Versio
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has multiple versions specified in class path, "
+ "only one version is allowed in class path for loading a plugin with version range",
name
fullName
));
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
Expand All @@ -264,7 +256,7 @@ private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, Versio
if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
name,
fullName,
pluginVersion,
range
), Collections.singletonList(pluginVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -265,15 +266,17 @@ public Function<ClassLoader, LoaderSwap> safeLoaderSwapper() {
};
}

public String latestVersion(String classOrAlias) {
return delegatingLoader.latestVersion(classOrAlias);
public String latestVersion(String classOrAlias, PluginType... allowedTypes) {
return pluginVersion(classOrAlias, null, allowedTypes);
}

public String pluginVersion(String classOrAlias, ClassLoader sourceLoader) {
if (!(sourceLoader instanceof PluginClassLoader)) {
return latestVersion(classOrAlias);
public String pluginVersion(String classOrAlias, ClassLoader sourceLoader, PluginType... allowedTypes) {
String location = (sourceLoader instanceof PluginClassLoader) ? ((PluginClassLoader) sourceLoader).location() : null;
PluginDesc<?> desc = delegatingLoader.pluginDesc(classOrAlias, location, new HashSet<>(Arrays.asList(allowedTypes)));
if (desc != null) {
return desc.version();
}
return delegatingLoader.versionInLocation(classOrAlias, ((PluginClassLoader) sourceLoader).location());
return null;
}

public DelegatingClassLoader delegatingLoader() {
Expand Down Expand Up @@ -376,7 +379,7 @@ public Object newPlugin(String classOrAlias, VersionRange range) throws Versione

public Object newPlugin(String classOrAlias, VersionRange range, ClassLoader sourceLoader) throws ClassNotFoundException {
if (range == null && sourceLoader instanceof PluginClassLoader) {
sourceLoader.loadClass(classOrAlias);
return newPlugin(sourceLoader.loadClass(classOrAlias));
}
return newPlugin(classOrAlias, range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,6 +77,14 @@ public HeaderConverterPluginVersionRecommender headerConverterPluginVersionRecom
return headerConverterPluginVersionRecommender;
}

public TransformationPluginRecommender transformationPluginRecommender(String classOrAlias) {
return new TransformationPluginRecommender(classOrAlias);
}

public PredicatePluginRecommender predicatePluginRecommender(String classOrAlias) {
return new PredicatePluginRecommender(classOrAlias);
}

public class ConnectorPluginVersionRecommender implements ConfigDef.Recommender {

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -195,4 +206,60 @@ protected Function<String, List<Object>> recommendations() {
.map(PluginDesc::version).distinct().collect(Collectors.toList());
}
}

// Recommender for transformation and predicate plugins
public abstract class SMTPluginRecommender<T> implements ConfigDef.Recommender {

protected abstract Function<String, Set<PluginDesc<T>>> plugins();

protected final String classOrAliasConfig;

public SMTPluginRecommender(String classOrAliasConfig) {
this.classOrAliasConfig = classOrAliasConfig;
}

@Override
@SuppressWarnings({"rawtypes"})
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
if (plugins == null) {
return Collections.emptyList();
}
if (parsedConfig.get(classOrAliasConfig) == null) {
return Collections.emptyList();
}

Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig);
return plugins().apply(classOrAlias.getName())
.stream().map(PluginDesc::version).distinct().collect(Collectors.toList());
}

@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return true;
}
}

public class TransformationPluginRecommender extends SMTPluginRecommender<Transformation<?>> {

public TransformationPluginRecommender(String classOrAliasConfig) {
super(classOrAliasConfig);
}

@Override
protected Function<String, Set<PluginDesc<Transformation<?>>>> plugins() {
return plugins::transformations;
}
}

public class PredicatePluginRecommender extends SMTPluginRecommender<Predicate<?>> {

public PredicatePluginRecommender(String classOrAliasConfig) {
super(classOrAliasConfig);
}

@Override
protected Function<String, Set<PluginDesc<Predicate<?>>>> plugins() {
return plugins::predicates;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand Down Expand Up @@ -559,12 +560,14 @@ public void testConfigValidationTopicsRegexWithDlq() {
}

@Test
public void testConfigValidationTransformsExtendResults() {
@SuppressWarnings("rawtypes")
public void testConfigValidationTransformsExtendResults() throws ClassNotFoundException {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);

// 2 transform aliases defined -> 2 plugin lookups
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), null, classLoader)).thenReturn(new SampleTransformation());

// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Expand All @@ -575,6 +578,7 @@ public void testConfigValidationTransformsExtendResults() {
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);

assertEquals(herder.connectorType(config), ConnectorType.SOURCE);

// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
Expand All @@ -596,7 +600,7 @@ public void testConfigValidationTransformsExtendResults() {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(31, infos.size());
assertEquals(33, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
Expand All @@ -611,12 +615,15 @@ public void testConfigValidationTransformsExtendResults() {
}

@Test
public void testConfigValidationPredicatesExtendResults() {
@SuppressWarnings("rawtypes")
public void testConfigValidationPredicatesExtendResults() throws ClassNotFoundException {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);

when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), null, classLoader)).thenReturn(new SampleTransformation());
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null, classLoader)).thenReturn(new SamplePredicate());

// Define 2 predicates. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Expand Down Expand Up @@ -653,7 +660,7 @@ public void testConfigValidationPredicatesExtendResults() {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(33, infos.size());
assertEquals(36, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ConnectorConfigTest<R extends ConnectRecord<R>> {

Expand Down Expand Up @@ -455,13 +457,19 @@ public static class Value<R extends ConnectRecord<R>> extends AbstractKeyValueTr
}

@Test
public void testEnrichedConfigDef() {
@SuppressWarnings("rawtypes")
public void testEnrichedConfigDef() throws ClassNotFoundException {
String alias = "hdt";
String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false);
Plugins mockPlugins = mock(Plugins.class);
when(mockPlugins.newPlugin(HasDuplicateConfigTransformation.class.getName(),
null, (ClassLoader) null)).thenReturn(new HasDuplicateConfigTransformation());
when(mockPlugins.transformations()).thenReturn(Collections.emptySet());
ConfigDef def = ConnectorConfig.enrich(mockPlugins, new ConfigDef(), props, false);
assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
assertEnrichedConfigDef(def, prefix, TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
assertEnrichedConfigDef(def, prefix, TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
Expand Down
Loading

0 comments on commit ad33698

Please sign in to comment.