diff --git a/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java b/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java index aa715e04fb948..780bb6b931694 100644 --- a/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java +++ b/extensions/resteasy-reactive/rest-client-reactive-jackson/deployment/src/test/java/io/quarkus/rest/client/reactive/jackson/test/MultiSseTest.java @@ -8,15 +8,21 @@ import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.sse.OutboundSseEvent; +import jakarta.ws.rs.sse.Sse; +import jakarta.ws.rs.sse.SseEventSink; import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; import org.jboss.resteasy.reactive.RestStreamElementType; +import org.jboss.resteasy.reactive.client.SseEvent; import org.jboss.resteasy.reactive.server.jackson.JacksonBasicMessageBodyReader; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -112,6 +118,63 @@ void shouldRestStreamElementTypeOverwriteProducesAtClassLevel() { .containsExactly(new Dto("foo", "bar"), new Dto("chocolate", "bar"))); } + @Test + void shouldBeAbleReadEntireEvent() { + var resultList = new CopyOnWriteArrayList<>(); + createClient() + .event() + .subscribe().with(new Consumer<>() { + @Override + public void accept(SseEvent event) { + resultList.add(new EventContainer(event.id(), event.name(), event.data())); + } + }); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> assertThat(resultList).containsExactly( + new EventContainer("id0", "name0", new Dto("name0", "0")), + new EventContainer("id1", "name1", new Dto("name1", "1")))); + } + + static class EventContainer { + final String id; + final String name; + final Dto dto; + + EventContainer(String id, String name, Dto dto) { + this.id = id; + this.name = name; + this.dto = dto; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EventContainer that = (EventContainer) o; + return Objects.equals(id, that.id) && Objects.equals(name, that.name) + && Objects.equals(dto, that.dto); + } + + @Override + public int hashCode() { + return Objects.hash(id, name, dto); + } + + @Override + public String toString() { + return "EventContainer{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", dto=" + dto + + '}'; + } + } + private SseClient createClient() { return QuarkusRestClientBuilder.newBuilder() .baseUri(uri) @@ -144,6 +207,11 @@ public interface SseClient { @Produces(MediaType.SERVER_SENT_EVENTS) @Path("/with-entity-json") Multi> postAndReadAsMap(String entity); + + @GET + @Path("/event") + @Produces(MediaType.SERVER_SENT_EVENTS) + Multi> event(); } @Path("/sse") @@ -175,6 +243,24 @@ public Multi post(String entity) { public Multi postAndReadAsMap(String entity) { return Multi.createBy().repeating().supplier(() -> new Dto("foo", entity)).atMost(3); } + + @GET + @Path("/event") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void event(@Context SseEventSink sink, @Context Sse sse) { + // send a stream of few events + try (sink) { + for (int i = 0; i < 2; i++) { + final OutboundSseEvent.Builder builder = sse.newEventBuilder(); + builder.id("id" + i) + .mediaType(MediaType.APPLICATION_JSON_TYPE) + .data(Dto.class, new Dto("name" + i, String.valueOf(i))) + .name("name" + i); + + sink.send(builder.build()); + } + } + } } @Path("/sse-rest-stream-element-type") @@ -226,5 +312,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(name, value); } + + @Override + public String toString() { + return "Dto{" + + "name='" + name + '\'' + + ", value='" + value + '\'' + + '}'; + } } } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/SseEvent.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/SseEvent.java new file mode 100644 index 0000000000000..a6978b93d2dc7 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/SseEvent.java @@ -0,0 +1,15 @@ +package org.jboss.resteasy.reactive.client; + +/** + * Represents the entire SSE response from the server + */ +public interface SseEvent { + + String id(); + + String name(); + + String comment(); + + T data(); +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index fe6a93492c42f..e483baa0ce357 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -2,6 +2,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.ParameterizedType; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -10,6 +11,7 @@ import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import org.jboss.resteasy.reactive.client.SseEvent; import org.jboss.resteasy.reactive.common.jaxrs.ResponseImpl; import org.jboss.resteasy.reactive.common.util.RestMediaType; @@ -151,10 +153,17 @@ private boolean isNewlineDelimited(ResponseImpl response) { RestMediaType.APPLICATION_NDJSON_TYPE.isCompatible(response.getMediaType()); } + @SuppressWarnings({ "unchecked", "rawtypes" }) private void registerForSse(MultiRequest multiRequest, GenericType responseType, Response response, HttpClientResponse vertxResponse, String defaultContentType) { + + boolean returnSseEvent = SseEvent.class.equals(responseType.getRawType()); + GenericType responseTypeFirstParam = responseType.getType() instanceof ParameterizedType + ? new GenericType(((ParameterizedType) responseType.getType()).getActualTypeArguments()[0]) + : null; + // honestly, isn't reconnect contradictory with completion? // FIXME: Reconnect settings? // For now we don't want multi to reconnect @@ -165,10 +174,39 @@ private void registerForSse(MultiRequest multiRequest, sseSource.register(event -> { // DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or // the content-type SSE field - R item = event.readData(responseType); - if (item != null) { // we don't emit null because it breaks Multi (by design) - multiRequest.emit(item); + if (returnSseEvent) { + multiRequest.emit((R) new SseEvent() { + @Override + public String id() { + return event.getId(); + } + + @Override + public String name() { + return event.getName(); + } + + @Override + public String comment() { + return event.getComment(); + } + + @Override + public Object data() { + if (responseTypeFirstParam != null) { + return event.readData(responseTypeFirstParam); + } else { + return event.readData(); // TODO: is this correct? + } + } + }); + } else { + R item = event.readData(responseType); + if (item != null) { // we don't emit null because it breaks Multi (by design) + multiRequest.emit(item); + } } + }, multiRequest::fail, multiRequest::complete); // watch for user cancelling sseSource.registerAfterRequest(vertxResponse);