Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Publisher<ByteBuffer>in RESTEasy reactive #33130

Closed
scrocquesel opened this issue May 4, 2023 · 20 comments
Closed

Support Publisher<ByteBuffer>in RESTEasy reactive #33130

scrocquesel opened this issue May 4, 2023 · 20 comments
Labels
area/rest kind/enhancement New feature or request

Comments

@scrocquesel
Copy link
Contributor

scrocquesel commented May 4, 2023

Description

I would like to be able to pipe a Publisher<ByteBuffer> into a RestResponse.

Given a Uni<Publisher<ByteBuffer>>, I would like to be able to do

    @GET
    @Path("download/{objectKey}")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Uni<RestResponse<Publisher<ByteBuffer>>> downloadFile(String objectKey) {
        return Uni.createFrom().item( (Publisher<ByteBuffer>)getPublisherByteBuffer())
                .onItem()
                .transform(publisher -> ResponseBuilder.ok( (Publisher<ByteBuffer>)publisher)
                        .header("Content-Disposition", "attachment;filename=" + objectKey)
                        .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM).build());
    }

I tried to produce Multi<byte[]> with

    @GET
    @Path("download/{objectKey}")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Uni<RestResponse<Multi<byte[]>>> downloadFile(String objectKey) {

        return Uni.createFrom().item( (Publisher<ByteBuffer>)getPublisherByteBuffer())
                .onItem()
                .transform(object -> ResponseBuilder.ok(Multi.createFrom().publisher(AdaptersToFlow.publisher(object))
                        .onItem().transform(this::getByteArray))
                        .header("Content-Disposition", "attachment;filename=" + objectKey)
                        .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM).build());
    }

