Skip to content

Commit

Permalink
Enhance RestMulti, configurable demand + distinct objects
Browse files Browse the repository at this point in the history
Adds two enhancements:

1. Produce multiple JSON objects, not just an array
2. Make requested demand configurable

== Produce multiple JSON objects

Currently, `PublisherResponseHandler.StreamingMultiSubscriber` produces a JSON array, where each emitted item is encoded as a JSON array element. For some use cases it is easier to consume a bunch of "bare" JSON objects - i.e. just write the individual JSON objects, possibly separated by a newline. As an option, of course.

Proposal to add:
```java
RestMulti.fromMultiData(multi).encodeAsArray(false)...
```

With `encodeAsArray(false)`, the produced JSON would look like this:
```json
{"some": "value"}
{"some": "value"}
{"some": "value"}
```

`encodeAsArray(true)` or omitting it would use the current behavior and produce something like this:
```json
[{"some": "value"},
{"some": "value"},
{"some": "value"}
}
```

== Configure request-demand

All implementations of `PublisherResponseHandler.AbstractMultiSubscriber` work with a hard-coded request-demand of `1`, which means that every emitted item is "produced"/"computed" serially / one-after-the-other. If the computation of individual items takes somewhat longer, possibly waiting for remote resources to reply, it makes sense to use a higher demand to produce multiple items concurrently.

For example, if each item takes maybe 250 ms (requesting data from a remote source) to be produced, and 100 items are produced, it currently takes 25 seconds. With a higher concurrency it would take a fraction of that time. I.e. if the use case is known to be not CPU but (async) I/O bound, it _might_ be legit/feasible to use a high demand.

Proposal to add:
```
RestMulti.fromMultiData(multi).withDemand( (long) 123 )...
```

Which would pass `123` as the demand for all call sites to `Subscription.request` in implementations of `PublisherResponseHandler.AbstractMultiSubscriber`.
  • Loading branch information
snazy committed Feb 15, 2024
1 parent 9d5e5b2 commit b2dd31d
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 103 deletions.
118 changes: 118 additions & 0 deletions docs/src/main/asciidoc/resteasy-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,124 @@ public class Endpoint {
}
----

=== Concurrent stream element processing

By default, `RestMulti` ensures serial/sequential order of the items/elements produced by the wrapped
`Multi` by using a value of 1 for the demand signaled to the publishers. To enable concurrent
processing/generation of multiple items, use `withDemand(long demand)`.

Check warning on line 1012 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.SentenceLength] Try to keep sentences to an average of 32 words or fewer. Raw Output: {"message": "[Quarkus.SentenceLength] Try to keep sentences to an average of 32 words or fewer.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1012, "column": 63}}}, "severity": "INFO"}

Using a demand higher than 1 is useful when multiple items shall be returned and the production of each
item takes some time, i.e. when parallel/concurrent production improves the service response time. Be

Check failure on line 1015 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsErrors] Use 'you' rather than 'i'. Raw Output: {"message": "[Quarkus.TermsErrors] Use 'you' rather than 'i'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1015, "column": 12}}}, "severity": "ERROR"}

Check warning on line 1015 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'that is' rather than 'i.e.' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'that is' rather than 'i.e.' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1015, "column": 12}}}, "severity": "WARNING"}
aware the concurrent processing also requires more resources and puts a higher load on services or
resources that are needed to produce the items. Also consider using `Multi.capDemandsTo(long)` and
`Multi.capDemandsUsing(LongFunction)`.

The example below produces 5 (JSON) strings, but the _order_ of the strings in the returned JSON array
is not guaranteed. The below example also works for JSON objects and not just simple types.

[source,java]
----
package org.acme.rest;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;
@Path("message-stream")
public class Endpoint {
@GET
public Multi<String> streamMessages() {
Multi<String> sourceMulti = Multi
.createBy()
.merging()
.streams(
Multi.createFrom().items(
"message-1",
"message-2",
"message-3",
"message-4",
"message-5"
)
);
return RestMulti
.fromMultiData(sourceMulti)
.withDemand(5)
.build();
}
}
----

Example response, the order is non-deterministic.

[source,text]
----
"message-3"
"message-5"
"message-4"
"message-1"
"message-2"

