Skip to content

Commit

Permalink
Context propagation not working in async calls fix #947
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Aug 9, 2023
1 parent b18d814 commit 6cea9f2
Show file tree
Hide file tree
Showing 22 changed files with 653 additions and 7 deletions.
25 changes: 25 additions & 0 deletions docs/modules/ROOT/examples/hc5/Hc5Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ public Uni<Integer> addAsync(@QueryParam("a") int a, @QueryParam("b") int b) {
public int addSync(@QueryParam("a") int a, @QueryParam("b") int b) {
return myCalculator.add(a, b);
}

@Inject
@CXFClient("observableCalculator")
CalculatorService observableCalculator;

@SuppressWarnings("unchecked")
@Path("/add-async-observable")
@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<Integer> addAsyncObservable(@QueryParam("a") int a, @QueryParam("b") int b) {
return Uni.createFrom()
.future(
(Future<AddResponse>) observableCalculator
.addAsync(a, b, res -> {
}))
.map(addResponse -> addResponse.getReturn());
}

@Path("/add-sync-observable")
@GET
@Produces(MediaType.TEXT_PLAIN)
public int addSyncObservable(@QueryParam("a") int a, @QueryParam("b") int b) {
return observableCalculator.add(a, b);
}

// tag::quarkus-cxf-rt-transports-http-hc5.usage.mutiny[]
}
// end::quarkus-cxf-rt-transports-http-hc5.usage.mutiny[]
9 changes: 9 additions & 0 deletions docs/modules/ROOT/examples/hc5/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ quarkus.cxf.client.myCalculator.wsdl = ${cxf.it.calculator.baseUri}/calculator-w
quarkus.cxf.client.myCalculator.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService
quarkus.cxf.client.myCalculator.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService

quarkus.cxf.client.observableCalculator.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl
quarkus.cxf.client.observableCalculator.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService
quarkus.cxf.client.observableCalculator.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService
quarkus.cxf.client.observableCalculator.features=io.quarkiverse.cxf.metrics.QuarkusCxfMetricsFeature

quarkus.micrometer.export.json.enabled = true
quarkus.micrometer.export.json.path = metrics/json
quarkus.micrometer.export.prometheus.path = metrics/prometheus

