From f59e503729210c1f8bfeeaa0a44383b2150e5fd8 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Sat, 19 Aug 2023 18:47:59 +0200 Subject: [PATCH] Reproducer for #973 Http2Exception: Flow control window exceeded for stream: 0 when sending a ~64 KiB attachment --- integration-tests/mtom/README.adoc | 9 +++ .../cxf/it/ws/mtom/server/DHRequest.java | 2 +- .../cxf/it/ws/mtom/server/DHResponse.java | 2 +- .../it/ws/mtom/server/MtomServiceImpl.java | 14 ++-- .../ws/mtom/server/RandomBytesDataSource.java | 72 +++++++++++++++++ .../src/main/resources/application.properties | 2 +- .../cxf/it/ws/mtom/server/MtomTest.java | 79 ++++++++++++++++--- 7 files changed, 159 insertions(+), 21 deletions(-) create mode 100644 integration-tests/mtom/README.adoc create mode 100644 integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/RandomBytesDataSource.java diff --git a/integration-tests/mtom/README.adoc b/integration-tests/mtom/README.adoc new file mode 100644 index 000000000..6224ef09d --- /dev/null +++ b/integration-tests/mtom/README.adoc @@ -0,0 +1,9 @@ += Quarkus CXF MTOM tests + +The number of iterations performed by `io.quarkiverse.cxf.it.ws.mtom.server.MtomTest.soak()` can be set by setting + +[source,shell] +---- +$ export QUARKUS_CXF_MTOM_SOAK_ITERATIONS=100000 +$ mvn clean test -Dtest=MtomTest#soak +---- diff --git a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHRequest.java b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHRequest.java index 07aed307b..98163ca56 100644 --- a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHRequest.java +++ b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHRequest.java @@ -15,7 +15,7 @@ public DHRequest(DataHandler dataHandler) { this.dataHandler = dataHandler; } - @XmlMimeType("text/plain") + @XmlMimeType("application/octet-stream") public DataHandler getDataHandler() { return dataHandler; } diff --git a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHResponse.java b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHResponse.java index e66139914..2061a5999 100644 --- a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHResponse.java +++ b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/DHResponse.java @@ -16,7 +16,7 @@ public DHResponse(DataHandler dataHandler) { this.dataHandler = dataHandler; } - @XmlMimeType("text/plain") + @XmlMimeType("application/octet-stream") public DataHandler getDataHandler() { return dataHandler; } diff --git a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomServiceImpl.java b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomServiceImpl.java index 76d0f0e76..90acdca44 100644 --- a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomServiceImpl.java +++ b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomServiceImpl.java @@ -20,16 +20,18 @@ public DHResponse echoDataHandler(DHRequest request) { DataHandler dataHandler = request.getDataHandler(); - log.infof("Received content type %s", dataHandler.getContentType()); try { - String message = dataHandler.getContent().toString(); - log.infof("Received content %s", message); - - DataHandler responseData = new DataHandler(message + " echoed from the server", "text/plain"); + int length = RandomBytesDataSource.count(dataHandler.getInputStream()); + log.infof("Received %d bytes of content type %s", length, dataHandler.getContentType()); + + /* + * We do not send back the original bytes, because we do not want to keep them in memory. + * We mainly care for testing the transport + */ + DataHandler responseData = new DataHandler(new RandomBytesDataSource(length)); return new DHResponse(responseData); } catch (IOException e) { throw new RuntimeException(e); } } - } diff --git a/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/RandomBytesDataSource.java b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/RandomBytesDataSource.java new file mode 100644 index 000000000..c80956b41 --- /dev/null +++ b/integration-tests/mtom/src/main/java/io/quarkiverse/cxf/it/ws/mtom/server/RandomBytesDataSource.java @@ -0,0 +1,72 @@ +package io.quarkiverse.cxf.it.ws.mtom.server; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Random; + +import jakarta.activation.DataSource; + +public class RandomBytesDataSource implements DataSource { + private final int sizeInBytes; + + public static int count(InputStream inputStream) { + byte[] buffer = new byte[1024]; + int result = 0; + int bytesRead; + + try { + while ((bytesRead = inputStream.read(buffer)) != -1) { + result += bytesRead; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return result; + } + + public RandomBytesDataSource(int sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } + + @Override + public InputStream getInputStream() throws IOException { + return new RandomBytesInputStream(sizeInBytes); + } + + @Override + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException("Writing to this DataSource is not supported"); + } + + @Override + public String getContentType() { + return "application/octet-stream"; + } + + @Override + public String getName() { + return "RandomBytesDataSource"; + } + + private static class RandomBytesInputStream extends InputStream { + private final int size; + private int bytesRead; + private final Random random; + + public RandomBytesInputStream(int size) { + this.size = size; + this.bytesRead = 0; + this.random = new Random(); + } + + @Override + public int read() throws IOException { + if (bytesRead >= size) { + return -1; + } + bytesRead++; + return random.nextInt(256); + } + } +} \ No newline at end of file diff --git a/integration-tests/mtom/src/main/resources/application.properties b/integration-tests/mtom/src/main/resources/application.properties index 0a4139cc9..cacbcb6fd 100644 --- a/integration-tests/mtom/src/main/resources/application.properties +++ b/integration-tests/mtom/src/main/resources/application.properties @@ -1,5 +1,5 @@ quarkus.cxf.endpoint."/mtom".implementor = io.quarkiverse.cxf.it.ws.mtom.server.MtomServiceImpl -quarkus.cxf.endpoint."/mtom".features = org.apache.cxf.ext.logging.LoggingFeature +#quarkus.cxf.endpoint."/mtom".features = org.apache.cxf.ext.logging.LoggingFeature quarkus.cxf.endpoint."/mtom".handlers = io.quarkiverse.cxf.it.ws.mtom.server.MtomEnforcer quarkus.native.resources.includes = *.properties,*.jks,*.wsdl,*.xml diff --git a/integration-tests/mtom/src/test/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomTest.java b/integration-tests/mtom/src/test/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomTest.java index 31665430d..fa2badb64 100644 --- a/integration-tests/mtom/src/test/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomTest.java +++ b/integration-tests/mtom/src/test/java/io/quarkiverse/cxf/it/ws/mtom/server/MtomTest.java @@ -1,46 +1,101 @@ package io.quarkiverse.cxf.it.ws.mtom.server; +import java.io.IOException; +import java.net.MalformedURLException; import java.net.URL; +import java.util.Optional; import javax.xml.namespace.QName; import jakarta.activation.DataHandler; import jakarta.xml.ws.Service; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.frontend.ClientProxy; +import org.apache.cxf.transport.Conduit; +import org.apache.cxf.transport.http.HTTPConduit; import org.assertj.core.api.Assertions; +import org.jboss.logging.Logger; import org.junit.jupiter.api.Test; -import com.sun.xml.messaging.saaj.soap.AttachmentPartImpl; - import io.quarkiverse.cxf.test.QuarkusCxfClientTestUtil; import io.quarkus.test.junit.QuarkusTest; @QuarkusTest public class MtomTest { + private static final Logger log = Logger.getLogger(MtomTest.class); + private static final int KiB = 1024; + + /** + * A reproducer for + * https://github.com/quarkiverse/quarkus-cxf/issues/973 + * + * @throws Exception + */ + @Test + public void soak() throws Exception { + // The following fail with + // Http2Exception: Flow control window exceeded for stream: 0 + // at io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:109) + // final int size = 63 * KiB + 137; // fails at round 0 + // final int size = 63 * KiB + 136; // fails at round 5 + final int size = 63 * KiB + 135; // fails at round 5 + // final int size = 63 * KiB + 134; // fails at round 50 + // final int size = 63 * KiB + 133; // fails at round 50 + // final int size = 63 * KiB + 132; // fails at round 500 + // final int size = 63 * KiB + 131; // fails at round 500 + // final int size = 63 * KiB + 130; // fails at round 5000 + // final int size = 63 * KiB + 129; // fails at round 5000 + + // This one fails with a different exception: + // final int size = 63 * KiB + 128; // fails at round 10763 Corrupted channel by directly writing to native stream + final int requestCount = Integer + .parseInt(Optional.ofNullable(System.getenv("QUARKUS_CXF_MTOM_SOAK_ITERATIONS")).orElse("5000")); + log.infof("Performing %d interations", requestCount); + for (int i = 0; i < requestCount; i++) { + log.infof("Soaking with %d bytes, round %d", size, i); + assertMtom(size); + } + } + + /** + * A reproducer for + * https://github.com/quarkiverse/quarkus-cxf/issues/973 + * + * @throws Exception + */ @Test - public void dataHandler() throws Exception { + public void largeAttachment() throws Exception { + + int increment = 100 * KiB; + int maxSize = 10 * KiB * KiB - 512; - /* - * This is required only in native mode, where the test code is isolated from the server and thus the server - * does not call AttachmentPartImpl.initializeJavaActivationHandlers() for us - */ - AttachmentPartImpl.initializeJavaActivationHandlers(); + for (int size = increment; size <= maxSize; size += increment) { + log.infof("Sending large attachment: %d bytes", size); + assertMtom(size); + } + } + + static void assertMtom(int size) throws MalformedURLException, IOException { final URL serviceUrl = new URL(QuarkusCxfClientTestUtil.getServerUrl() + "/services/mtom?wsdl"); final QName qName = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/mtom", MtomService.class.getSimpleName()); final Service service = jakarta.xml.ws.Service.create(serviceUrl, qName); final MtomService proxy = service.getPort(MtomService.class); + final Client client = ClientProxy.getClient(proxy); + final HTTPConduit conduit = (HTTPConduit) client.getConduit(); + /* Avoid read timeouts on GH actions */ + conduit.getClient().setReceiveTimeout(120000L); - DataHandler dh = new DataHandler("Hello from client", "text/plain"); + DataHandler dh = new DataHandler(new RandomBytesDataSource(size)); DHResponse response = proxy.echoDataHandler(new DHRequest(dh)); Assertions.assertThat(response).isNotNull(); DataHandler dataHandler = response.getDataHandler(); - Assertions.assertThat(dataHandler.getContent()).isEqualTo("Hello from client echoed from the server"); - Assertions.assertThat(dataHandler.getContentType()).isEqualTo("text/plain"); - + Assertions.assertThat(RandomBytesDataSource.count(dataHandler.getDataSource().getInputStream())).isEqualTo(size); + Assertions.assertThat(dataHandler.getContentType()).isEqualTo("application/octet-stream"); } }