From c15713cd312712c20d0855417dbec4fe8afcf5b8 Mon Sep 17 00:00:00 2001 From: Martin Ocenas Date: Mon, 15 Jan 2024 14:42:57 +0100 Subject: [PATCH 1/3] add test for Sse method indexing on native --- .../reactive/resources/SseResource.java | 67 +++++++++++++++++++ .../reactive/ReactiveRestClientIT.java | 12 ++++ 2 files changed, 79 insertions(+) create mode 100644 http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java new file mode 100644 index 000000000..96236cacc --- /dev/null +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java @@ -0,0 +1,67 @@ +package io.quarkus.ts.http.restclient.reactive.resources; + +import java.util.Arrays; +import java.util.concurrent.locks.LockSupport; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.sse.OutboundSseEvent; +import jakarta.ws.rs.sse.Sse; +import jakarta.ws.rs.sse.SseEventSink; +import jakarta.ws.rs.sse.SseEventSource; + +import org.eclipse.microprofile.config.ConfigProvider; + +/** + * Reproducer resource for https://github.com/quarkusio/quarkus/issues/36986 + */ +@ApplicationScoped +@Path("sse") +public class SseResource { + @Context + Sse sse; + + @GET + @Path("client") + @Produces(MediaType.TEXT_PLAIN) + public Response clientUpdate() { + String host = ConfigProvider.getConfig().getValue("quarkus.http.host", String.class); + int port = ConfigProvider.getConfig().getValue("quarkus.http.port", Integer.class); + StringBuilder eventCapture = new StringBuilder(); + + WebTarget target = ClientBuilder.newClient().target("http://" + host + ":" + port + "/sse/server-update"); + try (SseEventSource eventSource = SseEventSource.target(target).build()) { + eventSource.register( + ev -> eventCapture.append("event: name=").append(ev.getName()).append(" data={").append(ev.readData()) + .append("} and is empty: ").append(ev.isEmpty()).append("\n"), + thr -> eventCapture.append("Error: ").append(thr.getMessage()).append("\n") + .append(Arrays.toString(thr.getStackTrace()))); + + eventSource.open(); + LockSupport.parkNanos(2_000_000_000L); + } + + return Response.ok(eventCapture).build(); + } + + @GET + @Path("server-update") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void updates(@Context SseEventSink eventSink) { + eventSink.send(createEvent("SSE data", "random SSE data")); + } + + private OutboundSseEvent createEvent(String name, String data) { + return sse.newEventBuilder() + .name(name) + .data(data) + .build(); + } +} diff --git a/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java b/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java index d156121c1..cd224a578 100644 --- a/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java +++ b/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java @@ -2,6 +2,7 @@ import static com.github.tomakehurst.wiremock.core.Options.ChunkedEncodingPolicy.NEVER; import static io.quarkus.ts.http.restclient.reactive.resources.PlainBookResource.SEARCH_TERM_VAL; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -22,6 +23,7 @@ import io.quarkus.test.bootstrap.Protocol; import io.quarkus.test.bootstrap.RestService; import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.scenarios.annotations.EnabledOnNative; import io.quarkus.test.services.QuarkusApplication; import io.quarkus.ts.http.restclient.reactive.json.Book; import io.quarkus.ts.http.restclient.reactive.json.BookRepository; @@ -255,6 +257,16 @@ public void malformedChunk() { Assertions.assertEquals("io.vertx.core.http.HttpClosedException", response.body().asString()); } + @Test + @EnabledOnNative + // reproduced for: https://github.com/quarkusio/quarkus/issues/36986 + public void sseIndexMethodOnNativeTest() { + app.given().get("/sse/client") + .then() + .statusCode(200) + .body(containsString("random SSE data")); + } + @AfterAll static void afterAll() { mockServer.stop(); From 11b696d20b5d9a2b911159e24c3301caca111f4e Mon Sep 17 00:00:00 2001 From: Martin Ocenas Date: Mon, 15 Jan 2024 16:04:14 +0100 Subject: [PATCH 2/3] use tag for linked issue --- .../ts/http/restclient/reactive/ReactiveRestClientIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java b/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java index cd224a578..d577acb47 100644 --- a/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java +++ b/http/rest-client-reactive/src/test/java/io/quarkus/ts/http/restclient/reactive/ReactiveRestClientIT.java @@ -259,7 +259,7 @@ public void malformedChunk() { @Test @EnabledOnNative - // reproduced for: https://github.com/quarkusio/quarkus/issues/36986 + @Tag("https://github.com/quarkusio/quarkus/issues/36986") public void sseIndexMethodOnNativeTest() { app.given().get("/sse/client") .then() From aec03236ac676a17ee3034d8ef09339cde2dcee0 Mon Sep 17 00:00:00 2001 From: Martin Ocenas Date: Tue, 16 Jan 2024 09:51:28 +0100 Subject: [PATCH 3/3] Update Sse Resource to use thread safe constructs --- .../reactive/resources/SseResource.java | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java index 96236cacc..ad8b642b7 100644 --- a/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java +++ b/http/rest-client-reactive/src/main/java/io/quarkus/ts/http/restclient/reactive/resources/SseResource.java @@ -1,7 +1,10 @@ package io.quarkus.ts.http.restclient.reactive.resources; import java.util.Arrays; -import java.util.concurrent.locks.LockSupport; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import jakarta.enterprise.context.ApplicationScoped; import jakarta.ws.rs.GET; @@ -31,24 +34,29 @@ public class SseResource { @GET @Path("client") @Produces(MediaType.TEXT_PLAIN) - public Response clientUpdate() { + public Response clientUpdate() throws InterruptedException { String host = ConfigProvider.getConfig().getValue("quarkus.http.host", String.class); int port = ConfigProvider.getConfig().getValue("quarkus.http.port", Integer.class); - StringBuilder eventCapture = new StringBuilder(); + List receivedData = new CopyOnWriteArrayList<>(); WebTarget target = ClientBuilder.newClient().target("http://" + host + ":" + port + "/sse/server-update"); try (SseEventSource eventSource = SseEventSource.target(target).build()) { eventSource.register( - ev -> eventCapture.append("event: name=").append(ev.getName()).append(" data={").append(ev.readData()) - .append("} and is empty: ").append(ev.isEmpty()).append("\n"), - thr -> eventCapture.append("Error: ").append(thr.getMessage()).append("\n") - .append(Arrays.toString(thr.getStackTrace()))); - + ev -> { + String event = "event: name=" + ev.getName() + " data={" + ev.readData() + "} and is empty: " + + ev.isEmpty() + + "\n"; + receivedData.add(event); + }, thr -> { + String event = "Error: " + thr.getMessage() + "\n" + Arrays.toString(thr.getStackTrace()); + receivedData.add(event); + }); + CountDownLatch latch = new CountDownLatch(2); eventSource.open(); - LockSupport.parkNanos(2_000_000_000L); + latch.await(1, TimeUnit.SECONDS); } - return Response.ok(eventCapture).build(); + return Response.ok(String.join("\n", receivedData)).build(); } @GET