Check warning on line 1067 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in 'Returning multiple JSON objects'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in 'Returning multiple JSON objects'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1067, "column": 12}}}, "severity": "INFO"}
----

=== Returning multiple JSON objects

By default, `RestMulti` returns items/elements produced by the wrapped `Multi` as a JSON array, if the

Check warning on line 1072 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1072, "column": 69}}}, "severity": "INFO"}
media-type is `application/json`. To return separate JSON objects that are not wrapped in a JSON array,
use `encodeAsArray(false)` (`encodeAsArray(true)` is the default). Note that streaming multiple
objects this way requires a slightly different parsing on the client side, but objects can be parsed and
consumed as they appear without having to deserialize a possibly huge result at once.

Check warning on line 1076 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'deserialize'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'deserialize'?", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1076, "column": 32}}}, "severity": "WARNING"}

The example below produces 5 (JSON) strings, that are not wrapped in an array, like this:

[source,text]
----
"message-1"
"message-2"
"message-3"
"message-4"
"message-5"
----

[source,java]
----
package org.acme.rest;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;
@Path("message-stream")
public class Endpoint {
@GET
public Multi<String> streamMessages() {
Multi<String> sourceMulti = Multi
.createBy()
.merging()
.streams(
Multi.createFrom().items(
"message-1",
"message-2",
"message-3",
"message-4",
"message-5"
)
);
return RestMulti
.fromMultiData(sourceMulti)
.encodeAsJsonArray(false)
.build();
}
}
----

Check warning on line 1123 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in 'Server-Sent Event (SSE) support'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in 'Server-Sent Event (SSE) support'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1123, "column": 1}}}, "severity": "INFO"}


=== Server-Sent Event (SSE) support

If you want to stream JSON objects in your response, you can use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

import java.util.List;

