Skip to content

Commit

Permalink
Reproducer for quarkiverse#973 Http2Exception: Flow control window ex…
Browse files Browse the repository at this point in the history
…ceeded for stream: 0 when sending a ~64 KiB attachment
  • Loading branch information
ppalaga committed Sep 1, 2023
1 parent 7866e99 commit bf0dfad
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 21 deletions.
9 changes: 9 additions & 0 deletions integration-tests/mtom/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
= Quarkus CXF MTOM tests

The number of iterations performed by `io.quarkiverse.cxf.it.ws.mtom.server.MtomTest.soak()` can be set by setting

[source,shell]
----
$ export QUARKUS_CXF_MTOM_SOAK_ITERATIONS=100000
$ mvn clean test -Dtest=MtomTest#soak
----
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public DHRequest(DataHandler dataHandler) {
this.dataHandler = dataHandler;
}

@XmlMimeType("text/plain")
@XmlMimeType("application/octet-stream")
public DataHandler getDataHandler() {
return dataHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public DHResponse(DataHandler dataHandler) {
this.dataHandler = dataHandler;
}

@XmlMimeType("text/plain")
@XmlMimeType("application/octet-stream")
public DataHandler getDataHandler() {
return dataHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ public DHResponse echoDataHandler(DHRequest request) {

DataHandler dataHandler = request.getDataHandler();

log.infof("Received content type %s", dataHandler.getContentType());
try {
String message = dataHandler.getContent().toString();
log.infof("Received content %s", message);

DataHandler responseData = new DataHandler(message + " echoed from the server", "text/plain");
int length = RandomBytesDataSource.count(dataHandler.getInputStream());
log.infof("Received %d bytes of content type %s", length, dataHandler.getContentType());

/*
* We do not send back the original bytes, because we do not want to keep them in memory.
* We mainly care for testing the transport
*/
DataHandler responseData = new DataHandler(new RandomBytesDataSource(length));
return new DHResponse(responseData);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.quarkiverse.cxf.it.ws.mtom.server;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;

import jakarta.activation.DataSource;

public class RandomBytesDataSource implements DataSource {
private final int sizeInBytes;

public static int count(InputStream inputStream) {
byte[] buffer = new byte[1024];
int result = 0;
int bytesRead;

try {
while ((bytesRead = inputStream.read(buffer)) != -1) {
result += bytesRead;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return result;
}

public RandomBytesDataSource(int sizeInBytes) {
this.sizeInBytes = sizeInBytes;
}

@Override
public InputStream getInputStream() throws IOException {
return new RandomBytesInputStream(sizeInBytes);
}

@Override
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException("Writing to this DataSource is not supported");
}

@Override
public String getContentType() {
return "application/octet-stream";
}

@Override
public String getName() {
return "RandomBytesDataSource";
}

private static class RandomBytesInputStream extends InputStream {
private final int size;
private int bytesRead;
private final Random random;

public RandomBytesInputStream(int size) {
this.size = size;
this.bytesRead = 0;
this.random = new Random();
}

@Override
public int read() throws IOException {
if (bytesRead >= size) {
return -1;
}
bytesRead++;
return random.nextInt(256);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
quarkus.cxf.endpoint."/mtom".implementor = io.quarkiverse.cxf.it.ws.mtom.server.MtomServiceImpl
quarkus.cxf.endpoint."/mtom".features = org.apache.cxf.ext.logging.LoggingFeature
#quarkus.cxf.endpoint."/mtom".features = org.apache.cxf.ext.logging.LoggingFeature
quarkus.cxf.endpoint."/mtom".handlers = io.quarkiverse.cxf.it.ws.mtom.server.MtomEnforcer

quarkus.native.resources.includes = *.properties,*.jks,*.wsdl,*.xml
Original file line number Diff line number Diff line change
@@ -1,46 +1,101 @@
package io.quarkiverse.cxf.it.ws.mtom.server;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Optional;

import javax.xml.namespace.QName;

import jakarta.activation.DataHandler;
import jakarta.xml.ws.Service;

import org.apache.cxf.endpoint.Client;
import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.http.HTTPConduit;
import org.assertj.core.api.Assertions;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;

import com.sun.xml.messaging.saaj.soap.AttachmentPartImpl;

import io.quarkiverse.cxf.test.QuarkusCxfClientTestUtil;
import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class MtomTest {

private static final Logger log = Logger.getLogger(MtomTest.class);
private static final int KiB = 1024;

/**
* A reproducer for
* <a href="https://github.com/quarkiverse/quarkus-cxf/issues/973">https://github.com/quarkiverse/quarkus-cxf/issues/973</a>
*
* @throws Exception
*/
@Test
public void soak() throws Exception {
// The following fail with
// Http2Exception: Flow control window exceeded for stream: 0
// at io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:109)
// final int size = 63 * KiB + 137; // fails at round 0
// final int size = 63 * KiB + 136; // fails at round 5
final int size = 63 * KiB + 135; // fails at round 5
// final int size = 63 * KiB + 134; // fails at round 50
// final int size = 63 * KiB + 133; // fails at round 50
// final int size = 63 * KiB + 132; // fails at round 500
// final int size = 63 * KiB + 131; // fails at round 500
// final int size = 63 * KiB + 130; // fails at round 5000
// final int size = 63 * KiB + 129; // fails at round 5000

// This one fails with a different exception:
// final int size = 63 * KiB + 128; // fails at round 10763 Corrupted channel by directly writing to native stream
final int requestCount = Integer
.parseInt(Optional.ofNullable(System.getenv("QUARKUS_CXF_MTOM_SOAK_ITERATIONS")).orElse("5000"));
log.infof("Performing %d interations", requestCount);
for (int i = 0; i < requestCount; i++) {
log.infof("Soaking with %d bytes, round %d", size, i);
assertMtom(size);
}
}

/**
* A reproducer for
* <a href="https://github.com/quarkiverse/quarkus-cxf/issues/973">https://github.com/quarkiverse/quarkus-cxf/issues/973</a>
*
* @throws Exception
*/
@Test
public void dataHandler() throws Exception {
public void largeAttachment() throws Exception {

int increment = 100 * KiB;
int maxSize = 10 * KiB * KiB - 512;

/*
* This is required only in native mode, where the test code is isolated from the server and thus the server
* does not call AttachmentPartImpl.initializeJavaActivationHandlers() for us
*/
AttachmentPartImpl.initializeJavaActivationHandlers();
for (int size = increment; size <= maxSize; size += increment) {
log.infof("Sending large attachment: %d bytes", size);
assertMtom(size);
}

}

static void assertMtom(int size) throws MalformedURLException, IOException {
final URL serviceUrl = new URL(QuarkusCxfClientTestUtil.getServerUrl() + "/services/mtom?wsdl");
final QName qName = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/mtom",
MtomService.class.getSimpleName());
final Service service = jakarta.xml.ws.Service.create(serviceUrl, qName);
final MtomService proxy = service.getPort(MtomService.class);
final Client client = ClientProxy.getClient(proxy);
final HTTPConduit conduit = (HTTPConduit) client.getConduit();
/* Avoid read timeouts on GH actions */
conduit.getClient().setReceiveTimeout(240_000L);

DataHandler dh = new DataHandler("Hello from client", "text/plain");
DataHandler dh = new DataHandler(new RandomBytesDataSource(size));
DHResponse response = proxy.echoDataHandler(new DHRequest(dh));
Assertions.assertThat(response).isNotNull();

DataHandler dataHandler = response.getDataHandler();
Assertions.assertThat(dataHandler.getContent()).isEqualTo("Hello from client echoed from the server");
Assertions.assertThat(dataHandler.getContentType()).isEqualTo("text/plain");

Assertions.assertThat(RandomBytesDataSource.count(dataHandler.getDataSource().getInputStream())).isEqualTo(size);
Assertions.assertThat(dataHandler.getContentType()).isEqualTo("application/octet-stream");
}

}

0 comments on commit bf0dfad

Please sign in to comment.