From c28a8a499986eae9e97766e7283ebc998a7818d3 Mon Sep 17 00:00:00 2001 From: jansupol Date: Thu, 25 Jul 2024 22:50:11 +0200 Subject: [PATCH] Propagate WebApplicationException from SseEventSource to provided error handler Signed-off-by: jansupol --- .../media/sse/internal/EventProcessor.java | 31 ++++- .../sse/internal/JerseySseEventSource.java | 11 +- ...seEventSourceRegisterErrorHandlerTest.java | 126 ++++++++++++++++++ 3 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 media/sse/src/test/java/org/glassfish/jersey/media/sse/SseEventSourceRegisterErrorHandlerTest.java diff --git a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/EventProcessor.java b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/EventProcessor.java index 1e5b775788..bc7b84ff13 100644 --- a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/EventProcessor.java +++ b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/EventProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -87,6 +88,10 @@ public class EventProcessor implements Runnable, EventListener { * A map of listeners bound to receive only events of a particular name. */ private final Map> boundListeners; + /** + * A list of Error Consumers. + */ + private final List> throwableConsumers; /** * Shutdown handler is invoked when Event processor reaches terminal stage. @@ -111,6 +116,7 @@ private EventProcessor(final EventProcessor that) { this.unboundListeners = that.unboundListeners; this.eventListener = that.eventListener; this.shutdownHandler = that.shutdownHandler; + this.throwableConsumers = that.throwableConsumers; } private EventProcessor(Builder builder) { @@ -128,6 +134,7 @@ private EventProcessor(Builder builder) { this.unboundListeners = builder.unboundListeners == null ? Collections.EMPTY_LIST : builder.unboundListeners; this.eventListener = builder.eventListener; this.shutdownHandler = builder.shutdownHandler; + this.throwableConsumers = builder.throwableConsumers; } /** @@ -199,6 +206,16 @@ public void run() { } // if we're here, an unrecoverable error has occurred - just turn off the lights... shutdownHandler.shutdown(); + // and notify error handlers + if (throwableConsumers != null) { + for (Consumer consumer : throwableConsumers) { + try { + consumer.accept(ex); + } catch (Throwable throwable) { + LOGGER.fine(String.format("User throwable ignored: %s", throwable.getMessage())); + } + } + } } finally { if (eventInput != null && !eventInput.isClosed()) { eventInput.close(); @@ -357,6 +374,7 @@ public static class Builder { private boolean disableKeepAlive; private List unboundListeners; private Map> boundListeners; + private List> throwableConsumers = null; private Builder(WebTarget target, AtomicReference state, @@ -420,6 +438,17 @@ public Builder disableKeepAlive() { return this; } + /** + * Set the consumers of {@link Throwable} occurring during connection. + * + * @param throwableConsumers a list of consumers of throwable. + * @return updated builder instance. + */ + public Builder throwableConsumers(List> throwableConsumers) { + this.throwableConsumers = throwableConsumers; + return this; + } + /** * Build the {@link EventProcessor}. * diff --git a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseySseEventSource.java b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseySseEventSource.java index bb8b5171cd..abf7c24736 100644 --- a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseySseEventSource.java +++ b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseySseEventSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -16,6 +16,8 @@ package org.glassfish.jersey.media.sse.internal; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -72,6 +74,10 @@ public class JerseySseEventSource implements SseEventSource { * Client provided executor facade. */ private final ClientExecutor clientExecutor; + /** + * List of Throwable consumers passed to EventProcessor.Builder. + */ + private final List> throwableConsumers = new ArrayList<>(); /** * Private constructor. @@ -110,11 +116,13 @@ public void register(final Consumer onEvent) { public void register(final Consumer onEvent, final Consumer onError) { this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, () -> { }); + throwableConsumers.add(onError); } @Override public void register(final Consumer onEvent, final Consumer onError, final Runnable onComplete) { this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, onComplete); + throwableConsumers.add(onError); } private void subscribe(final Consumer onSubscribe, @@ -173,6 +181,7 @@ public void open() { EventProcessor processor = EventProcessor .builder(endpoint, state, clientExecutor, this::onEvent, this::close) .reconnectDelay(reconnectDelay, reconnectTimeUnit) + .throwableConsumers(throwableConsumers) .build(); clientExecutor.submit(processor); diff --git a/media/sse/src/test/java/org/glassfish/jersey/media/sse/SseEventSourceRegisterErrorHandlerTest.java b/media/sse/src/test/java/org/glassfish/jersey/media/sse/SseEventSourceRegisterErrorHandlerTest.java new file mode 100644 index 0000000000..430d96cbd9 --- /dev/null +++ b/media/sse/src/test/java/org/glassfish/jersey/media/sse/SseEventSourceRegisterErrorHandlerTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.media.sse; + +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import javax.ws.rs.BadRequestException; +import javax.ws.rs.GET; +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; +import javax.ws.rs.sse.SseEventSource; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public class SseEventSourceRegisterErrorHandlerTest extends JerseyTest { + @Path("sse") + public static class SseEventSourceRegisterTestSseEndpoint { + + @Path("hello") + @GET + @Produces(SseFeature.SERVER_SENT_EVENTS) + public void hello(@Context SseEventSink output, @Context Sse sse) throws InterruptedException { + output.send(sse.newEvent("HELLO")); + } + + @Path("close") + @GET + @Produces(SseFeature.SERVER_SENT_EVENTS) + public void close(@Context SseEventSink output, @Context Sse sse) throws InterruptedException { + output.close(); + } + + @Path("500") + @GET + @Produces(SseFeature.SERVER_SENT_EVENTS) + public void throw500(@Context SseEventSink output, @Context Sse sse) throws InterruptedException { + throw new WebApplicationException(); + } + + @Path("400") + @GET + @Produces(SseFeature.SERVER_SENT_EVENTS) + public void throw400(@Context SseEventSink output, @Context Sse sse) throws InterruptedException { + throw new BadRequestException(); + } + } + + @Override + protected Application configure() { + return new ResourceConfig(SseEventSourceRegisterTestSseEndpoint.class); + } + + private static final Consumer EMPTY = event -> { + }; + + @Test + public void testConnection404() throws InterruptedException { + WebTarget sseTarget = target("sse"); + AtomicReference throwable = new AtomicReference<>(); + CountDownLatch completeLatch = new CountDownLatch(1); + + SseEventSource eventSource = SseEventSource.target(sseTarget).build(); + eventSource.register(EMPTY, throwable::set, completeLatch::countDown); + eventSource.open(); + completeLatch.await(10_000, TimeUnit.MILLISECONDS); + MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue()); + MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(NotFoundException.class)); + } + + @Test + public void testError500() throws InterruptedException { + WebTarget sseTarget = target("sse/500"); + AtomicReference throwable = new AtomicReference<>(); + CountDownLatch completeLatch = new CountDownLatch(1); + + SseEventSource eventSource = SseEventSource.target(sseTarget).build(); + eventSource.register(EMPTY, throwable::set, completeLatch::countDown); + eventSource.open(); + completeLatch.await(10_000, TimeUnit.MILLISECONDS); + MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue()); + MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(InternalServerErrorException.class)); + } + + @Test + public void testError400() throws InterruptedException { + WebTarget sseTarget = target("sse/400"); + AtomicReference throwable = new AtomicReference<>(); + CountDownLatch completeLatch = new CountDownLatch(1); + + SseEventSource eventSource = SseEventSource.target(sseTarget).build(); + eventSource.register(EMPTY, throwable::set, completeLatch::countDown); + eventSource.open(); + completeLatch.await(10_000, TimeUnit.MILLISECONDS); + MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue()); + MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(BadRequestException.class)); + } +}