public class Demands {
public List<Long> demands;

public Demands(List<Long> demands) {
this.demands = demands;
}

// for Jsonb
public Demands() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Flow;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
Expand All @@ -22,6 +23,9 @@
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.StrictMultiSubscriber;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;

@Path("streams")
public class StreamResource {
Expand Down Expand Up @@ -105,6 +109,70 @@ public Multi<Message> multiJson() {
.header("foo", "bar").build();
}

@Path("json/multi-alt")
@GET
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiJsonAlt() {
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").encodeAsJsonArray(true).build();
}

@Path("json/multi-docs")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Multi<Message> multiJsonMultiDocs() {
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").encodeAsJsonArray(false).build();
}

@Path("json/multi-docs-huge-demand")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Multi<Object> multiJsonMultiDocsHigherDemand() {
List<Long> demands = new ArrayList<>();

Multi<Object> inner = Multi.createBy().merging()
// Add some messages
.streams(Multi.createFrom().items(
new Message("hello"),
new Message("stef"),
new Message("snazy"),
new Message("stef"),
new Message("elani"),
new Message("foo"),
new Message("bar"),
new Message("baz")
));

Multi<Object> items = Multi.createBy().concatenating().streams(
inner,
// Add "collected" demand values as the last JSON object, produce "lazily" to
// make sure that we "see" the demands signaled via Publisher.request(long).
Multi.createFrom().item(() -> new Demands(demands))
);

Multi<Object> outer = new AbstractMultiOperator<>(items) {
@Override
public void subscribe(Flow.Subscriber<? super Object> subscriber) {
this.upstream.subscribe()
.withSubscriber(new MultiOperatorProcessor<Object, Object>(new StrictMultiSubscriber<>(subscriber)) {
@Override
public void request(long numberOfItems) {
// Collect the "demands" to return to the test case
demands.add(numberOfItems);
super.request(numberOfItems);
}
});
}
}.log("outer");

return RestMulti.fromMultiData(
Multi.createBy().concatenating().streams(outer).log())
.withDemand(5)
.encodeAsJsonArray(false)
.header("foo", "bar").build();
}

@Path("json/multi2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import static io.restassured.RestAssured.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -44,7 +43,7 @@ public class StreamTestCase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(StreamResource.class, Message.class));
.addClasses(StreamResource.class, Message.class, Demands.class));

@Test
public void testSseFromSse() throws Exception {
Expand Down Expand Up @@ -110,13 +109,44 @@ public void testJsonMultiFromSse() {
@Test
public void testJsonMultiFromMulti() {
testJsonMulti("streams/json/multi");
testJsonMulti("streams/json/multi-alt");
}

@Test
public void testJsonMultiFromMultiWithDefaultElementType() {
testJsonMulti("streams/json/multi2");
}

@Test
public void testJsonMultiMultiDoc() {
when().get(uri.toString() + "streams/json/multi-docs")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}\n"
+ "{\"name\":\"stef\"}\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON));
}

@Test
public void testJsonMultiMultiDocHigherDemand() {
when().get(uri.toString() + "streams/json/multi-docs-huge-demand")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(allOf(
containsString("{\"name\":\"hello\"}\n"),
containsString("{\"name\":\"stef\"}\n"),
containsString("{\"name\":\"snazy\"}\n"),
containsString("{\"name\":\"elani\"}\n"),
containsString("{\"name\":\"foo\"}\n"),
containsString("{\"name\":\"bar\"}\n"),
containsString("{\"name\":\"baz\"}\n"),
endsWith("{\"demands\":[5,5]}\n")))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON))
.header("foo", equalTo("bar"));
}

@Test
public void testNdJsonMultiFromMulti() {
when().get(uri.toString() + "streams/ndjson/multi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongFunction;

import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap;
import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiMerge;
import io.smallrye.mutiny.helpers.EmptyUniSubscription;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -69,6 +71,8 @@ public static class SyncRestMulti<T> extends RestMulti<T> {
private final Multi<T> multi;
private final Integer status;
private final MultivaluedTreeMap<String, String> headers;
private final long demand;
private final boolean encodeAsJsonArray;

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
Expand All @@ -79,6 +83,8 @@ private SyncRestMulti(Builder<T> builder) {
this.multi = builder.multi;
this.status = builder.status;
this.headers = builder.headers;
this.demand = builder.demand;
this.encodeAsJsonArray = builder.encodeAsJsonArray;
}

@Override
Expand All @@ -91,17 +97,62 @@ public Map<String, List<String>> getHeaders() {
return headers;
}

public static class Builder<T> {
private final Multi<T> multi;
public long getDemand() {
return demand;
}

private Integer status;
public boolean encodeAsJsonArray() {
return encodeAsJsonArray;
}

public static class Builder<T> {
private final Multi<T> multi;
private final MultivaluedTreeMap<String, String> headers = new CaseInsensitiveMap<>();
private Integer status;
private long demand = 1;
private boolean encodeAsJsonArray = true;

private Builder(Multi<T> multi) {
this.multi = Objects.requireNonNull(multi, "multi cannot be null");
}

/**
* Configure the {@code demand} signaled to the wrapped {@link Multi}, defaults to {@code 1}.
*
* <p>
* A demand of {@code 1} guarantees serial/sequential processing, any higher demand supports
* concurrent processing. A demand greater {@code 1}, with concurrent {@link Multi} processing,
* does not guarantee element order - this means that elements emitted by the
* {@link RestMulti#fromMultiData(Multi) RestMulti.fromMultiData(Multi)} source <code>Multi</code>}
* will be produced in a non-deterministic order.
*
* @see MultiMerge#withConcurrency(int) Multi.createBy().merging().withConcurrency(int)
* @see Multi#capDemandsTo(long)
* @see Multi#capDemandsUsing(LongFunction)
*/
public Builder<T> withDemand(long demand) {
if (demand <= 0) {
throw new IllegalArgumentException("Demand must be greater than zero");
}
this.demand = demand;
return this;
}

/**
* Configure whether objects produced by the wrapped {@link Multi} are encoded as JSON array elements, which is the
* default.
*
* <p>
* {@code encodeAsJsonArray(false)} produces separate JSON objects.
*
* <p>
* This property is only used for JSON object results and ignored for SSE and chunked streaming.
*/
public Builder<T> encodeAsJsonArray(boolean encodeAsJsonArray) {
this.encodeAsJsonArray = encodeAsJsonArray;
return this;
}

public Builder<T> status(int status) {
this.status = status;
return this;
Expand Down
Loading

0 comments on commit b2dd31d

Please sign in to comment.