From 1e2a52843ad2540eb0bd444d970f8cfa3cc8cee3 Mon Sep 17 00:00:00 2001 From: Shawn Yang Date: Wed, 24 Jul 2024 20:02:15 +0800 Subject: [PATCH] fix(java): fix streaming classdef read (#1758) ## What does this PR do? fix streaming classdef read ## Related issues Closes #1757 ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark --- .../src/main/java/org/apache/fury/Fury.java | 34 +++++++++++++++---- .../apache/fury/resolver/ClassResolver.java | 4 --- .../test/java/org/apache/fury/StreamTest.java | 31 +++++++++++++++++ 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 87c635d77f..9f3e84e1bd 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -102,6 +102,7 @@ public final class Fury implements BaseFury { private final Config config; private final boolean refTracking; + private final boolean shareMeta; private final RefResolver refResolver; private final ClassResolver classResolver; private final MetaStringResolver metaStringResolver; @@ -125,6 +126,7 @@ public final class Fury implements BaseFury { private int copyDepth; private final boolean copyRefTracking; private final IdentityMap originToCopyMap; + private int classDefEndOffset; public Fury(FuryBuilder builder, ClassLoader classLoader) { // Avoid set classLoader in `FuryBuilder`, which won't be clear when @@ -133,6 +135,7 @@ public Fury(FuryBuilder builder, ClassLoader classLoader) { this.language = config.getLanguage(); this.refTracking = config.trackingRef(); this.copyRefTracking = config.copyTrackingRef(); + this.shareMeta = config.isMetaShareEnabled(); compressInt = config.compressInt(); longEncoding = config.longEncoding(); if (refTracking) { @@ -332,7 +335,6 @@ public void resetBuffer() { private void write(MemoryBuffer buffer, Object obj) { int startOffset = buffer.writerIndex(); - boolean shareMeta = config.isMetaShareEnabled(); if (shareMeta) { buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets. } @@ -781,8 +783,8 @@ public Object deserialize(MemoryBuffer buffer, Iterable outOfBandB if (isTargetXLang) { obj = xdeserializeInternal(buffer); } else { - if (config.isMetaShareEnabled()) { - classResolver.readClassDefs(buffer); + if (shareMeta) { + readClassDefs(buffer); } obj = readRef(buffer); } @@ -790,6 +792,9 @@ public Object deserialize(MemoryBuffer buffer, Iterable outOfBandB } catch (Throwable t) { throw ExceptionUtils.handleReadFailed(this, t); } finally { + if (shareMeta) { + buffer.readerIndex(classDefEndOffset); + } resetRead(); jitContext.unlock(); } @@ -1097,8 +1102,8 @@ public T deserializeJavaObject(MemoryBuffer buffer, Class cls) { if (depth != 0) { throwDepthDeserializationException(); } - if (config.isMetaShareEnabled()) { - classResolver.readClassDefs(buffer); + if (shareMeta) { + readClassDefs(buffer); } T obj; int nextReadRefId = refResolver.tryPreserveRefId(buffer); @@ -1111,6 +1116,9 @@ public T deserializeJavaObject(MemoryBuffer buffer, Class cls) { } catch (Throwable t) { throw ExceptionUtils.handleReadFailed(this, t); } finally { + if (shareMeta) { + buffer.readerIndex(classDefEndOffset); + } resetRead(); jitContext.unlock(); } @@ -1211,13 +1219,16 @@ public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) { if (depth != 0) { throwDepthDeserializationException(); } - if (config.isMetaShareEnabled()) { - classResolver.readClassDefs(buffer); + if (shareMeta) { + readClassDefs(buffer); } return readRef(buffer); } catch (Throwable t) { throw ExceptionUtils.handleReadFailed(this, t); } finally { + if (shareMeta) { + buffer.readerIndex(classDefEndOffset); + } resetRead(); jitContext.unlock(); } @@ -1409,6 +1420,15 @@ private void serializeToStream(OutputStream outputStream, Consumer } } + private void readClassDefs(MemoryBuffer buffer) { + int classDefOffset = buffer.readInt32(); + int readerIndex = buffer.readerIndex(); + buffer.readerIndex(classDefOffset); + classResolver.readClassDefs(buffer); + classDefEndOffset = buffer.readerIndex(); + buffer.readerIndex(readerIndex); + } + public void reset() { refResolver.reset(); classResolver.reset(); diff --git a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java index 93a3701e29..ce4dfa274d 100644 --- a/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java +++ b/java/fury-core/src/main/java/org/apache/fury/resolver/ClassResolver.java @@ -1470,9 +1470,6 @@ private void writeClassDefs( */ public void readClassDefs(MemoryBuffer buffer) { MetaContext metaContext = fury.getSerializationContext().getMetaContext(); - int classDefOffset = buffer.readInt32(); - int readerIndex = buffer.readerIndex(); - buffer.readerIndex(classDefOffset); int numClassDefs = buffer.readVarUint32Small14(); for (int i = 0; i < numClassDefs; i++) { long id = buffer.readInt64(); @@ -1491,7 +1488,6 @@ public void readClassDefs(MemoryBuffer buffer) { // can be created still. metaContext.readClassInfos.add(null); } - buffer.readerIndex(readerIndex); } private Tuple2 readClassDef(MemoryBuffer buffer, long header) { diff --git a/java/fury-core/src/test/java/org/apache/fury/StreamTest.java b/java/fury-core/src/test/java/org/apache/fury/StreamTest.java index 918e700508..ea81e605cc 100644 --- a/java/fury-core/src/test/java/org/apache/fury/StreamTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/StreamTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Lists; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -30,15 +31,20 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import org.apache.fury.config.CompatibleMode; import org.apache.fury.io.FuryInputStream; import org.apache.fury.io.FuryReadableChannel; import org.apache.fury.io.FuryStreamReader; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.reflect.ReflectionUtils; import org.apache.fury.test.bean.BeanA; +import org.testng.Assert; import org.testng.annotations.Test; public class StreamTest { + @Test public void testBufferStream() { MemoryBuffer buffer0 = MemoryBuffer.newHeapBuffer(10); @@ -320,4 +326,29 @@ public void testReadableChannel() throws IOException { } } } + + @Test + public void testScopedMetaShare() throws IOException { + Fury fury = + Fury.builder() + .requireClassRegistration(false) + .withCompatibleMode(CompatibleMode.COMPATIBLE) + .withScopedMetaShare(true) + .build(); + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + ArrayList list = Lists.newArrayList(1, 2, 3); + fury.serialize(bas, list); + HashMap map = new HashMap<>(); + map.put("key", "value"); + fury.serialize(bas, map); + ArrayList list2 = Lists.newArrayList(10, 9, 7); + fury.serialize(bas, list2); + bas.flush(); + + InputStream bis = new ByteArrayInputStream(bas.toByteArray()); + FuryInputStream stream = of(bis); + Assert.assertEquals(fury.deserialize(stream), list); + Assert.assertEquals(fury.deserialize(stream), map); + Assert.assertEquals(fury.deserialize(stream), list2); + } }