Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance RestMulti, configurable demand + distinct objects #38801

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions docs/src/main/asciidoc/resteasy-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,124 @@
}
----

=== Concurrent stream element processing

By default, `RestMulti` ensures serial/sequential order of the items/elements produced by the wrapped
`Multi` by using a value of 1 for the demand signaled to the publishers. To enable concurrent
processing/generation of multiple items, use `withDemand(long demand)`.

Check warning on line 1012 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.SentenceLength] Try to keep sentences to an average of 32 words or fewer. Raw Output: {"message": "[Quarkus.SentenceLength] Try to keep sentences to an average of 32 words or fewer.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1012, "column": 63}}}, "severity": "INFO"}

Using a demand higher than 1 is useful when multiple items shall be returned and the production of each
item takes some time, i.e. when parallel/concurrent production improves the service response time. Be

Check failure on line 1015 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsErrors] Use 'you' rather than 'i'. Raw Output: {"message": "[Quarkus.TermsErrors] Use 'you' rather than 'i'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1015, "column": 12}}}, "severity": "ERROR"}

Check warning on line 1015 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'that is' rather than 'i.e.' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'that is' rather than 'i.e.' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1015, "column": 12}}}, "severity": "WARNING"}
aware the concurrent processing also requires more resources and puts a higher load on services or
resources that are needed to produce the items. Also consider using `Multi.capDemandsTo(long)` and
`Multi.capDemandsUsing(LongFunction)`.

The example below produces 5 (JSON) strings, but the _order_ of the strings in the returned JSON array
is not guaranteed. The below example also works for JSON objects and not just simple types.

[source,java]
----
package org.acme.rest;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;

@Path("message-stream")
public class Endpoint {
@GET
public Multi<String> streamMessages() {
Multi<String> sourceMulti = Multi
.createBy()
.merging()
.streams(
Multi.createFrom().items(
"message-1",
"message-2",
"message-3",
"message-4",
"message-5"
)
);

return RestMulti
.fromMultiData(sourceMulti)
.withDemand(5)
.build();
}
}
----

Example response, the order is non-deterministic.

[source,text]
----
"message-3"
"message-5"
"message-4"
"message-1"
"message-2"

Check warning on line 1067 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in 'Returning multiple JSON objects'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in 'Returning multiple JSON objects'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1067, "column": 12}}}, "severity": "INFO"}
----

=== Returning multiple JSON objects

By default, `RestMulti` returns items/elements produced by the wrapped `Multi` as a JSON array, if the

Check warning on line 1072 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1072, "column": 69}}}, "severity": "INFO"}
media-type is `application/json`. To return separate JSON objects that are not wrapped in a JSON array,
use `encodeAsArray(false)` (`encodeAsArray(true)` is the default). Note that streaming multiple
objects this way requires a slightly different parsing on the client side, but objects can be parsed and
consumed as they appear without having to deserialize a possibly huge result at once.

Check warning on line 1076 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'deserialize'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'deserialize'?", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1076, "column": 32}}}, "severity": "WARNING"}

The example below produces 5 (JSON) strings, that are not wrapped in an array, like this:

[source,text]
----
"message-1"
"message-2"
"message-3"
"message-4"
"message-5"
----

[source,java]
----
package org.acme.rest;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.reactive.RestMulti;

@Path("message-stream")
public class Endpoint {
@GET
public Multi<String> streamMessages() {
Multi<String> sourceMulti = Multi
.createBy()
.merging()
.streams(
Multi.createFrom().items(
"message-1",
"message-2",
"message-3",
"message-4",
"message-5"
)
);

return RestMulti
.fromMultiData(sourceMulti)
.encodeAsJsonArray(false)
.build();
}
}
----

Check warning on line 1123 in docs/src/main/asciidoc/resteasy-reactive.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Headings] Use sentence-style capitalization in 'Server-Sent Event (SSE) support'. Raw Output: {"message": "[Quarkus.Headings] Use sentence-style capitalization in 'Server-Sent Event (SSE) support'.", "location": {"path": "docs/src/main/asciidoc/resteasy-reactive.adoc", "range": {"start": {"line": 1123, "column": 1}}}, "severity": "INFO"}


=== Server-Sent Event (SSE) support

If you want to stream JSON objects in your response, you can use
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.streams;

import java.util.List;