but got an exception :

 java.lang.ClassCastException: class io.smallrye.mutiny.operators.multi.MultiMapOp cannot be cast to class [B (io.smallrye.mutiny.operators.multi.MultiMapOp is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @290dbf45; [B is in module java.base of loader 'bootstrap')
        at org.jboss.resteasy.reactive.server.providers.serialisers.ServerByteArrayMessageBodyHandler.writeResponse(ServerByteArrayMessageBodyHandler.java:16)

Implementation ideas

No response

@scrocquesel scrocquesel added the kind/enhancement New feature or request label May 4, 2023
@quarkus-bot quarkus-bot bot added the area/rest label May 4, 2023
@quarkus-bot
Copy link

quarkus-bot bot commented May 4, 2023

/cc @FroMage (resteasy-reactive), @Sgitario (resteasy-reactive), @geoand (resteasy-reactive), @stuartwdouglas (resteasy-reactive)

@geoand
Copy link
Contributor

geoand commented May 6, 2023

Seems like a reasonable ask

Actually, it will be very hard to make Uni<RestResponse>> work with Publisher. I think we even have a similar issue somewhere...

@geoand
Copy link
Contributor

geoand commented May 6, 2023

But I do I need to think of how we can address the general idea of what is being attempted here - which is to to support streaming data but also setting headers

@geoand
Copy link
Contributor

geoand commented May 6, 2023

@FroMage @cescoffier what if we introduce some kind of custom Multi or Publisher that would allow users to first set HTTP headers and then use a Multi<T> for the data?

@cescoffier
Copy link
Member

Yes, we can try this. If you implement Publisher, Mutiny will consider you unsafe and may introduce additional checks. If you extend AbstractMulti we can avoid these checks (but you must make sure your code is correct)

@geoand
Copy link
Contributor

geoand commented May 8, 2023

#33197 is what I have in mind.

We still need to see about supporting ByteBuffer although I think that our support for io.vertx.core.buffer.Buffer should suffice.

@scrocquesel
Copy link
Contributor Author

#33197 is what I have in mind.

We still need to see about supporting ByteBuffer although I think that our support for io.vertx.core.buffer.Buffer should suffice.

@geoand Thanks, got it working with

    @GET
    @Path("download/{objectKey}")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Multi<Buffer> downloadFile(String objectKey, RoutingContext ctx) {

        return Uni.createFrom()
                .completionStage(() -> s3.getObject(buildGetRequest(objectKey),
                        AsyncResponseTransformer.toPublisher()))
                .onItem()
                .transformToMulti(object -> RestMulti
                        .from(Multi.createFrom().safePublisher(AdaptersToFlow.publisher((Publisher<ByteBuffer>) object))
                                .map(this::getBuffer))
                        .header("Content-Disposition", "attachment;filename=" + objectKey)
                        .header("Content-Type", object.response().contentType()).build());
    }

    private Buffer getBuffer(ByteBuffer bytebuffer) {
        byte[] result = new byte[bytebuffer.remaining()];
        bytebuffer.get(result);
        return Buffer.buffer(result);
    }

Regarding ByteBuffer, I don't know if there is a better way to avoid copying the backed array of bytes back and forth.

@scrocquesel
Copy link
Contributor Author

scrocquesel commented May 8, 2023

#33197 is what I have in mind.
We still need to see about supporting ByteBuffer although I think that our support for io.vertx.core.buffer.Buffer should suffice.

@geoand Thanks, got it working with

    @GET
    @Path("download/{objectKey}")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    public Multi<Buffer> downloadFile(String objectKey, RoutingContext ctx) {

        return Uni.createFrom()
                .completionStage(() -> s3.getObject(buildGetRequest(objectKey),
                        AsyncResponseTransformer.toPublisher()))
                .onItem()
                .transformToMulti(object -> RestMulti
                        .from(Multi.createFrom().safePublisher(AdaptersToFlow.publisher((Publisher<ByteBuffer>) object))
                                .map(this::getBuffer))
                        .header("Content-Disposition", "attachment;filename=" + objectKey)
                        .header("Content-Type", object.response().contentType()).build());
    }

    private Buffer getBuffer(ByteBuffer bytebuffer) {
        byte[] result = new byte[bytebuffer.remaining()];
        bytebuffer.get(result);
        return Buffer.buffer(result);
    }

Regarding ByteBuffer, I don't know if there is a better way to avoid copying the backed array of bytes back and forth.

Spoke a bit fast, actually custom headers are not produced in my case

@geoand
Copy link
Contributor

geoand commented May 8, 2023

Yeah, you need my PR for that

@scrocquesel
Copy link
Contributor Author

Yeah, you need my PR for that

I'm testing with it (I believe I do at least :)).
I guess the transformToMulti is not returning the RestMulti itself ? I cannot cast it to RestMulti either.

@geoand
Copy link
Contributor

geoand commented May 8, 2023

See the tests for it's used

@scrocquesel
Copy link
Contributor Author

I see the tests but headers are hard coded and they will usually come from a call to something else, thus from a Uni or something alike.

I also tried to do it without transformToMulti returning Uni<RestMulti<ByteBuffer>> but it return the toString value of the RestMulti instance.

@geoand
Copy link
Contributor

geoand commented May 8, 2023

In your case the header just comes from a method parameter, correct?

That should just work

@geoand
Copy link
Contributor

geoand commented May 8, 2023

RestMulti needs to be the return type, you can't mix it with Uni

@scrocquesel
Copy link
Contributor Author

scrocquesel commented May 8, 2023

In your case the header just comes from a method parameter, correct?

That should just work

No, I make a call to an external reactive service. The uni response provides both some headers and a Publisher of byte.

It's like having a resteasy client with a method return a Uni<Response> and wiring it to the server resource result like a simple proxy.

Quick pseudo code

    @Path("/proxy")
    public interface ProxyClient {
        Uni<Response> proxy();
    }

    ProxyClient proxy;

    @Path("/test")
    public Multi<Buffer> test() {
        // this doesn't work and I don't know how to wire things up to make it works
        return proxy.proxy().onItem().transformToMulti(response -> RestMulti.from(response.getPublisher())
                .header("Content-Lengnth", response.getLength())
                .header("x-custom-1", response.getHeaderString("z-custom-2")));
    }

    @Path("/proxy")
    public RestMulti<String> proxy() {
        // this should work as expected with the PR
        return RestMulti.from(Multi.createFrom().items("foo", "bar")).header("z-custom-2", "banana").build();
    }

@geoand
Copy link
Contributor

geoand commented May 8, 2023

Gotcha, thanks

@geoand
Copy link
Contributor

geoand commented May 8, 2023

I'll have a look tomorrow at how something like that can be done.

@geoand
Copy link
Contributor

geoand commented May 9, 2023

This request actually becomes very problematic in light of: #22762 - i.e. when returning SSE events we need to return the initial HTTP frame ASAP, we can't wait for the actual response.

So I am wondering if we should limit the feature to non-SSE streamed responses...

geoand added a commit to geoand/quarkus that referenced this issue May 9, 2023
@geoand
Copy link
Contributor

geoand commented May 9, 2023

I have pushed a second commit which addresses your use case

@scrocquesel
Copy link
Contributor Author

This request actually becomes very problematic in light of: #22762 - i.e. when returning SSE events we need to return the initial HTTP frame ASAP, we can't wait for the actual response.

So I am wondering if we should limit the feature to non-SSE streamed responses...

That's what I thought at first. I was thinking of a dedicated server message serializer but I struggle to wire up the multi with the resteasy server response object. Trying to make things like the asyncfile support.

Will look at your second commit soon

geoand added a commit that referenced this issue May 10, 2023
Introduce a way to set headers and status code for streaming response
manofthepeace pushed a commit to manofthepeace/quarkus that referenced this issue May 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/rest kind/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants