Skip to content

Commit

Permalink
RESTEasy Reactive: implement cancel for Multi on both client and server
Browse files Browse the repository at this point in the history
  • Loading branch information
FroMage committed Jan 5, 2021
1 parent 75ee1d5 commit 7b87b68
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package io.quarkus.resteasy.reactive.server.test.stream;

import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand All @@ -9,6 +14,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.buffer.Buffer;

@Path("stream")
Expand Down Expand Up @@ -76,4 +82,45 @@ public static Uni<Buffer> concatenateBuffers(Multi<Buffer> multi) {
return multi.collectItems().in(() -> Buffer.buffer(INITIAL_BUFFER_SIZE),
(accumulatingBuffer, receivedBuffer) -> accumulatingBuffer.appendBuffer(receivedBuffer));
}

private boolean receivedCancel = false;

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("infinite/stream")
public Multi<String> infiniteStream() {
receivedCancel = false;
return Multi.createFrom().emitter(emitter -> {
ScheduledExecutorService scheduler = Infrastructure.getDefaultWorkerPool();
// this should never complete, but let's kill it after 30 seconds
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
String str = "Called at " + new Date();
emitter.emit(str);
}, 0, 1, TimeUnit.SECONDS);

// catch client close
emitter.onTermination(() -> {
if (emitter.isCancelled()) {
receivedCancel = true;
if (!future.isCancelled())
future.cancel(true);
}
});

// die in 30s max
scheduler.schedule(() -> {
if (!future.isCancelled()) {
future.cancel(true);
// just in case
emitter.complete();
}
}, 30, TimeUnit.SECONDS);
});
}

@GET
@Path("infinite/stream-was-cancelled")
public String infiniteStreamWasCancelled() {
return receivedCancel ? "OK" : "KO";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
Expand All @@ -20,6 +22,7 @@
import io.quarkus.test.common.http.TestHTTPResource;
import io.restassured.RestAssured;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;

public class StreamTestCase {

Expand Down Expand Up @@ -80,4 +83,44 @@ public void testClientStreaming() throws Exception {
Assertions.assertEquals("foo", list.get(0));
Assertions.assertEquals("bar", list.get(1));
}

@Test
public void testInfiniteStreamClosedByClientImmediately() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/infinite/stream");
Multi<String> multi = target.request().rx(MultiInvoker.class).get(String.class);
Cancellable cancellable = multi.subscribe().with(item -> {
System.err.println("Received " + item);
});
// immediately cancel
cancellable.cancel();
// give it some time and check
Thread.sleep(2000);

WebTarget checkTarget = client.target(uri.toString() + "stream/infinite/stream-was-cancelled");
String check = checkTarget.request().get(String.class);
Assertions.assertEquals("OK", check);
}

@Test
public void testInfiniteStreamClosedByClientAfterRegistration() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "stream/infinite/stream");
Multi<String> multi = target.request().rx(MultiInvoker.class).get(String.class);
// cancel after two items
CountDownLatch latch = new CountDownLatch(2);
Cancellable cancellable = multi.subscribe().with(item -> {
System.err.println("Received " + item);
latch.countDown();
});
Assertions.assertTrue(latch.await(30, TimeUnit.SECONDS));
// now cancel
cancellable.cancel();
// give it some time and check
Thread.sleep(2000);

WebTarget checkTarget = client.target(uri.toString() + "stream/infinite/stream-was-cancelled");
String check = checkTarget.request().get(String.class);
Assertions.assertEquals("OK", check);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import io.vertx.core.net.impl.ConnectionBase;
import java.io.ByteArrayInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker.MultiRequest;

