diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index f93e561542eeb..53418744b5486 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -197,7 +197,8 @@ protected EntryFilter load(EntryFilterMetaData metadata) + " does not implement entry filter interface"); } EntryFilter pi = (EntryFilter) filter; - return new EntryFilterWithClassLoader(pi, ncl); + // the classloader is shared with the broker, the instance doesn't own it + return new EntryFilterWithClassLoader(pi, ncl, false); } catch (Throwable e) { if (e instanceof IOException) { throw (IOException) e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index c5c5721087788..aab46c62acdb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -30,15 +30,23 @@ public class EntryFilterWithClassLoader implements EntryFilter { private final EntryFilter entryFilter; private final NarClassLoader classLoader; + private final boolean classLoaderOwned; - public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) { + public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader, boolean classLoaderOwned) { this.entryFilter = entryFilter; this.classLoader = classLoader; + this.classLoaderOwned = classLoaderOwned; } @Override public FilterResult filterEntry(Entry entry, FilterContext context) { - return entryFilter.filterEntry(entry, context); + ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(classLoader); + return entryFilter.filterEntry(entry, context); + } finally { + Thread.currentThread().setContextClassLoader(currentClassLoader); + } } @VisibleForTesting @@ -48,11 +56,20 @@ public EntryFilter getEntryFilter() { @Override public void close() { - entryFilter.close(); + ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); try { - classLoader.close(); - } catch (IOException e) { - log.error("close EntryFilterWithClassLoader failed", e); + Thread.currentThread().setContextClassLoader(classLoader); + entryFilter.close(); + } finally { + Thread.currentThread().setContextClassLoader(currentClassLoader); + } + if (classLoaderOwned) { + log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName()); + try { + classLoader.close(); + } catch (IOException e) { + log.error("close EntryFilterWithClassLoader failed", e); + } } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 7b3daddcd9da0..f7388ef9eb990 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -239,9 +239,9 @@ public void testFilter() throws Exception { hasFilterField.setAccessible(true); NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter1, narClassLoader); + EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); EntryFilter filter2 = new EntryFilter2Test(); - EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter2, narClassLoader); + EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter2, narClassLoader, false); field.set(dispatcher, List.of(loader1, loader2)); hasFilterField.set(dispatcher, true); @@ -371,9 +371,9 @@ public void testFilteredMsgCount(String topic) throws Throwable { hasFilterField.setAccessible(true); NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); EntryFilter filter2 = new EntryFilter2Test(); - EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader); + EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader, false); field.set(dispatcher, List.of(loader1, loader2)); hasFilterField.set(dispatcher, true); @@ -463,10 +463,10 @@ public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscriptio NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); EntryFilterWithClassLoader loader1 = - spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); EntryFilter filter2 = new EntryFilterTest(); EntryFilterWithClassLoader loader2 = - spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader); + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader, false); field.set(dispatcher, List.of(loader1, loader2)); hasFilterField.set(dispatcher, true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 024d8582fa213..5b2998216e8e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -409,7 +409,7 @@ public void testAvgMessagesPerEntry() throws Exception { EntryFilter filter = new EntryFilterProducerTest(); EntryFilterWithClassLoader loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter, - narClassLoader); + narClassLoader, false); Pair> entryFilters = Pair.of("filter", List.of(loader)); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index 3e71d8f211101..bc4cb73e5b6fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -208,7 +208,7 @@ public void testSubscriptionStats(final String topic, final String subName, bool NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); EntryFilterWithClassLoader loader1 = - spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); field.set(dispatcher, List.of(loader1)); hasFilterField.set(dispatcher, true); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java index 9736d8b47ef71..44cfc2872ef6b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java @@ -40,6 +40,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -135,6 +136,7 @@ public class NarClassLoader extends URLClassLoader { * The NAR for which this ClassLoader is responsible. */ private final File narWorkingDirectory; + private final AtomicBoolean closed = new AtomicBoolean(); private static final String TMP_DIR_PREFIX = "pulsar-nar"; @@ -292,4 +294,18 @@ protected String findLibrary(final String libname) { public String toString() { return NarClassLoader.class.getName() + "[" + narWorkingDirectory.getPath() + "]"; } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (closed.get()) { + log.warn("Loading class {} from a closed classloader ({})", name, this); + } + return super.loadClass(name, resolve); + } + + @Override + public void close() throws IOException { + closed.set(true); + super.close(); + } }