Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fury fails when deserializing from InputStream #1435

Closed
2 tasks done
Munoon opened this issue Mar 30, 2024 · 8 comments · Fixed by #1451
Closed
2 tasks done

Fury fails when deserializing from InputStream #1435

Munoon opened this issue Mar 30, 2024 · 8 comments · Fixed by #1451
Labels
bug Something isn't working

Comments

@Munoon
Copy link
Contributor

Munoon commented Mar 30, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

Version

0.5.0-SNAPSHOT, 0.4.1

Component(s)

Java

Minimal reproduce step

import org.apache.fury.Fury;
import org.apache.fury.config.Language;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Arrays;

public class Reproducer {
    public static void main(String[] args) {
        Fury fury = Fury.builder()
                .withLanguage(Language.JAVA)
                .requireClassRegistration(false)
                .withMetaContextShare(false)
                .build();

        byte[] data = fury.serializeJavaObject(new TestRecord("Some string", 123));

        try {
            TestRecord result = fury.deserializeJavaObject(Arrays.copyOf(data, data.length), TestRecord.class);
            System.out.println("Byte array result: " + result);
        } catch (Exception e) {
            System.out.println("Byte array result: " + e.getClass().getSimpleName());
            e.printStackTrace();
        }

        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(Arrays.copyOf(data, data.length));
        try {
            TestRecord result = fury.deserializeJavaObject(byteArrayInputStream, TestRecord.class);
            System.out.println("ByteArrayInputStream result: " + result);
        } catch (Exception e) {
            System.out.println("ByteArrayInputStream result: " + e.getClass().getSimpleName());
            e.printStackTrace();
        }

        // hack: Fury have an optimisation for ByteArrayInputStream, which we want to avoid
        InputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(Arrays.copyOf(data, data.length)));
        try {
            TestRecord result = fury.deserializeJavaObject(inputStream, TestRecord.class);
            System.out.println("InputStream result: " + result);
        } catch (Exception e) {
            System.out.println("InputStream result: " + e.getClass().getSimpleName());
            e.printStackTrace();
        }
    }

    public static class TestRecord {
        public final String a;
        public final int b;

        public TestRecord(String a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public String toString() {
            return "TestRecord{" +
                    "a='" + a + '\'' +
                    ", b=" + b +
                    '}';
        }
    }
}

What did you expect to see?

No exception to be throwed, the data should be deserialized correctly.

What did you see instead?

java.lang.UnsupportedOperationException: Unsupported coder 111 when deserializing with ByteArrayInputStream and java.lang.IllegalArgumentException when deserializing with any other InputStream.

Anything Else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@Munoon Munoon added the bug Something isn't working label Mar 30, 2024
@Munoon
Copy link
Contributor Author

Munoon commented Mar 30, 2024

After hours of coding I finally came up that I have to manually prefix the byte array with it's original size. However I wasn't able to find any information about this on the documentation.
I think, either documentation have to be updated, either InputStream size read should be refactored with available() method, either this size should be written by Fury method.

@chaokunyang
Copy link
Collaborator

Hi @Munoon, fury.deserializeJavaObject(intputstream, class) should only be used with fury.serializeJavaObject(outputstream, object). You can't use it to deserialize the data from fury.serializeJavaObject(object).

Our document didn't pointed out this, sorry for the inconvenience. We do have plans to align the implementation between those methods, but that may take a little time to implement.

@Munoon
Copy link
Contributor Author

Munoon commented Mar 30, 2024

Hi @Munoon, fury.deserializeJavaObject(intputstream, class) should only be used with fury.serializeJavaObject(outputstream, object). You can't use it to deserialize the data from fury.serializeJavaObject(object).

Can you please explain what's the benefit of writing this size?
In my app I am using Fury to write the data in to the file (currently it is implemented using Files.writeAllBytes(fury.deserializeJavaObject(obj))) and then read it later.
So I'm trying to understand which case suits better here.

Also, I found out that the fastest way to read the data from the file with Fury is by passing the FileChannel -> ByteBuffer -> MemoryBuffer wrapper. Since, I guess, this is a very popular use case in using Fury, such API may be added.

@chaokunyang
Copy link
Collaborator

Currently fury doesn't support streaming read, so we prefix the size in fury.serializeJavaObject(outputstream, object. Then when deserialization, we can read size first and read binary from the stream to deserialize the object like you did manually. If you want to fury do this for you, you must use fury.serializeJavaObject(outputstream, object) for serialization.

@chaokunyang
Copy link
Collaborator

chaokunyang commented Mar 31, 2024

