Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for scala 2.13 async operations #109

Merged
merged 1 commit into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions instrumentation/scala-2.13.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apply plugin: 'scala'

dependencies {
implementation(project(":newrelic-api"))
implementation(project(":agent-bridge"))
implementation(project(":newrelic-weaver-api"))
implementation("org.scala-lang:scala-library:2.13.3")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.scala-2.13.0',
'Implementation-Title-Alias': 'scala_instrumentation' }
}

verifyInstrumentation {
passes 'org.scala-lang:scala-library:[2.13.0,)'
fails 'org.scala-lang:scala-library:(,2.13.0-M4]'
}

test {
onlyIf {
!project.hasProperty('test7')
}
}

site {
title 'Scala'
type 'Other'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.scala;

import com.newrelic.agent.bridge.AgentBridge;

import java.util.regex.Pattern;

public class ScalaUtils {

public static final boolean scalaFuturesAsSegments = AgentBridge.getAgent().getConfig().getValue("scala_futures_as_segments.enabled", false);

/**
* Strip out compiler generated names:
* <ul>
* <li>functions named $anonfun</li>
* <li>compiler generated classes like $1</li>
* <li>Trailing $ chars</li>
* </ul>
*/
private static final Pattern compilerNames = Pattern.compile("(\\$anonfun|\\$[0-9]+)");

/**
* Replace consecutive $ chars with a single $
*/
private static final Pattern singleDollar = Pattern.compile("\\$\\$+");

/**
* Remove trailing $ char
*/
private static final Pattern trailingDollar = Pattern.compile("(\\$+$)");

/**
* Remove any Lambda generated names
*/
private static final Pattern lambdaNames = Pattern.compile("(\\$Lambda.*$)");

/**
* Provides a metric name for compiled scala class name.
*/
public static String nameScalaFunction(String scalaFunctionName) {
String metricName = scalaFunctionName;
metricName = compilerNames.matcher(metricName).replaceAll("");
metricName = singleDollar.matcher(metricName).replaceAll("\\$");
metricName = trailingDollar.matcher(metricName).replaceAll("");
metricName = lambdaNames.matcher(metricName).replaceAll("");
return metricName;
}

private ScalaUtils() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package scala.concurrent.impl;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.Transaction;
import com.newrelic.api.agent.Segment;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.agent.instrumentation.scala.ScalaUtils;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.util.Try;

import java.util.concurrent.atomic.AtomicInteger;

import static com.nr.agent.instrumentation.scala.ScalaUtils.scalaFuturesAsSegments;

@Weave(originalName = "scala.concurrent.impl.Promise$Transformation")
public class Transformation_Instrumentation<T> {
private Try<T> _arg = Weaver.callOriginal();
@NewField
private AgentBridge.TokenAndRefCount tokenAndRefCount;

/**
* Override the submitWithValue so we can replace it with our wrapped version.
* It plays similar role to the 2.12 setter for "value" on the old CallbackRunnable
*/
public final Promise.Transformation submitWithValue(final Try<T> resolved) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong, but shouldn't we have this annotation here?

@Trace(async = true, excludeFromTransactionTrace = true)

This way the segments are reported and it allows to align the ProducerConsumerWithFutures.testProducerConsumer test with how it looks for 2.12.
Currently it has zero segments reported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, the annotation with async=true should only be necessary when calling Token::link. For the 2.12 instrumentation we had to create a second token in the CallbackRunnable constructor to fix an issue Akka-Http. It's not clear to me the Java/scala.concurrent.impl.CallbackRunnable/value_$eq segments are valuable and ideally they wouldn't be included in traces either, but I'd be interested to hear if you find them useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe they are useful. Until I worked on the scala instrumentation I had no idea that method even existed. I think most scala users would actually find those segments confusing because they are completely unaware that futures work like that under the hood.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here. If for practical purposes it's better to leave these segments out - I'm totally fine with it. Just pointed out the difference in behavior here.

AgentBridge.TokenAndRefCount tokenAndRefCount = AgentBridge.activeToken.get();
if (tokenAndRefCount == null) {
Transaction tx = AgentBridge.getAgent().getTransaction(false);
if (tx != null) {
this.tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(),
AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1));
}
} else {
this.tokenAndRefCount = tokenAndRefCount;
this.tokenAndRefCount.refCount.incrementAndGet();
}
return Weaver.callOriginal();
}

