From 36168be84601ca885acf52eab406b188aab75566 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Sun, 24 Nov 2024 19:52:29 +0100 Subject: [PATCH] VertxHttpClientHTTPConduit-backed client hangs when receiving large responses under load, fix #1613 --- .../client-server/application.properties | 15 +- .../client/VertxHttpClientHTTPConduit.java | 355 ++++++++++++++---- .../client/InputStreamWriteStreamTest.java | 292 ++++++++++++++ integration-tests/client-server/pom.xml | 4 + .../cxf/it/large/slow/LargeSlowRest.java | 35 ++ .../it/large/slow/LargeSlowServiceImpl.java | 37 ++ .../it/large/slow/generated/LargeSlow.java | 71 ++++ .../large/slow/generated/LargeSlowOutput.java | 98 +++++ .../slow/generated/LargeSlowResponse.java | 62 +++ .../slow/generated/LargeSlowService.java | 59 +++ .../generated/LargeSlowService_Service.java | 88 +++++ .../large/slow/generated/ObjectFactory.java | 93 +++++ .../it/large/slow/generated/package-info.java | 2 + .../src/main/resources/application.properties | 15 +- .../wsdl/LargeSlow-async-binding.xml | 10 + .../src/main/resources/wsdl/LargeSlow.wsdl | 64 ++++ .../cxf/it/large/slow/LargeSlowIT.java | 8 + .../cxf/it/large/slow/LargeSlowTest.java | 130 +++++++ 18 files changed, 1350 insertions(+), 88 deletions(-) create mode 100644 extensions/core/runtime/src/test/java/io/quarkiverse/cxf/vertx/http/client/InputStreamWriteStreamTest.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowRest.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowServiceImpl.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlow.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowOutput.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowResponse.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService_Service.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/ObjectFactory.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/package-info.java create mode 100644 integration-tests/client-server/src/main/resources/wsdl/LargeSlow-async-binding.xml create mode 100644 integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl create mode 100644 integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowIT.java create mode 100644 integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowTest.java diff --git a/docs/modules/ROOT/examples/client-server/application.properties b/docs/modules/ROOT/examples/client-server/application.properties index 99245fdf9..157860932 100644 --- a/docs/modules/ROOT/examples/client-server/application.properties +++ b/docs/modules/ROOT/examples/client-server/application.properties @@ -21,9 +21,9 @@ quarkus.cxf.endpoint."/addressing-anonymous".implementor = io.quarkiverse.cxf.it quarkus.cxf.endpoint."/addressing-decoupled".implementor = io.quarkiverse.cxf.it.ws.addressing.server.decoupled.WsAddressingImpl # XML Schema validation -quarkus.cxf.codegen.wsdl2java.includes = wsdl/*.wsdl -quarkus.cxf.codegen.wsdl2java.package-names = io.quarkiverse.cxf.it.server.xml.schema.validation.model -quarkus.cxf.codegen.wsdl2java.wsdl-location = classpath:wsdl/calculator.wsdl +quarkus.cxf.codegen.wsdl2java.schema-validation.includes = wsdl/calculator.wsdl +quarkus.cxf.codegen.wsdl2java.schema-validation.package-names = io.quarkiverse.cxf.it.server.xml.schema.validation.model +quarkus.cxf.codegen.wsdl2java.schema-validation.wsdl-location = classpath:wsdl/calculator.wsdl # Service endpoints quarkus.cxf.endpoint."/annotation-schema-validated-calculator".implementor = io.quarkiverse.cxf.it.server.xml.schema.validation.AnnotationSchemaValidatedCalculatorServiceImpl @@ -81,4 +81,13 @@ quarkus.cxf.client.basicAuthSecureWsdl.secure-wsdl-access = true quarkus.cxf.client.helloMock.client-endpoint-url = http://localhost:${quarkus.http.test-port}/soap/helloMock quarkus.cxf.client.helloMock.service-interface = io.quarkiverse.cxf.it.HelloService +# Large slow client +quarkus.cxf.client.largeSlow.client-endpoint-url = http://localhost:${quarkus.http.test-port}/soap/largeSlow +quarkus.cxf.client.largeSlow.service-interface = io.quarkiverse.cxf.it.large.slow.generated.LargeSlowService +# Uncomment, regenerate and copy the generated classes to /src/main/java, if needed +#quarkus.cxf.codegen.wsdl2java.large-slow.includes = wsdl/LargeSlow.wsdl +#quarkus.cxf.codegen.wsdl2java.large-slow.package-names = io.quarkiverse.cxf.it.large.slow.generated +#quarkus.cxf.codegen.wsdl2java.large-slow.additional-params = -b,src/main/resources/wsdl/LargeSlow-async-binding.xml + + quarkus.default-locale = en_US \ No newline at end of file diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java index 75064e138..97a20ad84 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.io.PushbackInputStream; import java.net.HttpURLConnection; import java.net.InetSocketAddress; @@ -31,6 +29,7 @@ import java.net.Proxy.Type; import java.net.SocketTimeoutException; import java.net.URI; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -75,8 +75,10 @@ import io.quarkus.runtime.BlockingOperationControl; import io.vertx.core.AsyncResult; import io.vertx.core.Context; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; @@ -87,6 +89,7 @@ import io.vertx.core.http.RequestOptions; import io.vertx.core.net.ProxyOptions; import io.vertx.core.net.ProxyType; +import io.vertx.core.streams.WriteStream; /** */ @@ -544,33 +547,26 @@ public void handle(RequestBodyEvent event) throws IOException { } void finishRequest(HttpClientRequest req, Buffer buffer) { - try { - final PipedOutputStream pipedOutputStream = new PipedOutputStream(); - final ExceptionAwarePipedInputStream pipedInputStream = new ExceptionAwarePipedInputStream( - pipedOutputStream); - - req.response() - .onComplete(ar -> { - if (ar.succeeded()) { - pipe(ar.result(), pipedOutputStream, pipedInputStream); + + req.response() + .onComplete(ar -> { + final InputStreamWriteStream sink = new InputStreamWriteStream(2); + if (ar.succeeded()) { + ar.result().pipeTo(sink); + } else { + if (ar.cause() instanceof IOException) { + sink.setException((IOException) ar.cause()); } else { - if (ar.cause() instanceof IOException) { - pipedInputStream.setException((IOException) ar.cause()); - } else { - pipedInputStream.setException(new IOException(ar.cause())); - } + sink.setException(new IOException(ar.cause())); } - mode.responseReady(new Result<>(new ResponseEvent(ar.result(), pipedInputStream), - ar.cause())); - }); + } + mode.responseReady(new Result<>(new ResponseEvent(ar.result(), sink), ar.cause())); + }); - req - .end(buffer) - .onFailure(t -> mode.responseFailed(t, true)); + req + .end(buffer) + .onFailure(t -> mode.responseFailed(t, true)); - } catch (IOException e) { - throw new VertxHttpException(e); - } } void failResponse(Throwable t) { @@ -674,39 +670,8 @@ HttpClientRequest awaitRequest() throws IOException { } } - static void pipe( - HttpClientResponse response, - PipedOutputStream pipedOutputStream, - ExceptionAwarePipedInputStream pipedInputStream - - ) { - - response.handler(buffer -> { - try { - pipedOutputStream.write(buffer.getBytes()); - } catch (IOException e) { - pipedInputStream.setException(e); - } - }); - - response.endHandler(v -> { - try { - pipedOutputStream.close(); - } catch (IOException e) { - pipedInputStream.setException(e); - } - }); - - response.exceptionHandler(e -> { - final IOException ioe = e instanceof IOException - ? (IOException) e - : new IOException(e); - pipedInputStream.setException(ioe); - }); - } - void awaitWriteable(HttpClientRequest request) throws IOException, InterruptedException { - assert lock.isHeldByCurrentThread(); + // assert lock.isHeldByCurrentThread(); while (request.writeQueueFull()) { if (this.request.cause() != null) { throw new IOException(this.request.cause()); @@ -802,7 +767,7 @@ public void responseFailed(Throwable t, boolean lockIfNeeded) { lock.unlock(); } } else { - assert lock.isHeldByCurrentThread(); + // assert lock.isHeldByCurrentThread(); response = Result.failure(t); responseReceived.signal(); } @@ -1141,51 +1106,277 @@ static boolean isOneway(Exchange exchange) { } - static class ExceptionAwarePipedInputStream extends PipedInputStream { - private IOException exception; - private final Object lock = new Object(); + static class InputStreamWriteStream extends InputStream implements WriteStream { + + private static Buffer END; + + private final Queue queue; + private final ReentrantLock lock = new ReentrantLock(); + private final Condition queueChange = lock.newCondition(); + + /** Written from event loop, read from the consumer worker thread */ + private volatile Handler drainHandler; + private volatile IOException exception; + private int maxQueueSize; // volatile not needed as we assume the value stays stable after being set on init + + /** Read and written from the from the consumer worker thread */ + private Buffer readBuffer; + private int readPosition = 0; - public ExceptionAwarePipedInputStream(PipedOutputStream pipedOutputStream) throws IOException { - super(pipedOutputStream); + public InputStreamWriteStream(int queueSize) { + if (END == null) { + END = Buffer.buffer(); + } + setWriteQueueMaxSize(queueSize); + this.queue = new ArrayDeque<>(queueSize); } - public void setException(IOException exception) { - synchronized (lock) { + @Override + public WriteStream exceptionHandler(Handler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public Future write(Buffer data) { + final Promise promise = Promise.promise(); + write(data, promise); + return promise.future(); + } + + @Override + public void write(Buffer data, Handler> handler) { + try { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + queue.offer(data); + // Log.infof("Adding buffer %d with size %d bytes; queue size after %d", System.identityHashCode(data), + // data.length(), queue.size()); + queueChange.signal(); + } finally { + lock.unlock(); + } + handler.handle(Future.succeededFuture()); + } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (this.exception == null) { - /* Ignore subsequent exceptions */ - this.exception = exception; + this.exception = e instanceof IOException ? (IOException) e : new IOException(e); } + handler.handle(Future.failedFuture(e)); + } + } + + @Override + public void end(Handler> handler) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + queue.offer(END); + // Log.info("Adding final buffer"); + queueChange.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public WriteStream setWriteQueueMaxSize(int maxSize) { + if (maxSize < 1) { + throw new IllegalArgumentException("maxSize must be >= 1"); } + this.maxQueueSize = maxSize; + return this; + } + + @Override + public boolean writeQueueFull() { + // int size = queue.size(); + // Log.infof("Queue %s: %d", (size >= maxQueueSize ? "full" : "not full"), size); + return queue.size() >= maxQueueSize; + } + + @Override + public WriteStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; } @Override public int read() throws IOException { - synchronized (lock) { - if (exception != null) { - throw exception; - } + // Log.infof("> reading 1 byte"); + final IOException e; + if ((e = exception) != null) { + throw e; } - return super.read(); + + final Buffer rb = takeBuffer(true); + return rb != null ? rb.getByte(readPosition++) : -1; } @Override - public int read(byte[] b, int off, int len) throws IOException { - synchronized (lock) { - if (exception != null) { - throw exception; + public int read(byte b[], final int off, int len) throws IOException { + final IOException e; + if ((e = exception) != null) { + throw e; + } + // Log.infof("Ready to read up to %d bytes", len); + + Buffer rb = takeBuffer(true); + if (rb == null) { + return -1; + } + int rbLen = rb.length(); + int readable = rbLen - readPosition; + // Log.infof("Readable %d bytes", readable); + + int result; + if (readable >= len) { + readable = len; + // Log.infof("Downsized readable to %d bytes", readable); + rb.getBytes(readPosition, readPosition + readable, b, off); + // Log.infof("After read 1: %s", new String(b, off, readable, StandardCharsets.UTF_8)); + readPosition += readable; + // Log.infof("readPosition now at %d", readPosition); + if (readPosition >= rbLen) { + /* Nothing more to read from this buffer, so dereference it so that it can be GC's earlier; */ + // Log.infof("Buffer read out completely"); + if (readBuffer != END) { + readBuffer = null; // deref. so that it can be GC's earlier; + } + } + result = readable; + } else { + /* readable < len so we read out the current buffer completely and we try the subsequent ones if available */ + rb.getBytes(readPosition, readPosition + readable, b, off); + // Log.infof("Read out current buffer %d completely: %s", System.identityHashCode(rb), + // new String(b, off, readable, StandardCharsets.UTF_8)); + readPosition += readable; + // assert readPosition == rbLen; + len -= readable; + int off2 = off + readable; + + result = readable; + /* check whether we get more buffers */ + while (len > 0 && (rb = takeBuffer(false)) != null) { + rbLen = rb.length(); + readable = rbLen - readPosition; + if (readable > len) { + readable = len; + // Log.infof("Downsized readable to %d bytes", readable); + } + rb.getBytes(readPosition, readPosition + readable, b, off2); + // Log.infof("Read 2 from buffer %d %d to %d: %s", System.identityHashCode(rb), readPosition, + // readPosition + readable, + // new String(b, off2, readable, StandardCharsets.UTF_8)); + readPosition += readable; + len -= readable; + off2 += readable; + // Log.infof("readPosition now at %d", readPosition); + result += readable; + } + if (readPosition == rbLen) { + // Log.infof("Buffer read out completely"); + if (readBuffer != END) { + readBuffer = null; // deref. so that it can be GC's earlier; + } } + // assert readPosition <= rbLen; } - return super.read(b, off, len); + // Log.infof("> read %d bytes: %s", result, new String(b, off, result, StandardCharsets.UTF_8)); + return result; } @Override - public void close() throws IOException { - synchronized (lock) { - if (exception != null) { - throw exception; + public void close() { + readBuffer = null; + // assert queueEmpty() : "Queue still has " + queue.size() + " items"; + } + + @Override + public int available() throws IOException { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Buffer rb = takeBuffer(false); + if (rb != null) { + int result = rb.length() - readPosition; + for (Buffer b : queue) { + if (rb != b) { + /* Skip the buffer returned by takeBuffer() above */ + result += b.length(); + } + } + return result; + } + } finally { + lock.unlock(); + } + return 0; + } + + private Buffer takeBuffer(boolean blockingAwaitBuffer) throws IOException { + // Log.infof("About to take buffer at queue size %d %s", queue.size(), + // blockingAwaitBuffer ? "with blocking" : "without blocking"); + Buffer rb = readBuffer; + if (rb == END) { + return null; + } + // Log.infof("Buffer is null: %s; %d >= %d: %s", rb == null, readPosition, (rb == null ? -1 : rb.length()), + // rb != null && readPosition >= rb.length()); + if (rb == null || readPosition >= rb.length()) { + // Log.info("Buffer is null or empty"); + + final ReentrantLock lock = this.lock; + try { + lock.lockInterruptibly(); + if (blockingAwaitBuffer) { + while ((readBuffer = rb = queue.poll()) == null) { + //Log.infof("Awaiting a buffer at queue size %d", queue.size()); + queueChange.await(); + } + } else { + readBuffer = rb = queue.poll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } finally { + lock.unlock(); + } + if (rb == END) { + return null; + } + readPosition = 0; + + final Handler dh; + if (!writeQueueFull() && (dh = drainHandler) != null) { + dh.handle(null); } } - super.close(); + // Log.infof("Taken a %s buffer %d, will read from %d to %d; queue size after: %d", + // (rb != null ? "valid" : "null"), + // System.identityHashCode(rb), + // readPosition, (rb != null ? rb.length() : -1), queue.size()); + return rb; + } + + private boolean queueEmpty() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return queue.isEmpty(); + } finally { + lock.unlock(); + } + } + + public void setException(IOException exception) { + if (this.exception == null) { + /* Ignore subsequent exceptions */ + this.exception = exception; + } } } diff --git a/extensions/core/runtime/src/test/java/io/quarkiverse/cxf/vertx/http/client/InputStreamWriteStreamTest.java b/extensions/core/runtime/src/test/java/io/quarkiverse/cxf/vertx/http/client/InputStreamWriteStreamTest.java new file mode 100644 index 000000000..5ad923502 --- /dev/null +++ b/extensions/core/runtime/src/test/java/io/quarkiverse/cxf/vertx/http/client/InputStreamWriteStreamTest.java @@ -0,0 +1,292 @@ +package io.quarkiverse.cxf.vertx.http.client; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkiverse.cxf.vertx.http.client.VertxHttpClientHTTPConduit.InputStreamWriteStream; +import io.vertx.core.buffer.Buffer; + +public class InputStreamWriteStreamTest { + + @Test + void readBeforeWriteTimeout() throws IOException, InterruptedException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch readFinished = new CountDownLatch(1); + Thread t = new Thread(() -> { + started.countDown(); + try { + ws.read(new byte[8]); // should block + readFinished.countDown(); // should never be reached + } catch (IOException e) { + throw new RuntimeException(e); + } + }, "test-worker"); + t.start(); + Assertions.assertThat(started.await(500, TimeUnit.MILLISECONDS)).isTrue(); + /* + * ws.read(new byte[8]) should block so readFinished.countDown() will never be reached, so the + * awaiting readFinished latch should timeout + */ + Assertions.assertThat(readFinished.await(500, TimeUnit.MILLISECONDS)).isFalse(); + } + } + + @Test + void readBeforeWrite() throws IOException, InterruptedException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final CountDownLatch started = new CountDownLatch(1); + + final CountDownLatch read1Finished = new CountDownLatch(1); + final AtomicInteger read1 = new AtomicInteger(); + final byte[] arr1 = new byte[8]; + + final CountDownLatch write2Finished = new CountDownLatch(1); + final CountDownLatch read2Finished = new CountDownLatch(1); + final AtomicInteger read2 = new AtomicInteger(); + final byte[] arr2 = new byte[14]; + + final CountDownLatch read3Finished = new CountDownLatch(1); + final AtomicInteger read3 = new AtomicInteger(); + final byte[] arr3 = new byte[1]; + + Thread t = new Thread(() -> { + started.countDown(); + try { + read1.set(ws.read(arr1)); + read1Finished.countDown(); + + write2Finished.await(500, TimeUnit.MILLISECONDS); + read2.set(ws.read(arr2)); + read2Finished.countDown(); + + read3.set(ws.read(arr3)); + read3Finished.countDown(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + }, "test-worker"); + t.start(); + + Assertions.assertThat(started.await(500, TimeUnit.MILLISECONDS)).isTrue(); + + ws.write(Buffer.buffer("abcd")); + Assertions.assertThat(read1Finished.await(500, TimeUnit.MILLISECONDS)).isTrue(); + Assertions.assertThat(read1.get()).isEqualTo(4); + Assertions.assertThat(arr1).isEqualTo("abcd\0\0\0\0".getBytes(StandardCharsets.UTF_8)); + + ws.write(Buffer.buffer("efgh")); + ws.write(Buffer.buffer("ijkl")); + ws.write(Buffer.buffer("mnop")); + write2Finished.countDown(); + + Assertions.assertThat(read2Finished.await(500, TimeUnit.MILLISECONDS)).isTrue(); + Assertions.assertThat(read2.get()).isEqualTo(12); + Assertions.assertThat(arr2).isEqualTo("efghijklmnop\0\0".getBytes(StandardCharsets.UTF_8)); + + ws.end(); + Assertions.assertThat(read3Finished.await(500, TimeUnit.MILLISECONDS)).isTrue(); + Assertions.assertThat(read3.get()).isEqualTo(-1); + Assertions.assertThat(arr3).isEqualTo("\0".getBytes(StandardCharsets.UTF_8)); + + } + } + + @Test + void available0AfterEnd() throws IOException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + ws.end(); + Assertions.assertThat(ws.available()).isEqualTo(0); + Assertions.assertThat(ws.read(new byte[8])).isEqualTo(-1); + + } + } + + @Test + void available0BeforeEnd() throws IOException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + Assertions.assertThat(ws.available()).isEqualTo(0); + ws.end(); + Assertions.assertThat(ws.read(new byte[8])).isEqualTo(-1); + } + } + + @Test + void readEmpty() throws IOException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + ws.end(); + Assertions.assertThat(ws.read(new byte[8])).isEqualTo(-1); + } + } + + @Test + void singleBuffer() throws IOException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT = "abcd"; + final Buffer b = Buffer.buffer(INPUT); + ws.write(b); + ws.end(); + byte[] arr = new byte[8]; + Assertions.assertThat(ws.read(arr)).isEqualTo(4); + Assertions.assertThat(arr).isEqualTo((INPUT + "\0\0\0\0").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo((INPUT + "\0\0\0\0").getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT = "abcd"; + final Buffer b = Buffer.buffer(INPUT); + ws.write(b); + ws.end(); + byte[] arr = new byte[4]; + Assertions.assertThat(ws.read(arr)).isEqualTo(4); + Assertions.assertThat(arr).isEqualTo(INPUT.getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo(INPUT.getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT = "abcd"; + final Buffer b = Buffer.buffer(INPUT); + ws.write(b); + ws.end(); + byte[] arr = new byte[2]; + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo("ab".getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo("cd".getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo("cd".getBytes(StandardCharsets.UTF_8)); + } + + } + + @Test + void twoBuffers() throws IOException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.end(); + byte[] arr = new byte[8]; + Assertions.assertThat(ws.read(arr)).isEqualTo(8); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2).getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2).getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.end(); + byte[] arr = new byte[10]; + Assertions.assertThat(ws.read(arr)).isEqualTo(8); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2 + "\0\0").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2 + "\0\0").getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.end(); + byte[] arr = new byte[6]; + Assertions.assertThat(ws.read(arr)).isEqualTo(6); + Assertions.assertThat(arr).isEqualTo(("abcdef").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo(("ghcdef").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo(("ghcdef").getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.end(); + byte[] arr = new byte[2]; + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo(("ab").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo(("cd").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo(("ef").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo(("gh").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo(("gh").getBytes(StandardCharsets.UTF_8)); + } + } + + @Test + void threeBuffers() throws IOException { + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + final String INPUT3 = "ijkl"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.write(Buffer.buffer(INPUT3)); + ws.end(); + byte[] arr = new byte[12]; + Assertions.assertThat(ws.read(arr)).isEqualTo(12); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2 + INPUT3).getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2 + INPUT3).getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + final String INPUT3 = "ijkl"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.write(Buffer.buffer(INPUT3)); + ws.end(); + byte[] arr = new byte[14]; + Assertions.assertThat(ws.read(arr)).isEqualTo(12); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2 + INPUT3 + "\0\0").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo((INPUT1 + INPUT2 + INPUT3 + "\0\0").getBytes(StandardCharsets.UTF_8)); + } + + try (InputStreamWriteStream ws = new InputStreamWriteStream(2)) { + final String INPUT1 = "abcd"; + final String INPUT2 = "efgh"; + final String INPUT3 = "ijkl"; + ws.write(Buffer.buffer(INPUT1)); + ws.write(Buffer.buffer(INPUT2)); + ws.write(Buffer.buffer(INPUT3)); + ws.end(); + byte[] arr = new byte[10]; + Assertions.assertThat(ws.read(arr)).isEqualTo(10); + Assertions.assertThat(arr).isEqualTo(("abcdefghij").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(2); + Assertions.assertThat(arr).isEqualTo(("klcdefghij").getBytes(StandardCharsets.UTF_8)); + Assertions.assertThat(ws.read(arr)).isEqualTo(-1); + Assertions.assertThat(arr).isEqualTo(("klcdefghij").getBytes(StandardCharsets.UTF_8)); + } + + } +} diff --git a/integration-tests/client-server/pom.xml b/integration-tests/client-server/pom.xml index b83b00153..df8a5be62 100644 --- a/integration-tests/client-server/pom.xml +++ b/integration-tests/client-server/pom.xml @@ -22,6 +22,10 @@ io.quarkus quarkus-resteasy + + io.quarkus + quarkus-resteasy-mutiny + io.quarkus quarkus-elytron-security-properties-file diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowRest.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowRest.java new file mode 100644 index 000000000..7bcc1de7d --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowRest.java @@ -0,0 +1,35 @@ +package io.quarkiverse.cxf.it.large.slow; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkiverse.cxf.it.large.slow.generated.LargeSlowService; +import io.smallrye.mutiny.Uni; + +@Path("/LargeSlowRest") +public class LargeSlowRest { + + @CXFClient("largeSlow") + LargeSlowService largeSlow; + + @Path("/largeHelloAsync") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni largeHelloAsync(@QueryParam("sizeBytes") int sizeBytes, @QueryParam("delayMs") int delayMs) { + return Uni.createFrom() + .future(largeSlow.largeSlowAsync(sizeBytes, delayMs)) + .map(addResponse -> addResponse.getReturn().getPayload()); + } + + @Path("/largeHelloSync") + @GET + @Produces(MediaType.TEXT_PLAIN) + public String largeHelloSync(@QueryParam("sizeBytes") int sizeBytes, @QueryParam("delayMs") int delayMs) { + return largeSlow.largeSlow(sizeBytes, delayMs).getPayload(); + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowServiceImpl.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowServiceImpl.java new file mode 100644 index 000000000..b298350fa --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/LargeSlowServiceImpl.java @@ -0,0 +1,37 @@ +package io.quarkiverse.cxf.it.large.slow; + +import java.util.concurrent.Future; + +import jakarta.jws.WebService; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.Response; + +import io.quarkiverse.cxf.annotation.CXFEndpoint; +import io.quarkiverse.cxf.it.large.slow.generated.LargeSlowOutput; +import io.quarkiverse.cxf.it.large.slow.generated.LargeSlowResponse; +import io.quarkiverse.cxf.it.large.slow.generated.LargeSlowService; + +@WebService(serviceName = "LargeSlowService", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test") +@CXFEndpoint("/largeSlow") +public class LargeSlowServiceImpl implements LargeSlowService { + + @Override + public LargeSlowOutput largeSlow(int sizeBytes, int delayMs) { + final StringBuilder sb = new StringBuilder(); + while (sb.length() < sizeBytes) { + sb.append("0123456789"); + } + sb.setLength(sizeBytes); + return new LargeSlowOutput(delayMs, sb.toString()); + } + + @Override + public Response largeSlowAsync(int arg0, int arg1) { + throw new UnsupportedOperationException(); + } + + @Override + public Future largeSlowAsync(int arg0, int arg1, AsyncHandler asyncHandler) { + throw new UnsupportedOperationException(); + } +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlow.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlow.java new file mode 100644 index 000000000..3c8c7a104 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlow.java @@ -0,0 +1,71 @@ + +package io.quarkiverse.cxf.it.large.slow.generated; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlType; + + +/** + *

