Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(java): fix streaming classdef read #1758

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public final class Fury implements BaseFury {

private final Config config;
private final boolean refTracking;
private final boolean shareMeta;
private final RefResolver refResolver;
private final ClassResolver classResolver;
private final MetaStringResolver metaStringResolver;
Expand All @@ -125,6 +126,7 @@ public final class Fury implements BaseFury {
private int copyDepth;
private final boolean copyRefTracking;
private final IdentityMap<Object, Object> originToCopyMap;
private int classDefEndOffset;

public Fury(FuryBuilder builder, ClassLoader classLoader) {
// Avoid set classLoader in `FuryBuilder`, which won't be clear when
Expand All @@ -133,6 +135,7 @@ public Fury(FuryBuilder builder, ClassLoader classLoader) {
this.language = config.getLanguage();
this.refTracking = config.trackingRef();
this.copyRefTracking = config.copyTrackingRef();
this.shareMeta = config.isMetaShareEnabled();
compressInt = config.compressInt();
longEncoding = config.longEncoding();
if (refTracking) {
Expand Down Expand Up @@ -332,7 +335,6 @@ public void resetBuffer() {

private void write(MemoryBuffer buffer, Object obj) {
int startOffset = buffer.writerIndex();
boolean shareMeta = config.isMetaShareEnabled();
if (shareMeta) {
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
}
Expand Down Expand Up @@ -781,15 +783,18 @@ public Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandB
if (isTargetXLang) {
obj = xdeserializeInternal(buffer);
} else {
if (config.isMetaShareEnabled()) {
classResolver.readClassDefs(buffer);
if (shareMeta) {
readClassDefs(buffer);
}
obj = readRef(buffer);
}
return obj;
} catch (Throwable t) {
throw ExceptionUtils.handleReadFailed(this, t);
} finally {
if (shareMeta) {
buffer.readerIndex(classDefEndOffset);
}
resetRead();
jitContext.unlock();
}
Expand Down Expand Up @@ -1097,8 +1102,8 @@ public <T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls) {
if (depth != 0) {
throwDepthDeserializationException();
}
if (config.isMetaShareEnabled()) {
classResolver.readClassDefs(buffer);
if (shareMeta) {
readClassDefs(buffer);
}
T obj;
int nextReadRefId = refResolver.tryPreserveRefId(buffer);
Expand All @@ -1111,6 +1116,9 @@ public <T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls) {
} catch (Throwable t) {
throw ExceptionUtils.handleReadFailed(this, t);
} finally {
if (shareMeta) {
buffer.readerIndex(classDefEndOffset);
}
resetRead();
jitContext.unlock();
}
Expand Down Expand Up @@ -1211,13 +1219,16 @@ public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) {
if (depth != 0) {
throwDepthDeserializationException();
}
if (config.isMetaShareEnabled()) {
classResolver.readClassDefs(buffer);
if (shareMeta) {
readClassDefs(buffer);
}
return readRef(buffer);
} catch (Throwable t) {
throw ExceptionUtils.handleReadFailed(this, t);
} finally {
if (shareMeta) {
buffer.readerIndex(classDefEndOffset);
}
resetRead();
jitContext.unlock();
}
Expand Down Expand Up @@ -1409,6 +1420,15 @@ private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer>
}
}

private void readClassDefs(MemoryBuffer buffer) {
int classDefOffset = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(classDefOffset);
classResolver.readClassDefs(buffer);
classDefEndOffset = buffer.readerIndex();
buffer.readerIndex(readerIndex);
}

public void reset() {
refResolver.reset();
classResolver.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,9 +1470,6 @@ private void writeClassDefs(
*/
public void readClassDefs(MemoryBuffer buffer) {
MetaContext metaContext = fury.getSerializationContext().getMetaContext();
int classDefOffset = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(classDefOffset);
int numClassDefs = buffer.readVarUint32Small14();
for (int i = 0; i < numClassDefs; i++) {
long id = buffer.readInt64();
Expand All @@ -1491,7 +1488,6 @@ public void readClassDefs(MemoryBuffer buffer) {
// can be created still.
metaContext.readClassInfos.add(null);
}
buffer.readerIndex(readerIndex);
}

private Tuple2<ClassDef, ClassInfo> readClassDef(MemoryBuffer buffer, long header) {
Expand Down
31 changes: 31 additions & 0 deletions java/fury-core/src/test/java/org/apache/fury/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.fury.config.CompatibleMode;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.io.FuryStreamReader;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.reflect.ReflectionUtils;
import org.apache.fury.test.bean.BeanA;
import org.testng.Assert;
import org.testng.annotations.Test;

public class StreamTest {

@Test
public void testBufferStream() {
MemoryBuffer buffer0 = MemoryBuffer.newHeapBuffer(10);
Expand Down Expand Up @@ -320,4 +326,29 @@ public void testReadableChannel() throws IOException {
}
}
}

@Test
public void testScopedMetaShare() throws IOException {
Fury fury =
Fury.builder()
.requireClassRegistration(false)
.withCompatibleMode(CompatibleMode.COMPATIBLE)
.withScopedMetaShare(true)
.build();
ByteArrayOutputStream bas = new ByteArrayOutputStream();
ArrayList<Integer> list = Lists.newArrayList(1, 2, 3);
fury.serialize(bas, list);
HashMap<String, String> map = new HashMap<>();
map.put("key", "value");
fury.serialize(bas, map);
ArrayList<Integer> list2 = Lists.newArrayList(10, 9, 7);
fury.serialize(bas, list2);
bas.flush();

InputStream bis = new ByteArrayInputStream(bas.toByteArray());
FuryInputStream stream = of(bis);
Assert.assertEquals(fury.deserialize(stream), list);
Assert.assertEquals(fury.deserialize(stream), map);
Assert.assertEquals(fury.deserialize(stream), list2);
}
}
Loading