From 81bd5f9e338d6d7f492d05cdf0332269f590c4b2 Mon Sep 17 00:00:00 2001
From: Tyutin Andrey <38692521+atyutin90@users.noreply.github.com>
Date: Tue, 28 May 2024 09:29:42 +0300
Subject: [PATCH] add: instrumentation for apache cxf client (#1335)
---
build.sbt | 29 ++++-
.../src/main/resources/reference.conf | 99 ++++++++++++++
.../cxf/client/ApacheCxfClientHelper.scala | 103 +++++++++++++++
...lientProxyFactoryBeanInstrumentation.scala | 32 +++++
.../apache/cxf/client/TraceScopeHolder.scala | 17 +++
.../cxf/client/TracingClientFeature.scala | 22 ++++
.../TracingClientFeatureInitializer.scala | 16 +++
.../cxf/client/TracingClientInterceptor.scala | 121 ++++++++++++++++++
.../src/test/resources/application.conf | 19 +++
.../src/test/resources/logback.xml | 17 +++
.../apache/cxf/client/MessageSpec.scala | 105 +++++++++++++++
.../cxf/client/util/FailingInterceptor.scala | 25 ++++
.../cxf/client/util/HelloWorldService.scala | 9 ++
.../client/util/MockServerExpectations.scala | 104 +++++++++++++++
14 files changed, 715 insertions(+), 3 deletions(-)
create mode 100644 instrumentation/kamon-apache-cxf/src/main/resources/reference.conf
create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/test/resources/application.conf
create mode 100644 instrumentation/kamon-apache-cxf/src/test/resources/logback.xml
create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala
create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala
diff --git a/build.sbt b/build.sbt
index a35db90e4..4240e0486 100644
--- a/build.sbt
+++ b/build.sbt
@@ -148,7 +148,8 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-alpakka-kafka`,
`kamon-http4s-1_0`,
`kamon-http4s-0_23`,
- `kamon-apache-httpclient`
+ `kamon-apache-httpclient`,
+ `kamon-apache-cxf`
)
lazy val instrumentation = (project in file("instrumentation"))
@@ -839,6 +840,26 @@ lazy val `kamon-apache-httpclient` = (project in file("instrumentation/kamon-apa
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")
+lazy val `kamon-apache-cxf` = (project in file("instrumentation/kamon-apache-cxf"))
+ .disablePlugins(AssemblyPlugin)
+ .enablePlugins(JavaAgent)
+ .settings(instrumentationSettings)
+ .settings(
+ crossScalaVersions := Seq(`scala_2.13_version`, `scala_3_version`),
+ libraryDependencies ++= Seq(
+ kanelaAgent % "provided",
+ "org.apache.cxf" % "cxf-rt-frontend-simple" % "3.3.6" % "provided",
+ slf4jApi % "provided",
+
+ scalatest % "test",
+ logbackClassic % "test",
+ "org.mock-server" % "mockserver-client-java" % "5.13.2" % "test",
+ "com.dimafeng" %% "testcontainers-scala" % "0.41.0" % "test",
+ "com.dimafeng" %% "testcontainers-scala-mockserver" % "0.41.0" % "test",
+ "org.apache.cxf" % "cxf-rt-frontend-jaxws" % "3.3.6" % "test",
+ "org.apache.cxf" % "cxf-rt-transports-http" % "3.3.6" % "test",
+ )
+ ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")
/**
* Reporters
@@ -1107,7 +1128,8 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle
`kamon-caffeine`,
`kamon-lagom`,
`kamon-aws-sdk`,
- `kamon-apache-httpclient`
+ `kamon-apache-httpclient`,
+ `kamon-apache-cxf`
)
/**
@@ -1172,7 +1194,8 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`,
- `kamon-apache-httpclient`
+ `kamon-apache-httpclient`,
+ `kamon-apache-cxf`
)
lazy val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
diff --git a/instrumentation/kamon-apache-cxf/src/main/resources/reference.conf b/instrumentation/kamon-apache-cxf/src/main/resources/reference.conf
new file mode 100644
index 000000000..cc2d862e2
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/resources/reference.conf
@@ -0,0 +1,99 @@
+# ================================================== #
+# kamon Apache CXF client reference configuration #
+# ================================================== #
+
+# Settings to control the CXF Client instrumentation
+#
+# IMPORTANT: The entire configuration of the CXF Client Instrumentation is based on the constructs provided by the
+# Kamon Instrumentation Common library which will always fallback to the settings found under the
+# "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to
+# find and understand in the context of this project and commented out so that any changes to the default settings
+# will actually have effect.
+#
+kamon.instrumentation.apache.cxf {
+
+ #
+ # Configuration for CXF context propagation.
+ #
+ propagation {
+
+ # Enables or disables HTTP context propagation on this HTTP client instrumentation. Please note that if
+ # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
+ # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
+ #enabled = yes
+
+ # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
+ # configuration for more details on how to configure the detault HTTP context propagation.
+ #channel = "default"
+ }
+
+ tracing {
+
+ # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests
+ # and finish them when the response is received from the server.
+ #enabled = yes
+
+ # Enables collection of span metrics using the `span.processing-time` metric.
+ #span-metrics = on
+
+ # Select which tags should be included as span and span metric tags. The possible options are:
+ # - span: the tag is added as a Span tag (i.e. using span.tag(...))
+ # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
+ # - off: the tag is not used.
+ #
+ tags {
+
+ # Use the http.url tag.
+ #url = span
+
+ # Use the http.method tag.
+ #method = metric
+
+ # Use the http.status_code tag.
+ #status-code = metric
+
+ # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
+ # tag from the context into the HTTP Client Span created by the instrumentation, the following configuration
+ # should be added:
+ #
+ # from-context {
+ # customer_type = span
+ # }
+ #
+ from-context {
+
+ }
+ }
+
+ operations {
+
+ # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP
+ # Client instrumentation will always try to use the HTTP Operation Name Generator configured below to get
+ # a name, but if it fails to generate it then this name will be used.
+ default = "apache.cxf.client"
+
+ # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms:
+ # - hostname: Uses the request Host as the operation name.
+ # - method: Uses the request HTTP method as the operation name.
+ #
+ #name-generator = "method"
+ }
+ }
+
+}
+
+kanela {
+ modules {
+ apache-cxf {
+ name = "Apache CXF Client"
+ description = "Provides tracing of client calls made with the official Apache CXF library."
+ instrumentations = [
+ "kamon.instrumentation.apache.cxf.client.ClientProxyFactoryBeanInstrumentation"
+ ]
+ enabled = true
+ within = [
+ "org.apache.cxf..*",
+ ]
+ }
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala
new file mode 100644
index 000000000..04cbeef17
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala
@@ -0,0 +1,103 @@
+package kamon.instrumentation.apache.cxf.client
+
+import kamon.instrumentation.http.HttpMessage
+import org.apache.cxf.message.Message
+import org.apache.cxf.message.Message.{HTTP_REQUEST_METHOD, PROTOCOL_HEADERS, RESPONSE_CODE}
+import org.slf4j.LoggerFactory
+
+import java.net.{URI, URISyntaxException}
+import java.util.Collections.{emptyMap => jEmptyMap, singletonList => jList}
+import java.util.{List => JList, Map => JMap}
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, MapHasAsScala}
+
+class ApacheCxfClientHelper
+
+object ApacheCxfClientHelper {
+
+ private val _logger = LoggerFactory.getLogger(classOf[ApacheCxfClientHelper])
+
+ def toRequestBuilder(request: Message): HttpMessage.RequestBuilder[Message] =
+ new RequestReader with HttpMessage.RequestBuilder[Message] {
+
+ val delegate: Message = request
+
+ val uri: URI = getUri(request)
+
+ override def write(header: String, value: String): Unit = {
+ val headers: mutable.Map[String, String] = getAllHeaders(delegate).to(mutable.Map)
+ headers.put(header, value)
+ delegate.put(Message.PROTOCOL_HEADERS, headers.map(m => m._1 -> jList(m._2)).toMap.asJava)
+ }
+
+ override def build(): Message = {
+ _logger.trace("Prepared request for instrumentation: {}", this)
+ delegate
+ }
+
+ override def toString(): String = s"RequestReader(host=$host,port=$port,method=$method,path=$path)"
+ }
+
+ def toResponse(message: Message): HttpMessage.Response = new HttpMessage.Response {
+ override def statusCode: Int = message.get(RESPONSE_CODE) match {
+ case code: Integer => code
+ case _ =>
+ _logger.debug("Not able to retrieve status code from response")
+ -1
+ }
+ }
+ private def getUri(message: Message): URI =
+ try {
+ getUriAsString(message).map(s => new URI(s)).orNull
+ } catch {
+ case e: URISyntaxException => throw new RuntimeException(e.getMessage, e)
+ }
+
+ private def safeGet(message: Message, key: String): Option[String] =
+ if (message.containsKey(key)) {
+ message.get(key) match {
+ case value: String => Option.apply(value)
+ case _ => Option.empty
+ }
+ } else Option.empty
+
+ private def getUriAsString(message: Message): Option[String] = {
+ val requestUrl: Option[String] = safeGet(message, Message.REQUEST_URL).orElse {
+ var address = safeGet(message, Message.ENDPOINT_ADDRESS)
+ val requestUri = safeGet(message, Message.REQUEST_URI)
+ if (requestUri.exists(r => r.startsWith("/"))) {
+ if (!address.exists(a => a.startsWith(requestUri.get))) {
+ if (address.exists(t => t.endsWith("/") && t.length > 1)) {
+ address = address.map(a => a.substring(0, a.length))
+ }
+ address.map(a => a + requestUri.getOrElse(""))
+ } else requestUri
+ } else address
+ }
+ safeGet(message, Message.QUERY_STRING).map(q => requestUrl.map(u => s"$u?$q")).getOrElse(requestUrl)
+ }
+
+ private def getAllHeaders(message: Message): Map[String, String] = (message.get(PROTOCOL_HEADERS) match {
+ case hs: JMap[String, JList[String]] => hs
+ case _ => jEmptyMap[String, JList[String]]()
+ }).asScala.map { case (key, values) => key -> values.asScala.mkString(", ") }.toMap
+
+ private trait RequestReader extends HttpMessage.Request {
+ def uri: URI
+ def delegate: Message
+
+ override def host: String = if (uri != null) uri.getHost else null
+
+ override def port: Int = if (uri != null) uri.getPort else 0
+
+ override def method: String = delegate.get(HTTP_REQUEST_METHOD).asInstanceOf[String]
+
+ override def path: String = if (uri != null) uri.getPath else null
+
+ override def read(header: String): Option[String] = getAllHeaders(delegate).get(header)
+
+ override def readAll(): Map[String, String] = getAllHeaders(delegate)
+
+ override def url: String = if (uri != null) uri.toString else null
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala
new file mode 100644
index 000000000..8e0138f6b
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala
@@ -0,0 +1,32 @@
+package kamon.instrumentation.apache.cxf.client
+
+import kamon.Kamon
+import kamon.instrumentation.http.HttpClientInstrumentation
+import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
+import kanela.agent.api.instrumentation.InstrumentationBuilder
+import org.apache.cxf.message.Message
+
+class ClientProxyFactoryBeanInstrumentation extends InstrumentationBuilder {
+
+ onSubTypesOf("org.apache.cxf.frontend.ClientProxyFactoryBean")
+ .advise(method("initFeatures"), classOf[TracingClientFeatureInitializer])
+}
+
+object ClientProxyFactoryBeanInstrumentation {
+ Kamon.onReconfigure(_ =>
+ ClientProxyFactoryBeanInstrumentation.rebuildHttpClientInstrumentation(): Unit
+ )
+
+ @volatile var cxfClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation()
+
+ private def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = {
+ val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.apache.cxf")
+ cxfClientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "apache.cxf.client")
+ cxfClientInstrumentation
+ }
+
+ def processResponse(handler: RequestHandler[_], message: Message, t: Throwable = null): Unit = {
+ if (t != null) handler.span.fail(t).finish()
+ else handler.processResponse(ApacheCxfClientHelper.toResponse(message))
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala
new file mode 100644
index 000000000..0ed2ea365
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala
@@ -0,0 +1,17 @@
+package kamon.instrumentation.apache.cxf.client
+
+import kamon.context.Storage.Scope
+import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
+import org.apache.cxf.message.Message
+
+import java.io.Closeable
+
+private class TraceScopeHolder(val traceScope: Option[TraceScope], val detached: Boolean = false) extends Serializable
+
+private case class TraceScope(handler: RequestHandler[Message], scope: Option[Scope]) extends Closeable {
+
+ override def close(): Unit = {
+ if (handler != null && handler.span != null) handler.span.finish()
+ if (scope.nonEmpty) scope.get.close()
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala
new file mode 100644
index 000000000..735c143b2
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala
@@ -0,0 +1,22 @@
+package kamon.instrumentation.apache.cxf.client
+
+import org.apache.cxf.Bus
+import org.apache.cxf.feature.AbstractFeature
+import org.apache.cxf.interceptor.InterceptorProvider
+
+private class TracingClientFeature extends AbstractFeature {
+
+ private val setup = new TracingClientSetupInterceptor()
+ private val receive = new TracingClientReceiveInterceptor()
+ private val postInvoke = new TracingClientPostInvokeInterceptor()
+
+ override def initializeProvider(provider: InterceptorProvider, bus: Bus): Unit = {
+ provider.getOutInterceptors.add(0, setup)
+ provider.getOutFaultInterceptors.add(0, setup)
+ provider.getInInterceptors.add(0, receive)
+ provider.getInInterceptors.add(postInvoke)
+ provider.getInFaultInterceptors.add(0, receive)
+ provider.getInFaultInterceptors.add(postInvoke)
+ super.initializeProvider(provider, bus)
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala
new file mode 100644
index 000000000..48708862a
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala
@@ -0,0 +1,16 @@
+package kamon.instrumentation.apache.cxf.client
+
+import kanela.agent.libs.net.bytebuddy.asm.Advice
+import kanela.agent.libs.net.bytebuddy.asm.Advice.This
+import org.apache.cxf.frontend.ClientProxyFactoryBean
+
+import scala.annotation.static
+
+class TracingClientFeatureInitializer
+object TracingClientFeatureInitializer {
+
+ @Advice.OnMethodEnter
+ @static def onEnter(@This clientProxyFactoryBean: Any) = clientProxyFactoryBean match {
+ case c: ClientProxyFactoryBean => c.getFeatures.add(new TracingClientFeature)
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala
new file mode 100644
index 000000000..b0281f7f8
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala
@@ -0,0 +1,121 @@
+package kamon.instrumentation.apache.cxf.client
+
+import kamon.Kamon
+import kamon.context.Storage.Scope
+import kamon.trace.Span
+import org.apache.cxf.message.Message
+import org.apache.cxf.phase.Phase.{POST_INVOKE, RECEIVE, SETUP}
+import org.apache.cxf.phase.{PhaseInterceptor, PhaseInterceptorChain}
+
+import java.util.Collections
+
+private sealed abstract class AbstractTracingClientInterceptor(phase: String) extends PhaseInterceptor[Message] {
+
+ protected val TRACE_SCOPE = "org.apache.cxf.tracing.client.kamon.traceScope"
+
+ override def getAfter: java.util.Set[String] = Collections.emptySet()
+
+ override def getBefore: java.util.Set[String] = Collections.emptySet()
+
+ override def getId: String = getClass.getName
+
+ override def getPhase: String = phase
+
+ override def getAdditionalInterceptors: java.util.Collection[PhaseInterceptor[_ <: Message]] = null
+
+ override def handleFault(message: Message): Unit = {}
+
+ protected def isAsyncResponse: Boolean = !PhaseInterceptorChain.getCurrentMessage.getExchange.isSynchronous
+
+ protected def processFailed(message: Message): Unit = {
+ message.getExchange.get(TRACE_SCOPE) match {
+ case holder: TraceScopeHolder =>
+ if (holder != null) {
+ holder.traceScope.foreach(t =>
+ if (t.handler != null) {
+ val exception = message.getContent(classOf[Exception])
+ ClientProxyFactoryBeanInstrumentation.processResponse(t.handler, message, exception)
+ t.close()
+ }
+ )
+ }
+ }
+ }
+}
+
+private class TracingClientSetupInterceptor extends AbstractTracingClientInterceptor(SETUP) {
+
+ override def handleMessage(message: Message): Unit = {
+ val parentContext = Kamon.currentContext()
+ val builder = ApacheCxfClientHelper.toRequestBuilder(message)
+ val handler = ClientProxyFactoryBeanInstrumentation.cxfClientInstrumentation.createHandler(builder, parentContext)
+ val scope = Kamon.storeContext(parentContext.withEntry(Span.Key, handler.span))
+
+ val holder: TraceScopeHolder =
+ // In case of asynchronous client invocation, the span should be detached as JAX-WS
+ // client request / response interceptors are going to be executed in different threads.
+ if (isAsyncResponse) new TraceScopeHolder(Option(TraceScope(handler, Option(scope))), true)
+ else new TraceScopeHolder(Option(TraceScope(handler, Option(scope))))
+
+ message.getExchange.put(TRACE_SCOPE, holder)
+ }
+
+ override def handleFault(message: Message): Unit = message.getExchange.get(TRACE_SCOPE) match {
+ case holder: TraceScopeHolder =>
+ if (holder != null) {
+ holder.traceScope.foreach(t => {
+ var newScope: Scope = null
+ try {
+ // If the client invocation was asynchronous, the trace span has been created
+ // in another thread and should be re-attached to the current one.
+ if (holder.detached) {
+ newScope = t.scope.map(s => Kamon.storeContext(s.context.withEntry(Span.Key, t.handler.span))).orNull
+ }
+ val exception = message.getContent(classOf[Exception])
+ ClientProxyFactoryBeanInstrumentation.processResponse(t.handler, message, exception)
+ } finally {
+ if (newScope != null) newScope.close()
+ t.close()
+ }
+ })
+ }
+ }
+}
+
+private class TracingClientReceiveInterceptor extends AbstractTracingClientInterceptor(RECEIVE) {
+
+ override def handleMessage(message: Message): Unit = message.getExchange.get(TRACE_SCOPE) match {
+ case holder: TraceScopeHolder =>
+ if (holder != null) {
+ holder.traceScope.foreach(t =>
+ // If the client invocation was asynchronous, the trace span has been created
+ // in another thread and should be re-attached to the current one.
+ if (holder.detached) {
+ // Close scope which has been created in another thread
+ t.scope.foreach(s => s.close())
+ val newScope = t.scope.map(s => Kamon.storeContext(s.context.withEntry(Span.Key, t.handler.span)))
+ val modifyHolder = new TraceScopeHolder(Option(TraceScope(t.handler, newScope)), holder.detached)
+ message.getExchange.put(TRACE_SCOPE, modifyHolder)
+ }
+ )
+ }
+ }
+ override def handleFault(message: Message): Unit = processFailed(message)
+}
+
+private class TracingClientPostInvokeInterceptor extends AbstractTracingClientInterceptor(POST_INVOKE) {
+
+ override def handleMessage(message: Message): Unit = message.getExchange.get(TRACE_SCOPE) match {
+ case holder: TraceScopeHolder =>
+ if (holder != null) {
+ holder.traceScope.foreach(t =>
+ try {
+ ClientProxyFactoryBeanInstrumentation.processResponse(t.handler, message)
+ } finally {
+ t.close()
+ }
+ )
+ }
+ }
+ override def handleFault(message: Message): Unit = processFailed(message)
+}
diff --git a/instrumentation/kamon-apache-cxf/src/test/resources/application.conf b/instrumentation/kamon-apache-cxf/src/test/resources/application.conf
new file mode 100644
index 000000000..3deb48156
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/test/resources/application.conf
@@ -0,0 +1,19 @@
+kamon {
+ trace.sampler = "always"
+}
+
+kanela {
+ # debug-mode = true
+ # log-level = "DEBUG"
+}
+
+kamon.instrumentation.apache.cxf {
+ tracing {
+ operations {
+ mappings {
+ "/CustomHelloWorldService" = "custom-named-from-config"
+ }
+ }
+ }
+}
+
diff --git a/instrumentation/kamon-apache-cxf/src/test/resources/logback.xml b/instrumentation/kamon-apache-cxf/src/test/resources/logback.xml
new file mode 100644
index 000000000..6823d19eb
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/test/resources/logback.xml
@@ -0,0 +1,17 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala
new file mode 100644
index 000000000..9ddf9c379
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala
@@ -0,0 +1,105 @@
+package kamon.instrumentation.apache.cxf.client
+
+import com.dimafeng.testcontainers.{ForAllTestContainer, MockServerContainer}
+import kamon.instrumentation.apache.cxf.client.util.MockServerExpectations
+import kamon.tag.Lookups.{plain, plainBoolean, plainLong}
+import kamon.testkit.{InitAndStopKamonAfterAll, Reconfigure, TestSpanReporter}
+import org.scalatest.OptionValues
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.SpanSugar
+import org.scalatest.wordspec.AnyWordSpec
+import org.slf4j.LoggerFactory
+
+class MessageSpec
+ extends AnyWordSpec
+ with Matchers
+ with Eventually
+ with SpanSugar
+ with Reconfigure
+ with OptionValues
+ with TestSpanReporter
+ with InitAndStopKamonAfterAll
+ with ForAllTestContainer {
+
+ private val _logger = LoggerFactory.getLogger(classOf[MessageSpec])
+
+ "The apache cxf client making call" should {
+ "create client span when call method sayHello()" in {
+ clientExpectation.simpleExpectation()
+ val client = clientExpectation.simpleClient
+ val target = clientExpectation.simpleTarget
+ val response = client.sayHello()
+
+ eventually(timeout(10 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+ _logger.info("Received Span: {}", span)
+ span.operationName shouldBe "apache.cxf.client"
+ span.tags.get(plain("http.url")) shouldBe target
+ span.metricTags.get(plain("component")) shouldBe "apache.cxf.client"
+ }
+ }
+ "replace operation name from config" in {
+ clientExpectation.customExpectation()
+ val client = clientExpectation.customClient
+ val target = clientExpectation.customTarget
+ val response = client.sayHello()
+
+ eventually(timeout(10 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+ _logger.debug("Received Span: {}", span)
+ span.operationName shouldBe "custom-named-from-config"
+ span.tags.get(plain("http.url")) shouldBe target
+ span.metricTags.get(plain("component")) shouldBe "apache.cxf.client"
+ }
+ }
+ "mark spans as errors when request fails" in {
+ clientExpectation.test500Expectation()
+ val client = clientExpectation.test500Client
+ val target = clientExpectation.test500Target
+ val response = client.sayHello()
+
+ eventually(timeout(10 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+ _logger.debug("Received Span: {}", span)
+ span.operationName shouldBe "apache.cxf.client"
+ span.tags.get(plain("http.url")) shouldBe target
+ span.metricTags.get(plain("component")) shouldBe "apache.cxf.client"
+ span.metricTags.get(plainBoolean("error")) shouldBe true
+ span.metricTags.get(plainLong("http.status_code")) shouldBe 500
+ span.hasError shouldBe true
+ }
+ }
+ "mark spans as errors when while processing the response throws an exception" in {
+ clientExpectation.failingExpectation()
+ val client = clientExpectation.failingClient
+ val target = clientExpectation.failingTarget
+ assertThrows[RuntimeException] {
+ val response = client.sayHello()
+ }
+
+ eventually(timeout(10 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+ _logger.info("Received Span: {}", span)
+ span.operationName shouldBe "apache.cxf.client"
+ span.tags.get(plain("http.url")) shouldBe target
+ span.tags.get(plain("error.message")) should not be null
+ span.metricTags.get(plain("component")) shouldBe "apache.cxf.client"
+ span.metricTags.get(plainBoolean("error")) shouldBe true
+ }
+ }
+ }
+
+ override val container: MockServerContainer = MockServerContainer()
+ lazy val clientExpectation: MockServerExpectations = new MockServerExpectations("localhost", container.serverPort)
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ container.start()
+ }
+
+ override protected def afterAll(): Unit = {
+ container.stop()
+ super.afterAll()
+ }
+}
diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala
new file mode 100644
index 000000000..70015a8b7
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala
@@ -0,0 +1,25 @@
+package kamon.instrumentation.apache.cxf.client.util
+
+import org.apache.cxf.message.Message
+import org.apache.cxf.phase.Phase.{POST_INVOKE, RECEIVE}
+import org.apache.cxf.phase.PhaseInterceptor
+
+import java.util.Collections
+
+class FailingInterceptor extends PhaseInterceptor[Message] {
+ override def handleMessage(message: Message): Unit = {
+ throw new RuntimeException("Dummy Exception")
+ }
+
+ override def getAfter: java.util.Set[String] = Collections.emptySet()
+
+ override def getBefore: java.util.Set[String] = Collections.emptySet()
+
+ override def getId: String = getClass.getName
+
+ override def getPhase: String = RECEIVE
+
+ override def getAdditionalInterceptors: java.util.Collection[PhaseInterceptor[_ <: Message]] = null
+
+ override def handleFault(message: Message): Unit = {}
+}
diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala
new file mode 100644
index 000000000..b0fdf5393
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala
@@ -0,0 +1,9 @@
+package kamon.instrumentation.apache.cxf.client.util
+
+import javax.jws.{WebMethod, WebService}
+
+@WebService
+trait HelloWorldService {
+ @WebMethod
+ def sayHello(): String
+}
diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala
new file mode 100644
index 000000000..252e15d06
--- /dev/null
+++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala
@@ -0,0 +1,104 @@
+package kamon.instrumentation.apache.cxf.client.util
+
+import org.apache.cxf.interceptor.Interceptor
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean
+import org.apache.cxf.message.Message
+import org.mockserver.client.MockServerClient
+import org.mockserver.model.HttpRequest.request
+import org.mockserver.model.HttpResponse.response
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters.SeqHasAsJava
+
+class MockServerExpectations(private val host: String, private val port: Int) {
+
+ private val _logger = LoggerFactory.getLogger(classOf[MockServerExpectations])
+
+ private[util] def client: MockServerClient = new MockServerClient(host, port)
+ def endpoint = s"http://$host:$port"
+ def simplePath: String = "/HelloWorldService"
+ def customPath: String = "/CustomHelloWorldService"
+ def test500Path: String = "/Test500HelloWorldService"
+
+ def failingPath: String = "/FailingHelloWorldService"
+
+ def simpleTarget = s"$endpoint$simplePath"
+ def customTarget = s"$endpoint$customPath"
+ def test500Target = s"$endpoint$test500Path"
+ def failingTarget = s"$endpoint$failingPath"
+
+ val simpleClient = new HelloWorldServiceClient(simpleTarget)
+
+ val customClient = new HelloWorldServiceClient(customTarget)
+
+ val test500Client = new HelloWorldServiceClient(test500Target)
+
+ val failingClient =
+ new HelloWorldServiceClient(address = failingTarget, inInterceptors = List(new FailingInterceptor))
+
+ private val RESPONSE: String =
+ "Hello!"
+
+ def simpleExpectation(): Unit = client.when(
+ request()
+ .withMethod("POST")
+ .withPath(simplePath)
+ ).respond(
+ response()
+ .withStatusCode(200)
+ .withBody(RESPONSE)
+ )
+
+ def customExpectation(): Unit = client.when(
+ request()
+ .withMethod("POST")
+ .withPath(customPath)
+ ).respond(
+ response()
+ .withStatusCode(200)
+ .withBody(RESPONSE)
+ )
+
+ def test500Expectation(): Unit = client.when(
+ request()
+ .withMethod("POST")
+ .withPath(test500Path)
+ ).respond(
+ response()
+ .withStatusCode(500)
+ .withBody(RESPONSE)
+ )
+ def failingExpectation(): Unit = client.when(
+ request()
+ .withMethod("POST")
+ .withPath(failingPath)
+ ).respond(
+ response()
+ .withStatusCode(200)
+ .withBody(RESPONSE)
+ )
+}
+
+class HelloWorldServiceClient(
+ address: String,
+ outInterceptors: List[Interceptor[Message]] = List.empty,
+ inInterceptors: List[Interceptor[Message]] = List.empty
+) {
+ val client: HelloWorldService = HelloWorldServiceClientFactory.create(address, outInterceptors, inInterceptors)
+ def sayHello(): String = client.sayHello()
+}
+
+object HelloWorldServiceClientFactory {
+ def create(
+ address: String,
+ outInterceptors: List[Interceptor[Message]],
+ inInterceptors: List[Interceptor[Message]]
+ ): HelloWorldService = {
+ val factory = new JaxWsProxyFactoryBean()
+ factory.setServiceClass(classOf[HelloWorldService])
+ factory.setAddress(address)
+ factory.getInInterceptors.addAll(inInterceptors.asJava)
+ factory.getOutInterceptors.addAll(outInterceptors.asJava)
+ factory.create().asInstanceOf[HelloWorldService]
+ }
+}