Java class for largeSlow complex type

. + * + *

The following schema fragment specifies the expected content contained within this class.

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "largeSlow", propOrder = { + "arg0", + "arg1" +}) +public class LargeSlow { + + protected int arg0; + protected int arg1; + + /** + * Gets the value of the arg0 property. + * + */ + public int getArg0() { + return arg0; + } + + /** + * Sets the value of the arg0 property. + * + */ + public void setArg0(int value) { + this.arg0 = value; + } + + /** + * Gets the value of the arg1 property. + * + */ + public int getArg1() { + return arg1; + } + + /** + * Sets the value of the arg1 property. + * + */ + public void setArg1(int value) { + this.arg1 = value; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowOutput.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowOutput.java new file mode 100644 index 000000000..35f45f651 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowOutput.java @@ -0,0 +1,98 @@ + +package io.quarkiverse.cxf.it.large.slow.generated; + +import io.quarkus.logging.Log; +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlElement; +import jakarta.xml.bind.annotation.XmlType; + + +/** + *

Java class for largeSlowOutput complex type

. + * + *

The following schema fragment specifies the expected content contained within this class.

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.PROPERTY) +@XmlType(name = "largeSlowOutput", propOrder = { + "delayMs", + "payload" +}) +public class LargeSlowOutput { + + protected int delayMs; + protected String payload; + + public LargeSlowOutput() { + + } + public LargeSlowOutput(int delayMs, String payload) { + this.delayMs = delayMs; + this.payload = payload; + } + + /** + * Gets the value of the delayMs property. + * + */ + public int getDelayMs() { + return delayMs; + } + + /** + * Sets the value of the delayMs property. + * + */ + @XmlElement(name = "delayMs") + public void setDelayMs(int delayMs) { + Log.infof("Sleeping for %d ms", delayMs); + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + this.delayMs = delayMs; + } + + /** + * Gets the value of the payload property. + * + * @return + * possible object is + * {@link String } + * + */ + public String getPayload() { + return payload; + } + + /** + * Sets the value of the payload property. + * + * @param value + * allowed object is + * {@link String } + * + */ + @XmlElement(name = "payload") + public void setPayload(String value) { + this.payload = value; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowResponse.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowResponse.java new file mode 100644 index 000000000..260a41301 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowResponse.java @@ -0,0 +1,62 @@ + +package io.quarkiverse.cxf.it.large.slow.generated; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlElement; +import jakarta.xml.bind.annotation.XmlType; + + +/** + *

Java class for largeSlowResponse complex type

. + * + *

The following schema fragment specifies the expected content contained within this class.

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "largeSlowResponse", propOrder = { + "_return" +}) +public class LargeSlowResponse { + + @XmlElement(name = "return") + protected LargeSlowOutput _return; + + /** + * Gets the value of the return property. + * + * @return + * possible object is + * {@link LargeSlowOutput } + * + */ + public LargeSlowOutput getReturn() { + return _return; + } + + /** + * Sets the value of the return property. + * + * @param value + * allowed object is + * {@link LargeSlowOutput } + * + */ + public void setReturn(LargeSlowOutput value) { + this._return = value; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService.java new file mode 100644 index 000000000..5e379fa2c --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService.java @@ -0,0 +1,59 @@ +package io.quarkiverse.cxf.it.large.slow.generated; + +import jakarta.jws.WebMethod; +import jakarta.jws.WebParam; +import jakarta.jws.WebResult; +import jakarta.jws.WebService; +import jakarta.xml.bind.annotation.XmlSeeAlso; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.RequestWrapper; +import jakarta.xml.ws.Response; +import jakarta.xml.ws.ResponseWrapper; +import java.util.concurrent.Future; + +/** + * This class was generated by Apache CXF 4.0.5 + * 2024-11-21T16:40:52.615+01:00 + * Generated source version: 4.0.5 + * + */ +@WebService(targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", name = "LargeSlowService") +@XmlSeeAlso({ObjectFactory.class}) +public interface LargeSlowService { + + @WebMethod(operationName = "largeSlow") + @RequestWrapper(localName = "largeSlow", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.large.slow.generated.LargeSlow") + @ResponseWrapper(localName = "largeSlowResponse", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.large.slow.generated.LargeSlowResponse") + public Response largeSlowAsync( + + @WebParam(name = "arg0", targetNamespace = "") + int arg0, + @WebParam(name = "arg1", targetNamespace = "") + int arg1 + ); + + @WebMethod(operationName = "largeSlow") + @ResponseWrapper(localName = "largeSlowResponse", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.large.slow.generated.LargeSlowResponse") + @RequestWrapper(localName = "largeSlow", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.large.slow.generated.LargeSlow") + public Future largeSlowAsync( + + @WebParam(name = "arg0", targetNamespace = "") + int arg0, + @WebParam(name = "arg1", targetNamespace = "") + int arg1, + @WebParam(name = "asyncHandler", targetNamespace = "") + AsyncHandler asyncHandler + ); + + @WebMethod + @RequestWrapper(localName = "largeSlow", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.large.slow.generated.LargeSlow") + @ResponseWrapper(localName = "largeSlowResponse", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.large.slow.generated.LargeSlowResponse") + @WebResult(name = "return", targetNamespace = "") + public io.quarkiverse.cxf.it.large.slow.generated.LargeSlowOutput largeSlow( + + @WebParam(name = "arg0", targetNamespace = "") + int arg0, + @WebParam(name = "arg1", targetNamespace = "") + int arg1 + ); +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService_Service.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService_Service.java new file mode 100644 index 000000000..96a421086 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/LargeSlowService_Service.java @@ -0,0 +1,88 @@ +package io.quarkiverse.cxf.it.large.slow.generated; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import javax.xml.namespace.QName; +import jakarta.xml.ws.WebEndpoint; +import jakarta.xml.ws.WebServiceClient; +import jakarta.xml.ws.WebServiceFeature; +import jakarta.xml.ws.Service; + +/** + * This class was generated by Apache CXF 4.0.5 + * 2024-11-21T16:40:52.628+01:00 + * Generated source version: 4.0.5 + * + */ +@WebServiceClient(name = "LargeSlowService", + wsdlLocation = "file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl", + targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test") +public class LargeSlowService_Service extends Service { + + public static final URL WSDL_LOCATION; + + public static final QName SERVICE = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "LargeSlowService"); + public static final QName LargeSlowServiceImplPort = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "LargeSlowServiceImplPort"); + static { + URL url = null; + try { + url = URI.create("file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl").toURL(); + } catch (MalformedURLException e) { + java.util.logging.Logger.getLogger(LargeSlowService_Service.class.getName()) + .log(java.util.logging.Level.INFO, + "Can not initialize the default wsdl from {0}", "file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl"); + } + WSDL_LOCATION = url; + } + + public LargeSlowService_Service(URL wsdlLocation) { + super(wsdlLocation, SERVICE); + } + + public LargeSlowService_Service(URL wsdlLocation, QName serviceName) { + super(wsdlLocation, serviceName); + } + + public LargeSlowService_Service() { + super(WSDL_LOCATION, SERVICE); + } + + public LargeSlowService_Service(WebServiceFeature ... features) { + super(WSDL_LOCATION, SERVICE, features); + } + + public LargeSlowService_Service(URL wsdlLocation, WebServiceFeature ... features) { + super(wsdlLocation, SERVICE, features); + } + + public LargeSlowService_Service(URL wsdlLocation, QName serviceName, WebServiceFeature ... features) { + super(wsdlLocation, serviceName, features); + } + + + + + /** + * + * @return + * returns LargeSlowService + */ + @WebEndpoint(name = "LargeSlowServiceImplPort") + public LargeSlowService getLargeSlowServiceImplPort() { + return super.getPort(LargeSlowServiceImplPort, LargeSlowService.class); + } + + /** + * + * @param features + * A list of {@link jakarta.xml.ws.WebServiceFeature} to configure on the proxy. Supported features not in the features parameter will have their default values. + * @return + * returns LargeSlowService + */ + @WebEndpoint(name = "LargeSlowServiceImplPort") + public LargeSlowService getLargeSlowServiceImplPort(WebServiceFeature... features) { + return super.getPort(LargeSlowServiceImplPort, LargeSlowService.class, features); + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/ObjectFactory.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/ObjectFactory.java new file mode 100644 index 000000000..5c3bc6cc2 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/ObjectFactory.java @@ -0,0 +1,93 @@ + +package io.quarkiverse.cxf.it.large.slow.generated; + +import javax.xml.namespace.QName; +import jakarta.xml.bind.JAXBElement; +import jakarta.xml.bind.annotation.XmlElementDecl; +import jakarta.xml.bind.annotation.XmlRegistry; + + +/** + * This object contains factory methods for each + * Java content interface and Java element interface + * generated in the io.quarkiverse.cxf.it.large.slow.generated package. + *

An ObjectFactory allows you to programmatically + * construct new instances of the Java representation + * for XML content. The Java representation of XML + * content can consist of schema derived interfaces + * and classes representing the binding of schema + * type definitions, element declarations and model + * groups. Factory methods for each of these are + * provided in this class. + * + */ +@XmlRegistry +public class ObjectFactory { + + private static final QName _LargeSlow_QNAME = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "largeSlow"); + private static final QName _LargeSlowResponse_QNAME = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "largeSlowResponse"); + + /** + * Create a new ObjectFactory that can be used to create new instances of schema derived classes for package: io.quarkiverse.cxf.it.large.slow.generated + * + */ + public ObjectFactory() { + } + + /** + * Create an instance of {@link LargeSlow } + * + * @return + * the new instance of {@link LargeSlow } + */ + public LargeSlow createLargeSlow() { + return new LargeSlow(); + } + + /** + * Create an instance of {@link LargeSlowResponse } + * + * @return + * the new instance of {@link LargeSlowResponse } + */ + public LargeSlowResponse createLargeSlowResponse() { + return new LargeSlowResponse(); + } + + /** + * Create an instance of {@link LargeSlowOutput } + * + * @return + * the new instance of {@link LargeSlowOutput } + */ + public LargeSlowOutput createLargeSlowOutput() { + return new LargeSlowOutput(); + } + + /** + * Create an instance of {@link JAXBElement }{@code <}{@link LargeSlow }{@code >} + * + * @param value + * Java instance representing xml element's value. + * @return + * the new instance of {@link JAXBElement }{@code <}{@link LargeSlow }{@code >} + */ + @XmlElementDecl(namespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", name = "largeSlow") + public JAXBElement createLargeSlow(LargeSlow value) { + return new JAXBElement<>(_LargeSlow_QNAME, LargeSlow.class, null, value); + } + + /** + * Create an instance of {@link JAXBElement }{@code <}{@link LargeSlowResponse }{@code >} + * + * @param value + * Java instance representing xml element's value. + * @return + * the new instance of {@link JAXBElement }{@code <}{@link LargeSlowResponse }{@code >} + */ + @XmlElementDecl(namespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", name = "largeSlowResponse") + public JAXBElement createLargeSlowResponse(LargeSlowResponse value) { + return new JAXBElement<>(_LargeSlowResponse_QNAME, LargeSlowResponse.class, null, value); + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/package-info.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/package-info.java new file mode 100644 index 000000000..ffd15e565 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/large/slow/generated/package-info.java @@ -0,0 +1,2 @@ +@jakarta.xml.bind.annotation.XmlSchema(namespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test") +package io.quarkiverse.cxf.it.large.slow.generated; diff --git a/integration-tests/client-server/src/main/resources/application.properties b/integration-tests/client-server/src/main/resources/application.properties index 99245fdf9..157860932 100644 --- a/integration-tests/client-server/src/main/resources/application.properties +++ b/integration-tests/client-server/src/main/resources/application.properties @@ -21,9 +21,9 @@ quarkus.cxf.endpoint."/addressing-anonymous".implementor = io.quarkiverse.cxf.it quarkus.cxf.endpoint."/addressing-decoupled".implementor = io.quarkiverse.cxf.it.ws.addressing.server.decoupled.WsAddressingImpl # XML Schema validation -quarkus.cxf.codegen.wsdl2java.includes = wsdl/*.wsdl -quarkus.cxf.codegen.wsdl2java.package-names = io.quarkiverse.cxf.it.server.xml.schema.validation.model -quarkus.cxf.codegen.wsdl2java.wsdl-location = classpath:wsdl/calculator.wsdl +quarkus.cxf.codegen.wsdl2java.schema-validation.includes = wsdl/calculator.wsdl +quarkus.cxf.codegen.wsdl2java.schema-validation.package-names = io.quarkiverse.cxf.it.server.xml.schema.validation.model +quarkus.cxf.codegen.wsdl2java.schema-validation.wsdl-location = classpath:wsdl/calculator.wsdl # Service endpoints quarkus.cxf.endpoint."/annotation-schema-validated-calculator".implementor = io.quarkiverse.cxf.it.server.xml.schema.validation.AnnotationSchemaValidatedCalculatorServiceImpl @@ -81,4 +81,13 @@ quarkus.cxf.client.basicAuthSecureWsdl.secure-wsdl-access = true quarkus.cxf.client.helloMock.client-endpoint-url = http://localhost:${quarkus.http.test-port}/soap/helloMock quarkus.cxf.client.helloMock.service-interface = io.quarkiverse.cxf.it.HelloService +# Large slow client +quarkus.cxf.client.largeSlow.client-endpoint-url = http://localhost:${quarkus.http.test-port}/soap/largeSlow +quarkus.cxf.client.largeSlow.service-interface = io.quarkiverse.cxf.it.large.slow.generated.LargeSlowService +# Uncomment, regenerate and copy the generated classes to /src/main/java, if needed +#quarkus.cxf.codegen.wsdl2java.large-slow.includes = wsdl/LargeSlow.wsdl +#quarkus.cxf.codegen.wsdl2java.large-slow.package-names = io.quarkiverse.cxf.it.large.slow.generated +#quarkus.cxf.codegen.wsdl2java.large-slow.additional-params = -b,src/main/resources/wsdl/LargeSlow-async-binding.xml + + quarkus.default-locale = en_US \ No newline at end of file diff --git a/integration-tests/client-server/src/main/resources/wsdl/LargeSlow-async-binding.xml b/integration-tests/client-server/src/main/resources/wsdl/LargeSlow-async-binding.xml new file mode 100644 index 000000000..978342f2b --- /dev/null +++ b/integration-tests/client-server/src/main/resources/wsdl/LargeSlow-async-binding.xml @@ -0,0 +1,10 @@ + + + + true + + diff --git a/integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl b/integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl new file mode 100644 index 000000000..b6a30e502 --- /dev/null +++ b/integration-tests/client-server/src/main/resources/wsdl/LargeSlow.wsdl @@ -0,0 +1,64 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowIT.java b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowIT.java new file mode 100644 index 000000000..c2d63337a --- /dev/null +++ b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowIT.java @@ -0,0 +1,8 @@ +package io.quarkiverse.cxf.it.large.slow; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class LargeSlowIT extends LargeSlowTest { + +} diff --git a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowTest.java b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowTest.java new file mode 100644 index 000000000..5167eb651 --- /dev/null +++ b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/large/slow/LargeSlowTest.java @@ -0,0 +1,130 @@ +package io.quarkiverse.cxf.it.large.slow; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.assertj.core.api.Assertions; +import org.eclipse.microprofile.config.ConfigProvider; +import org.junit.jupiter.api.Test; + +import io.quarkus.logging.Log; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; + +@QuarkusTest +class LargeSlowTest { + private static final int KiB = 1024; + private static final int DELAY_MS = 4000; + private static final int PAYLOAD_SIZE = 9 * KiB; + private static final int WORKERS_COUNT = 5; + + @Test + void largeHelloAsync() throws InterruptedException, ExecutionException { + assertEndpoint("largeHelloAsync"); + } + + @Test + void largeHelloSync() throws InterruptedException, ExecutionException { + assertEndpoint("largeHelloSync"); + } + + private void assertEndpoint(String endpoint) throws InterruptedException, ExecutionException { + final ExecutorService executor = Executors.newFixedThreadPool(WORKERS_COUNT); + final long startTime = System.currentTimeMillis(); + + List> futures = new ArrayList<>(WORKERS_COUNT); + try { + + for (int i = 0; i < WORKERS_COUNT; i++) { + final Future f = executor.submit(() -> { + Log.infof("Sending a request with delay %d ms", DELAY_MS); + String result = RestAssured.given() + .queryParam("sizeBytes", PAYLOAD_SIZE) + .queryParam("delayMs", DELAY_MS) + .get("/LargeSlowRest/" + endpoint) + .then() + .statusCode(200) + .extract().body().asString(); + Log.infof("Received payload of size %d", result.length()); + return result; + }); + futures.add(f); + } + + // Ensure all tasks are completed + for (Future future : futures) { + final String payload = future.get(); + Assertions.assertThat(payload).hasSize(PAYLOAD_SIZE); + } + } finally { + executor.shutdown(); + } + /* + * Asserting that the requests pass in DELAY_MS plus some snallish constant time proves that their execution + * does not block each other. Otherwise, it would take nearly WORKERS_COUNT times DELAY_MS. + */ + long diff = System.currentTimeMillis() - startTime; + Log.infof("%d slow (%d ms) requests passed in %d ms in parallel", WORKERS_COUNT, DELAY_MS, diff); + Assertions.assertThat(diff).isLessThanOrEqualTo(DELAY_MS + 2000); + + /* Make sure the Thread.sleep() was not removed from LargeSlowOutput.setDelayMs(int) */ + Assertions.assertThat(diff).isGreaterThan(DELAY_MS); + } + + /** + * Make sure that our static copy is the same as the WSDL served by the container + * + * @throws IOException + */ + @Test + void wsdlUpToDate() throws IOException { + final int port = ConfigProvider.getConfig() + .getValue("quarkus.http.test-port", Integer.class); + final String wsdlUrl = "http://localhost:" + port + "/soap/largeSlow?wsdl"; + Path staticCopyPath = Paths.get("src/main/resources/wsdl/LargeSlow.wsdl"); + + final String expected = RestAssured.given() + .get(wsdlUrl) + .then() + .statusCode(200) + .extract().body().asString(); + + if (!Files.isRegularFile(staticCopyPath)) { + /* + * This test can be run from the test jar on Quarkus Platform + * In that case target/classes does not exist an we have to copy + * what's needed manually + */ + staticCopyPath = Paths.get("target/classes/wsdl/LargeSlow.wsdl"); + Files.createDirectories(staticCopyPath.getParent()); + try (InputStream in = LargeSlowTest.class.getClassLoader() + .getResourceAsStream("wsdl/LargeSlow.wsdl")) { + Files.copy(in, staticCopyPath, StandardCopyOption.REPLACE_EXISTING); + } + } + /* The changing Docker IP address in the WSDL should not matter */ + final String sanitizerRegex = ""; + final String staticCopyContent = Files + .readString(staticCopyPath, StandardCharsets.UTF_8) + .replaceAll(sanitizerRegex, ""); + + if (!expected.replaceAll(sanitizerRegex, "").equals(staticCopyContent)) { + Files.writeString(staticCopyPath, expected, StandardCharsets.UTF_8); + Assertions.fail("The static WSDL copy in " + staticCopyPath + + " went out of sync with the WSDL served by the container. The content was updated by the test, you just need to review and commit the changes."); + } + + } + +}