Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.0' into 'origin/3.1'
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Nesen <[email protected]>
  • Loading branch information
senivam committed Jul 31, 2024
2 parents 503195b + 982e6db commit ad5df8e
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.glassfish.jersey.internal.PropertiesDelegate;
import org.glassfish.jersey.internal.util.ReflectionHelper;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.message.internal.ReaderInterceptorExecutor;

/**
* {@link jakarta.ws.rs.ext.MessageBodyWriter} for {@link ChunkedInput}.
Expand Down Expand Up @@ -71,7 +72,7 @@ public ChunkedInput readFrom(Class<ChunkedInput> chunkedInputClass,

return new ChunkedInput(
chunkType,
inputStream,
ReaderInterceptorExecutor.closeableInputStream(inputStream),
annotations,
mediaType,
headers,
Expand Down
34 changes: 17 additions & 17 deletions ext/spring6/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@
</build>

<profiles>
<profile>
<id>HK2_JDK8_dependencyConvergence_skip</id>
<activation>
<jdk>1.8</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<configuration>
<rulesToSkip>dependencyConvergence</rulesToSkip>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>SpringExclude</id>
<activation>
Expand Down Expand Up @@ -382,22 +399,5 @@
</plugins>
</build>
</profile>
<profile>
<id>HK2_JDK8_dependencyConvergence_skip</id>
<activation>
<jdk>1.8</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<configuration>
<rulesToSkip>dependencyConvergence</rulesToSkip>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -87,6 +88,10 @@ public class EventProcessor implements Runnable, EventListener {
* A map of listeners bound to receive only events of a particular name.
*/
private final Map<String, List<EventListener>> boundListeners;
/**
* A list of Error Consumers.
*/
private final List<Consumer<Throwable>> throwableConsumers;

/**
* Shutdown handler is invoked when Event processor reaches terminal stage.
Expand All @@ -111,6 +116,7 @@ private EventProcessor(final EventProcessor that) {
this.unboundListeners = that.unboundListeners;
this.eventListener = that.eventListener;
this.shutdownHandler = that.shutdownHandler;
this.throwableConsumers = that.throwableConsumers;
}

private EventProcessor(Builder builder) {
Expand All @@ -128,6 +134,7 @@ private EventProcessor(Builder builder) {
this.unboundListeners = builder.unboundListeners == null ? Collections.EMPTY_LIST : builder.unboundListeners;
this.eventListener = builder.eventListener;
this.shutdownHandler = builder.shutdownHandler;
this.throwableConsumers = builder.throwableConsumers;
}

/**
Expand Down Expand Up @@ -199,6 +206,16 @@ public void run() {
}
// if we're here, an unrecoverable error has occurred - just turn off the lights...
shutdownHandler.shutdown();
// and notify error handlers
if (throwableConsumers != null) {
for (Consumer<Throwable> consumer : throwableConsumers) {
try {
consumer.accept(ex);
} catch (Throwable throwable) {
LOGGER.fine(String.format("User throwable ignored: %s", throwable.getMessage()));
}
}
}
} finally {
if (eventInput != null && !eventInput.isClosed()) {
eventInput.close();
Expand Down Expand Up @@ -357,6 +374,7 @@ public static class Builder {
private boolean disableKeepAlive;
private List<EventListener> unboundListeners;
private Map<String, List<EventListener>> boundListeners;
private List<Consumer<Throwable>> throwableConsumers = null;

private Builder(WebTarget target,
AtomicReference<State> state,
Expand Down Expand Up @@ -420,6 +438,17 @@ public Builder disableKeepAlive() {
return this;
}

/**
* Set the consumers of {@link Throwable} occurring during connection.
*
* @param throwableConsumers a list of consumers of throwable.
* @return updated builder instance.
*/
public Builder throwableConsumers(List<Consumer<Throwable>> throwableConsumers) {
this.throwableConsumers = throwableConsumers;
return this;
}

/**
* Build the {@link EventProcessor}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,6 +16,8 @@

package org.glassfish.jersey.media.sse.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -72,6 +74,10 @@ public class JerseySseEventSource implements SseEventSource {
* Client provided executor facade.
*/
private final ClientExecutor clientExecutor;
/**
* List of Throwable consumers passed to EventProcessor.Builder.
*/
private final List<Consumer<Throwable>> throwableConsumers = new ArrayList<>();

/**
* Private constructor.
Expand Down Expand Up @@ -110,11 +116,13 @@ public void register(final Consumer<InboundSseEvent> onEvent) {
public void register(final Consumer<InboundSseEvent> onEvent, final Consumer<Throwable> onError) {
this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, () -> {
});
throwableConsumers.add(onError);
}

@Override
public void register(final Consumer<InboundSseEvent> onEvent, final Consumer<Throwable> onError, final Runnable onComplete) {
this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, onComplete);
throwableConsumers.add(onError);
}

private void subscribe(final Consumer<Flow.Subscription> onSubscribe,
Expand Down Expand Up @@ -173,6 +181,7 @@ public void open() {
EventProcessor processor = EventProcessor
.builder(endpoint, state, clientExecutor, this::onEvent, this::close)
.reconnectDelay(reconnectDelay, reconnectTimeUnit)
.throwableConsumers(throwableConsumers)
.build();
clientExecutor.submit(processor);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.media.sse;

import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.InternalServerErrorException;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.Application;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.InboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;
import jakarta.ws.rs.sse.SseEventSource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class SseEventSourceRegisterErrorHandlerTest extends JerseyTest {
@Path("sse")
public static class SseEventSourceRegisterTestSseEndpoint {

@Path("hello")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void hello(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
output.send(sse.newEvent("HELLO"));
}

@Path("close")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void close(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
output.close();
}

@Path("500")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void throw500(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
throw new WebApplicationException();
}

@Path("400")
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public void throw400(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
throw new BadRequestException();
}
}

@Override
protected Application configure() {
return new ResourceConfig(SseEventSourceRegisterTestSseEndpoint.class);
}

private static final Consumer<InboundSseEvent> EMPTY = event -> {
};

@Test
public void testConnection404() throws InterruptedException {
WebTarget sseTarget = target("sse");
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch completeLatch = new CountDownLatch(1);

SseEventSource eventSource = SseEventSource.target(sseTarget).build();
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
eventSource.open();
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(NotFoundException.class));
}

@Test
public void testError500() throws InterruptedException {
WebTarget sseTarget = target("sse/500");
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch completeLatch = new CountDownLatch(1);

SseEventSource eventSource = SseEventSource.target(sseTarget).build();
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
eventSource.open();
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(InternalServerErrorException.class));
}

@Test
public void testError400() throws InterruptedException {
WebTarget sseTarget = target("sse/400");
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch completeLatch = new CountDownLatch(1);

SseEventSource eventSource = SseEventSource.target(sseTarget).build();
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
eventSource.open();
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(BadRequestException.class));
}
}
Loading

0 comments on commit ad5df8e

Please sign in to comment.