Skip to content

Commit

Permalink
[Java] Support registration in thread safe fury (#1280)
Browse files Browse the repository at this point in the history
Closes #1279
  • Loading branch information
chaokunyang authored Dec 29, 2023
1 parent 03f0ff4 commit bdada2d
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.fury;

import java.util.function.Consumer;
import org.apache.fury.serializer.Serializer;

public abstract class AbstractThreadSafeFury implements ThreadSafeFury {
@Override
public void register(Class<?> clz) {
processCallback(fury -> fury.register(clz));
}

@Override
public void register(Class<?> cls, boolean createSerializer) {
processCallback(fury -> fury.register(cls, createSerializer));
}

@Override
public void register(Class<?> cls, Short id) {
processCallback(fury -> fury.register(cls, id));
}

@Override
public void register(Class<?> cls, Short id, boolean createSerializer) {
processCallback(fury -> fury.register(cls, id, createSerializer));
}

@Override
public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) {
processCallback(fury -> fury.registerSerializer(type, serializerClass));
}

protected abstract void processCallback(Consumer<Fury> callback);
}
8 changes: 8 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;
import org.apache.fury.serializer.Serializers;
import org.apache.fury.serializer.StringSerializer;
import org.apache.fury.type.Generics;
import org.apache.fury.type.Type;
Expand Down Expand Up @@ -176,6 +177,13 @@ public void register(Class<?> cls, String typeTag) {
classResolver.register(cls, typeTag);
}

