Skip to content

Commit

Permalink
Introduce an async variant of RestMulti
Browse files Browse the repository at this point in the history
  • Loading branch information
geoand authored and manofthepeace committed May 16, 2023
1 parent f4cc168 commit c640504
Show file tree
Hide file tree
Showing 7 changed files with 503 additions and 82 deletions.
53 changes: 49 additions & 4 deletions docs/src/main/asciidoc/resteasy-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,7 @@ impression that you can set headers or HTTP status codes, which is not true afte
response.
Exception mappers are also not invoked because part of the response may already have been written.

[TIP]
====
==== Customizing headers and status
If you need to set custom HTTP headers and / or the HTTP response, then you can return `org.jboss.resteasy.reactive.RestMulti` instead, like this:

[source,java]
Expand All @@ -959,11 +958,52 @@ public class Endpoint {
@GET
public Multi<String> streamLogs() {
return RestMulti.from(logs).status(222).header("foo", "bar").build();
return RestMulti.fromMultiData(logs).status(222).header("foo", "bar").build();
}
}
----

In more advanced cases where the headers and / or status can only be obtained from the results of an async call, the `RestMulti.fromUniResponse` needs to be used.
Here is an example of such a use case:

[source,java]
----
package org.acme.rest;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import java.util.List;import java.util.Map;import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;
@Path("logs")
public class Endpoint {
interface SomeService {
Uni<SomeResponse> get();
}
interface SomeResponse {
Multi<byte[]> data;
String myHeader();
}
private final SomeService someService;
public Endpoint(SomeService someService) {
this.someService = someService;
}
@GET
public Multi<String> streamLogs() {
return RestMulti.fromUniResponse(someService.get(), SomeResponse::data, (r -> Map.of("MyHeader", List.of(r.myHeader()))));
}
}
----
====

=== Server-Sent Event (SSE) support

