Skip to content

Commit

Permalink
Merge pull request #38801 from snazy/rest-multi-enhancements
Browse files Browse the repository at this point in the history
Enhance RestMulti, configurable demand + distinct objects
  • Loading branch information
geoand authored Feb 16, 2024
2 parents a9bfdce + 0b1bf99 commit e97f3e2
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 101 deletions.
118 changes: 118 additions & 0 deletions docs/src/main/asciidoc/resteasy-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,124 @@ public class Endpoint {
}
----

=== Concurrent stream element processing

By default, `RestMulti` ensures serial/sequential order of the items/elements produced by the wrapped
`Multi` by using a value of 1 for the demand signaled to the publishers. To enable concurrent
processing/generation of multiple items, use `withDemand(long demand)`.

Using a demand higher than 1 is useful when multiple items shall be returned and the production of each
item takes some time, i.e. when parallel/concurrent production improves the service response time. Be
aware the concurrent processing also requires more resources and puts a higher load on services or
resources that are needed to produce the items. Also consider using `Multi.capDemandsTo(long)` and
`Multi.capDemandsUsing(LongFunction)`.

The example below produces 5 (JSON) strings, but the _order_ of the strings in the returned JSON array
is not guaranteed. The below example also works for JSON objects and not just simple types.

[source,java]
----
package org.acme.rest;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;
@Path("message-stream")
public class Endpoint {
@GET
public Multi<String> streamMessages() {
Multi<String> sourceMulti = Multi
.createBy()
.merging()
.streams(
Multi.createFrom().items(
"message-1",
"message-2",
"message-3",
"message-4",
"message-5"
)
);
return RestMulti
.fromMultiData(sourceMulti)
.withDemand(5)
.build();
}
}
----

Example response, the order is non-deterministic.

[source,text]
----
"message-3"
"message-5"
"message-4"
"message-1"
"message-2"
----

=== Returning multiple JSON objects

By default, `RestMulti` returns items/elements produced by the wrapped `Multi` as a JSON array, if the
media-type is `application/json`. To return separate JSON objects that are not wrapped in a JSON array,
use `encodeAsArray(false)` (`encodeAsArray(true)` is the default). Note that streaming multiple
objects this way requires a slightly different parsing on the client side, but objects can be parsed and
consumed as they appear without having to deserialize a possibly huge result at once.

The example below produces 5 (JSON) strings, that are not wrapped in an array, like this:

[source,text]
----
"message-1"
"message-2"
"message-3"
"message-4"
"message-5"
----

[source,java]
----
package org.acme.rest;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;
@Path("message-stream")
public class Endpoint {
@GET
public Multi<String> streamMessages() {
Multi<String> sourceMulti = Multi
.createBy()
.merging()
.streams(
Multi.createFrom().items(
"message-1",
"message-2",
"message-3",
"message-4",
"message-5"
)
);
return RestMulti
.fromMultiData(sourceMulti)
.encodeAsJsonArray(false)
.build();
}
}
----


=== Server-Sent Event (SSE) support

If you want to stream JSON objects in your response, you can use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

import java.util.List;