public class MultiInvoker extends AbstractRxInvoker<Multi<?>> {

Expand All @@ -34,34 +36,87 @@ public <R> Multi<R> get(GenericType<R> responseType) {
return (Multi<R>) super.get(responseType);
}

/**
* We need this class to work around a bug in Mutiny where we can register our cancel listener
* after the subscription is cancelled and we never get notified
* See https://github.com/smallrye/smallrye-mutiny/issues/417
*/
static class MultiRequest<R> {

private final AtomicReference<Runnable> onCancel = new AtomicReference<>();

private MultiEmitter<? super R> emitter;

private static final Runnable CLEARED = () -> {
};

public MultiRequest(MultiEmitter<? super R> emitter) {
this.emitter = emitter;
emitter.onTermination(() -> {
if (emitter.isCancelled()) {
this.cancel();
}
});
}

public boolean isCancelled() {
return onCancel.get() == CLEARED;
}

private void cancel() {
Runnable action = onCancel.getAndSet(CLEARED);
if (action != null && action != CLEARED) {
action.run();
}
}

public void onCancel(Runnable onCancel) {
if (this.onCancel.compareAndSet(null, onCancel)) {
// this was a first set
} else if (this.onCancel.get() == CLEARED) {
// already cleared
if (onCancel != null)
onCancel.run();
} else {
// it was already set
throw new IllegalArgumentException("onCancel was already called");
}
}
}

@Override
public <R> Multi<R> method(String name, Entity<?> entity, GenericType<R> responseType) {
AsyncInvokerImpl invoker = (AsyncInvokerImpl) target.request().rx();
// FIXME: backpressure setting?
return Multi.createFrom().emitter(emitter -> {
MultiRequest<R> multiRequest = new MultiRequest<>(emitter);
RestClientRequestContext restClientRequestContext = invoker.performRequestInternal(name, entity, responseType,
false);
restClientRequestContext.getResult().handle((response, connectionError) -> {
if (connectionError != null) {
emitter.fail(connectionError);
} else {
HttpClientResponse vertxResponse = restClientRequestContext.getVertxClientResponse();
// FIXME: this is probably not good enough
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(emitter, responseType, response, vertxResponse);
if (!emitter.isCancelled()) {
// FIXME: this is probably not good enough
if (response.getStatus() == 200
&& MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) {
registerForSse(multiRequest, responseType, response, vertxResponse);
} else {
// read stuff in chunks
registerForChunks(multiRequest, restClientRequestContext, responseType, response, vertxResponse);
}
vertxResponse.resume();
} else {
// read stuff in chunks
registerForChunks(emitter, restClientRequestContext, responseType, response, vertxResponse);
vertxResponse.request().connection().close();
}
vertxResponse.resume();
}
return null;
});
});
}

private <R> void registerForSse(MultiEmitter<? super R> emitter,
private <R> void registerForSse(MultiRequest<? super R> multiRequest,
GenericType<R> responseType,
Response response,
HttpClientResponse vertxResponse) {
Expand All @@ -73,16 +128,20 @@ private <R> void registerForSse(MultiEmitter<? super R> emitter,
sseSource.register(event -> {
// DO NOT pass the response mime type because it's SSE: let the event pick between the X-SSE-Content-Type header or
// the content-type SSE field
emitter.emit((R) event.readData(responseType));
multiRequest.emitter.emit((R) event.readData(responseType));
}, error -> {
emitter.fail(error);
multiRequest.emitter.fail(error);
}, () -> {
emitter.complete();
multiRequest.emitter.complete();
});
// watch for user cancelling
multiRequest.onCancel(() -> {
sseSource.close();
});
sseSource.registerAfterRequest(vertxResponse);
}

private <R> void registerForChunks(MultiEmitter<? super R> emitter,
private <R> void registerForChunks(MultiRequest<? super R> multiRequest,
RestClientRequestContext restClientRequestContext,
GenericType<R> responseType,
Response response,
Expand All @@ -93,13 +152,13 @@ private <R> void registerForChunks(MultiEmitter<? super R> emitter,
if (t == ConnectionBase.CLOSED_EXCEPTION) {
// we can ignore this one since we registered a closeHandler
} else {
emitter.fail(t);
multiRequest.emitter.fail(t);
}
});
HttpConnection connection = vertxClientResponse.request().connection();
// this captures the server closing
connection.closeHandler(v -> {
emitter.complete();
multiRequest.emitter.complete();
});
vertxClientResponse.handler(new Handler<Buffer>() {
@Override
Expand All @@ -108,18 +167,22 @@ public void handle(Buffer buffer) {
ByteArrayInputStream in = new ByteArrayInputStream(buffer.getBytes());
R item = restClientRequestContext.readEntity(in, responseType, response.getMediaType(),
response.getMetadata());
emitter.emit(item);
multiRequest.emitter.emit(item);
} catch (Throwable t) {
// FIXME: probably close the client too? watch out that it doesn't call our close handler
// which calls emitter.complete()
emitter.fail(t);
multiRequest.emitter.fail(t);
}
}
});
// this captures the end of the response
// FIXME: won't this call complete twice()?
vertxClientResponse.endHandler(v -> {
emitter.complete();
multiRequest.emitter.complete();
});
// watch for user cancelling
multiRequest.onCancel(() -> {
vertxClientResponse.request().connection().close();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,17 @@ public Object apply(Object v, Throwable t) {
static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
protected Subscription subscription;
protected ResteasyReactiveRequestContext requestContext;
private boolean weClosed = false;

AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext) {
this.requestContext = requestContext;
// let's make sure we never restart by accident, also make sure we're not marked as completed
requestContext.restart(AWOL, true);
requestContext.serverResponse().addCloseHandler(() -> {
if (!weClosed && this.subscription != null) {
subscription.cancel();
}
});
}

@Override
Expand All @@ -88,6 +94,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onComplete() {
// make sure we don't trigger cancel with our onCloseHandler
weClosed = true;
// no need to cancel on complete
// FIXME: are we interested in async completion?
requestContext.serverResponse().end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public interface ServerHttpResponse {
OutputStream createResponseOutputStream();

void setPreCommitListener(Consumer<ResteasyReactiveRequestContext> task);

ServerHttpResponse addCloseHandler(Runnable onClose);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public VertxResteasyReactiveRequestContext(Deployment deployment, ProvidersImpl
}
}

@Override
public ServerHttpResponse addCloseHandler(Runnable onClose) {
this.response.closeHandler(v -> {
onClose.run();
});
return this;
}

public RoutingContext getContext() {
return context;
}
Expand Down

0 comments on commit 7b87b68

Please sign in to comment.