Skip to content

Commit

Permalink
Merge pull request #4154 from jamezp/RESTEASY-2952
Browse files Browse the repository at this point in the history
[RESTEASY-2952 & RESTEASY-3498] Clean up the SSE callback processing
  • Loading branch information
jamezp authored May 13, 2024
2 parents cba3089 + 2969b26 commit 5a02fd4
Show file tree
Hide file tree
Showing 6 changed files with 625 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* JBoss, Home of Professional Open Source.
*
* Copyright 2024 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jboss.resteasy.plugins.providers.sse.client;

import java.io.IOException;
Expand Down Expand Up @@ -215,9 +234,7 @@ public boolean close(final long timeout, final TimeUnit unit) {
try {
return sseEventSourceScheduler.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
onErrorConsumers.forEach(consumer -> {
consumer.accept(e);
});
runOnErrorConsumers(e);
Thread.currentThread().interrupt();
return false;
}
Expand All @@ -229,6 +246,12 @@ private void runCompleteConsumers() {
}
}

private void runOnErrorConsumers(final Throwable t) {
// Ensure the onComplete callbacks do not get invoked
completeListenersInvoked.set(true);
onErrorConsumers.forEach(onError -> onError.accept(t));
}

private void internalClose() {
if (state.getAndSet(State.CLOSED) == State.CLOSED) {
return;
Expand All @@ -238,9 +261,7 @@ private void internalClose() {
try {
clientResponse.releaseConnection(false);
} catch (IOException e) {
onErrorConsumers.forEach(consumer -> {
consumer.accept(e);
});
runOnErrorConsumers(e);
}
}
sseEventSourceScheduler.shutdownNow();
Expand All @@ -255,9 +276,9 @@ private class EventHandler implements Runnable {

private long reconnectDelay;

private String verb;
private Entity<?> entity;
private MediaType[] mediaTypes;
private final String verb;
private final Entity<?> entity;
private final MediaType[] mediaTypes;

EventHandler(final long reconnectDelay, final String lastEventId, final String verb, final Entity<?> entity,
final MediaType... mediaTypes) {
Expand Down Expand Up @@ -285,9 +306,11 @@ public void run() {
}

SseEventInputImpl eventInput = null;
InboundSseEvent event = null;
final Providers providers = (ClientConfiguration) target.getConfiguration();
try {
final Invocation.Builder requestBuilder = buildRequest(mediaTypes);
Invocation request = null;
Invocation request;
if (entity == null) {
request = requestBuilder.build(verb);
} else {
Expand All @@ -305,13 +328,17 @@ public void run() {
eventInput = clientResponse.readEntity(SseEventInputImpl.class);
//if 200<= response code <300 and response contentType is null, fail the connection.
if (eventInput == null) {
if (!alwaysReconnect) {
internalClose();
} else {
if (alwaysReconnect) {
reconnect(this.reconnectDelay);
} else {
// Run the onComplete callback, then close as something went wrong
runCompleteConsumers();
internalClose();
}
return;
}
// Success, read the event data
event = eventInput.read(providers);
} else {
//Let's buffer the entity in case the response contains an entity the user would like to retrieve from the exception.
//This will also ensure that the connection is correctly closed.
Expand All @@ -321,10 +348,15 @@ public void run() {
}
} catch (ServiceUnavailableException ex) {
if (ex.hasRetryAfter()) {
onConnection();
Date requestTime = new Date();
long localReconnectDelay = ex.getRetryTime(requestTime).getTime() - requestTime.getTime();
reconnect(localReconnectDelay);
// Reconnect, but if an error occurs this is unrecoverable, see https://issues.redhat.com/browse/RESTEASY-2952
try {
onConnection();
Date requestTime = new Date();
long localReconnectDelay = ex.getRetryTime(requestTime).getTime() - requestTime.getTime();
reconnect(localReconnectDelay);
} catch (Throwable t) {
onUnrecoverableError(t);
}
} else {
onUnrecoverableError(ex);
}
Expand All @@ -333,21 +365,23 @@ public void run() {
onUnrecoverableError(e);
return;
}
final Providers providers = (ClientConfiguration) target.getConfiguration();

while (!Thread.currentThread().isInterrupted() && state.get() == State.OPEN) {
if (eventInput == null || eventInput.isClosed()) {
if (event == null || eventInput.isClosed()) {
if (alwaysReconnect) {
reconnect(reconnectDelay);
} else {
// Run the onComplete callback, then close as something went wrong
runCompleteConsumers();
internalClose();
}
break;
}
try {
InboundSseEvent event = eventInput.read(providers);
if (event != null) {
onEvent(event);
}
// Process the event
onEvent(event);
// Read next event
event = eventInput.read(providers);
} catch (IOException e) {
reconnect(reconnectDelay);
break;
Expand All @@ -370,9 +404,7 @@ private void onConnection() {

private void onUnrecoverableError(Throwable throwable) {
connectedLatch.countDown();
onErrorConsumers.forEach(consumer -> {
consumer.accept(throwable);
});
runOnErrorConsumers(throwable);
internalClose();
}

Expand Down
8 changes: 8 additions & 0 deletions resteasy-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<version.jakarta.annotation.jakarta-annotation-api>2.1.1</version.jakarta.annotation.jakarta-annotation-api>
<version.jakarta.el.el-api>4.0.0</version.jakarta.el.el-api>
<version.jakarta.ejb.ejb-api>4.0.1</version.jakarta.ejb.ejb-api>
<version.jakarta.enterprise.concurrent>3.0.3</version.jakarta.enterprise.concurrent>
<version.jakarta.interceptor.interceptor-api>2.1.0</version.jakarta.interceptor.interceptor-api>
<version.jakarta.jms.jms-api>3.1.0</version.jakarta.jms.jms-api>
<version.jakarta.ws.rs>4.0.0</version.jakarta.ws.rs>
Expand Down Expand Up @@ -309,6 +310,13 @@
<version>${version.jakarta.ws.rs}</version>
</dependency>

<dependency>
<groupId>jakarta.enterprise.concurrent</groupId>
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
<version>${version.jakarta.enterprise.concurrent}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.ejb</groupId>
<artifactId>jakarta.ejb-api</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions testsuite/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>jakarta.enterprise.concurrent</groupId>
<artifactId>jakarta.enterprise.concurrent-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public void testException() throws Exception {
}, t -> {
String s = t.getMessage();
Assertions.assertTrue(s.contains("HTTP 500 Internal Server Error"));
// We need to count down here as well. Per the SseEventSource.register() the onComplete
// callback should not be invoked if the onError callback is invoked.
latch.countDown();
}, latch::countDown);
source.open();
Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
Expand Down
Loading

0 comments on commit 5a02fd4

Please sign in to comment.