diff --git a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java index 7102f37d5f..93f8aba948 100644 --- a/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java +++ b/connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -31,9 +31,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -521,7 +523,7 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing try { final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request); - responseContext.setEntityStream(getInputStream(response, closingMechanism)); + responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled())); } catch (final IOException e) { LOGGER.log(Level.SEVERE, null, e); } @@ -730,13 +732,14 @@ private static Map writeOutBoundHeaders(final ClientRequest clie } private static InputStream getInputStream(final CloseableHttpResponse response, - final ConnectionClosingMechanism closingMechanism) throws IOException { + final ConnectionClosingMechanism closingMechanism, + final Supplier isCancelled) throws IOException { final InputStream inputStream; if (response.getEntity() == null) { inputStream = new ByteArrayInputStream(new byte[0]); } else { - final InputStream i = response.getEntity().getContent(); + final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled); if (i.markSupported()) { inputStream = i; } else { @@ -885,4 +888,68 @@ protected void prepareSocket(SSLSocket socket) throws IOException { } } } + + private static class CancellableInputStream extends InputStream { + private final InputStream in; + private final Supplier isCancelled; + + private CancellableInputStream(InputStream in, Supplier isCancelled) { + this.in = in; + this.isCancelled = isCancelled; + } + + public int read(byte b[]) throws IOException { + checkAborted(); + return in.read(); + } + + public int read(byte b[], int off, int len) throws IOException { + checkAborted(); + return in.read(b, off, len); + } + + @Override + public int read() throws IOException { + checkAborted(); + return in.read(); + } + + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public long skip(long n) throws IOException { + checkAborted(); + return in.skip(n); + } + + @Override + public int available() throws IOException { + checkAborted(); + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + checkAborted(); + in.reset(); + } + + private void checkAborted() throws IOException { + if (isCancelled.get()) { + throw new IOException(new CancellationException()); + } + } + } } diff --git a/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java b/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java index d8328ff451..92fb44becc 100644 --- a/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java +++ b/connectors/apache5-connector/src/main/java/org/glassfish/jersey/apache5/connector/Apache5Connector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -32,8 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -531,7 +533,7 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing try { final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request); - responseContext.setEntityStream(getInputStream(response, closingMechanism)); + responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled())); } catch (final IOException e) { LOGGER.log(Level.SEVERE, null, e); } @@ -741,13 +743,14 @@ private static Map writeOutBoundHeaders(final ClientRequest clie } private static InputStream getInputStream(final CloseableHttpResponse response, - final ConnectionClosingMechanism closingMechanism) throws IOException { + final ConnectionClosingMechanism closingMechanism, + final Supplier isCancelled) throws IOException { final InputStream inputStream; if (response.getEntity() == null) { inputStream = new ByteArrayInputStream(new byte[0]); } else { - final InputStream i = response.getEntity().getContent(); + final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled); if (i.markSupported()) { inputStream = i; } else { @@ -889,4 +892,69 @@ protected void prepareSocket(SSLSocket socket) throws IOException { } } } + + private static class CancellableInputStream extends InputStream { + private final InputStream in; + private final Supplier isCancelled; + + private CancellableInputStream(InputStream in, Supplier isCancelled) { + this.in = in; + this.isCancelled = isCancelled; + } + + public int read(byte b[]) throws IOException { + checkAborted(); + return in.read(); + } + + public int read(byte b[], int off, int len) throws IOException { + checkAborted(); + return in.read(b, off, len); + } + + @Override + public int read() throws IOException { + checkAborted(); + return in.read(); + } + + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public long skip(long n) throws IOException { + checkAborted(); + return in.skip(n); + } + + @Override + public int available() throws IOException { + checkAborted(); + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + checkAborted(); + in.reset(); + } + + private void checkAborted() throws IOException { + if (isCancelled.get()) { + throw new IOException(new CancellationException()); + } + } + } + } diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java index 5ee77c4f32..717290514f 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -24,6 +24,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; @@ -157,6 +158,10 @@ protected void notifyResponse() { @Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { + if (jerseyRequest.isCancelled()) { + responseAvailable.completeExceptionally(new CancellationException()); + return; + } if (msg instanceof HttpResponse) { final HttpResponse response = (HttpResponse) msg; jerseyResponse = new ClientResponse(new Response.StatusType() { diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java index 46af3b4af4..f20f7cdbf2 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientRequest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -25,6 +25,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -90,6 +94,8 @@ public class ClientRequest extends OutboundMessageContext private LazyValue propertiesResolver = Values.lazy( (Value) () -> PropertiesResolver.create(getConfiguration(), getPropertiesDelegate()) ); + // by default nothing to be cancelled. + private Future cancellable = NotCancellable.INSTANCE; private static final Logger LOGGER = Logger.getLogger(ClientRequest.class.getName()); @@ -126,6 +132,7 @@ public ClientRequest(final ClientRequest original) { this.writerInterceptors = original.writerInterceptors; this.propertiesDelegate = new MapPropertiesDelegate(original.propertiesDelegate); this.ignoreUserAgent = original.ignoreUserAgent; + this.cancellable = original.cancellable; } @Override @@ -584,4 +591,66 @@ public boolean ignoreUserAgent() { public void ignoreUserAgent(final boolean ignore) { this.ignoreUserAgent = ignore; } + + /** + * Sets the new {@code Future} that may cancel this {@link ClientRequest}. + * @param cancellable + */ + void setCancellable(Future cancellable) { + this.cancellable = cancellable; + } + + /** + * Cancels this {@link ClientRequest}. May result in {@link java.util.concurrent.CancellationException} later in this + * request processing if this {@link ClientRequest} is backed by a {@link Future} provided to + * {@link JerseyInvocation.Builder#setCancellable(Future)}. + * @param mayInterruptIfRunning may have no effect or {@code true} if the thread executing this task should be interrupted + * (if the thread is known to the implementation); + * otherwise, in-progress tasks are allowed to complete + */ + public void cancel(boolean mayInterruptIfRunning) { + cancellable.cancel(mayInterruptIfRunning); + } + + /** + * Returns {@code true} if this {@link ClientRequest} was cancelled + * before it completed normally. + * + * @return {@code true} if this {@link ClientRequest} was cancelled + * before it completed normally + */ + public boolean isCancelled() { + return cancellable.isCancelled(); + } + + private static class NotCancellable implements Future { + public static final Future INSTANCE = new NotCancellable(); + private boolean isCancelled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + isCancelled = true; + return isCancelled; + } + + @Override + public boolean isCancelled() { + return isCancelled; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } + } } diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java index 2f59ae7f00..3dfa5a0184 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011, 2021 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2011, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -573,6 +573,18 @@ private T createRxInvoker(Class clazz throw new IllegalStateException( LocalizationMessages.CLIENT_RX_PROVIDER_NOT_REGISTERED(clazz.getSimpleName())); } + + /** + * Sets Future that backs {@link ClientRequest} {@link ClientRequest#isCancelled()} method. Can be used for instance + * by {@link CompletionStageRxInvoker} to pass the created {@link CompletableFuture} to the provided {@link SyncInvoker}. + * @param cancellable the {@link Future} whose result of {@link Future#cancel(boolean)} will be available by + * {@link ClientRequest#isCancelled()}. + * @return the updated builder. + */ + public Builder setCancellable(Future cancellable) { + requestContext.setCancellable(cancellable); + return this; + } } /* package */ static class AsyncInvoker extends CompletableFutureAsyncInvoker implements javax.ws.rs.client.AsyncInvoker { @@ -711,6 +723,8 @@ private T call(Producer producer, RequestScope scope) public Future submit() { final CompletableFuture responseFuture = new CompletableFuture<>(); final ClientRuntime runtime = request().getClientRuntime(); + + requestContext.setCancellable(responseFuture); runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), new InvocationResponseCallback<>(responseFuture, (request, scope) -> translate(request, scope, Response.class)))); @@ -725,6 +739,7 @@ public Future submit(final Class responseType) { final CompletableFuture responseFuture = new CompletableFuture<>(); final ClientRuntime runtime = request().getClientRuntime(); + requestContext.setCancellable(responseFuture); runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), new InvocationResponseCallback(responseFuture, (request, scope) -> translate(request, scope, responseType)))); @@ -764,6 +779,7 @@ public Future submit(final GenericType responseType) { final CompletableFuture responseFuture = new CompletableFuture<>(); final ClientRuntime runtime = request().getClientRuntime(); + requestContext.setCancellable(responseFuture); runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), new InvocationResponseCallback(responseFuture, (request, scope) -> translate(request, scope, responseType)))); @@ -888,6 +904,7 @@ public void failed(final ProcessingException error) { } }; final ClientRuntime runtime = request().getClientRuntime(); + requestContext.setCancellable(responseFuture); runtime.submit(runtime.createRunnableForAsyncProcessing(requestForCall(requestContext), responseCallback)); } catch (final Throwable error) { final ProcessingException ce; diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java index afea5f0bb8..ebe11845f9 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2011, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Supplier; @@ -155,7 +156,7 @@ public HttpUrlConnector( ); } - private static InputStream getInputStream(final HttpURLConnection uc) throws IOException { + private static InputStream getInputStream(final HttpURLConnection uc, final ClientRequest clientRequest) throws IOException { return new InputStream() { private final UnsafeValue in = Values.lazy(new UnsafeValue() { @Override @@ -190,6 +191,10 @@ private void throwIOExceptionIfClosed() throws IOException { if (closed) { throw new IOException("Stream closed"); } + if (clientRequest.isCancelled()) { + close(); + throw new IOException(new CancellationException()); + } } @Override @@ -311,7 +316,7 @@ protected void secureConnection(final JerseyClient client, final HttpURLConnecti if (DEFAULT_SSL_SOCKET_FACTORY.get() == suc.getSSLSocketFactory()) { // indicates that the custom socket factory was not set suc.setSSLSocketFactory(sslSocketFactory.get()); - } + } } } @@ -448,7 +453,7 @@ private ClientResponse _apply(final ClientRequest request) throws IOException { ); try { - InputStream inputStream = getInputStream(uc); + InputStream inputStream = getInputStream(uc, request); responseContext.setEntityStream(inputStream); } catch (IOException ioe) { // allow at least a partial response in a ResponseProcessingException diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java new file mode 100644 index 0000000000..31a64d7f4b --- /dev/null +++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/connector/FutureCancelTest.java @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.tests.e2e.client.connector; + +import org.glassfish.jersey.apache.connector.ApacheConnectorProvider; +import org.glassfish.jersey.apache5.connector.Apache5ConnectorProvider; +import org.glassfish.jersey.client.AbstractRxInvoker; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.HttpUrlConnectorProvider; +import org.glassfish.jersey.client.JerseyInvocation; +import org.glassfish.jersey.client.RequestEntityProcessing; +import org.glassfish.jersey.client.spi.ConnectorProvider; +import org.glassfish.jersey.netty.connector.NettyConnectorProvider; +import org.glassfish.jersey.server.ChunkedOutput; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.CompletionStageRxInvoker; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.RxInvokerProvider; +import javax.ws.rs.client.SyncInvoker; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Function; +import java.util.function.Supplier; + +public class FutureCancelTest extends JerseyTest { + + public static final long SLEEP = 100L; + + public static List testData() { + return Arrays.asList( + new ApacheConnectorProvider(), + new Apache5ConnectorProvider(), + new HttpUrlConnectorProvider(), + new NettyConnectorProvider() + ); + } + + @Path("/") + public static class FutureCancelResource { + @GET + public ChunkedOutput sendData() { + ChunkedOutput chunkedOutput = new ChunkedOutput<>(String.class); + Thread newThread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i != 100; i++) { + try { + chunkedOutput.write(String.valueOf(i)); + Thread.sleep(SLEEP); + } catch (Exception e) { + // consume + } + } + } + }); + newThread.start(); + + return chunkedOutput; + } + } + + @Override + protected Application configure() { + return new ResourceConfig(FutureCancelResource.class); + } + + @ParameterizedTest + @MethodSource("testData") + public void testFutureCancel(ConnectorProvider connectorProvider) throws InterruptedException, ExecutionException { + ClientConfig config = new ClientConfig(); + config.connectorProvider(connectorProvider); + + Future> future = ClientBuilder.newClient(config) + .register(new FutureCancelRxInvokerProvider()) + .property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED) + .target(target().getUri()).request().rx(FutureCancelRxInvoker.class).get().toCompletableFuture(); + + int expectedSize = 2; + + while (RX_LIST.size() < expectedSize) { + Thread.sleep(SLEEP); + } + future.cancel(true); + + Thread.sleep(2 * SLEEP); // wait to see no new messages arrive + int size = RX_LIST.size(); // some might have beween RX_LIST.size() and cancel() + while (size > expectedSize) { // be sure no more come + Thread.sleep(SLEEP); + expectedSize = size; + size = RX_LIST.size(); + } + + Assertions.assertTrue(size < 10, "Received " + size + " messages"); + } + + private static List RX_LIST = new LinkedList<>(); + + public static class FutureCancelRxInvokerProvider implements RxInvokerProvider { + + Function function = new Function() { + @Override + public Object apply(InputStream inputStream) { + byte[] number = new byte[8]; + int len = 0; + do { + try { + if ((len = inputStream.read(number)) != -1) { + RX_LIST.add(new String(number).substring(0, len)); + } else { + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } while (true); + return RX_LIST; + } + }; + + @Override + public boolean isProviderFor(Class clazz) { + return FutureCancelRxInvoker.class.equals(clazz); + } + + @Override + public FutureCancelRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) { + return new FutureCancelRxInvoker(syncInvoker, executorService, function); + } + } + + private static class FutureCancelRxInvoker extends AbstractRxInvoker implements CompletionStageRxInvoker { + private final Function consumer; + + public FutureCancelRxInvoker(SyncInvoker syncInvoker, ExecutorService executor, Function consumer) { + super(syncInvoker, executor); + this.consumer = consumer; + } + + @Override + public CompletionStage method(String name, Entity entity, Class responseType) { + CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() { + @Override + public R get() { + Response r = getSyncInvoker().get(); + InputStream is = r.readEntity(InputStream.class); + Object o = consumer.apply(is); + return (R) o; + } + }, getExecutorService()); + ((JerseyInvocation.Builder) getSyncInvoker()).setCancellable(completableFuture); + return completableFuture; + } + + @Override + public CompletionStage method(String name, Entity entity, GenericType responseType) { + CompletableFuture completableFuture = CompletableFuture.supplyAsync(new Supplier() { + @Override + public R get() { + Response r = getSyncInvoker().get(); + InputStream is = r.readEntity(InputStream.class); + Object o = consumer.apply(is); + return (R) o; + } + }, getExecutorService()); + return completableFuture; + } + } +}