Expand Down Expand Up @@ -1054,6 +1094,11 @@ public class Endpoint {
<3> Set the event name, i.e. the value of the `event` field of a SSE message.
<4> Set the data, i.e. the value of the `data` field of a SSE message.

[WARNING]
====
Manipulation of the returned HTTP headers and status code is not possible via `RestMulti.fromUniResponse` because when returning SSE responses the headers and status code cannot be delayed until the response becomes available.
====

=== Controlling HTTP Caching features

RESTEasy Reactive provides the link:{resteasy-reactive-common-api}/org/jboss/resteasy/reactive/Cache.html[`@Cache`]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import jakarta.ws.rs.GET;
Expand All @@ -19,6 +21,7 @@

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@Path("streams")
public class StreamResource {
Expand Down Expand Up @@ -98,7 +101,7 @@ public void sseJson2(Sse sse, SseEventSink sink) throws IOException {
@GET
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiJson() {
return RestMulti.from(Multi.createFrom().items(new Message("hello"), new Message("stef")))
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").build();
}

Expand All @@ -112,11 +115,24 @@ public Multi<Message> multiDefaultElementType() {
@Path("ndjson/multi")
@GET
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiNdJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("ndjson/multi2")
@GET
@Produces(RestMediaType.APPLICATION_NDJSON)
public Multi<Message> multiNdJson2() {

return RestMulti.fromUniResponse(
Uni.createFrom().item(
() -> new Wrapper(Multi.createFrom().items(new Message("hello"), new Message("stef")),
new AbstractMap.SimpleEntry<>("foo", "bar"), 222)),
Wrapper::getData,
Wrapper::getHeaders,
Wrapper::getStatus);
}

@Path("stream-json/multi")
@GET
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
Expand All @@ -137,9 +153,34 @@ public Multi<Message> multiStreamJsonFast() {
for (int i = 0; i < 5000; i++) {
ids.add(UUID.randomUUID());
}
return RestMulti.from(Multi.createFrom().items(ids::stream)
return RestMulti.fromMultiData(Multi.createFrom().items(ids::stream)
.onItem().transform(id -> new Message(id.toString()))
.onOverflow().buffer(81920)).header("foo", "bar").build();
}

private static final class Wrapper {
public final Multi<Message> data;

public final Map<String, List<String>> headers;
private final Integer status;

public Wrapper(Multi<Message> data, Map.Entry<String, String> header, Integer status) {
this.data = data;
this.status = status;
this.headers = Map.of(header.getKey(), List.of(header.getValue()));
}

public Multi<Message> getData() {
return data;
}

public Map<String, List<String>> getHeaders() {
return headers;
}

public Integer getStatus() {
return status;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ public void testNdJsonMultiFromMulti() {
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON));
}

@Test
public void testNdJsonMultiFromMulti2() {
when().get(uri.toString() + "streams/ndjson/multi2")
.then().statusCode(222)
// @formatter:off
.body(is("{\"name\":\"hello\"}\n"
+ "{\"name\":\"stef\"}\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON))
.header("foo", "bar");
}

@Test
public void testStreamJsonMultiFromMulti() {
when().get(uri.toString() + "streams/stream-json/multi")
Expand All @@ -141,7 +153,7 @@ public void testStreamJsonMultiFromMulti() {

private void testJsonMulti(String path) {
Client client = ClientBuilder.newBuilder().register(new JacksonBasicMessageBodyReader(new ObjectMapper())).build();
;

WebTarget target = client.target(uri.toString() + path);
Multi<Message> multi = target.request().rx(MultiInvoker.class).get(Message.class);
List<Message> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package io.quarkus.resteasy.reactive.server.test.headers;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;

import org.jboss.resteasy.reactive.ResponseHeader;
import org.jboss.resteasy.reactive.ResponseStatus;
Expand Down Expand Up @@ -155,6 +158,27 @@ public void testReturnRestMulti2() {
"Keep-Alive", "dummy"));
}

@Test
public void testReturnRestMulti3() {
RestAssured
.given()
.get("/test/rest-multi3")
.then()
.statusCode(200)
.headers(Map.of(
"header1", "foo",
"header2", "bar"));

RestAssured
.given()
.get("/test/rest-multi3?h1=h1&h2=h2")
.then()
.statusCode(200)
.headers(Map.of(
"header1", "h1",
"header2", "h2"));
}

@Path("/test")
public static class TestResource {

Expand Down Expand Up @@ -232,21 +256,53 @@ public String throwExceptionPlain() {
@GET
@Path("/rest-multi")
public RestMulti<String> getTestRestMulti() {
return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo")
return RestMulti.fromMultiData(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo")
.header("Keep-Alive", "bar").build();
}

@GET
@Path("/rest-multi2")
public RestMulti<String> getTestRestMulti2(@DefaultValue("bar") @RestQuery String keepAlive) {
return RestMulti.from(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo")
return RestMulti.fromMultiData(Multi.createFrom().item("test")).header("Access-Control-Allow-Origin", "foo")
.header("Keep-Alive", keepAlive).build();
}

@GET
@Path("/rest-multi3")
@Produces("application/octet-stream")
public RestMulti<byte[]> getTestRestMulti3(@DefaultValue("foo") @RestQuery("h1") String header1,
@DefaultValue("bar") @RestQuery("h2") String header2) {
return RestMulti.fromUniResponse(getWrapper(header1, header2), Wrapper::getData, Wrapper::getHeaders);
}

private IllegalArgumentException createException() {
IllegalArgumentException result = new IllegalArgumentException();
result.setStackTrace(EMPTY_STACK_TRACE);
return result;
}

private Uni<Wrapper> getWrapper(String header1, String header2) {
return Uni.createFrom().item(
() -> new Wrapper(Multi.createFrom().item("test".getBytes(StandardCharsets.UTF_8)), header1, header2));
}

private static final class Wrapper {
public final Multi<byte[]> data;

public final Map<String, List<String>> headers;

public Wrapper(Multi<byte[]> data, String header1, String header2) {
this.data = data;
this.headers = Map.of("header1", List.of(header1), "header2", List.of(header2));
}

public Multi<byte[]> getData() {
return data;
}

public Map<String, List<String>> getHeaders() {
return headers;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import org.jboss.resteasy.reactive.ResponseStatus;
import org.jboss.resteasy.reactive.RestMulti;
import org.jboss.resteasy.reactive.RestQuery;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -112,6 +114,36 @@ public void testReturnRestMulti2() {
.statusCode(211);
}

@Test
public void testReturnRestMulti3() {
RestAssured
.given()
.get("/test/rest-multi3")
.then()
.statusCode(200);

RestAssured
.given()
.get("/test/rest-multi3?status=212")
.then()
.statusCode(212);
}

@Test
public void testReturnRestMulti4() {
RestAssured
.given()
.get("/test/rest-multi4")
.then()
.statusCode(200);

RestAssured
.given()
.get("/test/rest-multi4")
.then()
.statusCode(200);
}

@Path("/test")
public static class TestResource {

Expand Down Expand Up @@ -177,13 +209,26 @@ public String throwExceptionPlain() {
@GET
@Path("/rest-multi")
public RestMulti<String> getTestRestMulti() {
return RestMulti.from(Multi.createFrom().item("test")).status(210).build();
return RestMulti.fromMultiData(Multi.createFrom().item("test")).status(210).build();
}

@GET
@Path("/rest-multi2")
public RestMulti<String> getTestRestMulti2() {
return RestMulti.from(Multi.createFrom().item("test")).status(211).build();
return RestMulti.fromMultiData(Multi.createFrom().item("test")).status(211).build();
}

@GET
@Path("/rest-multi3")
public RestMulti<String> getTestRestMulti3(@DefaultValue("200") @RestQuery Integer status) {
return RestMulti.fromUniResponse(Uni.createFrom().item("unused"), s -> Multi.createFrom().item("test"), null,
s -> status);
}

@GET
@Path("/rest-multi4")
public RestMulti<String> getTestRestMulti4() {
return RestMulti.fromUniResponse(Uni.createFrom().item("unused"), s -> Multi.createFrom().item("test"));
}

private IllegalArgumentException createException() {
Expand Down
Loading

0 comments on commit c640504

Please sign in to comment.