Skip to content

Commit

Permalink
feat(java): fast object copy framework in fury java (#1701)
Browse files Browse the repository at this point in the history
## What does this PR do?

Object deep copy framework in fury java. This PR is only for non-jit
serializer

- For immutable object such as `String`、`java.time`、`int`、`byte`...
return itself.
- For mutable object, create new object and set all attributes.
- For `pojo` / `bean` object, Use `ObjectSerializer.copy(obj)` to create
a new object
- For `Arrays`、`Collection`、`Map` object, create new object, traverse
the elements, call `fory.copy(obj)` to generate new elements and set
them to the new object. Some serializers have special `copy` methods.



## Related issues

[[Java] fast object copy framework in fury java
#1679](#1679)



## Does this PR introduce any user-facing change?

- [x] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?



1. Provide `fury.copy(obj)` interface, which can deep copy the object
without serialize / deserialize. For example:

```java
User user = new User();
user.setName("a");

// If object contain circular references, please enable copy ref tracking by FuryBuilder#withCopyRefTracking(true) 
Fury fury = Fury.builder().withLanguage(Language.JAVA).withCopyRefTracking(true).build();
User newUser = fury.copy(user);
```



2. Provide `FuryCopyable<T>` interface, which can customize the copy
method of object. For example:

```java
public class User implements Serializable, FuryCopyable<User> {
  
  @OverRide
  public User copy(Fury fury) {
    // do something
    System.out.println("object custom copy method");
    return newUser;
  }
}
```





## Benchmark
> Device

windows 10、12 cores、24G

> JDK

java version "1.8.0_202"
Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)

java version "17.0.8" 2023-07-18 LTS
Java(TM) SE Runtime Environment (build 17.0.8+9-LTS-211)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.8+9-LTS-211, mixed mode,
sharing)

> Test data

