diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 947c467ae1a50..037d98b68e6fd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -42,8 +42,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -55,12 +57,12 @@ public class ConnectorPluginsResource implements ConnectResource { private static final String ALIAS_SUFFIX = "Connector"; private final Herder herder; - private final List connectorPlugins; + private final Set connectorPlugins; private long requestTimeoutMs; public ConnectorPluginsResource(Herder herder) { this.herder = herder; - this.connectorPlugins = new ArrayList<>(); + this.connectorPlugins = new LinkedHashSet<>(); this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS; // TODO: improve once plugins are allowed to be added/removed during runtime. @@ -126,7 +128,7 @@ public List listConnectorPlugins( .filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type())) .collect(Collectors.toList())); } else { - return Collections.unmodifiableList(connectorPlugins); + return Collections.unmodifiableList(new ArrayList<>(connectorPlugins)); } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index c39017adc402f..52ac14ca1cd64 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -34,6 +34,8 @@ import org.apache.kafka.connect.runtime.SampleSinkConnector; import org.apache.kafka.connect.runtime.SampleSourceConnector; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -62,9 +64,11 @@ import org.mockito.junit.MockitoJUnitRunner; import javax.ws.rs.BadRequestException; +import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -120,6 +124,7 @@ public class ConnectorPluginsResourceTest { static { try { ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader(); + ClassLoader pluginClassLoader = new PluginClassLoader(DelegatingClassLoaderTest.ARBITRARY_URL, new URL[]{}, classLoader); String appVersion = AppInfoParser.getVersion(); SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, classLoader)); SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, classLoader)); @@ -132,6 +137,10 @@ public class ConnectorPluginsResourceTest { CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.CONVERTER, classLoader)); CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.CONVERTER, classLoader)); + // Same class, version, and type, but loaded from a different classloader + CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.CONVERTER, pluginClassLoader)); + CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.CONVERTER, pluginClassLoader)); + HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader)); HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader)); @@ -386,7 +395,7 @@ public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws @Test public void testListAllPlugins() { - Set expectedConnectorPlugins = Stream.of( + List expectedConnectorPlugins = Stream.of( SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS, CONVERTER_PLUGINS, @@ -395,8 +404,14 @@ public void testListAllPlugins() { PREDICATE_PLUGINS ).flatMap(Collection::stream) .map(PluginInfo::new) - .collect(Collectors.toSet()); - Set actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false)); + .distinct() + .collect(Collectors.toList()); + List actualConnectorPlugins = new ArrayList<>(connectorPluginsResource.listConnectorPlugins(false)); + Comparator compare = Comparator.comparing(PluginInfo::className) + .thenComparing(PluginInfo::type) + .thenComparing(PluginInfo::version); + actualConnectorPlugins.sort(compare); + expectedConnectorPlugins.sort(compare); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); verify(herder, atLeastOnce()).plugins(); }