# tag::quarkus-cxf-rt-transports-http-hc5.usage.wsdl2java[]
quarkus.cxf.codegen.wsdl2java.includes = wsdl/*.wsdl
quarkus.cxf.codegen.wsdl2java.additional-params = -b,src/main/resources/wsdl/async-binding.xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
/**
* Holds the build time {@link Bus} instance.
*/
public final class CxfBusBuildItem extends SimpleBuildItem {
public final class BuildTimeBusBuildItem extends SimpleBuildItem {
private final Bus bus;

public Bus getBus() {
return bus;
}

public CxfBusBuildItem(Bus bus) {
public BuildTimeBusBuildItem(Bus bus) {
super();
this.bus = bus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ void markBeansAsUnremovable(BuildProducer<UnremovableBeanBuildItem> unremovables
}

@BuildStep
CxfBusBuildItem bus() {
BuildTimeBusBuildItem bus() {
final Bus bus = BusFactory.getDefaultBus();
// setup class capturing
return new CxfBusBuildItem(bus);
return new BuildTimeBusBuildItem(bus);
}

@BuildStep
Expand All @@ -188,7 +188,7 @@ void generateRuntimeBusServiceFile(BuildProducer<GeneratedResourceBuildItem> gen

@BuildStep
void generateClasses(
CxfBusBuildItem busBuildItem,
BuildTimeBusBuildItem busBuildItem,
List<ClientSeiBuildItem> clients,
List<ServiceSeiBuildItem> endpointImplementations,
BuildProducer<GeneratedBeanBuildItem> generatedBeans,
Expand Down Expand Up @@ -591,6 +591,16 @@ NativeImageResourceBuildItem nativeImageResourceBuildItem() {
"schemas/configuration/security.xsd");
}

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void setupRuntimeBusCustomizers(
CXFRecorder recorder,
List<RuntimeBusCustomizerBuildItem> customizers) {
for (RuntimeBusCustomizerBuildItem customizer : customizers) {
recorder.addRuntimeBusCustomizer(customizer.getCustomizer());
}
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void shutDown(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkiverse.cxf.deployment;

import org.apache.cxf.Bus;

import io.quarkus.builder.item.SimpleBuildItem;
import io.quarkus.runtime.RuntimeValue;

/**
* Holds the runtime {@link Bus} reference.
*/
public final class RuntimeBusBuildItem extends SimpleBuildItem {
private final RuntimeValue<Bus> bus;

public RuntimeValue<Bus> getBus() {
return bus;
}

public RuntimeBusBuildItem(RuntimeValue<Bus> bus) {
super();
this.bus = bus;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkiverse.cxf.deployment;

import java.util.function.Consumer;

import org.apache.cxf.Bus;

import io.quarkus.builder.item.MultiBuildItem;
import io.quarkus.runtime.RuntimeValue;

/**
* Holds a {@link Consumer} that will be used to customize the runtime {@link Bus} right after its creation.
*/
public final class RuntimeBusCustomizerBuildItem extends MultiBuildItem {
private final RuntimeValue<Consumer<Bus>> customizer;

public RuntimeBusCustomizerBuildItem(RuntimeValue<Consumer<Bus>> customizer) {
super();
this.customizer = customizer;
}

public RuntimeValue<Consumer<Bus>> getCustomizer() {
return customizer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.cxf.Bus;
import org.jboss.logging.Logger;

import io.quarkiverse.cxf.devconsole.DevCxfServerInfosSupplier;
Expand Down Expand Up @@ -147,4 +149,8 @@ public Handler<RoutingContext> initServer(RuntimeValue<CXFServletInfos> infos, B
public void resetDestinationRegistry(ShutdownContext context) {
context.addShutdownTask(VertxDestinationFactory::resetRegistry);
}

public void addRuntimeBusCustomizer(RuntimeValue<Consumer<Bus>> customizer) {
QuarkusBusFactory.addBusCustomizer(customizer.getValue());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkiverse.cxf;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import org.apache.cxf.Bus;
import org.apache.cxf.bus.CXFBusFactory;
Expand All @@ -22,12 +25,18 @@

public class QuarkusBusFactory extends CXFBusFactory {

/** {@link List} of customizers passed via {@code RuntimeBusCustomizerBuildItem} */
private static final List<Consumer<Bus>> customizers = new CopyOnWriteArrayList<>();

@Override
public Bus createBus(Map<Class<?>, Object> extensions, Map<String, Object> properties) {
if (extensions == null) {
extensions = new HashMap<Class<?>, Object>();
}
final Bus bus = super.createBus(extensions, properties);
for (Consumer<Bus> customizer : customizers) {
customizer.accept(bus);
}
bus.setExtension(new WrapperHelperClassLoader(bus), WrapperHelperCreator.class);
bus.setExtension(new ExtensionClassLoader(bus), ExtensionClassCreator.class);
bus.setExtension(new ExceptionClassLoader(bus), ExceptionClassCreator.class);
Expand All @@ -38,4 +47,11 @@ public Bus createBus(Map<Class<?>, Object> extensions, Map<String, Object> prope
return bus;
}

/**
* @param customizer a {@link Consumer} to run right after the creation of the runtime {@link Bus}
*/
static void addBusCustomizer(Consumer<Bus> customizer) {
customizers.add(customizer);
}

}
4 changes: 4 additions & 0 deletions extensions/transports-http-hc5/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.cxf</groupId>
<artifactId>quarkus-cxf-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.cxf</groupId>
<artifactId>quarkus-cxf-rt-transports-http-hc5</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import java.util.Arrays;
import java.util.List;

import io.quarkiverse.cxf.deployment.RuntimeBusCustomizerBuildItem;
import io.quarkiverse.cxf.transport.http.hc5.Hc5Recorder;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;

public class QuarkusCxfTransportsHTTPAsyncProcessor {
Expand All @@ -15,4 +20,12 @@ List<RuntimeInitializedClassBuildItem> runtimeInitializedClasses() {
new RuntimeInitializedClassBuildItem("org.apache.hc.client5.http.impl.auth.NTLMEngineImpl"));
}

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void customizers(
Hc5Recorder recorder,
BuildProducer<RuntimeBusCustomizerBuildItem> customizers) {
customizers.produce(new RuntimeBusCustomizerBuildItem(recorder.customizeBus()));
}

}
4 changes: 4 additions & 0 deletions extensions/transports-http-hc5/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.cxf</groupId>
<artifactId>quarkus-cxf</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-hc5</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.quarkiverse.cxf.transport.http.hc5;

import java.util.function.Consumer;

import org.apache.cxf.Bus;
import org.apache.cxf.transport.http.HTTPConduitFactory;
import org.apache.cxf.workqueue.WorkQueueManager;

import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;

@Recorder
public class Hc5Recorder {

/**
* Customize the runtime {@link Bus} by adding a custom work queue for {@code http-conduit} and a custom
* {@link HTTPConduitFactory}. Both are there to enable context propagation for async clients.
*
* @return a new {@link RuntimeValue} holding a {@link Consumer} to customize the runtime {@link Bus}
*/
public RuntimeValue<Consumer<Bus>> customizeBus() {

return new RuntimeValue<>(bus -> {
final WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class);
wqm.addNamedWorkQueue("http-conduit", new QuarkusWorkQueueImpl("http-conduit"));

bus.setExtension(new QuarkusAsyncHTTPConduitFactory(bus), HTTPConduitFactory.class);
});
}
}
Loading

0 comments on commit 6cea9f2

Please sign in to comment.