diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java index d491d213eec61..1ebf9a0e2a769 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java @@ -1,8 +1,10 @@ package io.quarkus.resteasy.reactive.jackson.deployment.processor; +import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_NDJSON; +import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_STREAM_JSON; + import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -126,29 +128,29 @@ void additionalProviders(List jacksonFeatureBuildItems, additionalReaders .produce(new MessageBodyReaderBuildItem(ServerJacksonMessageBodyReader.class.getName(), Object.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON))); additionalReaders .produce(new MessageBodyReaderBuildItem(VertxJsonArrayMessageBodyReader.class.getName(), JsonArray.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON))); additionalReaders .produce(new MessageBodyReaderBuildItem(VertxJsonObjectMessageBodyReader.class.getName(), JsonObject.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(getJacksonMessageBodyWriter(applicationNeedsSpecialJacksonFeatures), Object.class.getName(), - List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, RestMediaType.APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(VertxJsonArrayMessageBodyWriter.class.getName(), JsonArray.class.getName(), - List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, RestMediaType.APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(VertxJsonObjectMessageBodyWriter.class.getName(), JsonObject.class.getName(), - List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, RestMediaType.APPLICATION_STREAM_JSON))); } @@ -165,7 +167,7 @@ void handleJsonAnnotations(Optional resourceSca BuildProducer reflectiveClassProducer, BuildProducer jacksonFeaturesProducer, ResteasyReactiveServerJacksonRecorder recorder, ShutdownContextBuildItem shutdown) { - if (!resourceScanningResultBuildItem.isPresent()) { + if (resourceScanningResultBuildItem.isEmpty()) { return; } Collection resourceClasses = resourceScanningResultBuildItem.get().getResult().getScannedResources() diff --git a/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/main/java/io/quarkus/rest/client/reactive/jackson/deployment/RestClientReactiveJacksonProcessor.java b/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/main/java/io/quarkus/rest/client/reactive/jackson/deployment/RestClientReactiveJacksonProcessor.java index a23df6d744d90..78ddcdacc33db 100644 --- a/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/main/java/io/quarkus/rest/client/reactive/jackson/deployment/RestClientReactiveJacksonProcessor.java +++ b/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/main/java/io/quarkus/rest/client/reactive/jackson/deployment/RestClientReactiveJacksonProcessor.java @@ -1,6 +1,8 @@ package io.quarkus.rest.client.reactive.jackson.deployment; import static io.quarkus.deployment.Feature.REST_CLIENT_REACTIVE_JACKSON; +import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_NDJSON; +import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_STREAM_JSON; import java.util.Collections; import java.util.List; @@ -48,15 +50,15 @@ void additionalProviders( additionalReaders .produce(new MessageBodyReaderBuildItem(JacksonBasicMessageBodyReader.class.getName(), Object.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON))); additionalReaders .produce(new MessageBodyReaderBuildItem(VertxJsonArrayBasicMessageBodyReader.class.getName(), JsonArray.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON))); additionalReaders .produce(new MessageBodyReaderBuildItem(VertxJsonObjectBasicMessageBodyReader.class.getName(), JsonObject.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(ClientJacksonMessageBodyWriter.class.getName(), Object.class.getName(), Collections.singletonList(MediaType.APPLICATION_JSON))); diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/pom.xml b/extensions/resteasy-reactive/rest-client-reactive/deployment/pom.xml index 98669622f5ecc..6a78f6cc87b85 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/pom.xml +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/pom.xml @@ -39,6 +39,11 @@ quarkus-smallrye-fault-tolerance-deployment test + + io.quarkus + quarkus-reactive-routes-deployment + test + io.quarkus quarkus-junit5-internal diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/MultiNdjsonTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/MultiNdjsonTest.java new file mode 100644 index 0000000000000..9e6d1b956ed70 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/MultiNdjsonTest.java @@ -0,0 +1,236 @@ +package io.quarkus.rest.client.reactive; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.jboss.resteasy.reactive.RestStreamElementType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.vertx.web.ReactiveRoutes; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.ext.web.RoutingContext; + +public class MultiNdjsonTest { + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest(); + + @TestHTTPResource + URI uri; + + @Test + void shouldReadNdjsonStringAsMulti() throws InterruptedException { + var client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readString().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + assertThat(collected).hasSize(4) + .contains("one", "two", "three", "four"); + } + + @Test + void shouldReadNdjsonPojoAsMulti() throws InterruptedException { + var client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readPojo().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + var expected = Arrays.asList(Message.of("one", "1"), + Message.of("two", "2"), Message.of("three", "3"), + Message.of("four", "4")); + assertThat(collected).hasSize(4).containsAll(expected); + } + + @Test + void shouldReadNdjsonPojoFromReactiveRoutes() throws InterruptedException { + URI reactiveRoutesBaseUri = URI.create(uri.toString() + "/rr"); + var client = RestClientBuilder.newBuilder().baseUri(reactiveRoutesBaseUri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readPojo().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + var expected = Arrays.asList(Message.of("superman", "1"), + Message.of("batman", "2"), Message.of("spiderman", "3")); + assertThat(collected).hasSize(3).containsAll(expected); + } + + @Test + void shouldReadNdjsonFromSingleMessage() throws InterruptedException { + var client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readPojoSingle().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + var expected = Arrays.asList( + Message.of("zero", "0"), Message.of("one", "1"), + Message.of("two", "2"), Message.of("three", "3")); + assertThat(collected).hasSize(4).containsAll(expected); + } + + @Path("/stream") + public interface Client { + @GET + @Path("/string") + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi readString(); + + @GET + @Path("/pojo") + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi readPojo(); + + @GET + @Path("/single-pojo") + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi readPojoSingle(); + } + + public static class ReactiveRoutesResource { + @Route(path = "/rr/stream/pojo", produces = ReactiveRoutes.ND_JSON) + Multi people(RoutingContext context) { + return Multi.createFrom().items( + Message.of("superman", "1"), + Message.of("batman", "2"), + Message.of("spiderman", "3")); + } + } + + @Path("/stream") + public static class StreamingResource { + @Inject + Vertx vertx; + + @GET + @Path("/string") + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi readString() { + return Multi.createFrom().emitter( + em -> { + em.emit("one"); + em.emit("two"); + em.emit("three"); + em.emit("four"); + em.complete(); + }); + } + + @GET + @Path("/pojo") + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi readPojo() { + return Multi.createFrom().emitter( + em -> { + em.emit(Message.of("one", "1")); + em.emit(Message.of("two", "2")); + em.emit(Message.of("three", "3")); + vertx.setTimer(100, id -> { + em.emit(Message.of("four", "4")); + em.complete(); + }); + }); + } + + @GET + @Path("/single-pojo") + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public String getPojosAsString() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + StringBuilder result = new StringBuilder(); + ObjectWriter objectWriter = mapper.writerFor(Message.class); + for (var msg : List.of(Message.of("zero", "0"), + Message.of("one", "1"), + Message.of("two", "2"), + Message.of("three", "3"))) { + result.append(objectWriter.writeValueAsString(msg)); + result.append("\n"); + } + return result.toString(); + } + } + + public static class Message { + public String name; + public String value; + + public static Message of(String name, String value) { + Message message = new Message(); + message.name = name; + message.value = value; + return message; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Message message = (Message) o; + return Objects.equals(name, message.name) && Objects.equals(value, message.value); + } + + @Override + public int hashCode() { + return Objects.hash(name, value); + } + + @Override + public String toString() { + return "Message{" + + "name='" + name + '\'' + + ", value='" + value + '\'' + + '}'; + } + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java new file mode 100644 index 0000000000000..602575ec5173d --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/StreamJsonTest.java @@ -0,0 +1,236 @@ +package io.quarkus.rest.client.reactive; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.jboss.resteasy.reactive.RestStreamElementType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.vertx.web.ReactiveRoutes; +import io.quarkus.vertx.web.Route; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.ext.web.RoutingContext; + +public class StreamJsonTest { + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest(); + + @TestHTTPResource + URI uri; + + @Test + void shouldReadStreamJsonStringAsMulti() throws InterruptedException { + var client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readString().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + assertThat(collected).hasSize(4) + .contains("one", "two", "3", "four"); + } + + @Test + void shouldReadNdjsonPojoAsMulti() throws InterruptedException { + var client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readPojo().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + var expected = Arrays.asList(Message.of("one", "1"), + Message.of("two", "2"), Message.of("three", "3"), + Message.of("four", "4")); + assertThat(collected).hasSize(4).containsAll(expected); + } + + @Test + void shouldReadNdjsonPojoFromReactiveRoutes() throws InterruptedException { + URI reactiveRoutesBaseUri = URI.create(uri.toString() + "/rr"); + var client = RestClientBuilder.newBuilder().baseUri(reactiveRoutesBaseUri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readPojo().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + var expected = Arrays.asList(Message.of("superman", "1"), + Message.of("batman", "2"), Message.of("spiderman", "3")); + assertThat(collected).hasSize(3).containsAll(expected); + } + + @Test + void shouldReadNdjsonFromSingleMessage() throws InterruptedException { + var client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + var collected = new CopyOnWriteArrayList(); + var completionLatch = new CountDownLatch(1); + client.readPojoSingle().onCompletion().invoke(completionLatch::countDown) + .subscribe().with(collected::add); + + if (!completionLatch.await(5, TimeUnit.SECONDS)) { + fail("Streaming did not complete in time"); + } + var expected = Arrays.asList( + Message.of("zero", "0"), Message.of("one", "1"), + Message.of("two", "2"), Message.of("three", "3")); + assertThat(collected).hasSize(4).containsAll(expected); + } + + @Path("/stream") + public interface Client { + @GET + @Path("/string") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi readString(); + + @GET + @Path("/pojo") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi readPojo(); + + @GET + @Path("/single-pojo") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + Multi readPojoSingle(); + } + + public static class ReactiveRoutesResource { + @Route(path = "/rr/stream/pojo", produces = ReactiveRoutes.JSON_STREAM) + Multi people(RoutingContext context) { + return Multi.createFrom().items( + Message.of("superman", "1"), + Message.of("batman", "2"), + Message.of("spiderman", "3")); + } + } + + @Path("/stream") + public static class StreamingResource { + @Inject + Vertx vertx; + + @GET + @Path("/string") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi readString() { + return Multi.createFrom().emitter( + em -> { + em.emit("one"); + em.emit("two"); + em.emit("3"); + em.emit("four"); + em.complete(); + }); + } + + @GET + @Path("/pojo") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public Multi readPojo() { + return Multi.createFrom().emitter( + em -> { + em.emit(Message.of("one", "1")); + em.emit(Message.of("two", "2")); + em.emit(Message.of("three", "3")); + vertx.setTimer(100, id -> { + em.emit(Message.of("four", "4")); + em.complete(); + }); + }); + } + + @GET + @Path("/single-pojo") + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) + public String getPojosAsString() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + StringBuilder result = new StringBuilder(); + ObjectWriter objectWriter = mapper.writerFor(Message.class); + for (var msg : List.of(Message.of("zero", "0"), + Message.of("one", "1"), + Message.of("two", "2"), + Message.of("three", "3"))) { + result.append(objectWriter.writeValueAsString(msg)); + result.append("\n"); + } + return result.toString(); + } + } + + public static class Message { + public String name; + public String value; + + public static Message of(String name, String value) { + Message message = new Message(); + message.name = name; + message.value = value; + return message; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Message message = (Message) o; + return Objects.equals(name, message.name) && Objects.equals(value, message.value); + } + + @Override + public int hashCode() { + return Objects.hash(name, value); + } + + @Override + public String toString() { + return "Message{" + + "name='" + name + '\'' + + ", value='" + value + '\'' + + '}'; + } + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index a94327152dae6..0b17f299d2572 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -13,6 +13,8 @@ import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.jboss.resteasy.reactive.common.jaxrs.ResponseImpl; +import org.jboss.resteasy.reactive.common.util.RestMediaType; public class MultiInvoker extends AbstractRxInvoker> { @@ -114,6 +116,11 @@ public Multi method(String name, Entity entity, GenericType respons }); } + private boolean isNewlineDelimited(ResponseImpl response) { + return RestMediaType.APPLICATION_STREAM_JSON_TYPE.isCompatible(response.getMediaType()) || + RestMediaType.APPLICATION_NDJSON_TYPE.isCompatible(response.getMediaType()); + } + private void registerForSse(MultiRequest multiRequest, GenericType responseType, Response response, @@ -143,8 +150,9 @@ private void registerForSse(MultiRequest multiRequest, private void registerForChunks(MultiRequest multiRequest, RestClientRequestContext restClientRequestContext, GenericType responseType, - Response response, + ResponseImpl response, HttpClientResponse vertxClientResponse) { + boolean isNewlineDelimited = isNewlineDelimited(response); // make sure we get exceptions on the response, like close events, otherwise they // will be logged as errors by vertx vertxClientResponse.exceptionHandler(t -> { @@ -161,17 +169,70 @@ private void registerForChunks(MultiRequest multiRequest, @Override public void handle(Buffer buffer) { try { - ByteArrayInputStream in = new ByteArrayInputStream(buffer.getBytes()); - R item = restClientRequestContext.readEntity(in, responseType, response.getMediaType(), - response.getMetadata()); - multiRequest.emitter.emit(item); + byte[] bytes = buffer.getBytes(); + MediaType mediaType = response.getMediaType(); + + if (isNewlineDelimited) { + String charset = mediaType.getParameters().get(MediaType.CHARSET_PARAMETER); + charset = charset == null ? "UTF-8" : charset; + byte[] separator = "\n".getBytes(charset); + int start = 0; + if (startsWith(bytes, separator)) { + start += separator.length; + } + while (start < bytes.length) { + int end = bytes.length; + for (int i = start; i < bytes.length - separator.length; i++) { + if (bytes[i] == separator[0]) { + int j; + boolean matches = true; + for (j = 1; j < separator.length; j++) { + if (bytes[i + j] != separator[j]) { + matches = false; + break; + } + } + if (matches) { + end = i; + break; + } + } + } + + if (start < end) { + ByteArrayInputStream in = new ByteArrayInputStream(bytes, start, end - start); + R item = restClientRequestContext.readEntity(in, responseType, mediaType, + response.getMetadata()); + multiRequest.emitter.emit(item); + } + start = end + separator.length; + } + } else { + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + R item = restClientRequestContext.readEntity(in, responseType, mediaType, + response.getMetadata()); + multiRequest.emitter.emit(item); + } } catch (Throwable t) { // FIXME: probably close the client too? watch out that it doesn't call our close handler // which calls emitter.complete() multiRequest.emitter.fail(t); } } + + private boolean startsWith(byte[] array, byte[] prefix) { + if (array.length < prefix.length) { + return false; + } + for (int i = 0; i < prefix.length; i++) { + if (array[i] != prefix[i]) { + return false; + } + } + return true; + } }); + // this captures the end of the response // FIXME: won't this call complete twice()? vertxClientResponse.endHandler(v -> { diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/providers/serialisers/AbstractJsonMessageBodyReader.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/providers/serialisers/AbstractJsonMessageBodyReader.java index 201e7e9d9a7ee..5048ee079e8fe 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/providers/serialisers/AbstractJsonMessageBodyReader.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/providers/serialisers/AbstractJsonMessageBodyReader.java @@ -21,7 +21,8 @@ protected boolean isReadable(MediaType mediaType, Class type) { } String subtype = mediaType.getSubtype(); boolean isApplicationMediaType = "application".equals(mediaType.getType()); - return (isApplicationMediaType && "json".equalsIgnoreCase(subtype) || subtype.endsWith("+json")) + return (isApplicationMediaType && "json".equalsIgnoreCase(subtype) || subtype.endsWith("+json") + || subtype.equalsIgnoreCase("x-ndjson")) || (mediaType.isWildcardSubtype() && (mediaType.isWildcardType() || isApplicationMediaType)); } }