Skip to content

Commit

Permalink
CXF clients based on java.net.http.HttpClient leak threads fix quarki…
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Nov 22, 2023
1 parent 7431baf commit 1795955
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,14 @@ quarkus.cxf.client.proxiedCalculator.proxy-server = ${cxf.it.calculator.proxy.ho
quarkus.cxf.client.proxiedCalculator.proxy-server-port = ${cxf.it.calculator.proxy.port}
quarkus.cxf.client.proxiedCalculator.proxy-username = ${cxf.it.calculator.proxy.user}
quarkus.cxf.client.proxiedCalculator.proxy-password = ${cxf.it.calculator.proxy.password}


quarkus.cxf.client.requestScopedHttpClient.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl
quarkus.cxf.client.requestScopedHttpClient.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService
quarkus.cxf.client.requestScopedHttpClient.http-conduit-factory = HttpClientHTTPConduitFactory

quarkus.cxf.client.requestScopedUrlConnectionClient.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl
quarkus.cxf.client.requestScopedUrlConnectionClient.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService
quarkus.cxf.client.requestScopedUrlConnectionClient.http-conduit-factory = URLConnectionHTTPConduitFactory
# Get rid of the Could not find endpoint/port warning
quarkus.log.category."org.apache.cxf.frontend.AbstractWSDLBasedEndpointFactory".level = ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,34 @@ Use the property below to prevent launching the HTTP server at startup:
----
quarkus.http.host-enabled = false
----

[[resource-leaks]]
== Prevent resource leaks

CXF client proxies implement `java.io.Closeable`.
Therefore, it is important to call `((Closeable) proxy).close()` once the client is not needed anymore
to free all associated system resources, such as threads.

{quarkus-cxf-project-name} takes care for closing the clients injected via `@io.quarkiverse.cxf.annotation.CXFClient` automatically
as soon as they are disposed by the CDI container.

For client proxies created manually, it is up to you to call `((Closeable) proxy).close()`:

[source,java]
----
import java.net.URL;
import javax.xml.namespace.QName;
import jakarta.xml.ws.Service;
import java.io.Closeable;
final URL serviceUrl = new URL("http://localhost/myService?wsdl");
final QName qName = new QName("http://acme.org/myNamespace", "MyService");
final Service service = jakarta.xml.ws.Service.create(serviceUrl, qName);
final MyService proxy = service.getPort(MyService.class);
try {
proxy.doSomething();
} finally {
((Closeable) proxy).close();
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Disposes;
import jakarta.enterprise.inject.Produces;
import jakarta.enterprise.inject.spi.InjectionPoint;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -55,6 +56,7 @@
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.util.IoUtil;
import io.quarkus.gizmo.AnnotatedElement;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.gizmo.FieldCreator;
Expand Down Expand Up @@ -406,37 +408,42 @@ private void generateCxfClientProducer(

}

// try (MethodCreator createInfo = classCreator.getMethodCreator(
// "createInfo",
// "io.quarkiverse.cxf.CXFClientInfo",
// p0class)) {
// createInfo.addAnnotation(Produces.class);
// createInfo.addAnnotation(CXFClient.class);
//
// // p0 (InjectionPoint);
// ResultHandle p0;
// ResultHandle p1;
// ResultHandle cxfClient;
//
// p0 = createInfo.getMethodParam(0);
//
// MethodDescriptor loadCxfInfo = MethodDescriptor.ofMethod(
// CxfClientProducer.class,
// "loadCxfClientInfo",
// "java.lang.Object",
// p0class,
// p1class);
// // >> .. {
// // >> Object cxfInfo = this().loadCxfInfo(ip, this.info);
// // >> return (CXFClientInfo)cxfInfo;
// // >> }
//
// p1 = createInfo.readInstanceField(info.getFieldDescriptor(), createInfo.getThis());
// cxfClient = createInfo.invokeVirtualMethod(loadCxfInfo, createInfo.getThis(), p0, p1);
// createInfo.returnValue(createInfo.checkCast(cxfClient, "io.quarkiverse.cxf
// .CXFClientInfo"));
// }
/*
* The client proxy implements Closeable so we need to generate a disposer method
* that calls proxy.close(). This is important because e.g. java.net.http.HttpClient based CXF clients
* have some associated threads that is better to stop immediately.
*/
// create method
// >> @Produces
// >> @CXFClient
// >> void closeClient(@Disposes @CXFClient {SEI} client) { .. }
try (MethodCreator createService = classCreator.getMethodCreator(
"closeClient",
void.class,
sei)) {
AnnotatedElement clientParamAnnotations = createService.getParameterAnnotations(0);
clientParamAnnotations.addAnnotation(Disposes.class);
clientParamAnnotations.addAnnotation(CXFClient.class);

final ResultHandle thisHandle = createService.getThis();
final ResultHandle clientHandle = createService.getMethodParam(0);

MethodDescriptor closeClient = MethodDescriptor.ofMethod(
CxfClientProducer.class,
"closeCxfClient",
void.class,
Object.class);
// >> .. {
// >> this.closeCxfClient(client);
// >> }

createService.invokeVirtualMethod(
closeClient,
thisHandle,
clientHandle);
createService.returnVoid();

}
}

// Eventually let's produce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.stream.Collectors.toList;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -59,6 +60,20 @@ public Object loadCxfClient(InjectionPoint ip, CXFClientInfo meta) {
return produceCxfClient(selectorCXFClientInfo(config, fixedConfig, ip, meta));
}

