From e2ab65d5e685b8dde9a081c33ed8e11b3bf5b67a Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Tue, 12 Sep 2023 19:51:55 +0200 Subject: [PATCH 1/3] Netty Expect:100-continue feature support Signed-off-by: Maxim Nesen --- .../Expect100ContinueConnectorExtension.java | 78 +++++++++++ .../netty/connector/NettyConnector.java | 28 +++- .../httpserver/JerseyServerInitializer.java | 4 +- .../client/internal/ConnectorExtension.java | 4 +- .../client/internal/HttpUrlConnector.java | 4 +- tests/e2e-client/pom.xml | 5 + .../nettyconnector/Expect100ContinueTest.java | 128 ++++++++++++++++++ 7 files changed, 239 insertions(+), 12 deletions(-) create mode 100644 connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java create mode 100644 tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/nettyconnector/Expect100ContinueTest.java diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java new file mode 100644 index 0000000000..e93bf0044b --- /dev/null +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.connector; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpRequest; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientRequest; +import org.glassfish.jersey.client.RequestEntityProcessing; +import org.glassfish.jersey.client.internal.ConnectorExtension; + +import javax.ws.rs.HttpMethod; +import java.io.IOException; +import java.net.ProtocolException; + +class Expect100ContinueConnectorExtension + implements ConnectorExtension { + + private static final String EXCEPTION_MESSAGE = "Server rejected operation"; + + @Override + public void invoke(ClientRequest request, HttpRequest extensionParam) { + + + final long length = request.getLengthLong(); + final RequestEntityProcessing entityProcessing = request.resolveProperty( + ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class); + + final Boolean expectContinueActivated = request.resolveProperty( + ClientProperties.EXPECT_100_CONTINUE, Boolean.class); + final Long expectContinueSizeThreshold = request.resolveProperty( + ClientProperties.EXPECT_100_CONTINUE_THRESHOLD_SIZE, + ClientProperties.DEFAULT_EXPECT_100_CONTINUE_THRESHOLD_SIZE); + + final boolean allowStreaming = length > expectContinueSizeThreshold + || entityProcessing == RequestEntityProcessing.CHUNKED; + + if (!Boolean.TRUE.equals(expectContinueActivated) + || !(HttpMethod.POST.equals(request.getMethod()) || HttpMethod.PUT.equals(request.getMethod())) + || !allowStreaming + ) { + return; + } + extensionParam.headers().add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE); + + + } + + @Override + public void postConnectionProcessing(HttpRequest extensionParam) { + + } + + @Override + public boolean handleException(ClientRequest request, HttpRequest extensionParam, IOException ex) { + final Boolean expectContinueActivated = request.resolveProperty( + ClientProperties.EXPECT_100_CONTINUE, Boolean.FALSE); + + return expectContinueActivated + && (ex instanceof ProtocolException && ex.getMessage().equals(EXCEPTION_MESSAGE)); + } + +} diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index b3289e1def..3274b0f503 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java @@ -45,6 +45,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -384,14 +385,11 @@ protected void initChannel(SocketChannel ch) throws Exception { if (jerseyRequest.hasEntity()) { // guard against prematurely closed channel final GenericFutureListener> closeListener = - new GenericFutureListener>() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + future -> { if (!responseDone.isDone()) { responseDone.completeExceptionally(new IOException("Channel closed.")); } - } - }; + }; ch.closeFuture().addListener(closeListener); final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch); @@ -407,8 +405,24 @@ public void operationComplete(io.netty.util.concurrent.Future futu // break; } - // Send the HTTP request. - entityWriter.writeAndFlush(nettyRequest); + //check for 100-Continue presence/availability + final Expect100ContinueConnectorExtension expect100ContinueExtension + = new Expect100ContinueConnectorExtension(); + + final DefaultFullHttpRequest rq = new DefaultFullHttpRequest(nettyRequest.protocolVersion(), + nettyRequest.method(), nettyRequest.uri()); + rq.headers().setAll(nettyRequest.headers()); + expect100ContinueExtension.invoke(jerseyRequest, rq); + + if (HttpUtil.is100ContinueExpected(rq)) { + ch.pipeline().writeAndFlush(rq).sync().awaitUninterruptibly().addListener( + (ChannelFutureListener) channelFuture -> + ch.pipeline().writeAndFlush(nettyRequest) + ); + } else { + // Send the HTTP request. + entityWriter.writeAndFlush(nettyRequest); + } jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() { @Override diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerInitializer.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerInitializer.java index d492449c56..5f4a9595c6 100644 --- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerInitializer.java +++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerInitializer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -25,6 +25,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpServerExpectContinueHandler; import io.netty.handler.codec.http.HttpServerUpgradeHandler; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder; @@ -115,6 +116,7 @@ private void configureClearText(SocketChannel ch) { final HttpServerCodec sourceCodec = new HttpServerCodec(); p.addLast(sourceCodec); + p.addLast("respondExpectContinue", new HttpServerExpectContinueHandler()); p.addLast(new HttpServerUpgradeHandler(sourceCodec, new HttpServerUpgradeHandler.UpgradeCodecFactory() { @Override public HttpServerUpgradeHandler.UpgradeCodec newUpgradeCodec(CharSequence protocol) { diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/ConnectorExtension.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/ConnectorExtension.java index 022cbc6b2e..98e47bb4bd 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/internal/ConnectorExtension.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/ConnectorExtension.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020, 2023 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -26,7 +26,7 @@ * * @since 2.33 */ -interface ConnectorExtension { +public interface ConnectorExtension { /** * Main function which allows extension of connector's functionality diff --git a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java index 2054dcf079..1dbd9d8b1e 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/internal/HttpUrlConnector.java @@ -405,7 +405,7 @@ private ClientResponse _apply(final ClientRequest request) throws IOException { } } - processExtentions(request, uc); + processExtensions(request, uc); request.setStreamProvider(contentLength -> { setOutboundHeaders(request.getStringHeaders(), uc); @@ -579,7 +579,7 @@ public Object run() throws NoSuchFieldException, } } - private void processExtentions(ClientRequest request, HttpURLConnection uc) { + private void processExtensions(ClientRequest request, HttpURLConnection uc) { connectorExtension.invoke(request, uc); } diff --git a/tests/e2e-client/pom.xml b/tests/e2e-client/pom.xml index 4105828640..5561b4700f 100644 --- a/tests/e2e-client/pom.xml +++ b/tests/e2e-client/pom.xml @@ -172,6 +172,11 @@ jersey-jdk-connector test + + org.glassfish.jersey.connectors + jersey-netty-connector + test + org.glassfish.jersey.security oauth1-signature diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/nettyconnector/Expect100ContinueTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/nettyconnector/Expect100ContinueTest.java new file mode 100644 index 0000000000..9051d3a685 --- /dev/null +++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/nettyconnector/Expect100ContinueTest.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2020, 2023 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.tests.e2e.client.nettyconnector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.RequestEntityProcessing; +import org.glassfish.jersey.client.http.Expect100ContinueFeature; +import org.glassfish.jersey.netty.connector.NettyConnectorProvider; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.jupiter.api.Test; + +import javax.ws.rs.HeaderParam; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class Expect100ContinueTest extends JerseyTest { + + private static final String RESOURCE_PATH = "expect"; + private static final String ENTITY_STRING = "1234567890123456789012345678901234567890123456789012" + + "3456789012345678901234567890"; + + + @Path(RESOURCE_PATH) + public static class Expect100ContinueResource { + + @POST + public Response publishResource(@HeaderParam("Expect") String expect) { + if ("100-Continue".equalsIgnoreCase(expect)) { + return Response.noContent().build(); + } + return Response.ok("TEST").build(); + } + + } + + @Override + protected Application configure() { + return new ResourceConfig(Expect100ContinueResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new NettyConnectorProvider()); + } + + @Test + public void testExpect100Continue() { + final Response response = target(RESOURCE_PATH).request().post(Entity.text(ENTITY_STRING)); + assertEquals(200, response.getStatus(), "Expected 200"); //no Expect header sent - response OK + } + + @Test + public void testExpect100ContinueChunked() { + final Response response = target(RESOURCE_PATH).register(Expect100ContinueFeature.basic()) + .property(ClientProperties.REQUEST_ENTITY_PROCESSING, + RequestEntityProcessing.CHUNKED).request().post(Entity.text(ENTITY_STRING)); + assertEquals(204, response.getStatus(), "Expected 204"); //Expect header sent - No Content response + } + + @Test + public void testExpect100ContinueBuffered() { + final Response response = target(RESOURCE_PATH).register(Expect100ContinueFeature.basic()) + .property(ClientProperties.REQUEST_ENTITY_PROCESSING, + RequestEntityProcessing.BUFFERED).request().header(HttpHeaders.CONTENT_LENGTH, 67000L) + .post(Entity.text(ENTITY_STRING)); + assertEquals(100, response.getStatus(), "Expected 100"); //Expect header sent - No Content response + } + + @Test + public void testExpect100ContinueCustomLength() { + final Response response = target(RESOURCE_PATH).register(Expect100ContinueFeature.withCustomThreshold(100L)) + .request().header(HttpHeaders.CONTENT_LENGTH, 101L) + .post(Entity.text(ENTITY_STRING)); + assertEquals(100, response.getStatus(), "Expected 100"); //Expect header sent - No Content response + } + + @Test + public void testExpect100ContinueCustomLengthWrong() { + final Response response = target(RESOURCE_PATH).register(Expect100ContinueFeature.withCustomThreshold(100L)) + .request().header(HttpHeaders.CONTENT_LENGTH, 99L) + .post(Entity.text(ENTITY_STRING)); + assertEquals(200, response.getStatus(), "Expected 200"); //Expect header NOT sent - low request size + } + + @Test + public void testExpect100ContinueCustomLengthProperty() { + final Response response = target(RESOURCE_PATH) + .property(ClientProperties.EXPECT_100_CONTINUE_THRESHOLD_SIZE, 555L) + .property(ClientProperties.EXPECT_100_CONTINUE, Boolean.TRUE) + .register(Expect100ContinueFeature.withCustomThreshold(555L)) + .request().header(HttpHeaders.CONTENT_LENGTH, 666L) + .post(Entity.text(ENTITY_STRING)); + assertNotNull(response.getStatus()); //Expect header sent - No Content response + } + + @Test + public void testExpect100ContinueRegisterViaCustomProperty() { + final Response response = target(RESOURCE_PATH) + .property(ClientProperties.EXPECT_100_CONTINUE_THRESHOLD_SIZE, 43L) + .property(ClientProperties.EXPECT_100_CONTINUE, Boolean.TRUE) + .request().header(HttpHeaders.CONTENT_LENGTH, 44L) + .post(Entity.text(ENTITY_STRING)); + assertEquals(100, response.getStatus(), "Expected 100"); //Expect header sent - No Content response + } +} From efd2f6cbc58d4b4f56e1beb4ea9a5366ec2d7feb Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Thu, 21 Sep 2023 16:16:10 +0200 Subject: [PATCH 2/3] connector adjustments Signed-off-by: Maxim Nesen --- .../Expect100ContinueConnectorExtension.java | 11 ++-------- .../netty/connector/NettyConnector.java | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java index e93bf0044b..5d1e7d2990 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/Expect100ContinueConnectorExtension.java @@ -30,13 +30,10 @@ class Expect100ContinueConnectorExtension implements ConnectorExtension { - private static final String EXCEPTION_MESSAGE = "Server rejected operation"; - @Override public void invoke(ClientRequest request, HttpRequest extensionParam) { - final long length = request.getLengthLong(); final RequestEntityProcessing entityProcessing = request.resolveProperty( ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class); @@ -52,18 +49,15 @@ public void invoke(ClientRequest request, HttpRequest extensionParam) { if (!Boolean.TRUE.equals(expectContinueActivated) || !(HttpMethod.POST.equals(request.getMethod()) || HttpMethod.PUT.equals(request.getMethod())) - || !allowStreaming - ) { + || !allowStreaming) { return; } extensionParam.headers().add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE); - } @Override public void postConnectionProcessing(HttpRequest extensionParam) { - } @Override @@ -74,5 +68,4 @@ public boolean handleException(ClientRequest request, HttpRequest extensionParam return expectContinueActivated && (ex instanceof ProtocolException && ex.getMessage().equals(EXCEPTION_MESSAGE)); } - -} +} \ No newline at end of file diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index 3274b0f503..a76d931db2 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java @@ -45,6 +45,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -385,11 +386,14 @@ protected void initChannel(SocketChannel ch) throws Exception { if (jerseyRequest.hasEntity()) { // guard against prematurely closed channel final GenericFutureListener> closeListener = - future -> { + new GenericFutureListener>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { if (!responseDone.isDone()) { responseDone.completeExceptionally(new IOException("Channel closed.")); } - }; + } + }; ch.closeFuture().addListener(closeListener); final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch); @@ -414,11 +418,14 @@ protected void initChannel(SocketChannel ch) throws Exception { rq.headers().setAll(nettyRequest.headers()); expect100ContinueExtension.invoke(jerseyRequest, rq); + ChannelFutureListener expect100ContinueListener = null; + ChannelFuture expect100ContinueFuture = null; + if (HttpUtil.is100ContinueExpected(rq)) { - ch.pipeline().writeAndFlush(rq).sync().awaitUninterruptibly().addListener( - (ChannelFutureListener) channelFuture -> - ch.pipeline().writeAndFlush(nettyRequest) - ); + expect100ContinueListener = + future -> ch.pipeline().writeAndFlush(nettyRequest); + expect100ContinueFuture = ch.pipeline().writeAndFlush(rq).syncUninterruptibly(); + expect100ContinueFuture.addListener(expect100ContinueListener); } else { // Send the HTTP request. entityWriter.writeAndFlush(nettyRequest); @@ -436,6 +443,9 @@ public OutputStream getOutputStream(int contentLength) throws IOException { } else { entityWriter.write(entityWriter.getChunkedInput()); } + if (expect100ContinueFuture != null && expect100ContinueListener != null) { + expect100ContinueFuture.removeListener(expect100ContinueListener); + } executorService.execute(new Runnable() { @Override From 7be0f2a6ec44ce187f16b0a445273bbf5b82cd92 Mon Sep 17 00:00:00 2001 From: Maxim Nesen Date: Fri, 22 Sep 2023 07:39:50 +0200 Subject: [PATCH 3/3] connector adjustments Signed-off-by: Maxim Nesen --- .../org/glassfish/jersey/netty/connector/NettyConnector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java index a76d931db2..e6cb98c3e9 100644 --- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java +++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java @@ -424,8 +424,8 @@ public void operationComplete(io.netty.util.concurrent.Future futu if (HttpUtil.is100ContinueExpected(rq)) { expect100ContinueListener = future -> ch.pipeline().writeAndFlush(nettyRequest); - expect100ContinueFuture = ch.pipeline().writeAndFlush(rq).syncUninterruptibly(); - expect100ContinueFuture.addListener(expect100ContinueListener); + expect100ContinueFuture = ch.pipeline().writeAndFlush(rq).sync().awaitUninterruptibly() + .addListener(expect100ContinueListener); } else { // Send the HTTP request. entityWriter.writeAndFlush(nettyRequest);