@Trace(excludeFromTransactionTrace = true)
public void run() {
Segment segment = null;
boolean remove = false;

if (tokenAndRefCount != null) {
// If we are here and there is no activeToken in progress we are the first one so we set this boolean in
// order to correctly remove the "activeToken" from the thread local after the original run() method executes
remove = AgentBridge.activeToken.get() == null;
AgentBridge.activeToken.set(tokenAndRefCount);
if (scalaFuturesAsSegments && remove) {
Transaction tx = AgentBridge.getAgent().getTransaction(false);
if (tx != null) {
segment = tx.startSegment("Scala", "Callback");
segment.setMetricName("Scala", "Callback",
ScalaUtils.nameScalaFunction(_arg.getClass().getName()));
}
}
}

try {
Weaver.callOriginal();
} finally {
if (tokenAndRefCount != null) {
if (segment != null) {
segment.end();
}

if (remove) {
AgentBridge.activeToken.remove();
}

if (tokenAndRefCount.refCount.decrementAndGet() == 0) {
tokenAndRefCount.token.expire();
tokenAndRefCount.token = null;
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.scala;

import org.junit.Assert;
import org.junit.Test;

public class ScalaUtilsTest {
@Test
public void testNaming() {
// remove scalac generated anonfun and numbered anonymous classes.
Assert.assertEquals("scala.concurrent.Future$flatMap", ScalaUtils.nameScalaFunction(
"scala.concurrent.Future$$anonfun$flatMap$1"));
Assert.assertEquals("scala.concurrent.Future$flatMap", ScalaUtils.nameScalaFunction(
"scala.concurrent.Future$$anonfun$flatMap$98"));
Assert.assertEquals("scala.concurrent.Future$flatMap", ScalaUtils.nameScalaFunction(
"scala.concurrent.Future$$anonfun$flatMap$98"));
// remove Lambda generated names
Assert.assertEquals("scala.concurrent.Future$flatMap", ScalaUtils.nameScalaFunction(
"scala.concurrent.Future$flatMap$Lambda$success"));
Assert.assertEquals("scala.concurrent.Future$flatMap", ScalaUtils.nameScalaFunction(
"scala.concurrent.Future$flatMap$Lambda12345$/108380"));

// remove trailing $
Assert.assertEquals("dollar$dollar", ScalaUtils.nameScalaFunction("dollar$$$$$$$$$dollar$"));
Assert.assertEquals("dollar$dollar", ScalaUtils.nameScalaFunction("dollar$$$$$$$$$dollar$$$$"));
Assert.assertEquals("dollar$dollar", ScalaUtils.nameScalaFunction("dollar$$$$$$$$$dollar$$1"));
// remove multiple $
Assert.assertEquals("dollar$dollar", ScalaUtils.nameScalaFunction("dollar$$$$$$$$$dollar"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.scala

import java.util

import com.newrelic.agent.bridge.AgentBridge
import com.newrelic.agent.introspec._
import com.newrelic.api.agent.Trace
import org.junit.runner.RunWith
import org.junit.{Assert, Test}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}

@RunWith(classOf[InstrumentationTestRunner])
@InstrumentationTestConfig(includePrefixes = Array("scala.concurrent.impl."))
class ProducerConsumerWithFutures {
def produceSomething(f: Future[Int]): Int = {
val r = scala.util.Random
var result = r.nextInt(100)
println("Producer produced " + result + " to be sent to " + f)
result
}

def startDoingSomething(): Unit = {
println("Consumer sleeping for 100ms")
Thread.sleep(scala.util.Random.nextInt(100))
}

@Trace(dispatcher = true)
def producerConsumerTx() {
for (i <- 1 to 5) {
val p = Promise[Int]
val f = p.future
val producer = Future {
val res = produceSomething(f)
p success res
println("Producer sleeping for 100ms")
Thread.sleep(scala.util.Random.nextInt(200))
}
val consumer = Future {
startDoingSomething()
f.onComplete(res => println("Consumer " + f + " consumed " + res))
}
}
}

@Test
def testProducerConsumer(): Unit = {
producerConsumerTx()
Thread.sleep(1000)
val introspector: Introspector = InstrumentationTestRunner.getIntrospector
Assert.assertEquals(1, introspector.getFinishedTransactionCount(30000))

var asyncTransaction: String = null
println("TransactionNames: " + introspector.getTransactionNames)
val it: java.util.Iterator[String] = introspector.getTransactionNames.iterator()
while (it.hasNext) {
val txName: String = it.next()
println("txName: " + txName)
if (txName.matches(".*ProducerConsumerWithFutures.*")) {
asyncTransaction = txName
}
}
Assert.assertNotNull("Unable to find test transaction", asyncTransaction)
val names: util.Collection[String] = introspector.getTransactionNames
val txName: String = new String("OtherTransaction/Custom/" + this.getClass.getName + "/producerConsumerTx")
Assert.assertTrue("Unable to find transaction " + txName, names.contains(txName))

val transactionTraces: util.Collection[TransactionTrace] = introspector.getTransactionTracesForTransaction(txName)
val ttIter: java.util.Iterator[TransactionTrace] = transactionTraces.iterator()
while (ttIter.hasNext) {
val txTrace: TransactionTrace = ttIter.next()
println("Tracer name: " + txTrace.getInitialTraceSegment.getName)
val traceSegIter: java.util.Iterator[TraceSegment] = txTrace.getInitialTraceSegment.getChildren.iterator()
while (traceSegIter.hasNext) {
System.out.println("trace segment name: " + traceSegIter.next().getName)
}
TracerSegmentUtils.checkChildrenTraces(txTrace.getInitialTraceSegment.getChildren,
List(),
List(),
List())
}
}
}
Loading