Skip to content

Commit

Permalink
Merge pull request #961 from jatcwang/master
Browse files Browse the repository at this point in the history
Add Storage.CrossThreadLocal
  • Loading branch information
SimunKaracic authored Mar 24, 2021
2 parents 0e84df5 + a803097 commit ce27fb5
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package kamon.bench

import java.util.concurrent.TimeUnit
import kamon.context.Storage.CrossThreadLocal

import kamon.context.Storage.Scope
import java.util.concurrent.TimeUnit
import kamon.context.{Context, Storage}
import org.openjdk.jmh.annotations._

Expand All @@ -28,19 +28,19 @@ class ThreadLocalStorageBenchmark {
val TestKey: Context.Key[Int] = Context.key("test-key", 0)
val ContextWithKey: Context = Context.of(TestKey, 43)

val TLS: Storage = new OldThreadLocal
val CrossTLS: Storage = new CrossThreadLocal
val FTLS: Storage = new Storage.ThreadLocal


@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Fork
def currentThreadLocal: Context = {
val scope = TLS.store(ContextWithKey)
TLS.current()
def crossThreadLocal: Context = {
val scope = CrossTLS.store(ContextWithKey)
CrossTLS.current()
scope.close()
TLS.current()
CrossTLS.current()
}

@Benchmark
Expand All @@ -54,28 +54,3 @@ class ThreadLocalStorageBenchmark {
FTLS.current()
}
}


class OldThreadLocal extends Storage {
private val tls = new java.lang.ThreadLocal[Context]() {
override def initialValue(): Context = Context.Empty
}

override def current(): Context =
tls.get()

override def store(context: Context): Scope = {
val newContext = context
val previousContext = tls.get()
tls.set(newContext)

new Scope {
override def context: Context = newContext
override def close(): Unit = tls.set(previousContext)
}
}
}

object OldThreadLocal {
def apply(): OldThreadLocal = new OldThreadLocal()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
package kamon.context


import org.scalatest.{Matchers, WordSpec}
import kamon.context.Storage.Scope
import org.scalatest.{WordSpec, BeforeAndAfterAll, AsyncWordSpec, Matchers, Assertion}

class ThreadLocalStorageSpec extends WordSpec with Matchers {
import java.util.concurrent.Executors
import scala.concurrent.{Promise, ExecutionContext, Future}
import scala.util.Try
import org.scalatest.concurrent.ScalaFutures._

class ThreadLocalStorageSpec extends WordSpec with Matchers with BeforeAndAfterAll {

private val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))

"the Storage.ThreadLocal implementation of Context storage" should {
"return a empty context when no context has been set" in {
Expand Down Expand Up @@ -47,9 +55,64 @@ class ThreadLocalStorageSpec extends WordSpec with Matchers {

}

val TLS: Storage = new Storage.ThreadLocal
"the Storage.CrossThreadLocal implementation of Context storage" should {
"return a empty context when no context has been set" in {
CrossTLS.current() shouldBe Context.Empty
}

"return the empty value for keys that have not been set in the context" in {
CrossTLS.current().get(TestKey) shouldBe 42
CrossTLS.current().get(AnotherKey) shouldBe 99
CrossTLS.current().get(BroadcastKey) shouldBe "i travel around"

ScopeWithKey.get(TestKey) shouldBe 43
ScopeWithKey.get(AnotherKey) shouldBe 99
ScopeWithKey.get(BroadcastKey) shouldBe "i travel around"
}

"allow setting a context as current and remove it when closing the Scope" in {
CrossTLS.current() shouldBe Context.Empty

val scope = CrossTLS.store(ScopeWithKey)
CrossTLS.current() shouldBe theSameInstanceAs(ScopeWithKey)
scope.close()

CrossTLS.current() shouldBe Context.Empty
}

"Allow closing the scope in a different thread than the original" in {
var scope: Scope = null

val f1 = Future {
// previous context
CrossTLS.store(ContextWithAnotherKey)
scope = CrossTLS.store(ScopeWithKey)
Thread.sleep(10)
CrossTLS.current() shouldBe theSameInstanceAs(ScopeWithKey)
}(ec)

val f2 = Future {
while (scope == null) {} // wait for scope to be created in the other thread
CrossTLS.current() shouldBe Context.Empty
scope.close()
CrossTLS.current() shouldBe theSameInstanceAs(ContextWithAnotherKey)
}(ec)

f1.flatMap(_ => f2)(ec).futureValue
}

}

override protected def afterAll(): Unit = {
ec.shutdown()
super.afterAll()
}

val TLS: Storage = Storage.ThreadLocal()
val CrossTLS: Storage = Storage.CrossThreadLocal()
val TestKey = Context.key("test-key", 42)
val AnotherKey = Context.key("another-key", 99)
val BroadcastKey = Context.key("broadcast", "i travel around")
val ScopeWithKey = Context.of(TestKey, 43)
val ContextWithAnotherKey = Context.of(AnotherKey, 98)
}
15 changes: 12 additions & 3 deletions core/kamon-core/src/main/scala/kamon/ContextStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,18 @@ object ContextStorage {
* instrumentation follows them around.
*/
private val _contextStorage: Storage = {
if(sys.props("kamon.context.debug") == "true")
val storageTypeStr = Option(sys.props("kamon.context.storageType"))

if (sys.props("kamon.context.debug") == "true")
Storage.Debug()
else
Storage.ThreadLocal()
else {
storageTypeStr match {
case None => Storage.CrossThreadLocal()
case Some("debug") => Storage.Debug()
case Some("sameThreadScope") => Storage.ThreadLocal()
case Some("default") => Storage.CrossThreadLocal()
case Some(other) => throw new IllegalArgumentException(s"Unrecognized kamon.context.storageType value: $other")
}
}
}
}
39 changes: 36 additions & 3 deletions core/kamon-core/src/main/scala/kamon/context/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,48 @@ object Storage {
def close(): Unit
}

/**
* A ThreadLocal context storage that allows the scope to be closed in a different
* thread than the thread where store(..) was called.
* This is roughly 25% slower than [[kamon.context.Storage.ThreadLocal]] but is required for certain
* library integrations such as cats-effect IO or Monix.
* Turn this on by setting the System Property "kamon.context.crossThread" to "true".
*/
class CrossThreadLocal extends Storage {
private val tls = new java.lang.ThreadLocal[Context]() {
override def initialValue(): Context = Context.Empty
}

override def current(): Context =
tls.get()

override def store(newContext: Context): Scope = {
val previousContext = tls.get()
tls.set(newContext)

new Scope {
override def context: Context = newContext
override def close(): Unit = tls.set(previousContext)
}
}
}

object CrossThreadLocal {
def apply(): Storage.CrossThreadLocal =
new Storage.CrossThreadLocal()
}

/**
* Wrapper that implements an optimized ThreadLocal access pattern ideal for heavily used ThreadLocals. It is faster
* to use a mutable holder object and always perform ThreadLocal.get() and never use ThreadLocal.set(), because the
* value is more likely to be found in the ThreadLocalMap direct hash slot and avoid the slow path of
* ThreadLocalMap.getEntryAfterMiss().
* WARNING: Closing of the returned Scope **MUST** be called in the same thread as store(..) was originally called.
*
* Credit to @trask from the FastThreadLocal in glowroot. One small change is that we don't use an kamon-defined
* holder object as that would prevent class unloading.
*/
// Named ThreadLocal for binary compatibility reasons, despite the fact that this isn't the default storage type
class ThreadLocal extends Storage {
private val tls = new java.lang.ThreadLocal[Array[AnyRef]]() {
override def initialValue(): Array[AnyRef] =
Expand Down Expand Up @@ -103,7 +136,6 @@ object Storage {
* This implementation is considerably less efficient than the default implementation since it is taking at least two
* different stack traces for every store/close operation pair. Do not use this for any reason other than debugging
* Context propagation issues (like, dirty Threads) in a controlled environment.
*
*/
class Debug extends Storage {
import Debug._
Expand Down Expand Up @@ -133,8 +165,9 @@ object Storage {
newContext

override def close(): Unit = {
ref.set(0, previousContext)
ref.set(2, stackTraceString())
val thisRef = _tls.get()
thisRef.set(0, previousContext)
thisRef.set(2, stackTraceString())
}
}
}
Expand Down

0 comments on commit ce27fb5

Please sign in to comment.