Skip to content

Commit

Permalink
Propagate WebApplicationException from SseEventSource to provided err…
Browse files Browse the repository at this point in the history
…or handler

Signed-off-by: jansupol <[email protected]>
  • Loading branch information
jansupol committed Jul 25, 2024
1 parent c4adbb3 commit c28a8a4
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -87,6 +88,10 @@ public class EventProcessor implements Runnable, EventListener {
* A map of listeners bound to receive only events of a particular name.
*/
private final Map<String, List<EventListener>> boundListeners;
/**
* A list of Error Consumers.
*/
private final List<Consumer<Throwable>> throwableConsumers;

/**
* Shutdown handler is invoked when Event processor reaches terminal stage.
Expand All @@ -111,6 +116,7 @@ private EventProcessor(final EventProcessor that) {
this.unboundListeners = that.unboundListeners;
this.eventListener = that.eventListener;
this.shutdownHandler = that.shutdownHandler;
this.throwableConsumers = that.throwableConsumers;
}

private EventProcessor(Builder builder) {
Expand All @@ -128,6 +134,7 @@ private EventProcessor(Builder builder) {
this.unboundListeners = builder.unboundListeners == null ? Collections.EMPTY_LIST : builder.unboundListeners;
this.eventListener = builder.eventListener;
this.shutdownHandler = builder.shutdownHandler;
this.throwableConsumers = builder.throwableConsumers;
}

/**
Expand Down Expand Up @@ -199,6 +206,16 @@ public void run() {
}
// if we're here, an unrecoverable error has occurred - just turn off the lights...
shutdownHandler.shutdown();
// and notify error handlers
if (throwableConsumers != null) {
for (Consumer<Throwable> consumer : throwableConsumers) {
try {
consumer.accept(ex);
} catch (Throwable throwable) {
LOGGER.fine(String.format("User throwable ignored: %s", throwable.getMessage()));
}
}
}
} finally {
if (eventInput != null && !eventInput.isClosed()) {
eventInput.close();
Expand Down Expand Up @@ -357,6 +374,7 @@ public static class Builder {
private boolean disableKeepAlive;
private List<EventListener> unboundListeners;
private Map<String, List<EventListener>> boundListeners;
private List<Consumer<Throwable>> throwableConsumers = null;

private Builder(WebTarget target,
AtomicReference<State> state,
Expand Down Expand Up @@ -420,6 +438,17 @@ public Builder disableKeepAlive() {
return this;
}

/**
* Set the consumers of {@link Throwable} occurring during connection.
*
* @param throwableConsumers a list of consumers of throwable.
* @return updated builder instance.
*/
public Builder throwableConsumers(List<Consumer<Throwable>> throwableConsumers) {
this.throwableConsumers = throwableConsumers;
return this;
}

/**
* Build the {@link EventProcessor}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,6 +16,8 @@

package org.glassfish.jersey.media.sse.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -72,6 +74,10 @@ public class JerseySseEventSource implements SseEventSource {
* Client provided executor facade.
*/
private final ClientExecutor clientExecutor;
/**
* List of Throwable consumers passed to EventProcessor.Builder.
*/
private final List<Consumer<Throwable>> throwableConsumers = new ArrayList<>();

/**
* Private constructor.
Expand Down Expand Up @@ -110,11 +116,13 @@ public void register(final Consumer<InboundSseEvent> onEvent) {
public void register(final Consumer<InboundSseEvent> onEvent, final Consumer<Throwable> onError) {
this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, () -> {
});
throwableConsumers.add(onError);
}

@Override
public void register(final Consumer<InboundSseEvent> onEvent, final Consumer<Throwable> onError, final Runnable onComplete) {
this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, onComplete);
throwableConsumers.add(onError);
}

private void subscribe(final Consumer<Flow.Subscription> onSubscribe,
Expand Down Expand Up @@ -173,6 +181,7 @@ public void open() {
EventProcessor processor = EventProcessor
.builder(endpoint, state, clientExecutor, this::onEvent, this::close)
.reconnectDelay(reconnectDelay, reconnectTimeUnit)
.throwableConsumers(throwableConsumers)
.build();
clientExecutor.submit(processor);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.media.sse;

import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import javax.ws.rs.BadRequestException;
import javax.ws.rs.GET;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseEventSource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class SseEventSourceRegisterErrorHandlerTest extends JerseyTest {
@Path("sse")
public static class SseEventSourceRegisterTestSseEndpoint {

@Path("hello")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void hello(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
output.send(sse.newEvent("HELLO"));
}

@Path("close")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void close(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
output.close();
}

@Path("500")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void throw500(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
throw new WebApplicationException();
}

@Path("400")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void throw400(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
throw new BadRequestException();
}
}

@Override
protected Application configure() {
return new ResourceConfig(SseEventSourceRegisterTestSseEndpoint.class);
}

private static final Consumer<InboundSseEvent> EMPTY = event -> {
};

@Test
public void testConnection404() throws InterruptedException {
WebTarget sseTarget = target("sse");
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch completeLatch = new CountDownLatch(1);

SseEventSource eventSource = SseEventSource.target(sseTarget).build();
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
eventSource.open();
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(NotFoundException.class));
}

@Test
public void testError500() throws InterruptedException {
WebTarget sseTarget = target("sse/500");
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch completeLatch = new CountDownLatch(1);

SseEventSource eventSource = SseEventSource.target(sseTarget).build();
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
eventSource.open();
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(InternalServerErrorException.class));
}

@Test
public void testError400() throws InterruptedException {
WebTarget sseTarget = target("sse/400");
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch completeLatch = new CountDownLatch(1);

SseEventSource eventSource = SseEventSource.target(sseTarget).build();
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
eventSource.open();
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(BadRequestException.class));
}
}

0 comments on commit c28a8a4

Please sign in to comment.