Skip to content

Commit

Permalink
Merge pull request quarkusio#38369 from geoand/quarkusio#38325
Browse files Browse the repository at this point in the history
Ensure that response body of unsuccessful SSE request can be read
  • Loading branch information
geoand authored Jan 24, 2024
2 parents 71f3200 + e2668c5 commit e9ce49c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;

import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;

import org.eclipse.microprofile.rest.client.annotation.ClientHeaderParam;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.jboss.resteasy.reactive.RestHeader;
import org.jboss.resteasy.reactive.RestStreamElementType;
import org.jboss.resteasy.reactive.client.SseEvent;
import org.jboss.resteasy.reactive.client.SseEventFilter;
Expand All @@ -31,6 +37,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.rest.client.reactive.ClientExceptionMapper;
import io.quarkus.rest.client.reactive.QuarkusRestClientBuilder;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
Expand All @@ -53,6 +60,29 @@ void shouldConsume() {
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> assertThat(resultList).containsExactly("foo", "bar"));

}

@Test
void shouldReadBodyFromFailedResponse() {
var errorBody = new AtomicReference<String>();
createClient()
.fail()
.subscribe().with(new Consumer<Object>() {
@Override
public void accept(Object o) {

}
}, new Consumer<>() {
@Override
public void accept(Throwable t) {
errorBody.set(t.getMessage());
}
});

await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> assertThat(errorBody.get()).isEqualTo("invalid input provided"));
}

@Test
Expand Down Expand Up @@ -209,6 +239,11 @@ public interface SseClient {
@Produces(MediaType.SERVER_SENT_EVENTS)
Multi<String> get();

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@ClientHeaderParam(name = "fail", value = "true")
Multi<String> fail();

@GET
@Path("/json")
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down Expand Up @@ -239,6 +274,14 @@ public interface SseClient {
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseEventFilter(CustomFilter.class)
Multi<SseEvent<Dto>> eventWithFilter();

@ClientExceptionMapper
static RuntimeException toException(Response response) {
if (response.getStatusInfo().getStatusCode() == 400) {
return new IllegalArgumentException(response.readEntity(String.class));
}
return null;
}
}

public static class CustomFilter implements Predicate<SseEvent<String>> {
Expand All @@ -260,7 +303,10 @@ public static class SseResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> get() {
public Multi<String> get(@DefaultValue("false") @RestHeader boolean fail) {
if (fail) {
throw new WebApplicationException(Response.status(400).entity("invalid input provided").build());
}
return Multi.createFrom().items("foo", "bar");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Variant;

import org.jboss.logging.Logger;
Expand Down Expand Up @@ -246,7 +247,8 @@ public void handle(Void event) {
requestContext.resume();
}
});
} else if (!requestContext.isRegisterBodyHandler()) {
} else if (!requestContext.isRegisterBodyHandler()
&& (Response.Status.Family.familyOf(status) == Response.Status.Family.SUCCESSFUL)) { // we force the registration of a body handler if there was an error, so we can ensure the body can be read
clientResponse.pause();
if (loggingScope != LoggingScope.NONE) {
clientLogger.logResponse(clientResponse, false);
Expand Down

0 comments on commit e9ce49c

Please sign in to comment.