see [benchmark
code](https://github.com/zhaommmmomo/fury/tree/copy_benchmark/java/fury-benchmark/src/main/java/org/apache/fury/benchmark)
```java
int size = 128;
int[] intArr = new int[size];
List<Object> list = new ArrayList<>(size);
Map<Object, Object> map = new ConcurrentHashMap<>();
BeanA beanA = BeanA.createBeanA(size); // org.apache.fury.test.bean.BeanA

for (int i = 0; i < size; i++) {
  intArr[i] = i;
  list.add(i);
  map.put(i, UUID.randomUUID().toString());
}
```

> Test

**JMH**:
benchmarkMode({Mode.Throughput})、fork(1)、threads(1)、warmup(iterations =
3, time = 1)、measurement(iterations = 5, time =
5)、outputTimeUnit(TimeUnit.SECONDS)
*JDK8*


*JDK17*


**JMH**:
benchmarkMode({Mode.AverageTime})、fork(1)、threads(1)、warmup(iterations =
3, time = 1)、measurement(iterations = 5, time =
5)、outputTimeUnit(TimeUnit.MILLISECONDS)
**Each benchmark method will be looped 10,000 times**
*JDK8*


*JDK17*
  • Loading branch information
zhaommmmomo authored Jul 18, 2024
1 parent a8a140b commit 64c995d
Show file tree
Hide file tree
Showing 40 changed files with 1,400 additions and 112 deletions.
3 changes: 3 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/BaseFury.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,7 @@ public interface BaseFury {
/** This method is deprecated, please use {@link #deserialize} instead. */
@Deprecated
Object deserializeJavaObjectAndClass(FuryReadableChannel channel);

/** Deep copy the <code>obj</code>. */
<T> T copy(T obj);
}
148 changes: 148 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.fury.builder.JITContext;
import org.apache.fury.collection.IdentityMap;
import org.apache.fury.config.CompatibleMode;
import org.apache.fury.config.Config;
import org.apache.fury.config.FuryBuilder;
Expand Down Expand Up @@ -57,6 +60,8 @@
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;
import org.apache.fury.serializer.StringSerializer;
import org.apache.fury.serializer.collection.CollectionSerializers.ArrayListSerializer;
import org.apache.fury.serializer.collection.MapSerializers.HashMapSerializer;
import org.apache.fury.type.Generics;
import org.apache.fury.type.Type;
import org.apache.fury.util.ExceptionUtils;
Expand Down Expand Up @@ -106,6 +111,8 @@ public final class Fury implements BaseFury {
private MemoryBuffer buffer;
private final List<Object> nativeObjects;
private final StringSerializer stringSerializer;
private final ArrayListSerializer arrayListSerializer;
private final HashMapSerializer hashMapSerializer;
private final Language language;
private final boolean compressInt;
private final LongEncoding longEncoding;
Expand All @@ -115,13 +122,17 @@ public final class Fury implements BaseFury {
private Iterator<MemoryBuffer> outOfBandBuffers;
private boolean peerOutOfBandEnabled;
private int depth;
private int copyDepth;
private final boolean copyRefTracking;
private final IdentityMap<Object, Object> originToCopyMap;

public Fury(FuryBuilder builder, ClassLoader classLoader) {
// Avoid set classLoader in `FuryBuilder`, which won't be clear when
// `org.apache.fury.ThreadSafeFury.clearClassLoader` is called.
config = new Config(builder);
this.language = config.getLanguage();
this.refTracking = config.trackingRef();
this.copyRefTracking = config.copyTrackingRef();
compressInt = config.compressInt();
longEncoding = config.longEncoding();
if (refTracking) {
Expand All @@ -138,6 +149,9 @@ public Fury(FuryBuilder builder, ClassLoader classLoader) {
nativeObjects = new ArrayList<>();
generics = new Generics(this);
stringSerializer = new StringSerializer(this);
arrayListSerializer = new ArrayListSerializer(this);
hashMapSerializer = new HashMapSerializer(this);
originToCopyMap = new IdentityMap<>();
LOG.info("Created new fury {}", this);
}

Expand Down Expand Up @@ -288,6 +302,19 @@ private StackOverflowError processStackOverflowError(StackOverflowError e) {
throw e;
}

private StackOverflowError processCopyStackOverflowError(StackOverflowError e) {
if (!copyRefTracking) {
String msg =
"Object may contain circular references, please enable ref tracking "
+ "by `FuryBuilder#withCopyRefTracking(true)`";
StackOverflowError t1 = ExceptionUtils.trySetStackOverflowErrorMessage(e, msg);
if (t1 != null) {
return t1;
}
}
throw e;
}

public MemoryBuffer getBuffer() {
MemoryBuffer buf = buffer;
if (buf == null) {
Expand Down Expand Up @@ -1220,6 +1247,113 @@ public Object deserializeJavaObjectAndClass(FuryReadableChannel channel) {
return deserializeJavaObjectAndClass(buf);
}

@Override
public <T> T copy(T obj) {
try {
return copyObject(obj);
} catch (StackOverflowError e) {
throw processCopyStackOverflowError(e);
} finally {
if (copyRefTracking) {
resetCopy();
}
}
}

/**
* Copy object. This method provides a fast copy of common types.
*
* @param obj object to copy
* @return copied object
*/
public <T> T copyObject(T obj) {
if (obj == null) {
return null;
}
Object copy;
ClassInfo classInfo = classResolver.getOrUpdateClassInfo(obj.getClass());
switch (classInfo.getClassId()) {
case ClassResolver.BOOLEAN_CLASS_ID:
case ClassResolver.BYTE_CLASS_ID:
case ClassResolver.CHAR_CLASS_ID:
case ClassResolver.SHORT_CLASS_ID:
case ClassResolver.INTEGER_CLASS_ID:
case ClassResolver.FLOAT_CLASS_ID:
case ClassResolver.LONG_CLASS_ID:
case ClassResolver.DOUBLE_CLASS_ID:
case ClassResolver.PRIMITIVE_BOOLEAN_CLASS_ID:
case ClassResolver.PRIMITIVE_BYTE_CLASS_ID:
case ClassResolver.PRIMITIVE_CHAR_CLASS_ID:
case ClassResolver.PRIMITIVE_SHORT_CLASS_ID:
case ClassResolver.PRIMITIVE_INT_CLASS_ID:
case ClassResolver.PRIMITIVE_FLOAT_CLASS_ID:
case ClassResolver.PRIMITIVE_LONG_CLASS_ID:
case ClassResolver.PRIMITIVE_DOUBLE_CLASS_ID:
case ClassResolver.STRING_CLASS_ID:
return obj;
case ClassResolver.PRIMITIVE_BOOLEAN_ARRAY_CLASS_ID:
boolean[] boolArr = (boolean[]) obj;
return (T) Arrays.copyOf(boolArr, boolArr.length);
case ClassResolver.PRIMITIVE_BYTE_ARRAY_CLASS_ID:
byte[] byteArr = (byte[]) obj;
return (T) Arrays.copyOf(byteArr, byteArr.length);
case ClassResolver.PRIMITIVE_CHAR_ARRAY_CLASS_ID:
char[] charArr = (char[]) obj;
return (T) Arrays.copyOf(charArr, charArr.length);
case ClassResolver.PRIMITIVE_SHORT_ARRAY_CLASS_ID:
short[] shortArr = (short[]) obj;
return (T) Arrays.copyOf(shortArr, shortArr.length);
case ClassResolver.PRIMITIVE_INT_ARRAY_CLASS_ID:
int[] intArr = (int[]) obj;
return (T) Arrays.copyOf(intArr, intArr.length);
case ClassResolver.PRIMITIVE_FLOAT_ARRAY_CLASS_ID:
float[] floatArr = (float[]) obj;
return (T) Arrays.copyOf(floatArr, floatArr.length);
case ClassResolver.PRIMITIVE_LONG_ARRAY_CLASS_ID:
long[] longArr = (long[]) obj;
return (T) Arrays.copyOf(longArr, longArr.length);
case ClassResolver.PRIMITIVE_DOUBLE_ARRAY_CLASS_ID:
double[] doubleArr = (double[]) obj;
return (T) Arrays.copyOf(doubleArr, doubleArr.length);
case ClassResolver.STRING_ARRAY_CLASS_ID:
String[] stringArr = (String[]) obj;
return (T) Arrays.copyOf(stringArr, stringArr.length);
case ClassResolver.ARRAYLIST_CLASS_ID:
copyDepth++;
copy = arrayListSerializer.copy((ArrayList) obj);
break;
case ClassResolver.HASHMAP_CLASS_ID:
copyDepth++;
copy = hashMapSerializer.copy((HashMap) obj);
break;
// todo: add fastpath for other types.
default:
copyDepth++;
copy = classInfo.getSerializer().copy(obj);
}
copyDepth--;
return (T) copy;
}

/**
* Track ref for copy.
*
* <p>Call this method immediately after composited object such as object
* array/map/collection/bean is created so that circular reference can be copy correctly.
*
* @param o1 object before copying
* @param o2 the copied object
*/
public <T> void reference(T o1, T o2) {
if (o1 != null) {
originToCopyMap.put(o1, o2);
}
}

public Object getCopyObject(Object originObj) {
return originToCopyMap.get(originObj);
}

private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer> function) {
MemoryBuffer buf = getBuffer();
if (outputStream.getClass() == ByteArrayOutputStream.class) {
Expand Down Expand Up @@ -1257,6 +1391,7 @@ public void reset() {
peerOutOfBandEnabled = false;
bufferCallback = null;
depth = 0;
resetCopy();
}

public void resetWrite() {
Expand All @@ -1279,6 +1414,11 @@ public void resetRead() {
depth = 0;
}

public void resetCopy() {
originToCopyMap.clear();
copyDepth = 0;
}

private void throwDepthSerializationException() {
String method = "Fury#" + (language != Language.JAVA ? "x" : "") + "writeXXX";
throw new IllegalStateException(
Expand Down Expand Up @@ -1339,6 +1479,10 @@ public void incDepth(int diff) {
this.depth += diff;
}

public void incCopyDepth(int diff) {
this.copyDepth += diff;
}

// Invoked by jit
public StringSerializer getStringSerializer() {
return stringSerializer;
Expand All @@ -1356,6 +1500,10 @@ public boolean trackingRef() {
return refTracking;
}

public boolean copyTrackingRef() {
return copyRefTracking;
}

public boolean isStringRefIgnored() {
return config.isStringRefIgnored();
}
Expand Down
29 changes: 29 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/FuryCopyable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.fury;

/**
* Fury copy interface. Customize the copy method of the class
*
* @param <T> custom copy interface object
*/
public interface FuryCopyable<T> {
T copy(Fury fury);
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ public Object deserializeJavaObjectAndClass(FuryReadableChannel channel) {
return bindingThreadLocal.get().get().deserializeJavaObjectAndClass(channel);
}

@Override
public <T> T copy(T obj) {
return bindingThreadLocal.get().get().copy(obj);
}

@Override
public void setClassLoader(ClassLoader classLoader) {
setClassLoader(classLoader, StagingType.STRONG_STAGING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.meta.ClassDef;
import org.apache.fury.reflect.ReflectionUtils;
import org.apache.fury.serializer.AbstractObjectSerializer;
import org.apache.fury.serializer.CompatibleSerializerBase;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.util.Preconditions;
Expand All @@ -39,7 +40,7 @@
public interface Generated {

/** Base class for all generated serializers. */
abstract class GeneratedSerializer extends Serializer implements Generated {
abstract class GeneratedSerializer extends AbstractObjectSerializer implements Generated {
public GeneratedSerializer(Fury fury, Class<?> cls) {
super(fury, cls);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class Config implements Serializable {
private final boolean basicTypesRefIgnored;
private final boolean stringRefIgnored;
private final boolean timeRefIgnored;
private final boolean copyTrackingRef;
private final boolean codeGenEnabled;
private final boolean checkClassVersion;
private final CompatibleMode compatibleMode;
Expand Down Expand Up @@ -65,6 +66,7 @@ public Config(FuryBuilder builder) {
basicTypesRefIgnored = !trackingRef || builder.basicTypesRefIgnored;
stringRefIgnored = !trackingRef || builder.stringRefIgnored;
timeRefIgnored = !trackingRef || builder.timeRefIgnored;
copyTrackingRef = builder.copyTrackingRef;
compressString = builder.compressString;
compressInt = builder.compressInt;
longEncoding = builder.longEncoding;
Expand Down Expand Up @@ -99,6 +101,10 @@ public boolean trackingRef() {
return trackingRef;
}

public boolean copyTrackingRef() {
return copyTrackingRef;
}

public boolean isBasicTypesRefIgnored() {
return basicTypesRefIgnored;
}
Expand Down Expand Up @@ -247,6 +253,7 @@ public boolean equals(Object o) {
&& basicTypesRefIgnored == config.basicTypesRefIgnored
&& stringRefIgnored == config.stringRefIgnored
&& timeRefIgnored == config.timeRefIgnored
&& copyTrackingRef == config.copyTrackingRef
&& codeGenEnabled == config.codeGenEnabled
&& checkClassVersion == config.checkClassVersion
&& checkJdkClassSerializable == config.checkJdkClassSerializable
Expand Down Expand Up @@ -276,6 +283,7 @@ public int hashCode() {
basicTypesRefIgnored,
stringRefIgnored,
timeRefIgnored,
copyTrackingRef,
codeGenEnabled,
checkClassVersion,
compatibleMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class FuryBuilder {
boolean checkClassVersion = false;
Language language = Language.JAVA;
boolean trackingRef = false;
boolean copyTrackingRef = false;
boolean basicTypesRefIgnored = true;
boolean stringRefIgnored = true;
boolean timeRefIgnored = true;
Expand Down Expand Up @@ -98,6 +99,12 @@ public FuryBuilder withRefTracking(boolean trackingRef) {
return this;
}

/** Whether track {@link Fury#copy(Object)} circular references. */
public FuryBuilder withCopyRefTracking(boolean copyTrackingRef) {
this.copyTrackingRef = copyTrackingRef;
return this;
}

/** Whether ignore basic types shared reference. */
public FuryBuilder ignoreBasicTypesRef(boolean ignoreBasicTypesRef) {
this.basicTypesRefIgnored = ignoreBasicTypesRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public Object deserializeJavaObjectAndClass(FuryReadableChannel channel) {
return execute(fury -> fury.deserializeJavaObjectAndClass(channel));
}

@Override
public <T> T copy(T obj) {
return execute(fury -> fury.copy(obj));
}

@Override
public void setClassLoader(ClassLoader classLoader) {
setClassLoader(classLoader, LoaderBinding.StagingType.SOFT_STAGING);
Expand Down
Loading

0 comments on commit 64c995d

Please sign in to comment.