From f74464030313ca61e81d324024f4b6b55c143f4c Mon Sep 17 00:00:00 2001 From: Shuchang Li Date: Tue, 14 May 2024 00:01:53 +0800 Subject: [PATCH] fix(java): ThreadPoolFury#factoryCallback don't work when create new classLoaderFuryPooled (#1628) ## What does this PR do? ThreadPoolFury#factoryCallback don't work when create new classLoaderFuryPooled. ## Related issues ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark --- .../fury/pool/FuryPooledObjectFactory.java | 9 +++- .../org/apache/fury/pool/ThreadPoolFury.java | 9 +++- .../org/apache/fury/ThreadSafeFuryTest.java | 46 +++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java index 9efc584a3e..1bc03403b2 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java @@ -22,6 +22,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.fury.Fury; import org.apache.fury.logging.Logger; @@ -62,15 +63,20 @@ public class FuryPooledObjectFactory { */ private final int maxPoolSize; + /** factoryCallback will be set in every new classLoaderFuryPooled so that can deal every fury. */ + private final Consumer factoryCallback; + public FuryPooledObjectFactory( Function furyFactory, int minPoolSize, int maxPoolSize, long expireTime, - TimeUnit timeUnit) { + TimeUnit timeUnit, + Consumer factoryCallback) { this.minPoolSize = minPoolSize; this.maxPoolSize = maxPoolSize; this.furyFactory = furyFactory; + this.factoryCallback = factoryCallback; classLoaderFuryPooledCache = CacheBuilder.newBuilder() .weakKeys() @@ -138,6 +144,7 @@ private synchronized ClassLoaderFuryPooled getOrAddCache(ClassLoader classLoader if (classLoaderFuryPooled == null) { classLoaderFuryPooled = new ClassLoaderFuryPooled(classLoader, furyFactory, minPoolSize, maxPoolSize); + classLoaderFuryPooled.setFactoryCallback(factoryCallback); classLoaderFuryPooledCache.put(classLoader, classLoaderFuryPooled); } return classLoaderFuryPooled; diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java index e9367dcfc6..f1c4bd6bb8 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java @@ -47,7 +47,13 @@ public ThreadPoolFury( long expireTime, TimeUnit timeUnit) { this.furyPooledObjectFactory = - new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, expireTime, timeUnit); + new FuryPooledObjectFactory( + furyFactory, + minPoolSize, + maxPoolSize, + expireTime, + timeUnit, + fury -> factoryCallback.accept(fury)); } @Override @@ -56,7 +62,6 @@ protected void processCallback(Consumer callback) { for (ClassLoaderFuryPooled furyPooled : furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) { furyPooled.allFury.keySet().forEach(callback); - furyPooled.setFactoryCallback(factoryCallback); } } diff --git a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java index 464ea6f5bc..294585729a 100644 --- a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java @@ -30,12 +30,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import lombok.Data; import org.apache.fury.config.Language; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.resolver.MetaContext; import org.apache.fury.serializer.Serializer; import org.apache.fury.test.bean.BeanA; import org.apache.fury.test.bean.BeanB; +import org.apache.fury.test.bean.Foo; import org.apache.fury.test.bean.Struct; import org.apache.fury.util.LoaderBinding.StagingType; import org.testng.Assert; @@ -319,4 +321,48 @@ public void testSerializeJavaObject() { Assert.assertEquals(fury.deserializeJavaObject(buffer, String.class), "abc"); } } + + @Data + static class Foo { + int f1; + } + + public static class FooSerializer extends Serializer { + public FooSerializer(Fury fury, Class type) { + super(fury, type); + } + + @Override + public void write(MemoryBuffer buffer, Foo value) { + buffer.writeInt32(value.f1); + } + + @Override + public Foo read(MemoryBuffer buffer) { + final Foo foo = new Foo(); + foo.f1 = buffer.readInt32(); + return foo; + } + } + + public static class CustomClassLoader extends ClassLoader { + public CustomClassLoader(ClassLoader parent) { + super(parent); + } + } + + @Test + public void testSerializerRegister() { + final ThreadSafeFury threadSafeFury = + Fury.builder().requireClassRegistration(false).buildThreadSafeFuryPool(0, 2); + threadSafeFury.registerSerializer(Foo.class, FooSerializer.class); + // create a new classLoader + threadSafeFury.setClassLoader(new CustomClassLoader(ClassLoader.getSystemClassLoader())); + threadSafeFury.execute( + fury -> { + Assert.assertEquals( + fury.getClassResolver().getSerializer(Foo.class).getClass(), FooSerializer.class); + return null; + }); + } }