/**
* Called from the <code>{SEI}CxfClientProducer.closeClient(@Disposes @CXFClient {SEI} client)</code> generated in
* {@code io.quarkiverse.cxf.deployment.CxfClientProcessor.generateCxfClientProducer()}.
*
* @param client the CXF client to close
*/
public void closeCxfClient(Object client) {
try {
((Closeable) client).close();
} catch (IOException e) {
throw new RuntimeException("Could not close CXF client " + client.getClass().getName(), e);
}
}

/**
* The main workhorse producing a CXF client proxy.
*
Expand Down
15 changes: 15 additions & 0 deletions integration-tests/client/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,18 @@ And then overwrite the existing file with the new content from the container:
----
curl http://localhost:8080/calculator-ws/CalculatorService?wsdl --output src/main/cxf-codegen-resources/wsdl/CalculatorService.wsdl
----

[[soak]]
== `CxfClientTest.soakRequestScopedHttpClient()` and `CxfClientTest.soakRequestScopedUrlConnectionClient()`

The number of iterations performed by these tests can be set via

[source,shell]
----
$ export QUARKUS_CXF_CLIENT_SOAK_ITERATIONS=100000
$ mvn clean test -Dtest=CxfClientTest#soakRequestScopedHttpClient
$ mvn clean test -Dtest=CxfClientTest#soakRequestScopedUrlConnectionClient
----

The hard-coded default of 300 is intentionally low to make the test pass on Quarkus Platform CI.
Higher values are more likely to uncover resource leaks.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.StringWriter;
import java.util.List;

import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.ws.rs.GET;
Expand Down Expand Up @@ -60,6 +61,20 @@ public class CxfClientResource {
@CXFClient("proxiedCalculator")
CalculatorService proxiedCalculator;

@Inject
RequestScopedClients requestScopedClients;

@GET
@Path("/calculator/{client}/add")
@Produces(MediaType.TEXT_PLAIN)
public Response add(@PathParam("client") String client, @QueryParam("a") int a, @QueryParam("b") int b) {
try {
return Response.ok(getClient(client).add(a, b)).build();
} catch (Exception e) {
return Response.serverError().entity(e.getMessage()).build();
}
}

@GET
@Path("/calculator/{client}/multiply")
@Produces(MediaType.TEXT_PLAIN)
Expand Down Expand Up @@ -136,7 +151,7 @@ private CalculatorService getClient(String client) {
case "proxiedCalculator":
return proxiedCalculator;
default:
throw new IllegalStateException("Unexpected client key " + client);
return requestScopedClients.getClient(client);
}
}

Expand Down Expand Up @@ -194,4 +209,27 @@ public String createEscapeHandler(@PathParam("className") String className, Stri
return out.toString();
}

@RequestScoped
public static class RequestScopedClients {

@Inject
@CXFClient("requestScopedHttpClient")
CalculatorService requestScopedHttpClient;

@Inject
@CXFClient("requestScopedUrlConnectionClient")
CalculatorService requestScopedUrlConnectionClient;

public CalculatorService getClient(String client) {
switch (client) {
case "requestScopedHttpClient":
return requestScopedHttpClient;
case "requestScopedUrlConnectionClient":
return requestScopedUrlConnectionClient;
default:
throw new IllegalStateException("Unexpected client key " + client);
}
}

}
}
11 changes: 11 additions & 0 deletions integration-tests/client/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,14 @@ quarkus.cxf.client.proxiedCalculator.proxy-server = ${cxf.it.calculator.proxy.ho
quarkus.cxf.client.proxiedCalculator.proxy-server-port = ${cxf.it.calculator.proxy.port}
quarkus.cxf.client.proxiedCalculator.proxy-username = ${cxf.it.calculator.proxy.user}
quarkus.cxf.client.proxiedCalculator.proxy-password = ${cxf.it.calculator.proxy.password}


quarkus.cxf.client.requestScopedHttpClient.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl
quarkus.cxf.client.requestScopedHttpClient.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService
quarkus.cxf.client.requestScopedHttpClient.http-conduit-factory = HttpClientHTTPConduitFactory

quarkus.cxf.client.requestScopedUrlConnectionClient.wsdl = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService?wsdl
quarkus.cxf.client.requestScopedUrlConnectionClient.client-endpoint-url = ${cxf.it.calculator.baseUri}/calculator-ws/CalculatorService
quarkus.cxf.client.requestScopedUrlConnectionClient.http-conduit-factory = URLConnectionHTTPConduitFactory
# Get rid of the Could not find endpoint/port warning
quarkus.log.category."org.apache.cxf.frontend.AbstractWSDLBasedEndpointFactory".level = ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.http.HttpClient;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.Random;

import org.apache.commons.io.IOUtils;
import org.assertj.core.api.Assertions;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -27,6 +32,8 @@
@QuarkusTestResource(CxfClientTestResource.class)
public class CxfClientTest {

private static final Logger log = Logger.getLogger(CxfClientTest.class);

@Test
void add() {
RestAssured.given()
Expand Down Expand Up @@ -362,4 +369,47 @@ void createEscapeHandler() {
.body(is("Tom & Jerry"));
}

/**
* Make sure that a request scoped client backed by {@link HttpClient} does not leak threads
* - see <a href=
* "https://github.com/quarkiverse/quarkus-cxf/issues/992">https://github.com/quarkiverse/quarkus-cxf/issues/992</a>.
*/
@Test
void soakRequestScopedHttpClient() {
soak("requestScopedHttpClient");

}