It is possible to support streaming natively. We can make MemoryBuffer hold a read channel Or input stream object, and read data only in checkReadableBytes method.

I prefer using channel since it provide opportunities for zero copy.

It's feasible, but we don't have time for this currently.

@Munoon
Copy link
Contributor Author

Munoon commented Apr 2, 2024

Fury seems like an interesting project to me, so I'd like to contribute, actually.
If you think, that this feature worth implementing - I'd like to make a try.

Also, I think that MemoryBuffer could become interface now, where different implementations holds different data sources (native address, byte array, InputStream, channel, etc)

@chaokunyang
Copy link
Collaborator

chaokunyang commented Apr 2, 2024

Fury seems like an interesting project to me, so I'd like to contribute, actually. If you think, that this feature worth implementing - I'd like to make a try.

Also, I think that MemoryBuffer could become interface now, where different implementations holds different data sources (native address, byte array, InputStream, channel, etc)

That would be really great, @Munoon .

I try to implement the native streaming deserialization in #1451 . It's a little complicated due to we don't employ a polymorphic design for MemoryBuffer. The streaming reading for InputStream are basiclly finished, but the support for java.nio.Channel are not included in that PR. Maybe it can be taken as an example to implement the streaming channel deserializaiton.

I thought about make MemoryBuffer polymorphic when I create fury several years ago, but polymorphism will make the invocation and other performance optimization such as bound checks elimination infeasible.
And considering many cases are small object deserializaiton, the streaming are unnecessary, so I made it as a final classes in the end.

@chaokunyang
Copy link
Collaborator

In those days, objects are getting bigger and bigger, the steaming is important to reduce latency for big objects, especially for big data and tensor computing.

chaokunyang added a commit that referenced this issue Apr 8, 2024
## What this PR do?
This PR implements streaming mode deserialization, which make the
protocol between `Fury#serialize(buffer, o)` and `Fury#serialize(stream,
o)` consistent, and make the deserialization support real streaming mode
to reduce latency, and facilitate the pipeline between deserialization
and transfer.

The changes inlucde:
- Introduce a `FuryStreamReader` to allow buffered streaming reading.
- Make `MemoryBuffer` support streaming reading without introduce cost
on critical path.
- Implements `InputStream` based `FuryStreamReader` for Fury.

`java.nio.Channel` based zero-copy is not implemented in this PR.

## Related issues
Closes #1435

## Media Benchmark
with this PR:
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT         false  thrpt   50  2867721.514 ± 55102.830  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT          true  thrpt   50  1841586.657 ± 46192.190  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT         false  thrpt   50  2542987.512 ± 49835.129  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT          true  thrpt   50  1712919.362 ± 26500.068  ops/s
```
Before this PR: 4c60eb9
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT         false  thrpt   50  2971081.559 ± 52348.975  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT          true  thrpt   50  1938252.031 ± 25077.333  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT         false  thrpt   50  2511729.964 ± 39278.437  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT          true  thrpt   50  1714610.534 ± 29778.339  ops/s
```

Baseline(7bb21d0):
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT         false  thrpt   50  2928012.663 ± 65849.325  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  MEDIA_CONTENT          true  thrpt   50  1937231.421 ± 24616.628  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT         false  thrpt   50  2539983.720 ± 43379.751  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  MEDIA_CONTENT          true  thrpt   50  1711973.100 ± 19195.431  ops/s
```

## Struct Benchmark
This PR
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                false  thrpt   50  2603998.829 ± 35062.683  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                 true  thrpt   50  3438343.670 ± 48911.824  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                false  thrpt   50  2631586.246 ± 34760.277  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                 true  thrpt   50  3425883.355 ± 40436.642  ops/s
```

Before This PR:
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                false  thrpt   50  2589559.872 ± 28062.176  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                 true  thrpt   50  3468978.381 ± 37684.130  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                false  thrpt   50  2640599.313 ± 36759.090  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                 true  thrpt   50  3451992.441 ± 51964.331  ops/s
```

Baseline:
```
Benchmark                                  (bufferType)   (objectType)  (references)   Mode  Cnt        Score       Error  Units
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                false  thrpt   50  1576536.898 ± 54571.741  ops/s
UserTypeDeserializeSuite.fury_deserialize         array  STRUCT                 true  thrpt   50  3194178.195 ± 60680.268  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                false  thrpt   50  1641797.023 ± 16222.155  ops/s
UserTypeDeserializeSuite.fury_deserialize  directBuffer  STRUCT                 true  thrpt   50  3163612.265 ± 49756.995  ops/s
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants