Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Akka, pekko, and pekko-http to support scala 3 #1311

Merged
merged 18 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(crossScalaVersions += `scala_3_version`)
.dependsOn(
`kamon-scala-future` % "compile,common,akka-2.5,akka-2.6",
`kamon-testkit` % "test,test-common,test-akka-2.5,test-akka-2.6"
Expand All @@ -464,31 +465,45 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))

def akkaHttpVersion(scalaVersion: String) = scalaVersion match {
case "2.11" => "10.1.12"
case "3" => "10.5.0"
case _ => "10.2.8"
}
def akkaStreamVersion(scalaVersion: String) = scalaVersion match {
case "3" => "2.7.0"
case _ => "2.5.32"
}

def versionedScalaSourceDirectories(sourceDir: File, scalaVersion: String): List[File] =
scalaVersion match {
case "3" => List(sourceDir / "scala-2.13+")
case "2.13" => List(sourceDir / "scala-2.13+")
case _ => Nil
}

lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
Compile / unmanagedSourceDirectories ++= versionedScalaSourceDirectories((Compile / sourceDirectory).value, scalaBinaryVersion.value),
resolvers += Resolver.bintrayRepo("hseeberger", "maven"),
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-stream" % "2.5.32" % "provided",
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion(scalaBinaryVersion.value) % "provided",

scalatest % "test",
slf4jApi % "test",
slf4jnop % "test",
okHttp % "test",
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion(scalaBinaryVersion.value) % "test",
"de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test",
"org.json4s" %% "json4s-native" % "3.6.7" % "test",
),
)).dependsOn(`kamon-akka`, `kamon-testkit` % "test")
"de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test" cross CrossVersion.for3Use2_13 intransitive(),
"org.json4s" %% "json4s-native" % "4.0.6" % "test",
)))
.settings(crossScalaVersions += `scala_3_version`)
.dependsOn(`kamon-akka`, `kamon-testkit` % "test")



Expand All @@ -497,7 +512,10 @@ lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoHttpVersion % "provided"
)
))
.dependsOn(
`kamon-scala-future` % "compile",
Expand All @@ -511,8 +529,7 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched}
import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet}
import akka.http.scaladsl.server.directives.RouteDirectives.reject
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.util.Tupler
import akka.http.scaladsl.server.util.{Tuple, Tupler}
import akka.http.scaladsl.util.FastFuture
import kamon.Kamon
import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext
Expand All @@ -27,6 +27,7 @@ import akka.stream.scaladsl.Flow
import kamon.context.Context
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic

import scala.annotation.static
import scala.collection.immutable


Expand All @@ -53,15 +54,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
.intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
* of variables) and take a Sampling Decision in case none has been taken so far.
*/
onType("akka.http.scaladsl.server.RequestContextImpl")
.mixin(classOf[HasMatchingContext.Mixin])
.intercept(method("copy"), RequestContextCopyInterceptor)
.intercept(method("copy"), classOf[RequestContextCopyInterceptor])

onType("akka.http.scaladsl.server.directives.PathDirectives")
.intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor])
Expand Down Expand Up @@ -263,10 +264,11 @@ object LastAutomaticOperationNameEdit {
new LastAutomaticOperationNameEdit(operationName, allowAutomaticChanges)
}

class RequestContextCopyInterceptor
object RequestContextCopyInterceptor {

@RuntimeType
def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = {
@static def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = {
val copiedRequestContext = copyCall.call()
copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext(context.asInstanceOf[HasMatchingContext].matchingContext)
copiedRequestContext
Expand All @@ -277,8 +279,8 @@ class PathDirectivesRawPathPrefixInterceptor
object PathDirectivesRawPathPrefixInterceptor {
import BasicDirectives._

def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = {
implicit val LIsTuple = matcher.ev
@static def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = {
implicit val LIsTuple: Tuple[T] = matcher.ev

extract { ctx =>
val fullPath = ctx.unmatchedPath.toString()
Expand All @@ -294,7 +296,7 @@ object PathDirectivesRawPathPrefixInterceptor {
(ctx, matching)
} flatMap {
case (ctx, Matched(rest, values)) =>
tprovide(values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult =>
tprovide[T](values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult =>

if(routeResult.isInstanceOf[Rejected])
ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext()
Expand All @@ -307,6 +309,7 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}

class Http2BlueprintInterceptor
object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
Expand All @@ -316,7 +319,7 @@ object Http2BlueprintInterceptor {
}

@RuntimeType
def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
@static def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.OptionValues

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

class AkkaHttpClientTracingSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with MetricInspection.Syntax
with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter {

import TestWebServer.Endpoints._

implicit private val system = ActorSystem("http-client-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

val timeoutTest: FiniteDuration = 5 second
val interface = "127.0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.OptionValues

import scala.concurrent.Future
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration._

class AkkaHttpServerMetricsSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with InstrumentInspection.Syntax
with Reconfigure with TestWebServer with Eventually with OptionValues {

import TestWebServer.Endpoints._

implicit private val system = ActorSystem("http-server-metrics-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-server-metrics-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

val port = 8083
val interface = "127.0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ import java.util.UUID
import javax.net.ssl.{HostnameVerifier, SSLSession}
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutor
import scala.util.Try

class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with ScalaFutures with Inside with InitAndStopKamonAfterAll
with MetricInspection.Syntax with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter {

import TestWebServer.Endpoints._

implicit private val system = ActorSystem("http-server-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-server-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

val (sslSocketFactory, trustManager) = clientSSL()
val okHttp = new OkHttpClient.Builder()
Expand Down Expand Up @@ -228,7 +230,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala

"correctly time entity transfer timings" in {
val target = s"$protocol://$interface:$port/$stream"
client.newCall(new Request.Builder().url(target).build()).execute()
def probablyScala3 = util.Properties.releaseVersion.contains("2.13.10")

def makeCall = client.newCall(new Request.Builder().url(target).build()).execute()
// akka 2.7.0 is flaky on this
if (probablyScala3) Try(makeCall).orElse(Try(makeCall))
else makeCall

val span = eventually(timeout(10 seconds)) {
val span = testSpanReporter().nextSpan().value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContextExecutor

class ServerFlowWrapperSpec extends AnyWordSpecLike with Matchers with ScalaFutures with InitAndStopKamonAfterAll {

implicit private val system = ActorSystem("http-client-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()
implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec")
implicit private val executor: ExecutionContextExecutor = system.dispatcher
implicit private val materializer: ActorMaterializer = ActorMaterializer()

private val okReturningFlow = Flow[HttpRequest].map { _ =>
HttpResponse(status = StatusCodes.OK, entity = HttpEntity("OK"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ import kamon.instrumentation.akka.http.TracingDirectives
import org.json4s.{DefaultFormats, native}
import kamon.tag.Lookups.plain
import kamon.trace.Trace
import org.json4s.native.Serialization
import scala.concurrent.{ExecutionContext, Future}

trait TestWebServer extends TracingDirectives {
implicit val serialization = native.Serialization
implicit val formats = DefaultFormats
implicit val serialization: Serialization.type = native.Serialization
implicit val formats: DefaultFormats.type = DefaultFormats
import Json4sSupport._

def startServer(interface: String, port: Int, https: Boolean = false)(implicit system: ActorSystem): WebServer = {
import Endpoints._

implicit val ec: ExecutionContext = system.dispatcher
implicit val materializer = ActorMaterializer()
implicit val materializer: ActorMaterializer = ActorMaterializer()

val routes = logRequest("routing-request") {
get {
Expand Down
Loading