Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added instrumentation for apache cxf client. #1335

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-alpakka-kafka`,
`kamon-http4s-1_0`,
`kamon-http4s-0_23`,
`kamon-apache-httpclient`
`kamon-apache-httpclient`,
`kamon-apache-cxf`
)

lazy val instrumentation = (project in file("instrumentation"))
Expand Down Expand Up @@ -839,6 +840,26 @@ lazy val `kamon-apache-httpclient` = (project in file("instrumentation/kamon-apa
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

lazy val `kamon-apache-cxf` = (project in file("instrumentation/kamon-apache-cxf"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
crossScalaVersions := Seq(`scala_2.13_version`, `scala_3_version`),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.cxf" % "cxf-rt-frontend-simple" % "3.3.6" % "provided",
slf4jApi % "provided",

scalatest % "test",
logbackClassic % "test",
"org.mock-server" % "mockserver-client-java" % "5.13.2" % "test",
"com.dimafeng" %% "testcontainers-scala" % "0.41.0" % "test",
"com.dimafeng" %% "testcontainers-scala-mockserver" % "0.41.0" % "test",
"org.apache.cxf" % "cxf-rt-frontend-jaxws" % "3.3.6" % "test",
"org.apache.cxf" % "cxf-rt-transports-http" % "3.3.6" % "test",
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

/**
* Reporters
Expand Down Expand Up @@ -1107,7 +1128,8 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle
`kamon-caffeine`,
`kamon-lagom`,
`kamon-aws-sdk`,
`kamon-apache-httpclient`
`kamon-apache-httpclient`,
`kamon-apache-cxf`
)

/**
Expand Down Expand Up @@ -1172,7 +1194,8 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`,
`kamon-apache-httpclient`
`kamon-apache-httpclient`,
`kamon-apache-cxf`
)

lazy val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
Expand Down
99 changes: 99 additions & 0 deletions instrumentation/kamon-apache-cxf/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# ================================================== #
# kamon Apache CXF client reference configuration #
# ================================================== #

# Settings to control the CXF Client instrumentation
#
# IMPORTANT: The entire configuration of the CXF Client Instrumentation is based on the constructs provided by the
# Kamon Instrumentation Common library which will always fallback to the settings found under the
# "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to
# find and understand in the context of this project and commented out so that any changes to the default settings
# will actually have effect.
#
kamon.instrumentation.apache.cxf {

#
# Configuration for CXF context propagation.
#
propagation {

# Enables or disables HTTP context propagation on this HTTP client instrumentation. Please note that if
# propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
# be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
#enabled = yes

# HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
# configuration for more details on how to configure the detault HTTP context propagation.
#channel = "default"
}

tracing {

# Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests
# and finish them when the response is received from the server.
#enabled = yes

# Enables collection of span metrics using the `span.processing-time` metric.
#span-metrics = on

# Select which tags should be included as span and span metric tags. The possible options are:
# - span: the tag is added as a Span tag (i.e. using span.tag(...))
# - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
# - off: the tag is not used.
#
tags {

# Use the http.url tag.
#url = span

# Use the http.method tag.
#method = metric

# Use the http.status_code tag.
#status-code = metric

# Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
# tag from the context into the HTTP Client Span created by the instrumentation, the following configuration
# should be added:
#
# from-context {
# customer_type = span
# }
#
from-context {

}
}

operations {

# The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP
# Client instrumentation will always try to use the HTTP Operation Name Generator configured below to get
# a name, but if it fails to generate it then this name will be used.
default = "apache.cxf.client"

# FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms:
# - hostname: Uses the request Host as the operation name.
# - method: Uses the request HTTP method as the operation name.
#
#name-generator = "method"
}
}

}

