Skip to content

Commit

Permalink
add: instrumentation for apache cxf client (kamon-io#1335)
Browse files Browse the repository at this point in the history
  • Loading branch information
atyutin90 authored May 28, 2024
1 parent 4fb6dc5 commit 81bd5f9
Show file tree
Hide file tree
Showing 14 changed files with 715 additions and 3 deletions.
29 changes: 26 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-alpakka-kafka`,
`kamon-http4s-1_0`,
`kamon-http4s-0_23`,
`kamon-apache-httpclient`
`kamon-apache-httpclient`,
`kamon-apache-cxf`
)

lazy val instrumentation = (project in file("instrumentation"))
Expand Down Expand Up @@ -839,6 +840,26 @@ lazy val `kamon-apache-httpclient` = (project in file("instrumentation/kamon-apa
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

lazy val `kamon-apache-cxf` = (project in file("instrumentation/kamon-apache-cxf"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
crossScalaVersions := Seq(`scala_2.13_version`, `scala_3_version`),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.cxf" % "cxf-rt-frontend-simple" % "3.3.6" % "provided",
slf4jApi % "provided",

scalatest % "test",
logbackClassic % "test",
"org.mock-server" % "mockserver-client-java" % "5.13.2" % "test",
"com.dimafeng" %% "testcontainers-scala" % "0.41.0" % "test",
"com.dimafeng" %% "testcontainers-scala-mockserver" % "0.41.0" % "test",
"org.apache.cxf" % "cxf-rt-frontend-jaxws" % "3.3.6" % "test",
"org.apache.cxf" % "cxf-rt-transports-http" % "3.3.6" % "test",
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

/**
* Reporters
Expand Down Expand Up @@ -1107,7 +1128,8 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle
`kamon-caffeine`,
`kamon-lagom`,
`kamon-aws-sdk`,
`kamon-apache-httpclient`
`kamon-apache-httpclient`,
`kamon-apache-cxf`
)

/**
Expand Down Expand Up @@ -1172,7 +1194,8 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`,
`kamon-apache-httpclient`
`kamon-apache-httpclient`,
`kamon-apache-cxf`
)

lazy val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
Expand Down
99 changes: 99 additions & 0 deletions instrumentation/kamon-apache-cxf/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# ================================================== #
# kamon Apache CXF client reference configuration #
# ================================================== #

# Settings to control the CXF Client instrumentation
#
# IMPORTANT: The entire configuration of the CXF Client Instrumentation is based on the constructs provided by the
# Kamon Instrumentation Common library which will always fallback to the settings found under the
# "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to
# find and understand in the context of this project and commented out so that any changes to the default settings
# will actually have effect.
#
kamon.instrumentation.apache.cxf {

#
# Configuration for CXF context propagation.
#
propagation {

# Enables or disables HTTP context propagation on this HTTP client instrumentation. Please note that if
# propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
# be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
#enabled = yes

# HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
# configuration for more details on how to configure the detault HTTP context propagation.
#channel = "default"
}

tracing {

# Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests
# and finish them when the response is received from the server.
#enabled = yes

# Enables collection of span metrics using the `span.processing-time` metric.
#span-metrics = on

# Select which tags should be included as span and span metric tags. The possible options are:
# - span: the tag is added as a Span tag (i.e. using span.tag(...))
# - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
# - off: the tag is not used.
#
tags {

# Use the http.url tag.
#url = span

# Use the http.method tag.
#method = metric

# Use the http.status_code tag.
#status-code = metric

# Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
# tag from the context into the HTTP Client Span created by the instrumentation, the following configuration
# should be added:
#
# from-context {
# customer_type = span
# }
#
from-context {

}
}

operations {

# The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP
# Client instrumentation will always try to use the HTTP Operation Name Generator configured below to get
# a name, but if it fails to generate it then this name will be used.
default = "apache.cxf.client"

# FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms:
# - hostname: Uses the request Host as the operation name.
# - method: Uses the request HTTP method as the operation name.
#
#name-generator = "method"
}
}

}

