Skip to content

Commit

Permalink
Allow REST Client to return the entire SSE event
Browse files Browse the repository at this point in the history
This can be useful when the id or the name of
the event contain useful metadata

Closes: #37107
  • Loading branch information
geoand committed Nov 21, 2023
1 parent e56b8ec commit 6b66359
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.jboss.resteasy.reactive.RestStreamElementType;
import org.jboss.resteasy.reactive.client.SseEvent;
import org.jboss.resteasy.reactive.server.jackson.JacksonBasicMessageBodyReader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -112,6 +118,63 @@ void shouldRestStreamElementTypeOverwriteProducesAtClassLevel() {
.containsExactly(new Dto("foo", "bar"), new Dto("chocolate", "bar")));
}

@Test
void shouldBeAbleReadEntireEvent() {
var resultList = new CopyOnWriteArrayList<>();
createClient()
.event()
.subscribe().with(new Consumer<>() {
@Override
public void accept(SseEvent<Dto> event) {
resultList.add(new EventContainer(event.id(), event.name(), event.data()));
}
});
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> assertThat(resultList).containsExactly(
new EventContainer("id0", "name0", new Dto("name0", "0")),
new EventContainer("id1", "name1", new Dto("name1", "1"))));
}

static class EventContainer {
final String id;
final String name;
final Dto dto;

EventContainer(String id, String name, Dto dto) {
this.id = id;
this.name = name;
this.dto = dto;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EventContainer that = (EventContainer) o;
return Objects.equals(id, that.id) && Objects.equals(name, that.name)
&& Objects.equals(dto, that.dto);
}

@Override
public int hashCode() {
return Objects.hash(id, name, dto);
}

@Override
public String toString() {
return "EventContainer{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", dto=" + dto +
'}';
}
}

private SseClient createClient() {
return QuarkusRestClientBuilder.newBuilder()
.baseUri(uri)
Expand Down Expand Up @@ -144,6 +207,11 @@ public interface SseClient {
@Produces(MediaType.SERVER_SENT_EVENTS)
@Path("/with-entity-json")
Multi<Map<String, String>> postAndReadAsMap(String entity);

@GET
@Path("/event")
@Produces(MediaType.SERVER_SENT_EVENTS)
Multi<SseEvent<Dto>> event();
}

@Path("/sse")
Expand Down Expand Up @@ -175,6 +243,24 @@ public Multi<String> post(String entity) {
public Multi<Dto> postAndReadAsMap(String entity) {
return Multi.createBy().repeating().supplier(() -> new Dto("foo", entity)).atMost(3);
}

@GET
@Path("/event")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void event(@Context SseEventSink sink, @Context Sse sse) {
// send a stream of few events
try (sink) {
for (int i = 0; i < 2; i++) {
final OutboundSseEvent.Builder builder = sse.newEventBuilder();
builder.id("id" + i)
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Dto.class, new Dto("name" + i, String.valueOf(i)))
.name("name" + i);

sink.send(builder.build());
}
}
}
}

@Path("/sse-rest-stream-element-type")
Expand Down Expand Up @@ -226,5 +312,13 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(name, value);
}

@Override
public String toString() {
return "Dto{" +
"name='" + name + '\'' +
", value='" + value + '\'' +
'}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.jboss.resteasy.reactive.client;

/**
* Represents the entire SSE response from the server
*/
public interface SseEvent<T> {

String id();

String name();

String comment();

T data();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -10,6 +11,7 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import org.jboss.resteasy.reactive.client.SseEvent;
import org.jboss.resteasy.reactive.common.jaxrs.ResponseImpl;
import org.jboss.resteasy.reactive.common.util.RestMediaType;

Expand Down Expand Up @@ -151,10 +153,17 @@ private boolean isNewlineDelimited(ResponseImpl response) {
RestMediaType.APPLICATION_NDJSON_TYPE.isCompatible(response.getMediaType());
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <R> void registerForSse(MultiRequest<? super R> multiRequest,
GenericType<R> responseType,
Response response,
HttpClientResponse vertxResponse, String defaultContentType) {

boolean returnSseEvent = SseEvent.class.equals(responseType.getRawType());
GenericType responseTypeFirstParam = responseType.getType() instanceof ParameterizedType
? new GenericType(((ParameterizedType) responseType.getType()).getActualTypeArguments()[0])
: null;

// honestly, isn't reconnect contradictory with completion?
// FIXME: Reconnect settings?
// For now we don't want multi to reconnect
Expand All @@ -165,10 +174,39 @@ private <R> void registerForSse(MultiRequest<? super R> multiRequest,
sseSource.register(event -> {
// DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or
// the content-type SSE field
R item = event.readData(responseType);
if (item != null) { // we don't emit null because it breaks Multi (by design)
multiRequest.emit(item);
if (returnSseEvent) {
multiRequest.emit((R) new SseEvent() {
@Override
public String id() {
return event.getId();
}

@Override
public String name() {
return event.getName();
}

@Override
public String comment() {
return event.getComment();
}

@Override
public Object data() {
if (responseTypeFirstParam != null) {
return event.readData(responseTypeFirstParam);
} else {
return event.readData(); // TODO: is this correct?
}
}
});
} else {
R item = event.readData(responseType);
if (item != null) { // we don't emit null because it breaks Multi (by design)
multiRequest.emit(item);
}
}

}, multiRequest::fail, multiRequest::complete);
// watch for user cancelling
sseSource.registerAfterRequest(vertxResponse);
Expand Down

0 comments on commit 6b66359

Please sign in to comment.