diff --git a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java new file mode 100644 index 0000000000..9f60a1915c --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury; + +import java.util.function.Consumer; +import org.apache.fury.serializer.Serializer; + +public abstract class AbstractThreadSafeFury implements ThreadSafeFury { + @Override + public void register(Class clz) { + processCallback(fury -> fury.register(clz)); + } + + @Override + public void register(Class cls, boolean createSerializer) { + processCallback(fury -> fury.register(cls, createSerializer)); + } + + @Override + public void register(Class cls, Short id) { + processCallback(fury -> fury.register(cls, id)); + } + + @Override + public void register(Class cls, Short id, boolean createSerializer) { + processCallback(fury -> fury.register(cls, id, createSerializer)); + } + + @Override + public void registerSerializer(Class type, Class serializerClass) { + processCallback(fury -> fury.registerSerializer(type, serializerClass)); + } + + protected abstract void processCallback(Consumer callback); +} diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 60ca0da643..cd8bc0f11f 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -54,6 +54,7 @@ import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer; import org.apache.fury.serializer.Serializer; import org.apache.fury.serializer.SerializerFactory; +import org.apache.fury.serializer.Serializers; import org.apache.fury.serializer.StringSerializer; import org.apache.fury.type.Generics; import org.apache.fury.type.Type; @@ -178,6 +179,13 @@ public void register(Class cls, String typeTag) { classResolver.register(cls, typeTag); } + /** + * Register a Serializer. + * + * @param type class needed to be serialized/deserialized + * @param serializerClass serializer class can be created with {@link Serializers#newSerializer} + * @param type of class + */ public void registerSerializer(Class type, Class serializerClass) { classResolver.registerSerializer(type, serializerClass); } diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java index daf248f701..3dac5ba7e0 100644 --- a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java @@ -20,6 +20,8 @@ package org.apache.fury; import java.nio.ByteBuffer; +import java.util.WeakHashMap; +import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.concurrent.ThreadSafe; import org.apache.fury.memory.MemoryBuffer; @@ -36,18 +38,24 @@ * @author chaokunyang */ @ThreadSafe -public class ThreadLocalFury implements ThreadSafeFury { +public class ThreadLocalFury extends AbstractThreadSafeFury { private final ThreadLocal bufferLocal = ThreadLocal.withInitial(() -> MemoryUtils.buffer(32)); private final ThreadLocal bindingThreadLocal; + private Consumer factoryCallback; + private final WeakHashMap allFury; public ThreadLocalFury(Function furyFactory) { + factoryCallback = f -> {}; + allFury = new WeakHashMap<>(); bindingThreadLocal = ThreadLocal.withInitial( () -> { LoaderBinding binding = new LoaderBinding(furyFactory); + binding.setBindingCallback(factoryCallback); binding.setClassLoader(Thread.currentThread().getContextClassLoader()); + allFury.put(binding, null); return binding; }); // 1. init and warm for current thread. @@ -59,6 +67,15 @@ public ThreadLocalFury(Function furyFactory) { fury.getConfig().getConfigHash(), fury.getClassResolver()); } + @Override + protected void processCallback(Consumer callback) { + factoryCallback = factoryCallback.andThen(callback); + for (LoaderBinding binding : allFury.keySet()) { + binding.visitAllFury(callback); + binding.setBindingCallback(factoryCallback); + } + } + public R execute(Function action) { Fury fury = bindingThreadLocal.get().get(); return action.apply(fury); diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java b/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java index fd4e0ddad2..46c6e92426 100644 --- a/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.function.Function; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.serializer.Serializer; +import org.apache.fury.serializer.Serializers; import org.apache.fury.util.LoaderBinding; /** @@ -32,6 +34,40 @@ */ public interface ThreadSafeFury { + /** register class. */ + void register(Class cls); + + /** + * Register class. + * + * @param cls class to register + * @param createSerializer whether to create serializer, if true and codegen enabled, this will + * generate the serializer code too. + */ + void register(Class cls, boolean createSerializer); + + /** register class with given id. */ + void register(Class cls, Short id); + + /** + * Register class with specified id. + * + * @param cls class to register + * @param id id for provided class. + * @param createSerializer whether to create serializer, if true and codegen enabled, this will + * generate the serializer code too. + */ + void register(Class cls, Short id, boolean createSerializer); + + /** + * Register a Serializer. + * + * @param type class needed to be serialized/deserialized + * @param serializerClass serializer class can be created with {@link Serializers#newSerializer} + * @param type of class + */ + void registerSerializer(Class type, Class serializerClass); + /** * Provide a context to execution operations on {@link Fury} directly and return the executed * result. diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java b/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java index 9483ac7cb9..e99c8edd4c 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java @@ -20,11 +20,13 @@ package org.apache.fury.pool; import java.util.Queue; +import java.util.WeakHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.fury.Fury; import org.apache.fury.util.LoggerFactory; @@ -36,6 +38,7 @@ public class ClassLoaderFuryPooled { private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class); private final Function furyFactory; + private Consumer factoryCallback = f -> {}; private final ClassLoader classLoader; @@ -45,6 +48,8 @@ public class ClassLoaderFuryPooled { */ private final Queue idleCacheQueue; + final WeakHashMap allFury = new WeakHashMap<>(); + /** active cache size's number change by : 1. getLoaderBind() 2. returnObject(LoaderBinding). */ private final AtomicInteger activeCacheNumber = new AtomicInteger(0); @@ -112,6 +117,12 @@ public void returnFury(Fury fury) { private void addFury() { Fury fury = furyFactory.apply(classLoader); + factoryCallback.accept(fury); idleCacheQueue.add(fury); + allFury.put(fury, null); + } + + void setFactoryCallback(Consumer factoryCallback) { + this.factoryCallback = factoryCallback; } } diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java index 095a76ec05..f0611d81dc 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java @@ -47,7 +47,7 @@ public class FuryPooledObjectFactory { * @see Cache * @see com.google.common.cache.CacheBuilder */ - private final Cache classLoaderFuryPooledCache; + final Cache classLoaderFuryPooledCache; /** ThreadLocal: ClassLoader. */ private final ThreadLocal classLoaderLocal = diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java index 4c49c2d8c1..8fc4a677d8 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java @@ -21,18 +21,20 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.concurrent.ThreadSafe; +import org.apache.fury.AbstractThreadSafeFury; import org.apache.fury.Fury; -import org.apache.fury.ThreadSafeFury; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryUtils; import org.apache.fury.util.LoaderBinding; @ThreadSafe -public class ThreadPoolFury implements ThreadSafeFury { +public class ThreadPoolFury extends AbstractThreadSafeFury { private final FuryPooledObjectFactory furyPooledObjectFactory; + private Consumer factoryCallback = f -> {}; public ThreadPoolFury( Function furyFactory, @@ -44,6 +46,16 @@ public ThreadPoolFury( new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, expireTime, timeUnit); } + @Override + protected void processCallback(Consumer callback) { + factoryCallback = factoryCallback.andThen(callback); + for (ClassLoaderFuryPooled furyPooled : + furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) { + furyPooled.allFury.keySet().forEach(callback); + furyPooled.setFactoryCallback(factoryCallback); + } + } + public R execute(Function action) { Fury fury = null; try { diff --git a/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java b/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java index 44137a5ef9..830485616a 100644 --- a/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java +++ b/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java @@ -20,7 +20,10 @@ package org.apache.fury.util; import java.lang.ref.SoftReference; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; import java.util.WeakHashMap; import java.util.function.Consumer; import java.util.function.Function; @@ -54,6 +57,33 @@ public Fury get() { return fury; } + public void visitAllFury(Consumer consumer) { + if (furySoftMap.isEmpty()) { + for (Fury f : furyMap.values()) { + consumer.accept(f); + } + } else if (furyMap.isEmpty()) { + for (SoftReference ref : furySoftMap.values()) { + Fury f = ref.get(); + if (f != null) { + consumer.accept(f); + } + } + } else { + Set furySet = new HashSet<>(furyMap.size()); + Collections.addAll(furyMap.values()); + for (SoftReference ref : furySoftMap.values()) { + Fury f = ref.get(); + if (f != null) { + furySet.add(f); + } + } + for (Fury f : furySet) { + consumer.accept(f); + } + } + } + public ClassLoader getClassLoader() { return loader; } @@ -145,6 +175,10 @@ public void register(Class clz, int id) { bindingCallback = bindingCallback.andThen(fury -> fury.register(clz, (short) id)); } + public void setBindingCallback(Consumer bindingCallback) { + this.bindingCallback = bindingCallback; + } + public enum StagingType { /** * Don't cache fury. A new {@link Fury} will be created if classloader is switched to a new one. diff --git a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java index bd28ba9466..464ea6f5bc 100644 --- a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java @@ -35,6 +35,7 @@ import org.apache.fury.resolver.MetaContext; import org.apache.fury.serializer.Serializer; import org.apache.fury.test.bean.BeanA; +import org.apache.fury.test.bean.BeanB; import org.apache.fury.test.bean.Struct; import org.apache.fury.util.LoaderBinding.StagingType; import org.testng.Assert; @@ -72,6 +73,42 @@ public void testPoolSerialize() { assertFalse(hasException); } + @Test + public void testRegistration() throws Exception { + BeanB bean = BeanB.createBeanB(2); + ExecutorService executor = Executors.newSingleThreadExecutor(); + AtomicReference ex = new AtomicReference<>(); + { + ThreadSafeFury fury = + Fury.builder().requireClassRegistration(true).buildThreadSafeFuryPool(2, 4); + fury.register(BeanB.class); + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + executor.execute( + () -> { + try { + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + } catch (Throwable t) { + ex.set(t); + } + }); + Assert.assertNull(ex.get()); + } + { + ThreadSafeFury fury = Fury.builder().requireClassRegistration(true).buildThreadLocalFury(); + fury.register(BeanB.class); + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + executor.execute( + () -> { + try { + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + } catch (Throwable t) { + ex.set(t); + } + }); + Assert.assertNull(ex.get()); + } + } + @Test public void testSerialize() throws Exception { BeanA beanA = BeanA.createBeanA(2);