kanela {
modules {
apache-cxf {
name = "Apache CXF Client"
description = "Provides tracing of client calls made with the official Apache CXF library."
instrumentations = [
"kamon.instrumentation.apache.cxf.client.ClientProxyFactoryBeanInstrumentation"
]
enabled = true
within = [
"org.apache.cxf..*",
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package kamon.instrumentation.apache.cxf.client

import kamon.instrumentation.http.HttpMessage
import org.apache.cxf.message.Message
import org.apache.cxf.message.Message.{HTTP_REQUEST_METHOD, PROTOCOL_HEADERS, RESPONSE_CODE}
import org.slf4j.LoggerFactory

import java.net.{URI, URISyntaxException}
import java.util.Collections.{emptyMap => jEmptyMap, singletonList => jList}
import java.util.{List => JList, Map => JMap}
import scala.collection.mutable
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, MapHasAsScala}

class ApacheCxfClientHelper

object ApacheCxfClientHelper {

private val _logger = LoggerFactory.getLogger(classOf[ApacheCxfClientHelper])

def toRequestBuilder(request: Message): HttpMessage.RequestBuilder[Message] =
new RequestReader with HttpMessage.RequestBuilder[Message] {

val delegate: Message = request

val uri: URI = getUri(request)

override def write(header: String, value: String): Unit = {
val headers: mutable.Map[String, String] = getAllHeaders(delegate).to(mutable.Map)
headers.put(header, value)
delegate.put(Message.PROTOCOL_HEADERS, headers.map(m => m._1 -> jList(m._2)).toMap.asJava)
}

override def build(): Message = {
_logger.trace("Prepared request for instrumentation: {}", this)
delegate
}

override def toString(): String = s"RequestReader(host=$host,port=$port,method=$method,path=$path)"
}

def toResponse(message: Message): HttpMessage.Response = new HttpMessage.Response {
override def statusCode: Int = message.get(RESPONSE_CODE) match {
case code: Integer => code
case _ =>
_logger.debug("Not able to retrieve status code from response")
-1
}
}
private def getUri(message: Message): URI =
try {
getUriAsString(message).map(s => new URI(s)).orNull
} catch {
case e: URISyntaxException => throw new RuntimeException(e.getMessage, e)
}

private def safeGet(message: Message, key: String): Option[String] =
if (message.containsKey(key)) {
message.get(key) match {
case value: String => Option.apply(value)
case _ => Option.empty
}
} else Option.empty

private def getUriAsString(message: Message): Option[String] = {
val requestUrl: Option[String] = safeGet(message, Message.REQUEST_URL).orElse {
var address = safeGet(message, Message.ENDPOINT_ADDRESS)
val requestUri = safeGet(message, Message.REQUEST_URI)
if (requestUri.exists(r => r.startsWith("/"))) {
if (!address.exists(a => a.startsWith(requestUri.get))) {
if (address.exists(t => t.endsWith("/") && t.length > 1)) {
address = address.map(a => a.substring(0, a.length))
}
address.map(a => a + requestUri.getOrElse(""))
} else requestUri
} else address
}
safeGet(message, Message.QUERY_STRING).map(q => requestUrl.map(u => s"$u?$q")).getOrElse(requestUrl)
}

private def getAllHeaders(message: Message): Map[String, String] = (message.get(PROTOCOL_HEADERS) match {
case hs: JMap[String, JList[String]] => hs
case _ => jEmptyMap[String, JList[String]]()
}).asScala.map { case (key, values) => key -> values.asScala.mkString(", ") }.toMap

private trait RequestReader extends HttpMessage.Request {
def uri: URI
def delegate: Message

override def host: String = if (uri != null) uri.getHost else null

override def port: Int = if (uri != null) uri.getPort else 0

override def method: String = delegate.get(HTTP_REQUEST_METHOD).asInstanceOf[String]

override def path: String = if (uri != null) uri.getPath else null

override def read(header: String): Option[String] = getAllHeaders(delegate).get(header)

override def readAll(): Map[String, String] = getAllHeaders(delegate)

override def url: String = if (uri != null) uri.toString else null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kamon.instrumentation.apache.cxf.client

import kamon.Kamon
import kamon.instrumentation.http.HttpClientInstrumentation
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
import kanela.agent.api.instrumentation.InstrumentationBuilder
import org.apache.cxf.message.Message

class ClientProxyFactoryBeanInstrumentation extends InstrumentationBuilder {

onSubTypesOf("org.apache.cxf.frontend.ClientProxyFactoryBean")
.advise(method("initFeatures"), classOf[TracingClientFeatureInitializer])
}

object ClientProxyFactoryBeanInstrumentation {
Kamon.onReconfigure(_ =>
ClientProxyFactoryBeanInstrumentation.rebuildHttpClientInstrumentation(): Unit
)

@volatile var cxfClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation()

private def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = {
val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.apache.cxf")
cxfClientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "apache.cxf.client")
cxfClientInstrumentation
}

def processResponse(handler: RequestHandler[_], message: Message, t: Throwable = null): Unit = {
if (t != null) handler.span.fail(t).finish()
else handler.processResponse(ApacheCxfClientHelper.toResponse(message))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kamon.instrumentation.apache.cxf.client

import kamon.context.Storage.Scope
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
import org.apache.cxf.message.Message

import java.io.Closeable

private class TraceScopeHolder(val traceScope: Option[TraceScope], val detached: Boolean = false) extends Serializable

private case class TraceScope(handler: RequestHandler[Message], scope: Option[Scope]) extends Closeable {

override def close(): Unit = {
if (handler != null && handler.span != null) handler.span.finish()
if (scope.nonEmpty) scope.get.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package kamon.instrumentation.apache.cxf.client

import org.apache.cxf.Bus
import org.apache.cxf.feature.AbstractFeature
import org.apache.cxf.interceptor.InterceptorProvider

private class TracingClientFeature extends AbstractFeature {

private val setup = new TracingClientSetupInterceptor()
private val receive = new TracingClientReceiveInterceptor()
private val postInvoke = new TracingClientPostInvokeInterceptor()

override def initializeProvider(provider: InterceptorProvider, bus: Bus): Unit = {
provider.getOutInterceptors.add(0, setup)
provider.getOutFaultInterceptors.add(0, setup)
provider.getInInterceptors.add(0, receive)
provider.getInInterceptors.add(postInvoke)
provider.getInFaultInterceptors.add(0, receive)
provider.getInFaultInterceptors.add(postInvoke)
super.initializeProvider(provider, bus)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kamon.instrumentation.apache.cxf.client

import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.asm.Advice.This
import org.apache.cxf.frontend.ClientProxyFactoryBean

import scala.annotation.static

class TracingClientFeatureInitializer
object TracingClientFeatureInitializer {

@Advice.OnMethodEnter
@static def onEnter(@This clientProxyFactoryBean: Any) = clientProxyFactoryBean match {
case c: ClientProxyFactoryBean => c.getFeatures.add(new TracingClientFeature)
}
}
Loading

0 comments on commit 81bd5f9

Please sign in to comment.