Skip to content

Commit

Permalink
fix(java): fix big buffer streaming MetaShared read offset (#1760)
Browse files Browse the repository at this point in the history
## What does this PR do?

 fix big buffer streaming MetaShared read by using relative offset

## Related issues

Closes #1759 

## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?


## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
chaokunyang authored Jul 25, 2024
1 parent 7b6e9ed commit 6e4d8a0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
8 changes: 4 additions & 4 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private void write(MemoryBuffer buffer, Object obj) {
writeData(buffer, classInfo, obj);
}
if (shareMeta) {
buffer.putInt32(startOffset, buffer.writerIndex());
buffer.putInt32(startOffset, buffer.writerIndex() - startOffset - 4);
classResolver.writeClassDefs(buffer);
}
}
Expand Down Expand Up @@ -1064,7 +1064,7 @@ public void serializeJavaObject(MemoryBuffer buffer, Object obj) {
ClassInfo classInfo = classResolver.getOrUpdateClassInfo(obj.getClass());
writeData(buffer, classInfo, obj);
}
buffer.putInt32(startOffset, buffer.writerIndex());
buffer.putInt32(startOffset, buffer.writerIndex() - startOffset - 4);
classResolver.writeClassDefs(buffer);
} else {
if (!refResolver.writeRefOrNull(buffer, obj)) {
Expand Down Expand Up @@ -1421,9 +1421,9 @@ private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer>
}

private void readClassDefs(MemoryBuffer buffer) {
int classDefOffset = buffer.readInt32();
int relativeClassDefOffset = buffer.readInt32();
int readerIndex = buffer.readerIndex();
buffer.readerIndex(classDefOffset);
buffer.readerIndex(readerIndex + relativeClassDefOffset);
classResolver.readClassDefs(buffer);
classDefEndOffset = buffer.readerIndex();
buffer.readerIndex(readerIndex);
Expand Down
29 changes: 28 additions & 1 deletion java/fury-core/src/test/java/org/apache/fury/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.fury.config.CompatibleMode;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
Expand All @@ -43,7 +44,7 @@
import org.testng.Assert;
import org.testng.annotations.Test;

public class StreamTest {
public class StreamTest extends FuryTestBase {

@Test
public void testBufferStream() {
Expand Down Expand Up @@ -351,4 +352,30 @@ public void testScopedMetaShare() throws IOException {
Assert.assertEquals(fury.deserialize(stream), map);
Assert.assertEquals(fury.deserialize(stream), list2);
}

@Test
public void testBigBufferStreamingMetaShared() throws IOException {
Fury fury = builder().withCompatibleMode(CompatibleMode.COMPATIBLE).build();
ByteArrayOutputStream bas = new ByteArrayOutputStream();
List<Integer> list = new ArrayList<>();
HashMap<String, String> map = new HashMap<>();
for (int i = 0; i < 5000; i++) {
list.add(i);
map.put("key" + i, "value" + i);
}
fury.serialize(bas, list);
fury.serialize(bas, map);
fury.serialize(bas, list);
fury.serialize(bas, new long[5000]);
fury.serialize(bas, new int[5000]);
bas.flush();

InputStream bis = new ByteArrayInputStream(bas.toByteArray());
FuryInputStream stream = of(bis);
assertEquals(fury.deserialize(stream), list);
assertEquals(fury.deserialize(stream), map);
assertEquals(fury.deserialize(stream), list);
assertEquals(fury.deserialize(stream), new long[5000]);
assertEquals(fury.deserialize(stream), new int[5000]);
}
}

0 comments on commit 6e4d8a0

Please sign in to comment.