Skip to content

Commit

Permalink
Update Akka, pekko, and pekko-http to support scala 3 (#1311)
Browse files Browse the repository at this point in the history
* + kamon-akka: add scala-3.2.0 reloease

* + kamon-pekko: add scala-3.2.0 release

* = kamon-akka: update akka version

* = kamon-pekko: Fix compile-errors in tests

* test failures

* some tests passing

* fix some more tests

* fix more tests

* fix last test

* fix akka 2.5 for now

* add @static annotation to various kamon-akka classes

* split ActorInstrumentation into scala2 and scala 3 variants

* fix some tests

* fix last kamon-akka tests

* oh, right. Figured out why ActorMonitorInstrumentation was throwing in scala 3. Patch it back in.

* rm gratuitous catch cases

* tests passing for scala 3 akka-http

---------

Co-authored-by: tjarko grossmann <[email protected]>
Co-authored-by: Tjarko <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2023
1 parent a8bef44 commit eb3628c
Show file tree
Hide file tree
Showing 64 changed files with 721 additions and 736 deletions.
33 changes: 25 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(crossScalaVersions += `scala_3_version`)
.dependsOn(
`kamon-scala-future` % "compile,common,akka-2.5,akka-2.6",
`kamon-testkit` % "test,test-common,test-akka-2.5,test-akka-2.6"
Expand All @@ -464,31 +465,45 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))

def akkaHttpVersion(scalaVersion: String) = scalaVersion match {
case "2.11" => "10.1.12"
case "3" => "10.5.0"
case _ => "10.2.8"
}
def akkaStreamVersion(scalaVersion: String) = scalaVersion match {
case "3" => "2.7.0"
case _ => "2.5.32"
}

def versionedScalaSourceDirectories(sourceDir: File, scalaVersion: String): List[File] =
scalaVersion match {
case "3" => List(sourceDir / "scala-2.13+")
case "2.13" => List(sourceDir / "scala-2.13+")
case _ => Nil
}

lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
Compile / unmanagedSourceDirectories ++= versionedScalaSourceDirectories((Compile / sourceDirectory).value, scalaBinaryVersion.value),
resolvers += Resolver.bintrayRepo("hseeberger", "maven"),
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-stream" % "2.5.32" % "provided",
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion(scalaBinaryVersion.value) % "provided",

scalatest % "test",
slf4jApi % "test",
slf4jnop % "test",
okHttp % "test",
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion(scalaBinaryVersion.value) % "test",
"de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test",
"org.json4s" %% "json4s-native" % "3.6.7" % "test",
),
)).dependsOn(`kamon-akka`, `kamon-testkit` % "test")
"de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test" cross CrossVersion.for3Use2_13 intransitive(),
"org.json4s" %% "json4s-native" % "4.0.6" % "test",
)))
.settings(crossScalaVersions += `scala_3_version`)
.dependsOn(`kamon-akka`, `kamon-testkit` % "test")



Expand All @@ -497,7 +512,10 @@ lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoHttpVersion % "provided"
)
))
.dependsOn(
`kamon-scala-future` % "compile",
Expand All @@ -511,8 +529,7 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched}
import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet}
import akka.http.scaladsl.server.directives.RouteDirectives.reject
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.util.Tupler
import akka.http.scaladsl.server.util.{Tuple, Tupler}
import akka.http.scaladsl.util.FastFuture
import kamon.Kamon
import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext
Expand All @@ -27,6 +27,7 @@ import akka.stream.scaladsl.Flow
import kamon.context.Context
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic

import scala.annotation.static
import scala.collection.immutable


Expand All @@ -53,15 +54,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
.intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
* of variables) and take a Sampling Decision in case none has been taken so far.
*/
onType("akka.http.scaladsl.server.RequestContextImpl")
.mixin(classOf[HasMatchingContext.Mixin])
.intercept(method("copy"), RequestContextCopyInterceptor)
.intercept(method("copy"), classOf[RequestContextCopyInterceptor])

onType("akka.http.scaladsl.server.directives.PathDirectives")
.intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor])
Expand Down Expand Up @@ -263,10 +264,11 @@ object LastAutomaticOperationNameEdit {
new LastAutomaticOperationNameEdit(operationName, allowAutomaticChanges)
}

