Skip to content

Commit

Permalink
fix(java): ThreadPoolFury#factoryCallback don't work when create new …
Browse files Browse the repository at this point in the history
…classLoaderFuryPooled (#1628)



## What does this PR do?

<!-- Describe the purpose of this PR. -->

ThreadPoolFury#factoryCallback don't work when create new
classLoaderFuryPooled.

## Related issues

<!--
Is there any related issue? Please attach here.

- #xxxx0
- #xxxx1
- #xxxx2
-->


## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/incubator-fury/issues/new/choose)
describing the need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?


## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
MrChang0 authored May 13, 2024
1 parent 9897574 commit f744640
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.fury.Fury;
import org.apache.fury.logging.Logger;
Expand Down Expand Up @@ -62,15 +63,20 @@ public class FuryPooledObjectFactory {
*/
private final int maxPoolSize;

/** factoryCallback will be set in every new classLoaderFuryPooled so that can deal every fury. */
private final Consumer<Fury> factoryCallback;

public FuryPooledObjectFactory(
Function<ClassLoader, Fury> furyFactory,
int minPoolSize,
int maxPoolSize,
long expireTime,
TimeUnit timeUnit) {
TimeUnit timeUnit,
Consumer<Fury> factoryCallback) {
this.minPoolSize = minPoolSize;
this.maxPoolSize = maxPoolSize;
this.furyFactory = furyFactory;
this.factoryCallback = factoryCallback;
classLoaderFuryPooledCache =
CacheBuilder.newBuilder()
.weakKeys()
Expand Down Expand Up @@ -138,6 +144,7 @@ private synchronized ClassLoaderFuryPooled getOrAddCache(ClassLoader classLoader
if (classLoaderFuryPooled == null) {
classLoaderFuryPooled =
new ClassLoaderFuryPooled(classLoader, furyFactory, minPoolSize, maxPoolSize);
classLoaderFuryPooled.setFactoryCallback(factoryCallback);
classLoaderFuryPooledCache.put(classLoader, classLoaderFuryPooled);
}
return classLoaderFuryPooled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ public ThreadPoolFury(
long expireTime,
TimeUnit timeUnit) {
this.furyPooledObjectFactory =
new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, expireTime, timeUnit);
new FuryPooledObjectFactory(
furyFactory,
minPoolSize,
maxPoolSize,
expireTime,
timeUnit,
fury -> factoryCallback.accept(fury));
}

@Override
Expand All @@ -56,7 +62,6 @@ protected void processCallback(Consumer<Fury> callback) {
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
furyPooled.allFury.keySet().forEach(callback);
furyPooled.setFactoryCallback(factoryCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Data;
import org.apache.fury.config.Language;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.resolver.MetaContext;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.test.bean.BeanA;
import org.apache.fury.test.bean.BeanB;
import org.apache.fury.test.bean.Foo;
import org.apache.fury.test.bean.Struct;
import org.apache.fury.util.LoaderBinding.StagingType;
import org.testng.Assert;
Expand Down Expand Up @@ -319,4 +321,48 @@ public void testSerializeJavaObject() {
Assert.assertEquals(fury.deserializeJavaObject(buffer, String.class), "abc");
}
}

@Data
static class Foo {
int f1;
}

public static class FooSerializer extends Serializer<Foo> {
public FooSerializer(Fury fury, Class<Foo> type) {
super(fury, type);
}

@Override
public void write(MemoryBuffer buffer, Foo value) {
buffer.writeInt32(value.f1);
}

@Override
public Foo read(MemoryBuffer buffer) {
final Foo foo = new Foo();
foo.f1 = buffer.readInt32();
return foo;
}
}

public static class CustomClassLoader extends ClassLoader {
public CustomClassLoader(ClassLoader parent) {
super(parent);
}
}

@Test
public void testSerializerRegister() {
final ThreadSafeFury threadSafeFury =
Fury.builder().requireClassRegistration(false).buildThreadSafeFuryPool(0, 2);
threadSafeFury.registerSerializer(Foo.class, FooSerializer.class);
// create a new classLoader
threadSafeFury.setClassLoader(new CustomClassLoader(ClassLoader.getSystemClassLoader()));
threadSafeFury.execute(
fury -> {
Assert.assertEquals(
fury.getClassResolver().getSerializer(Foo.class).getClass(), FooSerializer.class);
return null;
});
}
}

0 comments on commit f744640

Please sign in to comment.