kanela {
modules {
apache-cxf {
name = "Apache CXF Client"
description = "Provides tracing of client calls made with the official Apache CXF library."
instrumentations = [
"kamon.instrumentation.apache.cxf.client.ClientProxyFactoryBeanInstrumentation"
]
enabled = true
within = [
"org.apache.cxf..*",
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package kamon.instrumentation.apache.cxf.client

import kamon.instrumentation.http.HttpMessage
import org.apache.cxf.message.Message
import org.apache.cxf.message.Message.{HTTP_REQUEST_METHOD, PROTOCOL_HEADERS, RESPONSE_CODE}
import org.slf4j.LoggerFactory

import java.net.{URI, URISyntaxException}
import java.util.Collections.{emptyMap => jEmptyMap, singletonList => jList}
import java.util.{List => JList, Map => JMap}
import scala.collection.mutable
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, MapHasAsScala}

class ApacheCxfClientHelper

object ApacheCxfClientHelper {

private val _logger = LoggerFactory.getLogger(classOf[ApacheCxfClientHelper])

def toRequestBuilder(request: Message): HttpMessage.RequestBuilder[Message] =
new RequestReader with HttpMessage.RequestBuilder[Message] {

val delegate: Message = request

val uri: URI = getUri(request)

override def write(header: String, value: String): Unit = {
val headers: mutable.Map[String, String] = getAllHeaders(delegate).to(mutable.Map)
headers.put(header, value)
delegate.put(Message.PROTOCOL_HEADERS, headers.map(m => m._1 -> jList(m._2)).toMap.asJava)
}

override def build(): Message = {
_logger.trace("Prepared request for instrumentation: {}", this)
delegate
}

override def toString(): String = s"RequestReader(host=$host,port=$port,method=$method,path=$path)"
}

def toResponse(message: Message): HttpMessage.Response = new HttpMessage.Response {
override def statusCode: Int = message.get(RESPONSE_CODE) match {
case code: Integer => code
case _ =>
_logger.debug("Not able to retrieve status code from response")
-1
}
}
private def getUri(message: Message): URI =
try {
getUriAsString(message).map(s => new URI(s)).orNull
} catch {
case e: URISyntaxException => throw new RuntimeException(e.getMessage, e)
}

private def safeGet(message: Message, key: String): Option[String] =
if (message.containsKey(key)) {
message.get(key) match {
case value: String => Option.apply(value)
case _ => Option.empty
}
} else Option.empty

private def getUriAsString(message: Message): Option[String] = {
val requestUrl: Option[String] = safeGet(message, Message.REQUEST_URL).orElse {
var address = safeGet(message, Message.ENDPOINT_ADDRESS)
val requestUri = safeGet(message, Message.REQUEST_URI)
if (requestUri.exists(r => r.startsWith("/"))) {
if (!address.exists(a => a.startsWith(requestUri.get))) {
if (address.exists(t => t.endsWith("/") && t.length > 1)) {
address = address.map(a => a.substring(0, a.length))
}
address.map(a => a + requestUri.getOrElse(""))
} else requestUri
} else address
}
safeGet(message, Message.QUERY_STRING).map(q => requestUrl.map(u => s"$u?$q")).getOrElse(requestUrl)
}

private def getAllHeaders(message: Message): Map[String, String] = (message.get(PROTOCOL_HEADERS) match {
case hs: JMap[String, JList[String]] => hs
case _ => jEmptyMap[String, JList[String]]()
}).asScala.map { case (key, values) => key -> values.asScala.mkString(", ") }.toMap

private trait RequestReader extends HttpMessage.Request {
def uri: URI
def delegate: Message

override def host: String = if (uri != null) uri.getHost else null

override def port: Int = if (uri != null) uri.getPort else 0

override def method: String = delegate.get(HTTP_REQUEST_METHOD).asInstanceOf[String]

override def path: String = if (uri != null) uri.getPath else null

override def read(header: String): Option[String] = getAllHeaders(delegate).get(header)

override def readAll(): Map[String, String] = getAllHeaders(delegate)

override def url: String = if (uri != null) uri.toString else null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kamon.instrumentation.apache.cxf.client

import kamon.Kamon
import kamon.instrumentation.http.HttpClientInstrumentation
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
import kanela.agent.api.instrumentation.InstrumentationBuilder
import org.apache.cxf.message.Message

class ClientProxyFactoryBeanInstrumentation extends InstrumentationBuilder {

onSubTypesOf("org.apache.cxf.frontend.ClientProxyFactoryBean")
.advise(method("initFeatures"), classOf[TracingClientFeatureInitializer])
}

object ClientProxyFactoryBeanInstrumentation {
Kamon.onReconfigure(_ =>
ClientProxyFactoryBeanInstrumentation.rebuildHttpClientInstrumentation(): Unit
)

@volatile var cxfClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation()

private def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = {
val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.apache.cxf")
cxfClientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "apache.cxf.client")
cxfClientInstrumentation
}

def processResponse(handler: RequestHandler[_], message: Message, t: Throwable = null): Unit = {
if (t != null) handler.span.fail(t).finish()
else handler.processResponse(ApacheCxfClientHelper.toResponse(message))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kamon.instrumentation.apache.cxf.client

import kamon.context.Storage.Scope
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
import org.apache.cxf.message.Message

import java.io.Closeable

private class TraceScopeHolder(val traceScope: Option[TraceScope], val detached: Boolean = false) extends Serializable

private case class TraceScope(handler: RequestHandler[Message], scope: Option[Scope]) extends Closeable {

override def close(): Unit = {
if (handler != null && handler.span != null) handler.span.finish()
if (scope.nonEmpty) scope.get.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package kamon.instrumentation.apache.cxf.client

import org.apache.cxf.Bus
import org.apache.cxf.feature.AbstractFeature
import org.apache.cxf.interceptor.InterceptorProvider

private class TracingClientFeature extends AbstractFeature {

private val setup = new TracingClientSetupInterceptor()
private val receive = new TracingClientReceiveInterceptor()
private val postInvoke = new TracingClientPostInvokeInterceptor()

override def initializeProvider(provider: InterceptorProvider, bus: Bus): Unit = {
provider.getOutInterceptors.add(0, setup)
provider.getOutFaultInterceptors.add(0, setup)
provider.getInInterceptors.add(0, receive)
provider.getInInterceptors.add(postInvoke)
provider.getInFaultInterceptors.add(0, receive)
provider.getInFaultInterceptors.add(postInvoke)
super.initializeProvider(provider, bus)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kamon.instrumentation.apache.cxf.client

import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.asm.Advice.This
import org.apache.cxf.frontend.ClientProxyFactoryBean

import scala.annotation.static

class TracingClientFeatureInitializer
object TracingClientFeatureInitializer {

@Advice.OnMethodEnter
@static def onEnter(@This clientProxyFactoryBean: Any) = clientProxyFactoryBean match {
case c: ClientProxyFactoryBean => c.getFeatures.add(new TracingClientFeature)
}
}
Loading