/**
* Register a Serializer.
*
* @param type class needed to be serialized/deserialized
* @param serializerClass serializer class can be created with {@link Serializers#newSerializer}
* @param <T> type of class
*/
public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) {
classResolver.registerSerializer(type, serializerClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.fury;

import java.nio.ByteBuffer;
import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.memory.MemoryBuffer;
Expand All @@ -34,18 +36,24 @@
* will be created and destroyed frequently, which is slow.
*/
@ThreadSafe
public class ThreadLocalFury implements ThreadSafeFury {
public class ThreadLocalFury extends AbstractThreadSafeFury {
private final ThreadLocal<MemoryBuffer> bufferLocal =
ThreadLocal.withInitial(() -> MemoryUtils.buffer(32));

private final ThreadLocal<LoaderBinding> bindingThreadLocal;
private Consumer<Fury> factoryCallback;
private final WeakHashMap<LoaderBinding, Object> allFury;

public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
factoryCallback = f -> {};
allFury = new WeakHashMap<>();
bindingThreadLocal =
ThreadLocal.withInitial(
() -> {
LoaderBinding binding = new LoaderBinding(furyFactory);
binding.setBindingCallback(factoryCallback);
binding.setClassLoader(Thread.currentThread().getContextClassLoader());
allFury.put(binding, null);
return binding;
});
// 1. init and warm for current thread.
Expand All @@ -57,6 +65,15 @@ public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
fury.getConfig().getConfigHash(), fury.getClassResolver());
}

@Override
protected void processCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (LoaderBinding binding : allFury.keySet()) {
binding.visitAllFury(callback);
binding.setBindingCallback(factoryCallback);
}
}

public <R> R execute(Function<Fury, R> action) {
Fury fury = bindingThreadLocal.get().get();
return action.apply(fury);
Expand Down
36 changes: 36 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.Serializers;
import org.apache.fury.util.LoaderBinding;

/**
Expand All @@ -30,6 +32,40 @@
*/
public interface ThreadSafeFury {

/** register class. */
void register(Class<?> cls);

/**
* Register class.
*
* @param cls class to register
* @param createSerializer whether to create serializer, if true and codegen enabled, this will
* generate the serializer code too.
*/
void register(Class<?> cls, boolean createSerializer);

/** register class with given id. */
void register(Class<?> cls, Short id);

/**
* Register class with specified id.
*
* @param cls class to register
* @param id id for provided class.
* @param createSerializer whether to create serializer, if true and codegen enabled, this will
* generate the serializer code too.
*/
void register(Class<?> cls, Short id, boolean createSerializer);

/**
* Register a Serializer.
*
* @param type class needed to be serialized/deserialized
* @param serializerClass serializer class can be created with {@link Serializers#newSerializer}
* @param <T> type of class
*/
<T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass);

/**
* Provide a context to execution operations on {@link Fury} directly and return the executed
* result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.fury.pool;

import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.fury.Fury;
import org.apache.fury.util.LoggerFactory;
Expand All @@ -36,6 +38,7 @@ public class ClassLoaderFuryPooled {
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class);

private final Function<ClassLoader, Fury> furyFactory;
private Consumer<Fury> factoryCallback = f -> {};

private final ClassLoader classLoader;

Expand All @@ -45,6 +48,8 @@ public class ClassLoaderFuryPooled {
*/
private final Queue<Fury> idleCacheQueue;

final WeakHashMap<Fury, Object> allFury = new WeakHashMap<>();

/** active cache size's number change by : 1. getLoaderBind() 2. returnObject(LoaderBinding). */
private final AtomicInteger activeCacheNumber = new AtomicInteger(0);

Expand Down Expand Up @@ -112,6 +117,12 @@ public void returnFury(Fury fury) {

private void addFury() {
Fury fury = furyFactory.apply(classLoader);
factoryCallback.accept(fury);
idleCacheQueue.add(fury);
allFury.put(fury, null);
}

void setFactoryCallback(Consumer<Fury> factoryCallback) {
this.factoryCallback = factoryCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class FuryPooledObjectFactory {
* @see Cache
* @see com.google.common.cache.CacheBuilder
*/
private final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache;
final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache;

/** ThreadLocal: ClassLoader. */
private final ThreadLocal<ClassLoader> classLoaderLocal =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils;
import org.apache.fury.util.LoaderBinding;

@ThreadSafe
public class ThreadPoolFury implements ThreadSafeFury {
public class ThreadPoolFury extends AbstractThreadSafeFury {

private final FuryPooledObjectFactory furyPooledObjectFactory;
private Consumer<Fury> factoryCallback = f -> {};

public ThreadPoolFury(
Function<ClassLoader, Fury> furyFactory,
Expand All @@ -44,6 +46,16 @@ public ThreadPoolFury(
new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, expireTime, timeUnit);
}

@Override
protected void processCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
furyPooled.allFury.keySet().forEach(callback);
furyPooled.setFactoryCallback(factoryCallback);
}
}

public <R> R execute(Function<Fury, R> action) {
Fury fury = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.fury.util;

import java.lang.ref.SoftReference;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -52,6 +55,33 @@ public Fury get() {
return fury;
}

public void visitAllFury(Consumer<Fury> consumer) {
if (furySoftMap.isEmpty()) {
for (Fury f : furyMap.values()) {
consumer.accept(f);
}
} else if (furyMap.isEmpty()) {
for (SoftReference<Fury> ref : furySoftMap.values()) {
Fury f = ref.get();
if (f != null) {
consumer.accept(f);
}
}
} else {
Set<Fury> furySet = new HashSet<>(furyMap.size());
Collections.addAll(furyMap.values());
for (SoftReference<Fury> ref : furySoftMap.values()) {
Fury f = ref.get();
if (f != null) {
furySet.add(f);
}
}
for (Fury f : furySet) {
consumer.accept(f);
}
}
}

public ClassLoader getClassLoader() {
return loader;
}
Expand Down Expand Up @@ -143,6 +173,10 @@ public void register(Class<?> clz, int id) {
bindingCallback = bindingCallback.andThen(fury -> fury.register(clz, (short) id));
}

public void setBindingCallback(Consumer<Fury> bindingCallback) {
this.bindingCallback = bindingCallback;
}

public enum StagingType {
/**
* Don't cache fury. A new {@link Fury} will be created if classloader is switched to a new one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.fury.resolver.MetaContext;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.test.bean.BeanA;
import org.apache.fury.test.bean.BeanB;
import org.apache.fury.test.bean.Struct;
import org.apache.fury.util.LoaderBinding.StagingType;
import org.testng.Assert;
Expand Down Expand Up @@ -72,6 +73,42 @@ public void testPoolSerialize() {
assertFalse(hasException);
}

@Test
public void testRegistration() throws Exception {
BeanB bean = BeanB.createBeanB(2);
ExecutorService executor = Executors.newSingleThreadExecutor();
AtomicReference<Throwable> ex = new AtomicReference<>();
{
ThreadSafeFury fury =
Fury.builder().requireClassRegistration(true).buildThreadSafeFuryPool(2, 4);
fury.register(BeanB.class);
Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean);
executor.execute(
() -> {
try {
Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean);
} catch (Throwable t) {
ex.set(t);
}
});
Assert.assertNull(ex.get());
}
{
ThreadSafeFury fury = Fury.builder().requireClassRegistration(true).buildThreadLocalFury();
fury.register(BeanB.class);
Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean);
executor.execute(
() -> {
try {
Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean);
} catch (Throwable t) {
ex.set(t);
}
});
Assert.assertNull(ex.get());
}
}

@Test
public void testSerialize() throws Exception {
BeanA beanA = BeanA.createBeanA(2);
Expand Down

0 comments on commit bdada2d

Please sign in to comment.