diff --git a/docs/modules/ROOT/examples/hc5/Hc5Resource.java b/docs/modules/ROOT/examples/hc5/Hc5Resource.java index 41251c933..d6434d1f3 100644 --- a/docs/modules/ROOT/examples/hc5/Hc5Resource.java +++ b/docs/modules/ROOT/examples/hc5/Hc5Resource.java @@ -43,6 +43,31 @@ public Uni addAsync(@QueryParam("a") int a, @QueryParam("b") int b) { public int addSync(@QueryParam("a") int a, @QueryParam("b") int b) { return myCalculator.add(a, b); } + + @Inject + @CXFClient("observableCalculator") + CalculatorService observableCalculator; + + @SuppressWarnings("unchecked") + @Path("/add-async-observable") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni addAsyncObservable(@QueryParam("a") int a, @QueryParam("b") int b) { + return Uni.createFrom() + .future( + (Future) observableCalculator + .addAsync(a, b, res -> { + })) + .map(addResponse -> addResponse.getReturn()); + } + + @Path("/add-sync-observable") + @GET + @Produces(MediaType.TEXT_PLAIN) + public int addSyncObservable(@QueryParam("a") int a, @QueryParam("b") int b) { + return observableCalculator.add(a, b); + } + // tag::quarkus-cxf-rt-transports-http-hc5.usage.mutiny[] } // end::quarkus-cxf-rt-transports-http-hc5.usage.mutiny[] diff --git a/docs/modules/ROOT/examples/hc5/application.properties b/docs/modules/ROOT/examples/hc5/application.properties index a90bef02d..73ba2de17 100644 --- a/docs/modules/ROOT/examples/hc5/application.properties +++ b/docs/modules/ROOT/examples/hc5/application.properties @@ -2,6 +2,15 @@ quarkus.cxf.client.myCalculator.wsdl = ${cxf.it.calculator.baseUri}/calculator-w quarkus.cxf.client.myCalculator.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService quarkus.cxf.client.myCalculator.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService +quarkus.cxf.client.observableCalculator.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl +quarkus.cxf.client.observableCalculator.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService +quarkus.cxf.client.observableCalculator.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService +quarkus.cxf.client.observableCalculator.features=io.quarkiverse.cxf.metrics.QuarkusCxfMetricsFeature + +quarkus.micrometer.export.json.enabled = true +quarkus.micrometer.export.json.path = metrics/json +quarkus.micrometer.export.prometheus.path = metrics/prometheus + # tag::quarkus-cxf-rt-transports-http-hc5.usage.wsdl2java[] quarkus.cxf.codegen.wsdl2java.includes = wsdl/*.wsdl quarkus.cxf.codegen.wsdl2java.additional-params = -b,src/main/resources/wsdl/async-binding.xml diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfBusBuildItem.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/BuildTimeBusBuildItem.java similarity index 73% rename from extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfBusBuildItem.java rename to extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/BuildTimeBusBuildItem.java index 286ac3b7e..0e193ad1d 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfBusBuildItem.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/BuildTimeBusBuildItem.java @@ -7,14 +7,14 @@ /** * Holds the build time {@link Bus} instance. */ -public final class CxfBusBuildItem extends SimpleBuildItem { +public final class BuildTimeBusBuildItem extends SimpleBuildItem { private final Bus bus; public Bus getBus() { return bus; } - public CxfBusBuildItem(Bus bus) { + public BuildTimeBusBuildItem(Bus bus) { super(); this.bus = bus; } diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/QuarkusCxfProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/QuarkusCxfProcessor.java index c5fc585f0..d7a529678 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/QuarkusCxfProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/QuarkusCxfProcessor.java @@ -165,10 +165,10 @@ void markBeansAsUnremovable(BuildProducer unremovables } @BuildStep - CxfBusBuildItem bus() { + BuildTimeBusBuildItem bus() { final Bus bus = BusFactory.getDefaultBus(); // setup class capturing - return new CxfBusBuildItem(bus); + return new BuildTimeBusBuildItem(bus); } @BuildStep @@ -188,7 +188,7 @@ void generateRuntimeBusServiceFile(BuildProducer gen @BuildStep void generateClasses( - CxfBusBuildItem busBuildItem, + BuildTimeBusBuildItem busBuildItem, List clients, List endpointImplementations, BuildProducer generatedBeans, @@ -591,6 +591,16 @@ NativeImageResourceBuildItem nativeImageResourceBuildItem() { "schemas/configuration/security.xsd"); } + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + void setupRuntimeBusCustomizers( + CXFRecorder recorder, + List customizers) { + for (RuntimeBusCustomizerBuildItem customizer : customizers) { + recorder.addRuntimeBusCustomizer(customizer.getCustomizer()); + } + } + @BuildStep @Record(ExecutionTime.RUNTIME_INIT) void shutDown( diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/RuntimeBusBuildItem.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/RuntimeBusBuildItem.java new file mode 100644 index 000000000..b9d5723da --- /dev/null +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/RuntimeBusBuildItem.java @@ -0,0 +1,23 @@ +package io.quarkiverse.cxf.deployment; + +import org.apache.cxf.Bus; + +import io.quarkus.builder.item.SimpleBuildItem; +import io.quarkus.runtime.RuntimeValue; + +/** + * Holds the runtime {@link Bus} reference. + */ +public final class RuntimeBusBuildItem extends SimpleBuildItem { + private final RuntimeValue bus; + + public RuntimeValue getBus() { + return bus; + } + + public RuntimeBusBuildItem(RuntimeValue bus) { + super(); + this.bus = bus; + } + +} \ No newline at end of file diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/RuntimeBusCustomizerBuildItem.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/RuntimeBusCustomizerBuildItem.java new file mode 100644 index 000000000..b25563c7b --- /dev/null +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/RuntimeBusCustomizerBuildItem.java @@ -0,0 +1,25 @@ +package io.quarkiverse.cxf.deployment; + +import java.util.function.Consumer; + +import org.apache.cxf.Bus; + +import io.quarkus.builder.item.MultiBuildItem; +import io.quarkus.runtime.RuntimeValue; + +/** + * Holds a {@link Consumer} that will be used to customize the runtime {@link Bus} right after its creation. + */ +public final class RuntimeBusCustomizerBuildItem extends MultiBuildItem { + private final RuntimeValue> customizer; + + public RuntimeBusCustomizerBuildItem(RuntimeValue> customizer) { + super(); + this.customizer = customizer; + } + + public RuntimeValue> getCustomizer() { + return customizer; + } + +} \ No newline at end of file diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java index 94be99a52..5aad53271 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java @@ -6,7 +6,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; +import org.apache.cxf.Bus; import org.jboss.logging.Logger; import io.quarkiverse.cxf.devconsole.DevCxfServerInfosSupplier; @@ -147,4 +149,8 @@ public Handler initServer(RuntimeValue infos, B public void resetDestinationRegistry(ShutdownContext context) { context.addShutdownTask(VertxDestinationFactory::resetRegistry); } + + public void addRuntimeBusCustomizer(RuntimeValue> customizer) { + QuarkusBusFactory.addBusCustomizer(customizer.getValue()); + } } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusBusFactory.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusBusFactory.java index 016a15212..e6960c4e0 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusBusFactory.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusBusFactory.java @@ -1,7 +1,10 @@ package io.quarkiverse.cxf; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import org.apache.cxf.Bus; import org.apache.cxf.bus.CXFBusFactory; @@ -22,12 +25,18 @@ public class QuarkusBusFactory extends CXFBusFactory { + /** {@link List} of customizers passed via {@code RuntimeBusCustomizerBuildItem} */ + private static final List> customizers = new CopyOnWriteArrayList<>(); + @Override public Bus createBus(Map, Object> extensions, Map properties) { if (extensions == null) { extensions = new HashMap, Object>(); } final Bus bus = super.createBus(extensions, properties); + for (Consumer customizer : customizers) { + customizer.accept(bus); + } bus.setExtension(new WrapperHelperClassLoader(bus), WrapperHelperCreator.class); bus.setExtension(new ExtensionClassLoader(bus), ExtensionClassCreator.class); bus.setExtension(new ExceptionClassLoader(bus), ExceptionClassCreator.class); @@ -38,4 +47,11 @@ public Bus createBus(Map, Object> extensions, Map prope return bus; } + /** + * @param customizer a {@link Consumer} to run right after the creation of the runtime {@link Bus} + */ + static void addBusCustomizer(Consumer customizer) { + customizers.add(customizer); + } + } diff --git a/extensions/transports-http-hc5/deployment/pom.xml b/extensions/transports-http-hc5/deployment/pom.xml index 818257ad1..e8917a6c6 100644 --- a/extensions/transports-http-hc5/deployment/pom.xml +++ b/extensions/transports-http-hc5/deployment/pom.xml @@ -16,6 +16,10 @@ io.quarkus quarkus-core-deployment + + io.quarkiverse.cxf + quarkus-cxf-deployment + io.quarkiverse.cxf quarkus-cxf-rt-transports-http-hc5 diff --git a/extensions/transports-http-hc5/deployment/src/main/java/io/quarkiverse/cxf/transport/http/hc5/deployment/QuarkusCxfTransportsHTTPAsyncProcessor.java b/extensions/transports-http-hc5/deployment/src/main/java/io/quarkiverse/cxf/transport/http/hc5/deployment/QuarkusCxfTransportsHTTPAsyncProcessor.java index 9235b8c27..cfbb28b24 100644 --- a/extensions/transports-http-hc5/deployment/src/main/java/io/quarkiverse/cxf/transport/http/hc5/deployment/QuarkusCxfTransportsHTTPAsyncProcessor.java +++ b/extensions/transports-http-hc5/deployment/src/main/java/io/quarkiverse/cxf/transport/http/hc5/deployment/QuarkusCxfTransportsHTTPAsyncProcessor.java @@ -3,7 +3,12 @@ import java.util.Arrays; import java.util.List; +import io.quarkiverse.cxf.deployment.RuntimeBusCustomizerBuildItem; +import io.quarkiverse.cxf.transport.http.hc5.Hc5Recorder; +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem; public class QuarkusCxfTransportsHTTPAsyncProcessor { @@ -15,4 +20,12 @@ List runtimeInitializedClasses() { new RuntimeInitializedClassBuildItem("org.apache.hc.client5.http.impl.auth.NTLMEngineImpl")); } + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + void customizers( + Hc5Recorder recorder, + BuildProducer customizers) { + customizers.produce(new RuntimeBusCustomizerBuildItem(recorder.customizeBus())); + } + } diff --git a/extensions/transports-http-hc5/runtime/pom.xml b/extensions/transports-http-hc5/runtime/pom.xml index 1f55e9055..31fcaa0aa 100644 --- a/extensions/transports-http-hc5/runtime/pom.xml +++ b/extensions/transports-http-hc5/runtime/pom.xml @@ -21,6 +21,10 @@ io.quarkus quarkus-arc + + io.quarkiverse.cxf + quarkus-cxf + org.apache.cxf cxf-rt-transports-http-hc5 diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java new file mode 100644 index 000000000..9fb43cd76 --- /dev/null +++ b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java @@ -0,0 +1,30 @@ +package io.quarkiverse.cxf.transport.http.hc5; + +import java.util.function.Consumer; + +import org.apache.cxf.Bus; +import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.workqueue.WorkQueueManager; + +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.annotations.Recorder; + +@Recorder +public class Hc5Recorder { + + /** + * Customize the runtime {@link Bus} by adding a custom work queue for {@code http-conduit} and a custom + * {@link HTTPConduitFactory}. Both are there to enable context propagation for async clients. + * + * @return a new {@link RuntimeValue} holding a {@link Consumer} to customize the runtime {@link Bus} + */ + public RuntimeValue> customizeBus() { + + return new RuntimeValue<>(bus -> { + final WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class); + wqm.addNamedWorkQueue("http-conduit", new QuarkusWorkQueueImpl("http-conduit")); + + bus.setExtension(new QuarkusAsyncHTTPConduitFactory(bus), HTTPConduitFactory.class); + }); + } +} diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduit.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduit.java new file mode 100644 index 000000000..166eaf6ae --- /dev/null +++ b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduit.java @@ -0,0 +1,219 @@ +package io.quarkiverse.cxf.transport.http.hc5; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +import org.apache.cxf.Bus; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory; +import org.apache.cxf.ws.addressing.EndpointReferenceType; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorStatus; +import org.apache.hc.core5.util.TimeValue; +import org.eclipse.microprofile.context.ThreadContext; + +import io.quarkus.arc.Arc; + +/** + * An {@link AsyncHTTPConduit} with custom {@link #getHttpAsyncClient(TlsStrategy)}. + */ +public class QuarkusAsyncHTTPConduit extends AsyncHTTPConduit { + + public QuarkusAsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, AsyncHTTPConduitFactory factory) + throws IOException { + super(b, ei, t, factory); + } + + /** + * @param tlsStrategy + * @return a Custom {@link CloseableHttpAsyncClient} whose {@code execute(*)} methods contextualize (see + * {@link ThreadContext}) the passed {@link AsyncResponseConsumer} + * @throws IOException + */ + @Override + public synchronized CloseableHttpAsyncClient getHttpAsyncClient(TlsStrategy tlsStrategy) throws IOException { + return new QuarkusAsyncClient(super.getHttpAsyncClient(tlsStrategy)); + } + + static class QuarkusAsyncClient extends CloseableHttpAsyncClient { + private final CloseableHttpAsyncClient delegate; + + public QuarkusAsyncClient(CloseableHttpAsyncClient delegate) { + super(); + this.delegate = delegate; + } + + @Override + public void close(CloseMode closeMode) { + delegate.close(closeMode); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public IOReactorStatus getStatus() { + return delegate.getStatus(); + } + + @Override + public void awaitShutdown(TimeValue waitTime) throws InterruptedException { + delegate.awaitShutdown(waitTime); + } + + @Override + public void initiateShutdown() { + delegate.initiateShutdown(); + } + + @Override + public boolean equals(Object obj) { + return delegate.equals(obj); + } + + @Override + public void register(String hostname, String uriPattern, Supplier supplier) { + delegate.register(hostname, uriPattern, supplier); + } + + @Override + public String toString() { + return delegate.toString(); + } + + @Override + protected Future doExecute( + final HttpHost target, + final AsyncRequestProducer requestProducer, + final AsyncResponseConsumer responseConsumer, + final HandlerFactory pushHandlerFactory, + final HttpContext context, + final FutureCallback callback) { + return delegate.execute( + target, + requestProducer, + new ContextualizedResponseConsumer(responseConsumer), + pushHandlerFactory, + context, + callback); + } + } + + /** + * Wraps the delegate in {@link ThreadContext#contextualConsumer(Consumer)} so that context propagation works for + * async clients + * + * @param + */ + static class ContextualizedResponseConsumer implements AsyncResponseConsumer { + private final AsyncResponseConsumer delegate; + private Consumer> contextualConsumer; + + public ContextualizedResponseConsumer(AsyncResponseConsumer delegate) { + super(); + this.delegate = delegate; + final ThreadContext threadContext = Arc.container().select(ThreadContext.class).get(); + /* + * We need to call this threadContext.contextualConsumer() here in the constructor to store the context + * because consumeResponse() is called from another thread where the context is not available anymore + */ + this.contextualConsumer = threadContext.contextualConsumer(args -> { + try { + delegate.consumeResponse( + args.response, + args.entityDetails, + args.context, + args.resultCallback); + } catch (HttpException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void updateCapacity(CapacityChannel capacityChannel) throws IOException { + delegate.updateCapacity(capacityChannel); + } + + @Override + public void consumeResponse(HttpResponse response, EntityDetails entityDetails, HttpContext context, + FutureCallback resultCallback) throws HttpException, IOException { + contextualConsumer.accept(new ConsumeResponseArgs<>(response, entityDetails, context, resultCallback)); + } + + @Override + public void releaseResources() { + delegate.releaseResources(); + } + + @Override + public void consume(ByteBuffer src) throws IOException { + delegate.consume(src); + } + + @Override + public void informationResponse(HttpResponse response, HttpContext context) throws HttpException, IOException { + delegate.informationResponse(response, context); + } + + @Override + public void streamEnd(List trailers) throws HttpException, IOException { + delegate.streamEnd(trailers); + } + + @Override + public void failed(Exception cause) { + delegate.failed(cause); + } + + static class ConsumeResponseArgs { + public ConsumeResponseArgs(HttpResponse response, EntityDetails entityDetails, HttpContext context, + FutureCallback resultCallback) { + super(); + this.response = response; + this.entityDetails = entityDetails; + this.context = context; + this.resultCallback = resultCallback; + } + + private final HttpResponse response; + private final EntityDetails entityDetails; + private final HttpContext context; + private final FutureCallback resultCallback; + } + } + +} diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduitFactory.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduitFactory.java new file mode 100644 index 000000000..3c3af49a4 --- /dev/null +++ b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduitFactory.java @@ -0,0 +1,31 @@ +package io.quarkiverse.cxf.transport.http.hc5; + +import java.io.IOException; +import java.util.Map; + +import org.apache.cxf.Bus; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.http.HTTPConduit; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory; +import org.apache.cxf.ws.addressing.EndpointReferenceType; + +/** + * Returns {@link QuarkusAsyncHTTPConduit}s from {@link #createConduit(Bus, EndpointInfo, EndpointReferenceType)}. + */ +public class QuarkusAsyncHTTPConduitFactory extends AsyncHTTPConduitFactory { + + public QuarkusAsyncHTTPConduitFactory(Bus b) { + super(b); + } + + public QuarkusAsyncHTTPConduitFactory(Map conf) { + super(conf); + } + + @Override + public HTTPConduit createConduit(Bus bus, EndpointInfo localInfo, + EndpointReferenceType target) throws IOException { + return new QuarkusAsyncHTTPConduit(bus, localInfo, target, this); + } + +} diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusWorkQueueImpl.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusWorkQueueImpl.java new file mode 100644 index 000000000..3f578e869 --- /dev/null +++ b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusWorkQueueImpl.java @@ -0,0 +1,74 @@ +package io.quarkiverse.cxf.transport.http.hc5; + +import org.apache.cxf.common.classloader.ClassLoaderUtils; +import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder; +import org.apache.cxf.workqueue.AutomaticWorkQueue; +import org.eclipse.microprofile.context.ManagedExecutor; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; + +/** + * Executes the tasks using {@link ManagedExecutor} for the sake of context propagation. + */ +public class QuarkusWorkQueueImpl implements AutomaticWorkQueue { + + private final String name; + + public QuarkusWorkQueueImpl(String name) { + this.name = name; + } + + @Override + public void execute(final Runnable command) { + //Grab the context classloader of this thread. We'll make sure we use that + //on the thread the runnable actually runs on. + + final ClassLoader loader = Thread.currentThread().getContextClassLoader(); + Runnable r = new Runnable() { + public void run() { + ClassLoaderHolder orig = ClassLoaderUtils.setThreadContextClassloader(loader); + try { + command.run(); + } finally { + if (orig != null) { + orig.reset(); + } + } + } + }; + InstanceHandle managedExecutorInst = Arc.container().instance(ManagedExecutor.class); + if (managedExecutorInst.isAvailable()) { + ManagedExecutor managedExecutor = managedExecutorInst.get(); + managedExecutor.execute(r); + } else { + throw new IllegalStateException(ManagedExecutor.class.getName() + " not available in Arc"); + } + } + + @Override + public void execute(Runnable work, long timeout) { + execute(work); + } + + @Override + public synchronized void schedule(final Runnable work, final long delay) { + throw new UnsupportedOperationException(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void shutdown(boolean processRemainingWorkItems) { + // we do not shot down the managed executor instance + } + + @Override + public boolean isShutdown() { + return false; + } + +} diff --git a/integration-tests/hc5/pom.xml b/integration-tests/hc5/pom.xml index eb0bd06e4..bf7f2fd4a 100644 --- a/integration-tests/hc5/pom.xml +++ b/integration-tests/hc5/pom.xml @@ -21,6 +21,18 @@ io.quarkiverse.cxf quarkus-cxf-rt-transports-http-hc5 + + io.quarkiverse.cxf + quarkus-cxf-rt-features-metrics + + + io.quarkus + quarkus-micrometer-registry-prometheus + + + io.quarkus + quarkus-smallrye-context-propagation + io.quarkus quarkus-resteasy-reactive diff --git a/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/Hc5Resource.java b/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/Hc5Resource.java index 41251c933..d6434d1f3 100644 --- a/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/Hc5Resource.java +++ b/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/Hc5Resource.java @@ -43,6 +43,31 @@ public Uni addAsync(@QueryParam("a") int a, @QueryParam("b") int b) { public int addSync(@QueryParam("a") int a, @QueryParam("b") int b) { return myCalculator.add(a, b); } + + @Inject + @CXFClient("observableCalculator") + CalculatorService observableCalculator; + + @SuppressWarnings("unchecked") + @Path("/add-async-observable") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni addAsyncObservable(@QueryParam("a") int a, @QueryParam("b") int b) { + return Uni.createFrom() + .future( + (Future) observableCalculator + .addAsync(a, b, res -> { + })) + .map(addResponse -> addResponse.getReturn()); + } + + @Path("/add-sync-observable") + @GET + @Produces(MediaType.TEXT_PLAIN) + public int addSyncObservable(@QueryParam("a") int a, @QueryParam("b") int b) { + return observableCalculator.add(a, b); + } + // tag::quarkus-cxf-rt-transports-http-hc5.usage.mutiny[] } // end::quarkus-cxf-rt-transports-http-hc5.usage.mutiny[] diff --git a/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/MeterFilterProducer.java b/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/MeterFilterProducer.java new file mode 100644 index 000000000..c45e7a914 --- /dev/null +++ b/integration-tests/hc5/src/main/java/io/quarkiverse/cxf/hc5/it/MeterFilterProducer.java @@ -0,0 +1,31 @@ +package io.quarkiverse.cxf.hc5.it; + +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.config.MeterFilter; + +@Singleton +public class MeterFilterProducer { + + @Inject + RequestScopedHeader requestScopedHeader; + + @Produces + @Singleton + public MeterFilter addTags() { + return new MeterFilter() { + @Override + public Meter.Id map(Meter.Id id) { + if (id.getName().startsWith("http.client") || id.getName().startsWith("cxf.client")) { + return id.withTag(Tag.of("my-header", requestScopedHeader.getHeaderValue())); + } else { + return id; + } + } + }; + } +} diff --git a/integration-tests/hc5/src/main/resources/application.properties b/integration-tests/hc5/src/main/resources/application.properties index a90bef02d..73ba2de17 100644 --- a/integration-tests/hc5/src/main/resources/application.properties +++ b/integration-tests/hc5/src/main/resources/application.properties @@ -2,6 +2,15 @@ quarkus.cxf.client.myCalculator.wsdl = ${cxf.it.calculator.baseUri}/calculator-w quarkus.cxf.client.myCalculator.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService quarkus.cxf.client.myCalculator.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService +quarkus.cxf.client.observableCalculator.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl +quarkus.cxf.client.observableCalculator.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService +quarkus.cxf.client.observableCalculator.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService +quarkus.cxf.client.observableCalculator.features=io.quarkiverse.cxf.metrics.QuarkusCxfMetricsFeature + +quarkus.micrometer.export.json.enabled = true +quarkus.micrometer.export.json.path = metrics/json +quarkus.micrometer.export.prometheus.path = metrics/prometheus + # tag::quarkus-cxf-rt-transports-http-hc5.usage.wsdl2java[] quarkus.cxf.codegen.wsdl2java.includes = wsdl/*.wsdl quarkus.cxf.codegen.wsdl2java.additional-params = -b,src/main/resources/wsdl/async-binding.xml diff --git a/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java new file mode 100644 index 000000000..92114ccd7 --- /dev/null +++ b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java @@ -0,0 +1,42 @@ +package io.quarkiverse.cxf.hc5.it; + +import java.util.concurrent.ExecutionException; + +import jakarta.inject.Inject; + +import org.assertj.core.api.Assertions; +import org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService; +import org.junit.jupiter.api.Test; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(Hc5TestResource.class) +class Hc5ContextlessTest { + + @Inject + @CXFClient("myCalculator") + CalculatorService myCalculator; + + /** + * Make sure that the client still works when called from a thread where the request context is not set + * + * @throws InterruptedException + */ + @Test + void contextless() throws InterruptedException { + Assertions.assertThat(myCalculator.add(5, 5)).isEqualTo(10); + myCalculator.addAsync(5, 7, response -> { + try { + Assertions.assertThat(response.get().getReturn()).isEqualTo(12); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5IT.java b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5IT.java new file mode 100644 index 000000000..86f25ad31 --- /dev/null +++ b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5IT.java @@ -0,0 +1,7 @@ +package io.quarkiverse.cxf.hc5.it; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class Hc5IT extends Hc5Test { +} diff --git a/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5Test.java b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5Test.java index 696fb10bf..5e27dcf2f 100644 --- a/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5Test.java +++ b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5Test.java @@ -8,8 +8,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Map; import org.assertj.core.api.Assertions; +import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -18,21 +20,39 @@ import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; +import io.restassured.path.json.JsonPath; @QuarkusTest @QuarkusTestResource(Hc5TestResource.class) class Hc5Test { @ParameterizedTest - @ValueSource(strings = { "sync", "async" }) + @ValueSource(strings = { "sync", "async", "sync-observable", "async-observable" }) void add(String syncMode) { RestAssured.given() + .header(RequestScopedHeader.header, syncMode + "-header-value") .queryParam("a", 7) .queryParam("b", 4) .get("/hc5/add-" + syncMode) .then() .statusCode(200) .body(is("11")); + + if (syncMode.endsWith("-observable")) { + /* Make sure that the tagging done in MeterFilterProducer actually works */ + + final Config config = ConfigProvider.getConfig(); + final String baseUri = config.getValue("cxf.it.calculator.baseUri", String.class); + final Map metrics = getMetrics(); + + @SuppressWarnings("unchecked") + Map clientRequests = (Map) metrics.get("cxf.client.requests"); + Assertions.assertThat(clientRequests).isNotNull(); + String key = "count;exception=None;faultCode=None;method=POST;my-header=" + syncMode + + "-header-value;operation=add;outcome=SUCCESS;status=200;uri=" + + baseUri + "/calculator-ws/CalculatorService"; + Assertions.assertThat((Integer) clientRequests.get(key)).isGreaterThan(0); + } } /** @@ -77,4 +97,15 @@ void wsdlUpToDate() throws IOException { } } -} \ No newline at end of file + + private Map getMetrics() { + final String body = RestAssured.given() + .header("Content-Type", "application/json") + .get("/q/metrics/json") + .then() + .statusCode(200) + .extract().body().asString(); + final JsonPath jp = new JsonPath(body); + return jp.getJsonObject("$"); + } +}