Skip to content

Commit

Permalink
Merge pull request #24236 from michalszynkiewicz/rcr-ndjson
Browse files Browse the repository at this point in the history
Support consuming json stream and ndjson in REST Client Reactive
  • Loading branch information
michalszynkiewicz authored Mar 10, 2022
2 parents e43c637 + 2fa8961 commit e48cd9a
Show file tree
Hide file tree
Showing 7 changed files with 560 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.quarkus.resteasy.reactive.jackson.deployment.processor;

import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_NDJSON;
import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_STREAM_JSON;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -126,29 +128,29 @@ void additionalProviders(List<JacksonFeatureBuildItem> jacksonFeatureBuildItems,

additionalReaders
.produce(new MessageBodyReaderBuildItem(ServerJacksonMessageBodyReader.class.getName(), Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON)));
additionalReaders
.produce(new MessageBodyReaderBuildItem(VertxJsonArrayMessageBodyReader.class.getName(),
JsonArray.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON)));
additionalReaders
.produce(new MessageBodyReaderBuildItem(VertxJsonObjectMessageBodyReader.class.getName(),
JsonObject.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(getJacksonMessageBodyWriter(applicationNeedsSpecialJacksonFeatures),
Object.class.getName(),
List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON,
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON,
RestMediaType.APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(VertxJsonArrayMessageBodyWriter.class.getName(),
JsonArray.class.getName(),
List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON,
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON,
RestMediaType.APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(VertxJsonObjectMessageBodyWriter.class.getName(),
JsonObject.class.getName(),
List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON,
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON,
RestMediaType.APPLICATION_STREAM_JSON)));
}

Expand All @@ -165,7 +167,7 @@ void handleJsonAnnotations(Optional<ResourceScanningResultBuildItem> resourceSca
BuildProducer<ReflectiveClassBuildItem> reflectiveClassProducer,
BuildProducer<JacksonFeatureBuildItem> jacksonFeaturesProducer,
ResteasyReactiveServerJacksonRecorder recorder, ShutdownContextBuildItem shutdown) {
if (!resourceScanningResultBuildItem.isPresent()) {
if (resourceScanningResultBuildItem.isEmpty()) {
return;
}
Collection<ClassInfo> resourceClasses = resourceScanningResultBuildItem.get().getResult().getScannedResources()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.quarkus.rest.client.reactive.jackson.deployment;

import static io.quarkus.deployment.Feature.REST_CLIENT_REACTIVE_JACKSON;
import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_NDJSON;
import static org.jboss.resteasy.reactive.common.util.RestMediaType.APPLICATION_STREAM_JSON;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -48,15 +50,15 @@ void additionalProviders(

additionalReaders
.produce(new MessageBodyReaderBuildItem(JacksonBasicMessageBodyReader.class.getName(), Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON)));
additionalReaders
.produce(new MessageBodyReaderBuildItem(VertxJsonArrayBasicMessageBodyReader.class.getName(),
JsonArray.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON)));
additionalReaders
.produce(new MessageBodyReaderBuildItem(VertxJsonObjectBasicMessageBodyReader.class.getName(),
JsonObject.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, APPLICATION_NDJSON, APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(ClientJacksonMessageBodyWriter.class.getName(), Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>quarkus-smallrye-fault-tolerance-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package io.quarkus.rest.client.reactive;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.jboss.resteasy.reactive.RestStreamElementType;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.Route;
import io.smallrye.mutiny.Multi;
import io.vertx.core.Vertx;
import io.vertx.ext.web.RoutingContext;

public class MultiNdjsonTest {
@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest();

@TestHTTPResource
URI uri;

@Test
void shouldReadNdjsonStringAsMulti() throws InterruptedException {
var client = RestClientBuilder.newBuilder().baseUri(uri)
.build(Client.class);
var collected = new CopyOnWriteArrayList<String>();
var completionLatch = new CountDownLatch(1);
client.readString().onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);

if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
assertThat(collected).hasSize(4)
.contains("one", "two", "three", "four");
}

@Test
void shouldReadNdjsonPojoAsMulti() throws InterruptedException {
var client = RestClientBuilder.newBuilder().baseUri(uri)
.build(Client.class);
var collected = new CopyOnWriteArrayList<Message>();
var completionLatch = new CountDownLatch(1);
client.readPojo().onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);

if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
var expected = Arrays.asList(Message.of("one", "1"),
Message.of("two", "2"), Message.of("three", "3"),
Message.of("four", "4"));
assertThat(collected).hasSize(4).containsAll(expected);
}

@Test
void shouldReadNdjsonPojoFromReactiveRoutes() throws InterruptedException {
URI reactiveRoutesBaseUri = URI.create(uri.toString() + "/rr");
var client = RestClientBuilder.newBuilder().baseUri(reactiveRoutesBaseUri)
.build(Client.class);
var collected = new CopyOnWriteArrayList<Message>();
var completionLatch = new CountDownLatch(1);
client.readPojo().onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);

if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
var expected = Arrays.asList(Message.of("superman", "1"),
Message.of("batman", "2"), Message.of("spiderman", "3"));
assertThat(collected).hasSize(3).containsAll(expected);
}

@Test
void shouldReadNdjsonFromSingleMessage() throws InterruptedException {
var client = RestClientBuilder.newBuilder().baseUri(uri)
.build(Client.class);
var collected = new CopyOnWriteArrayList<Message>();
var completionLatch = new CountDownLatch(1);
client.readPojoSingle().onCompletion().invoke(completionLatch::countDown)
.subscribe().with(collected::add);

if (!completionLatch.await(5, TimeUnit.SECONDS)) {
fail("Streaming did not complete in time");
}
var expected = Arrays.asList(
Message.of("zero", "0"), Message.of("one", "1"),
Message.of("two", "2"), Message.of("three", "3"));
assertThat(collected).hasSize(4).containsAll(expected);
}

@Path("/stream")
public interface Client {
@GET
@Path("/string")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<String> readString();

@GET
@Path("/pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readPojo();

@GET
@Path("/single-pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
Multi<Message> readPojoSingle();
}

public static class ReactiveRoutesResource {
@Route(path = "/rr/stream/pojo", produces = ReactiveRoutes.ND_JSON)
Multi<Message> people(RoutingContext context) {
return Multi.createFrom().items(
Message.of("superman", "1"),
Message.of("batman", "2"),
Message.of("spiderman", "3"));
}
}

@Path("/stream")
public static class StreamingResource {
@Inject
Vertx vertx;

@GET
@Path("/string")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<String> readString() {
return Multi.createFrom().emitter(
em -> {
em.emit("one");
em.emit("two");
em.emit("three");
em.emit("four");
em.complete();
});
}

@GET
@Path("/pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> readPojo() {
return Multi.createFrom().emitter(
em -> {
em.emit(Message.of("one", "1"));
em.emit(Message.of("two", "2"));
em.emit(Message.of("three", "3"));
vertx.setTimer(100, id -> {
em.emit(Message.of("four", "4"));
em.complete();
});
});
}

@GET
@Path("/single-pojo")
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public String getPojosAsString() throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
StringBuilder result = new StringBuilder();
ObjectWriter objectWriter = mapper.writerFor(Message.class);
for (var msg : List.of(Message.of("zero", "0"),
Message.of("one", "1"),
Message.of("two", "2"),
Message.of("three", "3"))) {
result.append(objectWriter.writeValueAsString(msg));
result.append("\n");
}
return result.toString();
}
}

public static class Message {
public String name;
public String value;

public static Message of(String name, String value) {
Message message = new Message();
message.name = name;
message.value = value;
return message;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Message message = (Message) o;
return Objects.equals(name, message.name) && Objects.equals(value, message.value);
}

@Override
public int hashCode() {
return Objects.hash(name, value);
}

@Override
public String toString() {
return "Message{" +
"name='" + name + '\'' +
", value='" + value + '\'' +
'}';
}
}
}
Loading

0 comments on commit e48cd9a

Please sign in to comment.