public class Demands {
public List<Long> demands;

public Demands(List<Long> demands) {
this.demands = demands;
}

// for Jsonb
public Demands() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Flow;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
Expand All @@ -22,6 +23,9 @@
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.StrictMultiSubscriber;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;

@Path("streams")
public class StreamResource {
Expand Down Expand Up @@ -105,6 +109,68 @@ public Multi<Message> multiJson() {
.header("foo", "bar").build();
}

@Path("json/multi-alt")
@GET
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiJsonAlt() {
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").encodeAsJsonArray(true).build();
}

@Path("json/multi-docs")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Multi<Message> multiJsonMultiDocs() {
return RestMulti.fromMultiData(Multi.createFrom().items(new Message("hello"), new Message("stef")))
.header("foo", "bar").encodeAsJsonArray(false).build();
}

@Path("json/multi-docs-huge-demand")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Multi<Object> multiJsonMultiDocsHigherDemand() {
List<Long> demands = new ArrayList<>();

Multi<Object> inner = Multi.createBy().merging()
// Add some messages
.streams(Multi.createFrom().items(
new Message("hello"),
new Message("stef"),
new Message("snazy"),
new Message("stef"),
new Message("elani"),
new Message("foo"),
new Message("bar"),
new Message("baz")));

Multi<Object> items = Multi.createBy().concatenating().streams(
inner,
// Add "collected" demand values as the last JSON object, produce "lazily" to
// make sure that we "see" the demands signaled via Publisher.request(long).
Multi.createFrom().item(() -> new Demands(demands)));

Multi<Object> outer = new AbstractMultiOperator<>(items) {
@Override
public void subscribe(Flow.Subscriber<? super Object> subscriber) {
this.upstream.subscribe()
.withSubscriber(new MultiOperatorProcessor<Object, Object>(new StrictMultiSubscriber<>(subscriber)) {
@Override
public void request(long numberOfItems) {
// Collect the "demands" to return to the test case
demands.add(numberOfItems);
super.request(numberOfItems);
}
});
}
}.log("outer");

return RestMulti.fromMultiData(
Multi.createBy().concatenating().streams(outer).log())
.withDemand(5)
.encodeAsJsonArray(false)
.header("foo", "bar").build();
}

@Path("json/multi2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import static io.restassured.RestAssured.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

import java.net.URI;
Expand Down Expand Up @@ -44,7 +47,7 @@ public class StreamTestCase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(StreamResource.class, Message.class));
.addClasses(StreamResource.class, Message.class, Demands.class));

@Test
public void testSseFromSse() throws Exception {
Expand Down Expand Up @@ -110,13 +113,44 @@ public void testJsonMultiFromSse() {
@Test
public void testJsonMultiFromMulti() {
testJsonMulti("streams/json/multi");
testJsonMulti("streams/json/multi-alt");
}

@Test
public void testJsonMultiFromMultiWithDefaultElementType() {
testJsonMulti("streams/json/multi2");
}

@Test
public void testJsonMultiMultiDoc() {
when().get(uri.toString() + "streams/json/multi-docs")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}\n"
+ "{\"name\":\"stef\"}\n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON));
}

@Test
public void testJsonMultiMultiDocHigherDemand() {
when().get(uri.toString() + "streams/json/multi-docs-huge-demand")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(allOf(
containsString("{\"name\":\"hello\"}\n"),
containsString("{\"name\":\"stef\"}\n"),
containsString("{\"name\":\"snazy\"}\n"),
containsString("{\"name\":\"elani\"}\n"),
containsString("{\"name\":\"foo\"}\n"),
containsString("{\"name\":\"bar\"}\n"),
containsString("{\"name\":\"baz\"}\n"),
endsWith("{\"demands\":[5,5]}\n")))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_JSON))
.header("foo", equalTo("bar"));
}

@Test
public void testNdJsonMultiFromMulti() {
when().get(uri.toString() + "streams/ndjson/multi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongFunction;

import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap;
import org.jboss.resteasy.reactive.common.util.MultivaluedTreeMap;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiMerge;
import io.smallrye.mutiny.helpers.EmptyUniSubscription;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -69,6 +71,8 @@ public static class SyncRestMulti<T> extends RestMulti<T> {
private final Multi<T> multi;
private final Integer status;
private final MultivaluedTreeMap<String, String> headers;
private final long demand;
private final boolean encodeAsJsonArray;

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
Expand All @@ -79,6 +83,8 @@ private SyncRestMulti(Builder<T> builder) {
this.multi = builder.multi;
this.status = builder.status;
this.headers = builder.headers;
this.demand = builder.demand;
this.encodeAsJsonArray = builder.encodeAsJsonArray;
}

@Override
Expand All @@ -91,17 +97,62 @@ public Map<String, List<String>> getHeaders() {
return headers;
}

public static class Builder<T> {
private final Multi<T> multi;
public long getDemand() {
return demand;
}

private Integer status;
public boolean encodeAsJsonArray() {
return encodeAsJsonArray;
}

public static class Builder<T> {
private final Multi<T> multi;
private final MultivaluedTreeMap<String, String> headers = new CaseInsensitiveMap<>();
private Integer status;
private long demand = 1;
private boolean encodeAsJsonArray = true;

private Builder(Multi<T> multi) {
this.multi = Objects.requireNonNull(multi, "multi cannot be null");
}

/**
* Configure the {@code demand} signaled to the wrapped {@link Multi}, defaults to {@code 1}.
*
* <p>
* A demand of {@code 1} guarantees serial/sequential processing, any higher demand supports
* concurrent processing. A demand greater {@code 1}, with concurrent {@link Multi} processing,
* does not guarantee element order - this means that elements emitted by the
* {@link RestMulti#fromMultiData(Multi) RestMulti.fromMultiData(Multi)} source <code>Multi</code>}
* will be produced in a non-deterministic order.
*
* @see MultiMerge#withConcurrency(int) Multi.createBy().merging().withConcurrency(int)
* @see Multi#capDemandsTo(long)
* @see Multi#capDemandsUsing(LongFunction)
*/
public Builder<T> withDemand(long demand) {
if (demand <= 0) {
throw new IllegalArgumentException("Demand must be greater than zero");
}
this.demand = demand;
return this;
}

/**
* Configure whether objects produced by the wrapped {@link Multi} are encoded as JSON array elements, which is the
* default.
*
* <p>
* {@code encodeAsJsonArray(false)} produces separate JSON objects.
*
* <p>
* This property is only used for JSON object results and ignored for SSE and chunked streaming.
*/
public Builder<T> encodeAsJsonArray(boolean encodeAsJsonArray) {
this.encodeAsJsonArray = encodeAsJsonArray;
return this;
}

public Builder<T> status(int status) {
this.status = status;
return this;
Expand Down
Loading
Loading