Skip to content

Commit

Permalink
KAFKA-15473: Hide duplicate plugins in /connector-plugins (apache#14398)
Browse files Browse the repository at this point in the history
Reviewers: Yash Mayya <[email protected]>, Sagar Rao <[email protected]>, Hector Geraldino <[email protected]>, Chris Egerton <[email protected]>
  • Loading branch information
gharris1727 authored Sep 19, 2023
1 parent 7872a1f commit b088307
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand All @@ -55,12 +57,12 @@ public class ConnectorPluginsResource implements ConnectResource {

private static final String ALIAS_SUFFIX = "Connector";
private final Herder herder;
private final List<PluginInfo> connectorPlugins;
private final Set<PluginInfo> connectorPlugins;
private long requestTimeoutMs;

public ConnectorPluginsResource(Herder herder) {
this.herder = herder;
this.connectorPlugins = new ArrayList<>();
this.connectorPlugins = new LinkedHashSet<>();
this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;

// TODO: improve once plugins are allowed to be added/removed during runtime.
Expand Down Expand Up @@ -126,7 +128,7 @@ public List<PluginInfo> listConnectorPlugins(
.filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type()))
.collect(Collectors.toList()));
} else {
return Collections.unmodifiableList(connectorPlugins);
return Collections.unmodifiableList(new ArrayList<>(connectorPlugins));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.kafka.connect.runtime.SampleSinkConnector;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
Expand Down Expand Up @@ -62,9 +64,11 @@
import org.mockito.junit.MockitoJUnitRunner;

import javax.ws.rs.BadRequestException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -120,6 +124,7 @@ public class ConnectorPluginsResourceTest {
static {
try {
ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader();
ClassLoader pluginClassLoader = new PluginClassLoader(DelegatingClassLoaderTest.ARBITRARY_URL, new URL[]{}, classLoader);
String appVersion = AppInfoParser.getVersion();
SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, classLoader));
SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, classLoader));
Expand All @@ -132,6 +137,10 @@ public class ConnectorPluginsResourceTest {
CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.CONVERTER, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.CONVERTER, classLoader));

// Same class, version, and type, but loaded from a different classloader
CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.CONVERTER, pluginClassLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.CONVERTER, pluginClassLoader));

HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader));

Expand Down Expand Up @@ -386,7 +395,7 @@ public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws

@Test
public void testListAllPlugins() {
Set<PluginInfo> expectedConnectorPlugins = Stream.of(
List<PluginInfo> expectedConnectorPlugins = Stream.of(
SINK_CONNECTOR_PLUGINS,
SOURCE_CONNECTOR_PLUGINS,
CONVERTER_PLUGINS,
Expand All @@ -395,8 +404,14 @@ public void testListAllPlugins() {
PREDICATE_PLUGINS
).flatMap(Collection::stream)
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
.distinct()
.collect(Collectors.toList());
List<PluginInfo> actualConnectorPlugins = new ArrayList<>(connectorPluginsResource.listConnectorPlugins(false));
Comparator<PluginInfo> compare = Comparator.comparing(PluginInfo::className)
.thenComparing(PluginInfo::type)
.thenComparing(PluginInfo::version);
actualConnectorPlugins.sort(compare);
expectedConnectorPlugins.sort(compare);
assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
verify(herder, atLeastOnce()).plugins();
}
Expand Down

0 comments on commit b088307

Please sign in to comment.