/**
* Make sure that a request scoped client backed by {@link HttpURLConnection} does not leak threads.
*/
@Test
void soakRequestScopedUrlConnectionClient() {
soak("requestScopedUrlConnectionClient");

}

private void soak(String client) {

final Random rnd = new Random();
// we divide by 2 to avoid overflow
int a = rnd.nextInt() / 2;
int b = rnd.nextInt() / 2;
int expected = a + b;

final int requestCount = Integer
.parseInt(Optional.ofNullable(System.getenv("QUARKUS_CXF_CLIENT_SOAK_ITERATIONS")).orElse("300"));
log.infof("Performing %d interations", requestCount);
for (int i = 0; i < requestCount; i++) {
log.infof("Soaking round %d", i);
RestAssured.given()
.queryParam("a", a)
.queryParam("b", b)
.get("/cxf/client/calculator/" + client + "/add")
.then()
.statusCode(200)
.body(is(String.valueOf(expected)));
}
}

}
2 changes: 2 additions & 0 deletions integration-tests/mtom/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ $ mvn clean test -Dtest=MtomTest#soak
The hard-coded default of 300 is intentionally low to make the test pass on Quarkus Platform CI.
Higher values are more likely to uncover resource leaks.

Note that there are two more soak tests in CxfClientTest - see link:../client/README.adoc#soak[../client/README.adoc].

== `MtomTest.largeAttachment()`

`MtomTest.largeAttachment()` can be tuned via
Expand Down

0 comments on commit 1795955

Please sign in to comment.