Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Support registration in thread safe fury #1280

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.fury;
chaokunyang marked this conversation as resolved.
Show resolved Hide resolved

import java.util.function.Consumer;
import org.apache.fury.serializer.Serializer;

public abstract class AbstractThreadSafeFury implements ThreadSafeFury {
@Override
public void register(Class<?> clz) {
processCallback(fury -> fury.register(clz));
}

@Override
public void register(Class<?> cls, boolean createSerializer) {
processCallback(fury -> fury.register(cls, createSerializer));
}

@Override
public void register(Class<?> cls, Short id) {
processCallback(fury -> fury.register(cls, id));
}

@Override
public void register(Class<?> cls, Short id, boolean createSerializer) {
processCallback(fury -> fury.register(cls, id, createSerializer));
}

@Override
public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) {
processCallback(fury -> fury.registerSerializer(type, serializerClass));
}

protected abstract void processCallback(Consumer<Fury> callback);
}
8 changes: 8 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;
import org.apache.fury.serializer.Serializers;
import org.apache.fury.serializer.StringSerializer;
import org.apache.fury.type.Generics;
import org.apache.fury.type.Type;
Expand Down Expand Up @@ -178,6 +179,13 @@ public void register(Class<?> cls, String typeTag) {
classResolver.register(cls, typeTag);
}

/**
* Register a Serializer.
*
* @param type class needed to be serialized/deserialized
* @param serializerClass serializer class can be created with {@link Serializers#newSerializer}
* @param <T> type of class
*/
public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) {
classResolver.registerSerializer(type, serializerClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.fury;

import java.nio.ByteBuffer;
import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.memory.MemoryBuffer;
Expand All @@ -36,11 +38,13 @@
* @author chaokunyang
*/
@ThreadSafe
public class ThreadLocalFury implements ThreadSafeFury {
public class ThreadLocalFury extends AbstractThreadSafeFury {
private final ThreadLocal<MemoryBuffer> bufferLocal =
ThreadLocal.withInitial(() -> MemoryUtils.buffer(32));

private final ThreadLocal<LoaderBinding> bindingThreadLocal;
private Consumer<Fury> factoryCallback = f -> {};
private final WeakHashMap<Fury, Object> allFury = new WeakHashMap<>();

public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
bindingThreadLocal =
Expand All @@ -57,6 +61,14 @@ public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
Fury fury = bindingThreadLocal.get().get();
ClassResolver._addGraalvmClassRegistry(
fury.getConfig().getConfigHash(), fury.getClassResolver());
factoryCallback.accept(fury);
allFury.put(fury, null);
}

@Override
protected void processCallback(Consumer<Fury> callback) {
allFury.keySet().forEach(callback);
factoryCallback = factoryCallback.andThen(callback);
}

public <R> R execute(Function<Fury, R> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.Serializers;
import org.apache.fury.util.LoaderBinding;

/**
Expand All @@ -32,6 +34,40 @@
*/
public interface ThreadSafeFury {

/** register class. */
void register(Class<?> cls);

/**
* Register class.
*
* @param cls class to register
* @param createSerializer whether to create serializer, if true and codegen enabled, this will
* generate the serializer code too.
*/
void register(Class<?> cls, boolean createSerializer);

/** register class with given id. */
void register(Class<?> cls, Short id);

/**
* Register class with specified id.
*
* @param cls class to register
* @param id id for provided class.
* @param createSerializer whether to create serializer, if true and codegen enabled, this will
* generate the serializer code too.
*/
void register(Class<?> cls, Short id, boolean createSerializer);

/**
* Register a Serializer.
*
* @param type class needed to be serialized/deserialized
* @param serializerClass serializer class can be created with {@link Serializers#newSerializer}
* @param <T> type of class
*/
<T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass);

/**
* Provide a context to execution operations on {@link Fury} directly and return the executed
* result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.fury.pool;

import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.fury.Fury;
import org.apache.fury.util.LoggerFactory;
Expand All @@ -36,6 +38,7 @@ public class ClassLoaderFuryPooled {
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class);

private final Function<ClassLoader, Fury> furyFactory;
private Consumer<Fury> factoryCallback = f -> {};

private final ClassLoader classLoader;

Expand All @@ -45,6 +48,8 @@ public class ClassLoaderFuryPooled {
*/
private final Queue<Fury> idleCacheQueue;

final WeakHashMap<Fury, Object> allFury = new WeakHashMap<>();

/** active cache size's number change by : 1. getLoaderBind() 2. returnObject(LoaderBinding). */
private final AtomicInteger activeCacheNumber = new AtomicInteger(0);

Expand Down Expand Up @@ -112,6 +117,12 @@ public void returnFury(Fury fury) {

private void addFury() {
Fury fury = furyFactory.apply(classLoader);
factoryCallback.accept(fury);
idleCacheQueue.add(fury);
allFury.put(fury, null);
}

void setFactoryCallback(Consumer<Fury> factoryCallback) {
this.factoryCallback = factoryCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class FuryPooledObjectFactory {
* @see Cache
* @see com.google.common.cache.CacheBuilder
*/
private final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache;
final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache;

/** ThreadLocal: ClassLoader. */
private final ThreadLocal<ClassLoader> classLoaderLocal =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils;
import org.apache.fury.util.LoaderBinding;

@ThreadSafe
public class ThreadPoolFury implements ThreadSafeFury {
public class ThreadPoolFury extends AbstractThreadSafeFury {

private final FuryPooledObjectFactory furyPooledObjectFactory;
private Consumer<Fury> factoryCallback = f -> {};

public ThreadPoolFury(
Function<ClassLoader, Fury> furyFactory,
Expand All @@ -44,6 +46,16 @@ public ThreadPoolFury(
new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, expireTime, timeUnit);
}

@Override
protected void processCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
furyPooled.allFury.keySet().forEach(callback);
furyPooled.setFactoryCallback(factoryCallback);
}
}

public <R> R execute(Function<Fury, R> action) {
Fury fury = null;
try {
Expand Down
Loading