public class Demands {
public List<Long> demands;

public Demands(List<Long> demands) {
this.demands = demands;
}

// for Jsonb
public Demands() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Flow;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
Expand All @@ -22,6 +23,9 @@
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.StrictMultiSubscriber;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;

@Path("streams")
public class StreamResource {
Expand Down Expand Up @@ -105,6 +109,68 @@ public Multi<Message> multiJson() {
.header("foo", "bar").build();
}

@Path("json/multi-alt")
@GET
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiJsonAlt() {
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").encodeAsJsonArray(true).build();
}

@Path("json/multi-docs")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Multi<Message> multiJsonMultiDocs() {
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").encodeAsJsonArray(false).build();
}

@Path("json/multi-docs-huge-demand")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Multi<Object> multiJsonMultiDocsHigherDemand() {
List<Long> demands = new ArrayList<>();

Multi<Object> inner = Multi.createBy().merging()
// Add some messages
.streams(Multi.createFrom().items(
new Message("hello"),
new Message("stef"),
new Message("snazy"),
new Message("stef"),
new Message("elani"),
new Message("foo"),
new Message("bar"),
new Message("baz")));

Multi<Object> items = Multi.createBy().concatenating().streams(
inner,
// Add "collected" demand values as the last JSON object, produce "lazily" to
// make sure that we "see" the demands signaled via Publisher.request(long).
Multi.createFrom().item(() -> new Demands(demands)));

Multi<Object> outer = new AbstractMultiOperator<>(items) {
@Override
public void subscribe(Flow.Subscriber<? super Object> subscriber) {
this.upstream.subscribe()
.withSubscriber(new MultiOperatorProcessor<Object, Object>(new StrictMultiSubscriber<>(subscriber)) {
@Override
public void request(long numberOfItems) {
// Collect the "demands" to return to the test case
demands.add(numberOfItems);
super.request(numberOfItems);
}
});
}
}.log("outer");

return RestMulti.fromMultiData(
Multi.createBy().concatenating().streams(outer).log())
.withDemand(5)
.encodeAsJsonArray(false)
.header("foo", "bar").build();
}

@Path("json/multi2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import static io.restassured.RestAssured.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

import java.net.URI;
Expand Down Expand Up @@ -44,7 +47,7 @@ public class StreamTestCase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(StreamResource.class, Message.class));
.addClasses(StreamResource.class, Message.class, Demands.class));

@Test
public void testSseFromSse() throws Exception {
Expand Down Expand Up @@ -110,13 +113,44 @@ public void testJsonMultiFromSse() {
@Test
public void testJsonMultiFromMulti() {
testJsonMulti("streams/json/multi");
testJsonMulti("streams/json/multi-alt");
}

@Test
public void testJsonMultiFromMultiWithDefaultElementType() {
testJsonMulti("streams/json/multi2");
}

@Test
public void testJsonMultiMultiDoc() {
when().get(uri.toString() + "streams/json/multi-docs")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}\n"
+ "{\"name\":\"stef\"}\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON));
}

@Test
public void testJsonMultiMultiDocHigherDemand() {
when().get(uri.toString() + "streams/json/multi-docs-huge-demand")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(allOf(
containsString("{\"name\":\"hello\"}\n"),
containsString("{\"name\":\"stef\"}\n"),
containsString("{\"name\":\"snazy\"}\n"),
containsString("{\"name\":\"elani\"}\n"),
containsString("{\"name\":\"foo\"}\n"),
containsString("{\"name\":\"bar\"}\n"),
containsString("{\"name\":\"baz\"}\n"),
endsWith("{\"demands\":[5,5]}\n")))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON))
.header("foo", equalTo("bar"));
}

@Test
public void testNdJsonMultiFromMulti() {
when().get(uri.toString() + "streams/ndjson/multi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongFunction;

import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap;
import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiMerge;
import io.smallrye.mutiny.helpers.EmptyUniSubscription;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -69,6 +71,8 @@ public static class SyncRestMulti<T> extends RestMulti<T> {
private final Multi<T> multi;
private final Integer status;
private final MultivaluedTreeMap<String, String> headers;
private final long demand;
private final boolean encodeAsJsonArray;

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
Expand All @@ -79,6 +83,8 @@ private SyncRestMulti(Builder<T> builder) {
this.multi = builder.multi;
this.status = builder.status;
this.headers = builder.headers;
this.demand = builder.demand;
this.encodeAsJsonArray = builder.encodeAsJsonArray;
}

@Override
Expand All @@ -91,17 +97,62 @@ public Map<String, List<String>> getHeaders() {
return headers;
}

public static class Builder<T> {
private final Multi<T> multi;
public long getDemand() {
return demand;
}

private Integer status;
public boolean encodeAsJsonArray() {
return encodeAsJsonArray;
}

public static class Builder<T> {
private final Multi<T> multi;
private final MultivaluedTreeMap<String, String> headers = new CaseInsensitiveMap<>();
private Integer status;
private long demand = 1;
private boolean encodeAsJsonArray = true;

private Builder(Multi<T> multi) {
this.multi = Objects.requireNonNull(multi, "multi cannot be null");
}

/**
* Configure the {@code demand} signaled to the wrapped {@link Multi}, defaults to {@code 1}.
*
* <p>
* A demand of {@code 1} guarantees serial/sequential processing, any higher demand supports
* concurrent processing. A demand greater {@code 1}, with concurrent {@link Multi} processing,
* does not guarantee element order - this means that elements emitted by the
* {@link RestMulti#fromMultiData(Multi) RestMulti.fromMultiData(Multi)} source <code>Multi</code>}
* will be produced in a non-deterministic order.
*
* @see MultiMerge#withConcurrency(int) Multi.createBy().merging().withConcurrency(int)
* @see Multi#capDemandsTo(long)
* @see Multi#capDemandsUsing(LongFunction)
*/
public Builder<T> withDemand(long demand) {
if (demand <= 0) {
throw new IllegalArgumentException("Demand must be greater than zero");
}
this.demand = demand;
return this;
}

/**
* Configure whether objects produced by the wrapped {@link Multi} are encoded as JSON array elements, which is the
* default.
*
* <p>
* {@code encodeAsJsonArray(false)} produces separate JSON objects.
*
* <p>
* This property is only used for JSON object results and ignored for SSE and chunked streaming.
*/
public Builder<T> encodeAsJsonArray(boolean encodeAsJsonArray) {
this.encodeAsJsonArray = encodeAsJsonArray;
return this;
}

public Builder<T> status(int status) {
this.status = status;
return this;
Expand Down
Loading

0 comments on commit e97f3e2

Please sign in to comment.