class RequestContextCopyInterceptor
object RequestContextCopyInterceptor {

@RuntimeType
def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = {
@static def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = {
val copiedRequestContext = copyCall.call()
copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext(context.asInstanceOf[HasMatchingContext].matchingContext)
copiedRequestContext
Expand All @@ -277,8 +279,8 @@ class PathDirectivesRawPathPrefixInterceptor
object PathDirectivesRawPathPrefixInterceptor {
import BasicDirectives._

def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = {
implicit val LIsTuple = matcher.ev
@static def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = {
implicit val LIsTuple: Tuple[T] = matcher.ev

extract { ctx =>
val fullPath = ctx.unmatchedPath.toString()
Expand All @@ -294,7 +296,7 @@ object PathDirectivesRawPathPrefixInterceptor {
(ctx, matching)
} flatMap {
case (ctx, Matched(rest, values)) =>
tprovide(values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult =>
tprovide[T](values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult =>

if(routeResult.isInstanceOf[Rejected])
ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext()
Expand All @@ -307,6 +309,7 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}

class Http2BlueprintInterceptor
object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
Expand All @@ -316,7 +319,7 @@ object Http2BlueprintInterceptor {
}

@RuntimeType
def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
@static def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.OptionValues

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

class AkkaHttpClientTracingSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with MetricInspection.Syntax
with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter {

import TestWebServer.Endpoints._

implicit private val system = ActorSystem("http-client-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

val timeoutTest: FiniteDuration = 5 second
val interface = "127.0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.OptionValues

import scala.concurrent.Future
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration._

class AkkaHttpServerMetricsSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with InstrumentInspection.Syntax
with Reconfigure with TestWebServer with Eventually with OptionValues {

import TestWebServer.Endpoints._

implicit private val system = ActorSystem("http-server-metrics-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-server-metrics-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

val port = 8083
val interface = "127.0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ import java.util.UUID
import javax.net.ssl.{HostnameVerifier, SSLSession}
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutor
import scala.util.Try

class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with ScalaFutures with Inside with InitAndStopKamonAfterAll
with MetricInspection.Syntax with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter {

import TestWebServer.Endpoints._

implicit private val system = ActorSystem("http-server-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-server-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

val (sslSocketFactory, trustManager) = clientSSL()
val okHttp = new OkHttpClient.Builder()
Expand Down Expand Up @@ -228,7 +230,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"correctly time entity transfer timings" in {
val target = s"$protocol://$interface:$port/$stream"
client.newCall(new Request.Builder().url(target).build()).execute()
def probablyScala3 = util.Properties.releaseVersion.contains("2.13.10")

def makeCall = client.newCall(new Request.Builder().url(target).build()).execute()
// akka 2.7.0 is flaky on this
if (probablyScala3) Try(makeCall).orElse(Try(makeCall))
else makeCall

val span = eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContextExecutor

class ServerFlowWrapperSpec extends AnyWordSpecLike with Matchers with ScalaFutures with InitAndStopKamonAfterAll {

implicit private val system = ActorSystem("http-client-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

private val okReturningFlow = Flow[HttpRequest].map { _ =>
HttpResponse(status = StatusCodes.OK, entity = HttpEntity("OK"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ import kamon.instrumentation.akka.http.TracingDirectives
import org.json4s.{DefaultFormats, native}
import kamon.tag.Lookups.plain
import kamon.trace.Trace
import org.json4s.native.Serialization
import scala.concurrent.{ExecutionContext, Future}

trait TestWebServer extends TracingDirectives {
implicit val serialization = native.Serialization
implicit val formats = DefaultFormats
implicit val serialization: Serialization.type = native.Serialization
implicit val formats: DefaultFormats.type = DefaultFormats
import Json4sSupport._

def startServer(interface: String, port: Int, https: Boolean = false)(implicit system: ActorSystem): WebServer = {
import Endpoints._

implicit val ec: ExecutionContext = system.dispatcher
implicit val materializer = ActorMaterializer()
implicit val materializer: ActorMaterializer = ActorMaterializer()

val routes = logRequest("routing-request") {
get {
Expand Down
Loading

0 comments on commit eb3628c

Please sign in to comment.