diff --git a/build.sbt b/build.sbt index 0496973d1..b2425a29f 100644 --- a/build.sbt +++ b/build.sbt @@ -569,6 +569,22 @@ lazy val `kamon-finagle` = (project in file("instrumentation/kamon-finagle")) ) ).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test") +lazy val `kamon-netty` = (project in file("instrumentation/kamon-netty")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "io.netty" % "netty-all" % "4.1.65.Final" % "provided", + "io.netty" % "netty-transport-native-epoll" % "4.1.65.Final" % "provided" classifier "linux-x86_64", + + scalatest % "test", + logbackClassic % "test", + ) + ).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test") + + /** * Reporters */ diff --git a/instrumentation/kamon-netty/src/main/java/kamon/netty/instrumentation/advisor/ClientEncodeMethodAdvisor.java b/instrumentation/kamon-netty/src/main/java/kamon/netty/instrumentation/advisor/ClientEncodeMethodAdvisor.java new file mode 100644 index 000000000..ac7221a2e --- /dev/null +++ b/instrumentation/kamon-netty/src/main/java/kamon/netty/instrumentation/advisor/ClientEncodeMethodAdvisor.java @@ -0,0 +1,18 @@ +package kamon.netty.instrumentation.advisor; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpRequest; +import kamon.netty.instrumentation.HttpRequestContext; +import kanela.agent.libs.net.bytebuddy.asm.Advice.Argument; +import kanela.agent.libs.net.bytebuddy.asm.Advice.OnMethodEnter; + +public class ClientEncodeMethodAdvisor { + + @OnMethodEnter + static void onEnter(@Argument(value = 0) ChannelHandlerContext ctx, + @Argument(value = 1, readOnly = false) Object request) { + if (request instanceof HttpRequest) { + request = HttpRequestContext.withContext((HttpRequest)request, ctx); + } + } +} diff --git a/instrumentation/kamon-netty/src/main/java/kamon/netty/instrumentation/advisor/NewTaskQueueMethodAdvisor.java b/instrumentation/kamon-netty/src/main/java/kamon/netty/instrumentation/advisor/NewTaskQueueMethodAdvisor.java new file mode 100644 index 000000000..173e42b26 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/java/kamon/netty/instrumentation/advisor/NewTaskQueueMethodAdvisor.java @@ -0,0 +1,16 @@ +package kamon.netty.instrumentation.advisor; + +import io.netty.channel.EventLoop; +import java.util.Queue; +import kamon.netty.util.MonitoredQueue; +import kanela.agent.libs.net.bytebuddy.asm.Advice.OnMethodExit; +import kanela.agent.libs.net.bytebuddy.asm.Advice.Return; +import kanela.agent.libs.net.bytebuddy.asm.Advice.This; + +public class NewTaskQueueMethodAdvisor { + + @OnMethodExit + static void onExit(@This Object eventLoop, @Return(readOnly = false) Queue queue) { + MonitoredQueue.apply((EventLoop) eventLoop, queue); + } +} diff --git a/instrumentation/kamon-netty/src/main/java/kamon/netty/util/QueueWrapperAdapter.java b/instrumentation/kamon-netty/src/main/java/kamon/netty/util/QueueWrapperAdapter.java new file mode 100644 index 000000000..bca390eb7 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/java/kamon/netty/util/QueueWrapperAdapter.java @@ -0,0 +1,121 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + + +package kamon.netty.util; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; + +public class QueueWrapperAdapter implements Queue { + + private final Queue underlying; + + public QueueWrapperAdapter(Queue underlying) { + this.underlying = underlying; + } + + @Override + public int size() { + return underlying.size(); + } + + @Override + public boolean isEmpty() { + return underlying.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return underlying.contains(o); + } + + @Override + public Iterator iterator() { + return underlying.iterator(); + } + + @Override + public Object[] toArray() { + return underlying.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return underlying.toArray(a); + } + + @Override + public boolean add(E e) { + return underlying.add(e); + } + + @Override + public boolean remove(Object o) { + return underlying.remove(o); + } + + @Override + public boolean containsAll(Collection c) { + return underlying.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + return underlying.addAll(c); + } + + @Override + public boolean removeAll(Collection c) { + return underlying.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return underlying.retainAll(c); + } + + @Override + public void clear() { + underlying.clear(); + } + + @Override + public boolean offer(E e) { + return underlying.offer(e); + } + + @Override + public E remove() { + return underlying.remove(); + } + + @Override + public E poll() { + return underlying.poll(); + } + + @Override + public E element() { + return underlying.element(); + } + + @Override + public E peek() { + return underlying.peek(); + } +} diff --git a/instrumentation/kamon-netty/src/main/resources/META-INF/aop.xml b/instrumentation/kamon-netty/src/main/resources/META-INF/aop.xml new file mode 100644 index 000000000..2ea77c878 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/resources/META-INF/aop.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/instrumentation/kamon-netty/src/main/resources/logback.xml b/instrumentation/kamon-netty/src/main/resources/logback.xml new file mode 100644 index 000000000..898887fe6 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/resources/logback.xml @@ -0,0 +1,14 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/instrumentation/kamon-netty/src/main/resources/reference.conf b/instrumentation/kamon-netty/src/main/resources/reference.conf new file mode 100644 index 000000000..78ad42db0 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/resources/reference.conf @@ -0,0 +1,260 @@ +# =================================== # +# kamon-netty reference configuration # +# =================================== # + + +kamon.instrumentation.netty { + + # Settings to control the HTTP Server instrumentation. + # + # IMPORTANT: Besides the "initial-operation-name" and "unhandled-operation-name" settings, the entire configuration of + # the HTTP Server Instrumentation is based on the constructs provided by the Kamon Instrumentation Common library + # which will always fallback to the settings found under the "kamon.instrumentation.http-server.default" path. The + # default settings have been included here to make them easy to find and understand in the context of this project and + # commented out so that any changes to the default settings will actually have effect. + # + server { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + + + } + + + # + # Configuration for HTTP server metrics collection. + # + metrics { + + # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming + # that the instrumentation is fully compliant: + # + # - http.server.requets + # - http.server.request.active + # - http.server.request.size + # - http.server.response.size + # - http.server.connection.lifetime + # - http.server.connection.usage + # - http.server.connection.open + # + # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests + # metric will also have a status_code tag with the status code group (1xx, 2xx and so on). + # + #enabled = yes + } + + + # + # Configuration for HTTP request tracing. + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + #enabled = yes + + # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valid in accordance to the identity provider. + #preferred-trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span crekamon.trace.sampler = alwaysated by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + # Controls writing trace and span identifiers to HTTP response headers sent by the instrumented servers. The + # configuration can be set to either "none" to disable writing the identifiers on the response headers or to + # the header name to be used when writing the identifiers. + response-headers { + + # HTTP response header name for the trace identifier, or "none" to disable it. + #trace-id = "trace-id" + + # HTTP response header name for the server span identifier, or "none" to disable it. + #span-id = none + } + + # Custom mappings between routes and operation names. + operations { + + # The default operation name to be used when creating Spans to handle the HTTP server requests. In most + # cases it is not possible to define an operation name right at the moment of starting the HTTP server Span + # and in those cases, this operation name will be initially assigned to the Span. Instrumentation authors + # should do their best effort to provide a suitable operation name or make use of the "mappings" facilities. + default = "http.server.request" + + # The operation name to be assigned when an application cannot find any route/endpoint/controller to handle + # a given request. Depending on the instrumented framework, it might be possible to apply this operation + # name automatically or not, check the frameworks' instrumentation docs for more details. + unhandled = "unhandled" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - default: Uses the set default operation name + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.netty.DefaultNameGenerator" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } + } + } + } + + # Settings to control the HTTP Client instrumentation + # + # IMPORTANT: The entire configuration of the HTTP Client Instrumentation is based on the constructs provided by the + # Kamon Instrumentation Common library which will always fallback to the settings found under the + # "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to + # find and understand in the context of this project and commented out so that any changes to the default settings + # will actually have effect. + # + client { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + #channel = "default" + } + + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests + # and finish them when the response is received from the server. + #enabled = yes + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + operations { + + # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP + # Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get + # a name, but if it fails to generate it then this name will be used. + #default = "http.client.request" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - hostname: Uses the request Host as the operation name. + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.netty.PathOperationNameGenerator" + } + } + } +} + +kanela { + show-banner = false + + modules { + netty { + name = "Netty Intrumentation Module" + stoppable = false + instrumentations = [ + "kamon.netty.instrumentation.ChannelInstrumentation", + "kamon.netty.instrumentation.EventLoopInstrumentation", + "kamon.netty.instrumentation.HttpClientInstrumentation", + "kamon.netty.instrumentation.HttpMessageInstrumentation", + "kamon.netty.instrumentation.HttpServerInstrumentation", + "kamon.netty.instrumentation.ServerBootstrapInstrumentation" + ] + within = [ "io.netty.*" ] + } + } +} \ No newline at end of file diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/Metrics.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/Metrics.scala new file mode 100644 index 000000000..eac73ea97 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/Metrics.scala @@ -0,0 +1,57 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + +import kamon.Kamon +import kamon.metric.MeasurementUnit._ +import kamon.metric._ +import kamon.tag.TagSet + + +object Metrics { + + /** + * Metrics for Netty Event Loops: + * + * - registered-channels:The number of registered Channels. + * - task-processing-time: The the number of nanoseconds the last processing of all tasks took. + * - task-queue-size: The number of tasks that are pending for processing. + * - task-waiting-time: The waiting time in the queue. + */ + val registeredChannelsMetric = Kamon.rangeSampler("netty.event-loop.registered-channels") + val taskProcessingTimeMetric = Kamon.histogram("netty.event-loop.task-processing-time", time.nanoseconds) + val taskQueueSizeMetric = Kamon.rangeSampler("netty.event-loop.task-queue-size") + val taskWaitingTimeMetric = Kamon.histogram("netty.event-loop.task-waiting-time", time.nanoseconds) + + + def forEventLoop(name: String): EventLoopMetrics = { + val eventLoopTags = Map("name" -> name) + EventLoopMetrics( + eventLoopTags, + registeredChannelsMetric.withTags(TagSet.from(eventLoopTags)), + taskProcessingTimeMetric.withTags(TagSet.from(eventLoopTags)), + taskQueueSizeMetric.withTags(TagSet.from(eventLoopTags)), + taskWaitingTimeMetric.withTags(TagSet.from(eventLoopTags)) + ) + } + + case class EventLoopMetrics(tags: Map[String, String], + registeredChannels: RangeSampler, + taskProcessingTime: Histogram, + taskQueueSize: RangeSampler, + taskWaitingTime: Histogram) +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/Netty.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/Netty.scala new file mode 100644 index 000000000..e16666dd3 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/Netty.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + +import java.net.URI + +import com.typesafe.config.Config +import io.netty.handler.codec.http.HttpRequest +import kamon.Kamon +import kamon.instrumentation.http.HttpMessage.Request +import kamon.instrumentation.http.{HttpClientInstrumentation, HttpOperationNameGenerator, HttpServerInstrumentation} +import kamon.util.DynamicAccess + +object Netty { +// private var nameGenerator: NameGenerator = new PathOperationNameGenerator() + +// loadConfiguration(Kamon.config()) + +// def generateOperationName(request: HttpRequest): String = +// nameGenerator.generateOperationName(request) + +// def generateHttpClientOperationName(request: HttpRequest): String = +// nameGenerator.generateHttpClientOperationName(request) + + + val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.netty.server") + val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.netty.client") + + + val serverInstrumentation = HttpServerInstrumentation.from(httpServerConfig, "netty.server", "0.0.0.0", 8080) + val clientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "netty.client") + +// Kamon.onReconfigure((newConfig: Config) => Netty.loadConfiguration(newConfig)) + +// private def loadConfiguration(config: Config): Unit = synchronized { +// val dynamic = new DynamicAccess(getClass.getClassLoader) +// val nameGeneratorFQCN = config.getString("kamon.netty.name-generator") +// nameGenerator = dynamic.createInstanceFor[NameGenerator](nameGeneratorFQCN, Nil) +// } +// + @volatile var nameGenerator: HttpOperationNameGenerator = nameGeneratorFromConfig(Kamon.config()) + + private def nameGeneratorFromConfig(config: Config): HttpOperationNameGenerator = { + val dynamic = new DynamicAccess(getClass.getClassLoader) + val nameGeneratorFQCN = config.getString("kamon.instrumentation.netty.client.tracing.operations.name-generator") + dynamic.createInstanceFor[HttpOperationNameGenerator](nameGeneratorFQCN, Nil) + } + + Kamon.onReconfigure { newConfig => + nameGenerator = nameGeneratorFromConfig(newConfig) + } +} + +class DefaultNameGenerator extends HttpOperationNameGenerator { + import java.util.Locale + + import scala.collection.concurrent.TrieMap + + private val localCache = TrieMap.empty[String, String] + private val normalizePattern = """\$([^<]+)<[^>]+>""".r + +// override def generateHttpClientOperationName(request: HttpRequest): String = { +// val uri = new URI(request.getUri) +// s"${uri.getAuthority}${uri.getPath}" +// } + + def name(request: Request): Option[String] = { + Some( + localCache.getOrElseUpdate(s"${request.method}${request.path}", { + // Convert paths of form GET /foo/bar/$paramname/blah to foo.bar.paramname.blah.get + val p = normalizePattern.replaceAllIn(request.path, "$1").replace('/', '.').dropWhile(_ == '.') + val normalisedPath = { + if (p.lastOption.exists(_ != '.')) s"$p." + else p + } + s"$normalisedPath${request.method.toLowerCase(Locale.ENGLISH)}" + }) + ) + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/PathOperationNameGenerator.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/PathOperationNameGenerator.scala new file mode 100644 index 000000000..5007d2d65 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/PathOperationNameGenerator.scala @@ -0,0 +1,8 @@ +package kamon.netty + +import kamon.instrumentation.http.{HttpMessage, HttpOperationNameGenerator} + +class PathOperationNameGenerator extends HttpOperationNameGenerator { + override def name(request: HttpMessage.Request): Option[String] = + Some(request.path) +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/ChannelInstrumentation.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/ChannelInstrumentation.scala new file mode 100644 index 000000000..9e3fe12fa --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/ChannelInstrumentation.scala @@ -0,0 +1,10 @@ +package kamon.netty.instrumentation + +import kamon.netty.instrumentation.mixin.ChannelContextAwareMixin +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class ChannelInstrumentation extends InstrumentationBuilder { + + onSubTypesOf("io.netty.channel.Channel") + .mixin(classOf[ChannelContextAwareMixin]) +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/EventLoopInstrumentation.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/EventLoopInstrumentation.scala new file mode 100644 index 000000000..73115c5ab --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/EventLoopInstrumentation.scala @@ -0,0 +1,37 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.instrumentation + +import kamon.netty.instrumentation.advisor.{EpollAddMethodAdvisor, NewTaskQueueMethodAdvisor, NioCancelMethodAdvisor, RemoveMethodAdvisor} +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class EventLoopInstrumentation extends InstrumentationBuilder { + + onType("io.netty.channel.nio.NioEventLoop") + .advise(method("cancel"), classOf[NioCancelMethodAdvisor]) + .advise(method("newTaskQueue"), classOf[NewTaskQueueMethodAdvisor]) + + onType("io.netty.channel.epoll.EpollEventLoop") + .advise(method("add"), classOf[EpollAddMethodAdvisor]) + .advise(method("remove"), classOf[RemoveMethodAdvisor]) + .advise(method("newTaskQueue"), classOf[NewTaskQueueMethodAdvisor]) + + onType("io.netty.channel.SingleThreadEventLoop") + .advise(method("register"), classOf[EpollAddMethodAdvisor]) +} + + diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpClientInstrumentation.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpClientInstrumentation.scala new file mode 100644 index 000000000..54e2817a7 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpClientInstrumentation.scala @@ -0,0 +1,29 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.instrumentation + +import kamon.netty.instrumentation.advisor.{ClientDecodeMethodAdvisor, ClientEncodeMethodAdvisor} +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class HttpClientInstrumentation extends InstrumentationBuilder { + + onType("io.netty.handler.codec.http.HttpClientCodec$Decoder") + .advise(method("decode").and(takesArguments(3)), classOf[ClientDecodeMethodAdvisor]) + + onType("io.netty.handler.codec.http.HttpClientCodec$Encoder") + .advise(method("encode").and(takesArguments(3)), classOf[ClientEncodeMethodAdvisor]) +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpMessageInstrumentation.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpMessageInstrumentation.scala new file mode 100644 index 000000000..dded3b638 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpMessageInstrumentation.scala @@ -0,0 +1,10 @@ +package kamon.netty.instrumentation + +import kamon.netty.instrumentation.mixin.RequestContextAwareMixin +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class HttpMessageInstrumentation extends InstrumentationBuilder { + + onSubTypesOf("io.netty.handler.codec.http.HttpMessage") + .mixin(classOf[RequestContextAwareMixin]) +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpRequestContext.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpRequestContext.scala new file mode 100644 index 000000000..fe61572d9 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpRequestContext.scala @@ -0,0 +1,72 @@ +package kamon.netty.instrumentation + +import java.net.URI + +import io.netty.channel.{ChannelHandler, ChannelHandlerContext, ChannelInboundHandlerAdapter} +import io.netty.handler.codec.http.HttpRequest +import kamon.Kamon +import kamon.instrumentation.http.HttpMessage +import kamon.netty.Netty + +object HttpRequestContext { + + def withContext(request: HttpRequest, ctx: ChannelHandlerContext): HttpRequest = { + val currentContext = request.getContext() + val handler = Netty.clientInstrumentation.createHandler(toRequestBuilder(request), currentContext) + + ctx.channel().toContextAware().setClientHandler(handler) + request + } + + + private def toRequestBuilder(request: HttpRequest): HttpMessage.RequestBuilder[HttpRequest] = + new HttpMessage.RequestBuilder[HttpRequest] { + + val uri = new URI(request.uri()) + + import scala.collection.JavaConverters._ + + private var _newHttpHeaders: List[(String, String)] = List.empty + + override def write(header: String, value: String): Unit = + _newHttpHeaders = (header -> value) :: _newHttpHeaders + + override def build(): HttpRequest = { + _newHttpHeaders.foreach{ case(key, value) => request.headers().add(key, value)} + request + } + + override def read(header: String): Option[String] = + Option(request.headers().get(header)) + + override def readAll(): Map[String, String] = + request.headers.asScala.map(m => (m.getKey, m.getValue)).toMap + + override def url: String = + uri.toURL.toString + + override def path: String = + uri.getPath + + override def method: String = + request.method().name() + + override def host: String = + uri.getHost + + override def port: Int = + uri.getPort + } + +} + +object KamonHandlerPortable { + + @ChannelHandler.Sharable + class KamonHandler extends ChannelInboundHandlerAdapter { + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + ctx.channel().toContextAware().setStartTime(Kamon.clock().instant()) + super.channelRead(ctx, msg) + } + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpServerInstrumentation.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpServerInstrumentation.scala new file mode 100644 index 000000000..a0d5ddede --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/HttpServerInstrumentation.scala @@ -0,0 +1,57 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.instrumentation + +import kamon.netty.instrumentation.advisor.{ServerDecodeMethodAdvisor, ServerEncodeMethodAdvisor} +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class HttpServerInstrumentation extends InstrumentationBuilder { + + onSubTypesOf("io.netty.handler.codec.http.HttpObjectDecoder") + .advise(method("decode").and(takesArguments(3)), classOf[ServerDecodeMethodAdvisor]) + + onSubTypesOf("io.netty.handler.codec.http.HttpObjectEncoder") + .advise(method("encode").and(takesArguments(3)), classOf[ServerEncodeMethodAdvisor]) + +// @After("execution(* io.netty.handler.codec.http.HttpObjectDecoder+.decode(..)) && args(ctx, *, out)") +// def onDecodeRequest(ctx: ChannelHandlerContext, out:java.util.List[AnyRef]): Unit = { +// if (out.size() > 0 && out.get(0).isHttpRequest()) { +// val request = out.get(0).toHttpRequest() +// val channel = ctx.channel().toContextAware() +// val incomingContext = decodeContext(request) +// val serverSpan = Kamon.buildSpan(Netty.generateOperationName(request)) +// .asChildOf(incomingContext.get(Span.ContextKey)) +// .withStartTimestamp(channel.startTime) +// .withSpanTag("span.kind", "server") +// .withSpanTag("component", "netty") +// .withSpanTag("http.method", request.getMethod.name()) +// .withSpanTag("http.url", request.getUri) +// .start() +// +// channel.setContext(incomingContext.withKey(Span.ContextKey, serverSpan)) +// } +// } +// +// @Before("execution(* io.netty.handler.codec.http.HttpObjectEncoder+.encode(..)) && args(ctx, response, *)") +// def onEncodeResponse(ctx: ChannelHandlerContext, response:HttpResponse): Unit = { +// val serverSpan = ctx.channel().getContext().get(Span.ContextKey) +// if(isError(response.getStatus.code())) +// serverSpan.addSpanTag("error", value = true) +// serverSpan.finish() +// } +} + diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/ServerBootstrapInstrumentation.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/ServerBootstrapInstrumentation.scala new file mode 100644 index 000000000..7491c692a --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/ServerBootstrapInstrumentation.scala @@ -0,0 +1,67 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.instrumentation + +import kamon.netty.instrumentation.advisor.{ServerChannelReadMethodAdvisor, ServerGroupMethodAdvisor} +import kamon.netty.instrumentation.mixin.EventLoopMixin +import kanela.agent.api.instrumentation.InstrumentationBuilder + +class ServerBootstrapInstrumentation extends InstrumentationBuilder { + + onSubTypesOf("io.netty.channel.EventLoopGroup") + .mixin(classOf[EventLoopMixin]) + + onType("io.netty.bootstrap.ServerBootstrap") + .advise(method("group").and(takesArguments(2)), classOf[ServerGroupMethodAdvisor]) + + onType("io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor") + .advise(method("channelRead").and(takesArguments(2)), classOf[ServerChannelReadMethodAdvisor]) + +// @Before("execution(* io.netty.bootstrap.ServerBootstrap.group(..)) && args(bossGroup, workerGroup)") +// def onNewServerBootstrap(bossGroup:NamedEventLoopGroup, workerGroup:NamedEventLoopGroup):Unit = { +// if(bossGroup == workerGroup) { +// bossGroup.name = BossGroupName +// workerGroup.name = BossGroupName +// } else { +// bossGroup.name = BossGroupName +// workerGroup.name = WorkerGroupName +// } +// } + +// @After("execution(* io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead(..)) && args(ctx, child)") +// def onChannelRead(ctx: ChannelHandlerContext, child: Channel):Unit = { +// val pipeline = child.pipeline() +// if(pipeline.get(KamonHandler) == null) +// pipeline.addFirst(KamonHandler, new KamonHandler()) +// } +} + +object ServerBootstrapInstrumentation { + val BossGroupName = "boss-group" + val WorkerGroupName = "worker-group" + val KamonHandler = "kamon-handler" +} + +//@Aspect +//class EventLoopMixin { +// @DeclareMixin("io.netty.channel.EventLoopGroup+") +// def mixinEventLoopGroupWithNamedEventLoopGroup: NamedEventLoopGroup = new NamedEventLoopGroup {} +//} +// +//trait NamedEventLoopGroup { +// var name:String = _ +//} \ No newline at end of file diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ClientDecodeMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ClientDecodeMethodAdvisor.scala new file mode 100644 index 000000000..d788c9ead --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ClientDecodeMethodAdvisor.scala @@ -0,0 +1,31 @@ +package kamon.netty.instrumentation +package advisor + +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.http.HttpResponse +import kamon.instrumentation.http.HttpMessage +import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodExit, Thrown} + +class ClientDecodeMethodAdvisor +object ClientDecodeMethodAdvisor { + + @OnMethodExit(onThrowable = classOf[Throwable], inline = false) + def onExit(@Argument(0) _ctx: AnyRef, + @Argument(2) out: java.util.List[AnyRef], + @Thrown failure: Throwable): Unit = { + + val ctx = _ctx.asInstanceOf[ChannelHandlerContext] + + if (failure != null) { + val handler = ctx.channel().toContextAware().getClientHandler + handler.span.fail(failure.getMessage, failure).finish() + throw failure + } else if (out.size() > 0 && out.get(0).isHttpResponse()) { + val response = out.get(0).asInstanceOf[HttpResponse] + val handler = ctx.channel().toContextAware().getClientHandler + handler.processResponse(new HttpMessage.Response { + override def statusCode: Int = response.status().code() + }) + } + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/EpollAddMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/EpollAddMethodAdvisor.scala new file mode 100644 index 000000000..eac2705de --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/EpollAddMethodAdvisor.scala @@ -0,0 +1,15 @@ +package kamon.netty.instrumentation.advisor + +import io.netty.util.concurrent.EventExecutor +import kamon.netty.Metrics +import kamon.netty.util.EventLoopUtils.name +import kanela.agent.libs.net.bytebuddy.asm.Advice.{OnMethodExit, This} + +class EpollAddMethodAdvisor +object EpollAddMethodAdvisor { + + @OnMethodExit + def onExit(@This eventLoop: EventExecutor): Unit = { + Metrics.forEventLoop(name(eventLoop)).registeredChannels.increment() + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/NioCancelMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/NioCancelMethodAdvisor.scala new file mode 100644 index 000000000..61b6d8f08 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/NioCancelMethodAdvisor.scala @@ -0,0 +1,16 @@ +package kamon.netty.instrumentation.advisor + +import io.netty.util.concurrent.EventExecutor +import kamon.netty.Metrics +import kamon.netty.util.EventLoopUtils.name +import kanela.agent.libs.net.bytebuddy.asm.Advice.{OnMethodEnter, This} + +class NioCancelMethodAdvisor +object NioCancelMethodAdvisor { + + @OnMethodEnter + def onEnter(@This eventLoop: EventExecutor): Unit = { + val registeredChannels = Metrics.forEventLoop(name(eventLoop)).registeredChannels + registeredChannels.decrement() + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/RegisterMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/RegisterMethodAdvisor.scala new file mode 100644 index 000000000..43d3abb7d --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/RegisterMethodAdvisor.scala @@ -0,0 +1,22 @@ +package kamon.netty.instrumentation.advisor + +import io.netty.channel.ChannelFuture +import io.netty.util.concurrent.EventExecutor +import kamon.netty.Metrics +import kamon.netty.util.EventLoopUtils.name +import kanela.agent.libs.net.bytebuddy.asm.Advice.{OnMethodExit, Return, This} + +class RegisterMethodAdvisor +object RegisterMethodAdvisor { + + @OnMethodExit + def onExit(@This eventLoop: EventExecutor, @Return _channelFuture: AnyRef): Unit = { + val channelFuture = _channelFuture.asInstanceOf[ChannelFuture] + val registeredChannels = Metrics.forEventLoop(name(eventLoop)).registeredChannels + + if (channelFuture.isSuccess) registeredChannels.increment() + else channelFuture.addListener((future: ChannelFuture) => { + if(future.isSuccess) registeredChannels.increment() + }) + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/RemoveMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/RemoveMethodAdvisor.scala new file mode 100644 index 000000000..565443fe6 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/RemoveMethodAdvisor.scala @@ -0,0 +1,17 @@ +package kamon.netty.instrumentation.advisor + +import io.netty.channel.Channel +import io.netty.util.concurrent.EventExecutor +import kamon.netty.Metrics +import kamon.netty.util.EventLoopUtils.name +import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodExit, This} + +class RemoveMethodAdvisor +object RemoveMethodAdvisor { + + @OnMethodExit + def onExit(@This eventLoop: EventExecutor, @Argument(0) channel: Any): Unit = { + if(channel.asInstanceOf[Channel].isOpen) + Metrics.forEventLoop(name(eventLoop)).registeredChannels.decrement() + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerChannelReadMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerChannelReadMethodAdvisor.scala new file mode 100644 index 000000000..997b71ec4 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerChannelReadMethodAdvisor.scala @@ -0,0 +1,17 @@ +package kamon.netty.instrumentation +package advisor + +import io.netty.channel.Channel +import kamon.netty.instrumentation.ServerBootstrapInstrumentation.KamonHandler +import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodExit} + +class ServerChannelReadMethodAdvisor +object ServerChannelReadMethodAdvisor { + + @OnMethodExit + def onExit(@Argument(1) child: AnyRef): Unit = { + val pipeline = child.asInstanceOf[Channel].pipeline() + if(pipeline.get(KamonHandler) == null) + pipeline.addFirst(KamonHandler, new KamonHandlerPortable.KamonHandler()) + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerDecodeMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerDecodeMethodAdvisor.scala new file mode 100644 index 000000000..b9de4deba --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerDecodeMethodAdvisor.scala @@ -0,0 +1,45 @@ +package kamon.netty.instrumentation +package advisor + +import java.net.URI + +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.http.HttpRequest +import kamon.instrumentation.http.HttpMessage +import kamon.netty.Netty +import kamon.netty.instrumentation.mixin.ChannelContextAware +import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodExit} + +class ServerDecodeMethodAdvisor +object ServerDecodeMethodAdvisor { + + @OnMethodExit + def onExit(@Argument(0) ctx: AnyRef, + @Argument(2) out: java.util.List[AnyRef]): Unit = { + + if (out.size() > 0 && out.get(0).isHttpRequest()) { + val request = out.get(0).toHttpRequest() + val channel = ctx.asInstanceOf[ChannelHandlerContext].channel().asInstanceOf[ChannelContextAware] + val serverRequestHandler = Netty.serverInstrumentation.createHandler(toRequest(request)) + channel.setHandler(serverRequestHandler) + } + } + + private def toRequest(request: HttpRequest): HttpMessage.Request = new HttpMessage.Request { + import scala.collection.JavaConverters._ + + val uri = new URI(request.uri()) + + override def url: String = uri.toURL.toString + override def path: String = uri.getPath + override def method: String = request.method().name() + override def host: String = uri.getHost + override def port: Int = uri.getPort + + override def read(header: String): Option[String] = + Option(request.headers().get(header)) + + override def readAll(): Map[String, String] = + request.headers().entries().asScala.map(e => e.getKey -> e.getValue).toMap + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerEncodeMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerEncodeMethodAdvisor.scala new file mode 100644 index 000000000..86ecce1c3 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerEncodeMethodAdvisor.scala @@ -0,0 +1,34 @@ +package kamon.netty.instrumentation +package advisor + +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.http.HttpResponse +import kamon.instrumentation.http.HttpMessage +import kamon.netty.instrumentation.mixin.ChannelContextAware + +class ServerEncodeMethodAdvisor +object ServerEncodeMethodAdvisor { + import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodEnter} + + @OnMethodEnter + def onEnter(@Argument(0) ctx: ChannelHandlerContext, + @Argument(1) response: AnyRef): Unit = { + + if (response.isHttpResponse()) { + val handler = ctx.channel().asInstanceOf[ChannelContextAware].getHandler + handler.buildResponse(toResponse(response.toHttpResponse()),handler.context) + handler.responseSent() + } + } + + private def toResponse(response: HttpResponse): HttpMessage.ResponseBuilder[HttpResponse] = new HttpMessage.ResponseBuilder[HttpResponse] { + override def build(): HttpResponse = + response + + override def statusCode: Int = + response.status().code() + + override def write(header: String, value: String): Unit = + response.headers().add(header, value) + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerGroupMethodAdvisor.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerGroupMethodAdvisor.scala new file mode 100644 index 000000000..cda074785 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/advisor/ServerGroupMethodAdvisor.scala @@ -0,0 +1,23 @@ +package kamon.netty.instrumentation.advisor + +import kamon.netty.instrumentation.ServerBootstrapInstrumentation.{BossGroupName, WorkerGroupName} +import kamon.netty.instrumentation.mixin.NamedEventLoopGroup +import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodEnter} + +class ServerGroupMethodAdvisor +object ServerGroupMethodAdvisor { + + @OnMethodEnter + def onEnter(@Argument(value = 0) _bossGroup: AnyRef, + @Argument(value = 1) _workerGroup: AnyRef): Unit = { + val bossGroup = _bossGroup.asInstanceOf[NamedEventLoopGroup] + val workerGroup = _workerGroup.asInstanceOf[NamedEventLoopGroup] + if(bossGroup == workerGroup) { + bossGroup.setName(BossGroupName) + workerGroup.setName(BossGroupName) + } else { + bossGroup.setName(BossGroupName) + workerGroup.setName(WorkerGroupName) + } + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/ChannelMixin.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/ChannelMixin.scala new file mode 100644 index 000000000..1241bbb23 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/ChannelMixin.scala @@ -0,0 +1,67 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.instrumentation.mixin + +import java.time.Instant + +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.http.HttpClientInstrumentation +import kamon.instrumentation.http.HttpServerInstrumentation.RequestHandler +import kanela.agent.api.instrumentation.mixin.Initializer + + +trait ChannelContextAware { + def setClientHandler(handler: HttpClientInstrumentation.RequestHandler[_]): Unit + def getClientHandler: HttpClientInstrumentation.RequestHandler[_] + def setStartTime(value: Instant): Unit + def getStartTime: Instant + def setContext(ctx: Context):Unit + def getContext: Context + def setHandler(handler:RequestHandler): Unit + def getHandler:RequestHandler +} + +/** + * -- + */ +class ChannelContextAwareMixin extends ChannelContextAware { + @volatile var startTime: Instant = _ + @volatile var context:Context = _ + @volatile var serverHandler:RequestHandler = _ + @volatile var clientHandler:HttpClientInstrumentation.RequestHandler[_] = _ + + override def setStartTime(value: Instant): Unit = startTime = value + + override def getStartTime: Instant = startTime + + override def setContext(ctx: Context): Unit = context = ctx + + override def getContext: Context = context + + override def setHandler(handler: RequestHandler): Unit = serverHandler = handler + + override def getHandler: RequestHandler = serverHandler + + override def setClientHandler(handler: HttpClientInstrumentation.RequestHandler[_]): Unit = clientHandler = handler + + override def getClientHandler: HttpClientInstrumentation.RequestHandler[_] = clientHandler + + @Initializer + def init(): Unit = context = Kamon.currentContext() + +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/EventLoopMixin.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/EventLoopMixin.scala new file mode 100644 index 000000000..dde54ce8d --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/EventLoopMixin.scala @@ -0,0 +1,15 @@ +package kamon.netty.instrumentation.mixin + +trait NamedEventLoopGroup { + def setName(name: String): Unit + def getName: String +} + +class EventLoopMixin extends NamedEventLoopGroup { + + @volatile var _name: String = _ + + override def setName(name: String): Unit = _name = name + + override def getName: String = _name +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/HttpMessageMixin.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/HttpMessageMixin.scala new file mode 100644 index 000000000..3a08f6667 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/mixin/HttpMessageMixin.scala @@ -0,0 +1,26 @@ +package kamon.netty.instrumentation.mixin + +import kamon.Kamon +import kamon.context.Context +import kanela.agent.api.instrumentation.mixin.Initializer + + +trait RequestContextAware { + def setContext(ctx: Context): Unit + def getContext: Context +} + + +/** + * -- + */ +class RequestContextAwareMixin extends RequestContextAware { + @volatile var context: Context = _ + + override def setContext(ctx: Context): Unit = context = ctx + + override def getContext: Context = context + + @Initializer + def init(): Unit = context = Kamon.currentContext() +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/package.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/package.scala new file mode 100644 index 000000000..93955c7c3 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/instrumentation/package.scala @@ -0,0 +1,58 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + + +import io.netty.handler.codec.http.{HttpRequest, HttpResponse} +import kamon.context.Context +import kamon.netty.instrumentation.mixin.{ChannelContextAware, RequestContextAware} + +package object instrumentation { + + implicit class ChannelSyntax(val channel: io.netty.channel.Channel) extends AnyVal { + def toContextAware(): ChannelContextAware = + channel.asInstanceOf[ChannelContextAware] + + def getContext(): Context = + channel.toContextAware().getContext + } + + implicit class RequestSyntax(val request: HttpRequest) extends AnyVal { + def toContextAware(): RequestContextAware = + request.asInstanceOf[RequestContextAware] + + def getContext(): Context = + request.toContextAware().getContext + } + + implicit class HttpSyntax(val obj: AnyRef) extends AnyVal { + def toHttpRequest(): HttpRequest = + obj.asInstanceOf[HttpRequest] + + def isHttpRequest(): Boolean = + obj.isInstanceOf[HttpRequest] + + def toHttpResponse(): HttpResponse = + obj.asInstanceOf[HttpResponse] + + def isHttpResponse(): Boolean = + obj.isInstanceOf[HttpResponse] + } + + def isError(statusCode: Int): Boolean = + statusCode >= 500 && statusCode < 600 +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/EventLoopUtils.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/EventLoopUtils.scala new file mode 100644 index 000000000..c2f3ff712 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/EventLoopUtils.scala @@ -0,0 +1,27 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.util + +import io.netty.util.concurrent.EventExecutor +import kamon.netty.instrumentation.mixin.NamedEventLoopGroup + +object EventLoopUtils { + def name(eventLoop: EventExecutor): String = { + val sanitize:String => String = str => str.replaceAll("(.)(\\p{Upper})", "$1-$2").toLowerCase() + s"${eventLoop.parent().asInstanceOf[NamedEventLoopGroup].getName}-${sanitize(eventLoop.getClass.getSimpleName)}" + } +} diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/Latency.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/Latency.scala new file mode 100644 index 000000000..f5d7b0d43 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/Latency.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.util + +import kamon.Kamon +import kamon.metric.Histogram + + +object Latency { + def measure[A](histogram: Histogram)(thunk: ⇒ A): A = { + val start = Kamon.clock.nanos() + try thunk finally { + val latency = Kamon.clock().nanos() - start + histogram.record(latency) + } + } +} \ No newline at end of file diff --git a/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/MonitoredQueue.scala b/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/MonitoredQueue.scala new file mode 100644 index 000000000..8a0a930f0 --- /dev/null +++ b/instrumentation/kamon-netty/src/main/scala/kamon/netty/util/MonitoredQueue.scala @@ -0,0 +1,82 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty.util + +import java.util + +import io.netty.channel.EventLoop +import kamon.Kamon +import kamon.context.Context +import kamon.netty.Metrics +import kamon.netty.Metrics.EventLoopMetrics +import kamon.netty.util.EventLoopUtils.name + +class MonitoredQueue(eventLoop:EventLoop, underlying:util.Queue[Runnable]) extends QueueWrapperAdapter[Runnable](underlying) { + + import MonitoredQueue._ + + implicit lazy val eventLoopMetrics: EventLoopMetrics = Metrics.forEventLoop(name(eventLoop)) + + override def add(runnable: Runnable): Boolean = { + eventLoopMetrics.taskQueueSize.increment() + underlying.add(new TimedTask(runnable)) + } + + override def offer(runnable: Runnable): Boolean = { + eventLoopMetrics.taskQueueSize.increment() + underlying.offer(new TimedTask(runnable)) + } + + override def remove(): Runnable = { + val runnable = underlying.remove() + eventLoopMetrics.taskQueueSize.decrement() + eventLoopMetrics.taskWaitingTime.record(timeInQueue(runnable)) + runnable + } + + override def poll(): Runnable = { + val runnable = underlying.poll() + + if(runnable != null) { + eventLoopMetrics.taskQueueSize.decrement() + eventLoopMetrics.taskWaitingTime.record(timeInQueue(runnable)) + } + runnable + } +} + +object MonitoredQueue { + def apply(eventLoop: EventLoop, underlying: util.Queue[Runnable]): MonitoredQueue = + new MonitoredQueue(eventLoop, underlying) + + def timeInQueue(runnable: Runnable):Long = + runnable.asInstanceOf[TimedTask].timeInQueue + +} + +private[this] class TimedTask(underlying:Runnable)(implicit metrics: EventLoopMetrics) extends Runnable { + val startTime:Long = Kamon.clock().nanos() + val context: Context = Kamon.currentContext() + + override def run(): Unit = + Kamon.runWithContext(context) { + Latency.measure(metrics.taskProcessingTime)(underlying.run()) + } + + def timeInQueue: Long = Kamon.clock().nanos() - startTime + +} diff --git a/instrumentation/kamon-netty/src/test/scala/kamon/netty/Clients.scala b/instrumentation/kamon-netty/src/test/scala/kamon/netty/Clients.scala new file mode 100644 index 000000000..c80c1d3ed --- /dev/null +++ b/instrumentation/kamon-netty/src/test/scala/kamon/netty/Clients.scala @@ -0,0 +1,105 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + +import java.net.InetSocketAddress +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.Unpooled +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter, ChannelInitializer} +import io.netty.handler.codec.http._ +import io.netty.util.CharsetUtil + +class NioEventLoopBasedClient(port: Int) { + + private val clientMessagesReceived = new LinkedBlockingQueue[AnyRef]() + private val group = new NioEventLoopGroup(1) + private val b = new Bootstrap + + b.group(group) + .channel(classOf[NioSocketChannel]) + .handler(new HttpClientInitializer(clientMessagesReceived)) + + val channel: Channel = b.connect(new InetSocketAddress(port)).sync.channel + + def close(): Unit = { + channel.close + group.shutdownGracefully() + } + + def execute(request: DefaultFullHttpRequest, timeoutMillis: Long = 2000): FullHttpResponse = { + val future = channel.write(request) + channel.flush + future.await(timeoutMillis) + response() + } + + def executeWithContent(request: DefaultHttpRequest, content: Seq[HttpContent], timeoutMillis: Long = 2000): FullHttpResponse = { + val allFutures = (request +: content).map(channel.write) + channel.flush + allFutures.foreach(_.await(timeoutMillis)) + response() + } + + def get(path: String): DefaultFullHttpRequest = { + val request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path) + HttpHeaders.setContentLength(request, 0) + request + } + + def postWithChunks(path: String, chunks: String*): (DefaultHttpRequest, Seq[DefaultHttpContent]) = { + val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, path) + HttpHeaders.setTransferEncodingChunked(request) + val httpChunks = chunks.map(chunk => new DefaultHttpContent(Unpooled.copiedBuffer(chunk, CharsetUtil.UTF_8))) + (request, httpChunks :+ new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER)) + } + + private def response(): FullHttpResponse = + clientMessagesReceived.poll(2, TimeUnit.SECONDS).asInstanceOf[FullHttpResponse] +} + +object NioEventLoopBasedClient { + def apply(bindAddress: Int): NioEventLoopBasedClient = new NioEventLoopBasedClient(bindAddress) +} + +object Clients { + def withNioClient[A](bindAddress:Int = 9001)(thunk: NioEventLoopBasedClient => A): A = { + val client = new NioEventLoopBasedClient(bindAddress) + try thunk(client) finally client.close() + } +} + +private class HttpClientInitializer(received:java.util.Queue[AnyRef]) extends ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val p = ch.pipeline + p.addLast(new HttpClientCodec) + p.addLast(new HttpObjectAggregator(1024)) + p.addLast(new HttpClientHandler(received)) + } +} + +private class HttpClientHandler(received:java.util.Queue[AnyRef]) extends ChannelInboundHandlerAdapter { + override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = { + received.add(msg) + } +} + + diff --git a/instrumentation/kamon-netty/src/test/scala/kamon/netty/NettyHTTPTracingSpec.scala b/instrumentation/kamon-netty/src/test/scala/kamon/netty/NettyHTTPTracingSpec.scala new file mode 100644 index 000000000..737770fbf --- /dev/null +++ b/instrumentation/kamon-netty/src/test/scala/kamon/netty/NettyHTTPTracingSpec.scala @@ -0,0 +1,175 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + +import kamon.Kamon +import kamon.context.Context +import kamon.netty.Clients.withNioClient +import kamon.netty.Servers.withNioServer +import kamon.testkit.{MetricInspection, SpanInspection, TestSpanReporter} +import kamon.trace.Span +import org.scalatest._ +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.SpanSugar._ +import org.scalatest.wordspec.AnyWordSpec + +class NettyHTTPTracingSpec extends AnyWordSpec + with Matchers + with MetricInspection.Syntax + with Eventually + with SpanInspection + with TestSpanReporter + with OptionValues { + + "The Netty HTTP span propagation" should { + "propagate the span from the client to the server" in { + withNioServer(9001) { port => + withNioClient(port) { httpClient => + val clientSpan = Kamon.spanBuilder("test-span").start() + Kamon.runWithContext(Context.of(Span.Key, clientSpan)) { + val httpGet = httpClient.get(s"http://localhost:$port/route?param=123") + httpClient.execute(httpGet) + + eventually(timeout(5 seconds)) { + val serverSpan = testSpanReporter().nextSpan().value + val clientSpan = testSpanReporter.nextSpan().value + + serverSpan.operationName shouldBe "route.get" + serverSpan.kind shouldBe Span.Kind.Server + + clientSpan.operationName shouldBe "/route" + clientSpan.kind shouldBe Span.Kind.Client + } + } + } + } + } + + "contain a span error when an internal server error(500) occurs" in { + withNioServer(9002) { port => + withNioClient(port) { httpClient => + val clientSpan = Kamon.spanBuilder("test-span-with-error").start() + Kamon.runWithContext(Context.of(Span.Key, clientSpan)) { + val httpGet = httpClient.get(s"http://localhost:$port/error") + httpClient.execute(httpGet) + + + eventually(timeout(5 seconds)) { + val serverSpan = testSpanReporter().nextSpan().value + val clientSpan = testSpanReporter.nextSpan().value + + serverSpan.operationName shouldBe "error.get" + serverSpan.kind shouldBe Span.Kind.Server + + clientSpan.operationName shouldBe "/error" + clientSpan.kind shouldBe Span.Kind.Client + } + } + } + } + } + + "propagate the span from the client to the server with chunk-encoded request" in { + withNioServer(9003) { port => + withNioClient(port) { httpClient => + val clientSpan = Kamon.spanBuilder("client-chunk-span").start() + Kamon.runWithContext(Context.of(Span.Key, clientSpan)) { + val (httpPost, chunks) = httpClient.postWithChunks(s"http://localhost:$port/fetch-in-chunks-request", "test 1", "test 2") + httpClient.executeWithContent(httpPost, chunks) + + eventually(timeout(5 seconds)) { + val serverSpan = testSpanReporter().nextSpan().value + val clientSpan = testSpanReporter.nextSpan().value + + serverSpan.operationName shouldBe "fetch-in-chunks-request.post" + serverSpan.kind shouldBe Span.Kind.Server + + clientSpan.operationName shouldBe s"/fetch-in-chunks-request" + clientSpan.kind shouldBe Span.Kind.Client + } + } + } + } + } + + "propagate the span from the client to the server with chunk-encoded response" in { + withNioServer(9004) { port => + withNioClient(port) { httpClient => + val clientSpan = Kamon.spanBuilder("client-chunk-span").start() + Kamon.runWithContext(Context.of(Span.Key, clientSpan)) { + val (httpPost, chunks) = httpClient.postWithChunks(s"http://localhost:$port/fetch-in-chunks-response", "test 1", "test 2") + httpClient.executeWithContent(httpPost, chunks) + + eventually(timeout(5 seconds)) { + val serverSpan = testSpanReporter().nextSpan().value + val clientSpan = testSpanReporter.nextSpan().value + + serverSpan.operationName shouldBe "fetch-in-chunks-response.post" + serverSpan.kind shouldBe Span.Kind.Server + + clientSpan.operationName shouldBe s"/fetch-in-chunks-response" + clientSpan.kind shouldBe Span.Kind.Client + } + } + } + } + } + + "create a new span when it's coming a request without one" in { + withNioServer(9005) { port => + withNioClient(port) { httpClient => + val httpGet = httpClient.get(s"http://localhost:$port/route?param=123") + httpClient.execute(httpGet) + + eventually(timeout(5 seconds)) { + val serverSpan = testSpanReporter().nextSpan().value + + serverSpan.operationName shouldBe "route.get" + serverSpan.kind shouldBe Span.Kind.Server + } + } + } + } + + "create a new span for each request" in { + withNioServer(9006) { port => + withNioClient(port) { httpClient => + val clientSpan = Kamon.spanBuilder("test-span").start() + Kamon.runWithContext(Context.of(Span.Key, clientSpan)) { + httpClient.execute(httpClient.get(s"http://localhost:$port/route?param=123")) + httpClient.execute(httpClient.get(s"http://localhost:$port/route?param=123")) + + eventually(timeout(5 seconds)) { + val serverSpan = testSpanReporter().nextSpan().value + val clientSpan = testSpanReporter.nextSpan().value + + serverSpan.operationName shouldBe "route.get" + serverSpan.kind shouldBe Span.Kind.Server + + clientSpan.operationName shouldBe "/route" + clientSpan.kind shouldBe Span.Kind.Client + + serverSpan.trace.id shouldBe clientSpan.trace.id + + } + } + } + } + } + } +} diff --git a/instrumentation/kamon-netty/src/test/scala/kamon/netty/NettyMetricsSpec.scala b/instrumentation/kamon-netty/src/test/scala/kamon/netty/NettyMetricsSpec.scala new file mode 100644 index 000000000..93760721e --- /dev/null +++ b/instrumentation/kamon-netty/src/test/scala/kamon/netty/NettyMetricsSpec.scala @@ -0,0 +1,98 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + +import kamon.netty.Clients.withNioClient +import kamon.netty.Metrics.{registeredChannelsMetric, taskProcessingTimeMetric, taskQueueSizeMetric, taskWaitingTimeMetric} +import kamon.netty.Servers.{withEpollServer, withNioServer} +import kamon.testkit.{InstrumentInspection, MetricInspection} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class NettyMetricsSpec extends AnyWordSpec with Matchers with MetricInspection.Syntax with InstrumentInspection.Syntax { + + "The NettyMetrics" should { + + "track the NioEventLoop in boss-group and worker-group" in { + withNioServer() { port => + withNioClient(port) { httpClient => + val httpGet = httpClient.get(s"http://localhost:$port/route?param=123") + httpClient.execute(httpGet) + + registeredChannelsMetric.tagValues("name") should contain atLeastOneOf("boss-group-nio-event-loop", "worker-group-nio-event-loop") + taskProcessingTimeMetric.tagValues("name") should contain atLeastOneOf("boss-group-nio-event-loop", "worker-group-nio-event-loop") + taskQueueSizeMetric.tagValues("name") should contain atLeastOneOf("boss-group-nio-event-loop", "worker-group-nio-event-loop") + taskWaitingTimeMetric.tagValues("name") should contain atLeastOneOf("boss-group-nio-event-loop", "worker-group-nio-event-loop") + } + } + } + + "track the EpollEventLoop in boss-group and worker-group" in { + withEpollServer() { port => + withNioClient(port) { httpClient => + val httpGet = httpClient.get(s"http://localhost:$port/route?param=123") + httpClient.execute(httpGet) + + registeredChannelsMetric.tagValues("name") should contain atLeastOneOf("boss-group-epoll-event-loop", "worker-group-epoll-event-loop") + taskProcessingTimeMetric.tagValues("name") should contain atLeastOneOf("boss-group-epoll-event-loop", "worker-group-epoll-event-loop") + taskQueueSizeMetric.tagValues("name") should contain atLeastOneOf("boss-group-epoll-event-loop", "worker-group-epoll-event-loop") + taskWaitingTimeMetric.tagValues("name") should contain atLeastOneOf("boss-group-epoll-event-loop", "worker-group-epoll-event-loop") + } + } + } + + "track the registered channels, task processing time and task queue size for NioEventLoop" in { + withNioServer() { port => + withNioClient(port) { httpClient => + val httpGet = httpClient.get(s"http://localhost:$port/route?param=123") + val response = httpClient.execute(httpGet) + response.status.code() should be(200) + + registeredChannelsMetric.tagValues("name") should contain("boss-group-nio-event-loop") + + val metrics = Metrics.forEventLoop("boss-group-nio-event-loop") + + metrics.registeredChannels.distribution().max should be > 0L + metrics.taskProcessingTime.distribution().max should be > 0L + metrics.taskQueueSize.distribution().max should be > 0L + metrics.taskWaitingTime.distribution().max should be > 0L + } + } + } + + "track the registered channels, task processing time and task queue size for EpollEventLoop" in { + withEpollServer() { port => + withNioClient(port) { httpClient => + val httpGet = httpClient.get(s"http://localhost:$port/route?param=123") + val response = httpClient.execute(httpGet) + response.status.code() should be(200) + + registeredChannelsMetric.tagValues("name") should contain("boss-group-epoll-event-loop") + + val metrics = Metrics.forEventLoop("boss-group-epoll-event-loop") + + metrics.registeredChannels.distribution().max should be >= 0L + metrics.taskProcessingTime.distribution().max should be > 0L + metrics.taskQueueSize.distribution().max should be >= 0L + metrics.taskWaitingTime.distribution().max should be > 0L + } + } + } + } +} + + diff --git a/instrumentation/kamon-netty/src/test/scala/kamon/netty/Servers.scala b/instrumentation/kamon-netty/src/test/scala/kamon/netty/Servers.scala new file mode 100644 index 000000000..331320433 --- /dev/null +++ b/instrumentation/kamon-netty/src/test/scala/kamon/netty/Servers.scala @@ -0,0 +1,146 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.netty + +import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.{ByteBuf, Unpooled} +import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel} +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.channel.{ChannelFutureListener, _} +import io.netty.handler.codec.http._ +import io.netty.handler.stream.ChunkedWriteHandler +import io.netty.util.CharsetUtil + + +class NioEventLoopBasedServer(port: Int) { + val bossGroup = new NioEventLoopGroup(1) + val workerGroup = new NioEventLoopGroup + val b = new ServerBootstrap + + b.group(bossGroup, workerGroup) + .channel(classOf[NioServerSocketChannel]) + .childHandler(new HttpServerInitializer) + + val channel: Channel = b.bind(port).sync.channel + + def close(): Unit = { + channel.close + bossGroup.shutdownGracefully() + workerGroup.shutdownGracefully() + } +} + +class EpollEventLoopBasedServer(port: Int) { + val bossGroup = new EpollEventLoopGroup(1) + val workerGroup = new EpollEventLoopGroup + val b = new ServerBootstrap + + b.group(bossGroup, workerGroup) + .channel(classOf[EpollServerSocketChannel]) + .childHandler(new HttpServerInitializer) + + val channel: Channel = b.bind(port).sync.channel + + def close(): Unit = { + channel.close + bossGroup.shutdownGracefully() + workerGroup.shutdownGracefully() + } +} + +object Servers { + def withNioServer[A](port:Int = 9001)(thunk: Int => A): A = { + val server = new NioEventLoopBasedServer(port) + try thunk(port) finally server.close() + } + + def withEpollServer[A](port:Int = 9001)(thunk: Int => A): A = { + val server = new EpollEventLoopBasedServer(port) + try thunk(port) finally server.close() + } +} + +private class HttpServerInitializer extends ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel): Unit = { + val p = ch.pipeline + p.addLast(new HttpRequestDecoder(4096, 8192, 8192)) + p.addLast(new HttpResponseEncoder()) + p.addLast(new ChunkedWriteHandler) + p.addLast(new HttpServerHandler) + } +} + +private class HttpServerHandler extends ChannelInboundHandlerAdapter { + private val ContentOk = Array[Byte]('H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd') + private val ContentError = Array[Byte]('E', 'r', 'r', 'o', 'r') + + override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { + if (msg.isInstanceOf[HttpRequest]) { + val request = msg.asInstanceOf[HttpRequest] + + val isKeepAlive = HttpUtil.isKeepAlive(request) + + if (request.uri().contains("/error")) { + val response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(ContentError)) + response.headers.set("Content-Type", "text/plain") + response.headers.set("Content-Length", response.content.readableBytes) + val channelFuture = ctx.write(response) + addCloseListener(isKeepAlive)(channelFuture) + } else if (request.uri().contains("/fetch-in-chunks")) { + val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) + HttpUtil.setTransferEncodingChunked(response, true) + response.headers.set("Content-Type", "text/plain") + + + ctx.write(response) + .addListener((cf: ChannelFuture) => + writeChunk(cf.channel()).addListener((cf: ChannelFuture) => + writeChunk(cf.channel()).addListener((cf: ChannelFuture) => + writeChunk(cf.channel()).addListener((cf: ChannelFuture) => + (writeLastContent _).andThen(addCloseListener(isKeepAlive))(cf.channel()))))) + } else { + val response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(ContentOk)) + response.headers.set("Content-Type", "text/plain") + response.headers.set("Content-Length", response.content.readableBytes) + val channelFuture = ctx.write(response) + addCloseListener(isKeepAlive)(channelFuture) + } + + } + + } + + override def channelReadComplete(ctx: ChannelHandlerContext): Unit = + ctx.flush() + + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = + ctx.close() + + private def writeChunk(channel: Channel, content: ByteBuf = Unpooled.wrappedBuffer(ContentOk)): ChannelFuture = { + channel.writeAndFlush(new DefaultHttpContent(Unpooled.copiedBuffer("chunkkkkkkkkkkkkk", CharsetUtil.UTF_8))) + } + + private def writeLastContent(channel: Channel): ChannelFuture = { + channel.writeAndFlush(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER)) + } + + private def addCloseListener(isKeepAlive: Boolean)(f: ChannelFuture): Unit = { + if (!isKeepAlive) f.addListener(ChannelFutureListener.CLOSE) + } +}