Skip to content

Commit

Permalink
fix(java): fix streaming classdef read (#1758)
Browse files Browse the repository at this point in the history
## What does this PR do?

 fix streaming classdef read

## Related issues

Closes #1757 

## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?


## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
chaokunyang authored Jul 24, 2024
1 parent 1a5c357 commit 1e2a528
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 11 deletions.
34 changes: 27 additions & 7 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public final class Fury implements BaseFury {

private final Config config;
private final boolean refTracking;
private final boolean shareMeta;
private final RefResolver refResolver;
private final ClassResolver classResolver;
private final MetaStringResolver metaStringResolver;
Expand All @@ -125,6 +126,7 @@ public final class Fury implements BaseFury {
private int copyDepth;
private final boolean copyRefTracking;
private final IdentityMap<Object, Object> originToCopyMap;
private int classDefEndOffset;

public Fury(FuryBuilder builder, ClassLoader classLoader) {
// Avoid set classLoader in `FuryBuilder`, which won't be clear when
Expand All @@ -133,6 +135,7 @@ public Fury(FuryBuilder builder, ClassLoader classLoader) {
this.language = config.getLanguage();
this.refTracking = config.trackingRef();
this.copyRefTracking = config.copyTrackingRef();
this.shareMeta = config.isMetaShareEnabled();
compressInt = config.compressInt();
longEncoding = config.longEncoding();
if (refTracking) {
Expand Down Expand Up @@ -332,7 +335,6 @@ public void resetBuffer() {

private void write(MemoryBuffer buffer, Object obj) {
int startOffset = buffer.writerIndex();
boolean shareMeta = config.isMetaShareEnabled();
if (shareMeta) {
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
}
Expand Down Expand Up @@ -781,15 +783,18 @@ public Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandB
if (isTargetXLang) {
obj = xdeserializeInternal(buffer);
} else {
if (config.isMetaShareEnabled()) {
classResolver.readClassDefs(buffer);
if (shareMeta) {
readClassDefs(buffer);
}
obj = readRef(buffer);
}
return obj;
} catch (Throwable t) {
throw ExceptionUtils.handleReadFailed(this, t);
} finally {
if (shareMeta) {
buffer.readerIndex(classDefEndOffset);
}
resetRead();
jitContext.unlock();
}
Expand Down Expand Up @@ -1097,8 +1102,8 @@ public <T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls) {
if (depth != 0) {
throwDepthDeserializationException();
}
if (config.isMetaShareEnabled()) {
classResolver.readClassDefs(buffer);
if (shareMeta) {
readClassDefs(buffer);
}
T obj;
int nextReadRefId = refResolver.tryPreserveRefId(buffer);
Expand All @@ -1111,6 +1116,9 @@ public <T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls) {
} catch (Throwable t) {
throw ExceptionUtils.handleReadFailed(this, t);
} finally {
if (shareMeta) {
buffer.readerIndex(classDefEndOffset);
}
resetRead();
jitContext.unlock();
}
Expand Down Expand Up @@ -1211,13 +1219,16 @@ public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) {
if (depth != 0) {
throwDepthDeserializationException();
}
if (config.isMetaShareEnabled()) {
classResolver.readClassDefs(buffer);
if (shareMeta) {
readClassDefs(buffer);
}
return readRef(buffer);
} catch (Throwable t) {
throw ExceptionUtils.handleReadFailed(this, t);
} finally {
if (shareMeta) {
buffer.readerIndex(classDefEndOffset);
}
resetRead();
jitContext.unlock();
}
Expand Down Expand Up @@ -1409,6 +1420,15 @@ private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer>
}
}

private void readClassDefs(MemoryBuffer buffer) {
int classDefOffset = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(classDefOffset);
classResolver.readClassDefs(buffer);
classDefEndOffset = buffer.readerIndex();
buffer.readerIndex(readerIndex);
}

public void reset() {
refResolver.reset();
classResolver.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,9 +1470,6 @@ private void writeClassDefs(
*/
public void readClassDefs(MemoryBuffer buffer) {
MetaContext metaContext = fury.getSerializationContext().getMetaContext();
int classDefOffset = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(classDefOffset);
int numClassDefs = buffer.readVarUint32Small14();
for (int i = 0; i < numClassDefs; i++) {
long id = buffer.readInt64();
Expand All @@ -1491,7 +1488,6 @@ public void readClassDefs(MemoryBuffer buffer) {
// can be created still.
metaContext.readClassInfos.add(null);
}
buffer.readerIndex(readerIndex);
}

private Tuple2<ClassDef, ClassInfo> readClassDef(MemoryBuffer buffer, long header) {
Expand Down
31 changes: 31 additions & 0 deletions java/fury-core/src/test/java/org/apache/fury/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.fury.config.CompatibleMode;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.io.FuryStreamReader;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.reflect.ReflectionUtils;
import org.apache.fury.test.bean.BeanA;
import org.testng.Assert;
import org.testng.annotations.Test;

public class StreamTest {

@Test
public void testBufferStream() {
MemoryBuffer buffer0 = MemoryBuffer.newHeapBuffer(10);
Expand Down Expand Up @@ -320,4 +326,29 @@ public void testReadableChannel() throws IOException {
}
}
}

@Test
public void testScopedMetaShare() throws IOException {
Fury fury =
Fury.builder()
.requireClassRegistration(false)
.withCompatibleMode(CompatibleMode.COMPATIBLE)
.withScopedMetaShare(true)
.build();
ByteArrayOutputStream bas = new ByteArrayOutputStream();
ArrayList<Integer> list = Lists.newArrayList(1, 2, 3);
fury.serialize(bas, list);
HashMap<String, String> map = new HashMap<>();
map.put("key", "value");
fury.serialize(bas, map);
ArrayList<Integer> list2 = Lists.newArrayList(10, 9, 7);
fury.serialize(bas, list2);
bas.flush();

InputStream bis = new ByteArrayInputStream(bas.toByteArray());
FuryInputStream stream = of(bis);
Assert.assertEquals(fury.deserialize(stream), list);
Assert.assertEquals(fury.deserialize(stream), map);
Assert.assertEquals(fury.deserialize(stream), list2);
}
}

0 comments on commit 1e2a528

Please sign in to comment.