Skip to content

Commit

Permalink
feat(java): native streaming mode deserialization (#1451)
Browse files Browse the repository at this point in the history
## What this PR do?
This PR implements streaming mode deserialization, which make the
protocol between `Fury#serialize(buffer, o)` and `Fury#serialize(stream,
o)` consistent, and make the deserialization support real streaming mode
to reduce latency, and facilitate the pipeline between deserialization
and transfer.

The changes inlucde:
- Introduce a `FuryStreamReader` to allow buffered streaming reading.
- Make `MemoryBuffer` support streaming reading without introduce cost
on critical path.
- Implements `InputStream` based `FuryStreamReader` for Fury.

`java.nio.Channel` based zero-copy is not implemented in this PR.

## Related issues
Closes #1435

## Media Benchmark
with this PR:
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT         false  thrpt   50  2867721.514 ± 55102.830  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT          true  thrpt   50  1841586.657 ± 46192.190  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT         false  thrpt   50  2542987.512 ± 49835.129  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT          true  thrpt   50  1712919.362 ± 26500.068  ops/s
```
Before this PR: 4c60eb9
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT         false  thrpt   50  2971081.559 ± 52348.975  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT          true  thrpt   50  1938252.031 ± 25077.333  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT         false  thrpt   50  2511729.964 ± 39278.437  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT          true  thrpt   50  1714610.534 ± 29778.339  ops/s
```

Baseline(7bb21d0):
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT         false  thrpt   50  2928012.663 ± 65849.325  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT          true  thrpt   50  1937231.421 ± 24616.628  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT         false  thrpt   50  2539983.720 ± 43379.751  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT          true  thrpt   50  1711973.100 ± 19195.431  ops/s
```

## Struct Benchmark
This PR
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                false  thrpt   50  2603998.829 ± 35062.683  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                 true  thrpt   50  3438343.670 ± 48911.824  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                false  thrpt   50  2631586.246 ± 34760.277  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                 true  thrpt   50  3425883.355 ± 40436.642  ops/s
```

Before This PR:
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                false  thrpt   50  2589559.872 ± 28062.176  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                 true  thrpt   50  3468978.381 ± 37684.130  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                false  thrpt   50  2640599.313 ± 36759.090  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                 true  thrpt   50  3451992.441 ± 51964.331  ops/s
```

Baseline:
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                false  thrpt   50  1576536.898 ± 54571.741  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                 true  thrpt   50  3194178.195 ± 60680.268  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                false  thrpt   50  1641797.023 ± 16222.155  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                 true  thrpt   50  3163612.265 ± 49756.995  ops/s
```
  • Loading branch information
chaokunyang authored Apr 8, 2024
1 parent 5c5b1d4 commit fd424b8
Show file tree
Hide file tree
Showing 49 changed files with 1,349 additions and 744 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Object bytesCopyUnaligned(MemoryState state) {
// @Benchmark
public Object charsCopyAligned(MemoryState state) {
state.heapBuffer.writerIndex(0);
state.heapBuffer.writePrimitiveArrayWithSizeEmbedded(
state.heapBuffer.writePrimitiveArrayWithSize(
state.chars, Platform.CHAR_ARRAY_OFFSET, state.chars.length * 2);
return state.heapBuffer;
}
Expand All @@ -110,15 +110,15 @@ public Object charsCopyAligned(MemoryState state) {
public Object charsCopyUnaligned(MemoryState state) {
state.heapBuffer.writerIndex(0);
state.heapBuffer.writeBoolean(false);
state.heapBuffer.writePrimitiveArrayWithSizeEmbedded(
state.heapBuffer.writePrimitiveArrayWithSize(
state.chars, Platform.CHAR_ARRAY_OFFSET, state.chars.length * 2);
return state.heapBuffer;
}

// @Benchmark
public Object longsCopyAligned(MemoryState state) {
state.heapBuffer.writerIndex(0);
state.heapBuffer.writePrimitiveArrayWithSizeEmbedded(
state.heapBuffer.writePrimitiveArrayWithSize(
state.longs, Platform.LONG_ARRAY_OFFSET, state.longs.length * 8);
return state.heapBuffer;
}
Expand All @@ -127,7 +127,7 @@ public Object longsCopyAligned(MemoryState state) {
public Object longsCopyUnaligned(MemoryState state) {
state.heapBuffer.writerIndex(0);
state.heapBuffer.writeBoolean(false);
state.heapBuffer.writePrimitiveArrayWithSizeEmbedded(
state.heapBuffer.writePrimitiveArrayWithSize(
state.longs, Platform.LONG_ARRAY_OFFSET, state.longs.length * 8);
return state.heapBuffer;
}
Expand Down
10 changes: 5 additions & 5 deletions java/fury-core/src/main/java/org/apache/fury/BaseFury.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.fury;

import java.io.InputStream;
import java.io.OutputStream;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.serializer.BufferCallback;
import org.apache.fury.serializer.Serializer;
Expand Down Expand Up @@ -110,9 +110,9 @@ public interface BaseFury {

Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandBuffers);

Object deserialize(InputStream inputStream);
Object deserialize(FuryInputStream inputStream);

Object deserialize(InputStream inputStream, Iterable<MemoryBuffer> outOfBandBuffers);
Object deserialize(FuryInputStream inputStream, Iterable<MemoryBuffer> outOfBandBuffers);

/**
* Serialize java object without class info, deserialization should use {@link
Expand Down Expand Up @@ -140,7 +140,7 @@ public interface BaseFury {
*/
<T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls);

<T> T deserializeJavaObject(InputStream inputStream, Class<T> cls);
<T> T deserializeJavaObject(FuryInputStream inputStream, Class<T> cls);

byte[] serializeJavaObjectAndClass(Object obj);

Expand All @@ -152,5 +152,5 @@ public interface BaseFury {

Object deserializeJavaObjectAndClass(MemoryBuffer buffer);

Object deserializeJavaObjectAndClass(InputStream inputStream);
Object deserializeJavaObjectAndClass(FuryInputStream inputStream);
}
Loading

0 comments on commit fd424b8

Please sign in to comment.