From fd4578e5d88a314db3cd9e6d593f13f9be351eca Mon Sep 17 00:00:00 2001 From: Jacob Wang Date: Sat, 20 Mar 2021 13:50:33 +0000 Subject: [PATCH 1/2] Add Storage.CrossThreadLocal This storage implements the "unoptimized" version of thread local storage which allows for the scope to be closed in another thread, as it re-resolves the thread local context in `close` --- .../bench/ThreadLocalStorageBenchmark.scala | 39 ++--------- .../context/ThreadLocalStorageSpec.scala | 69 ++++++++++++++++++- .../src/main/scala/kamon/ContextStorage.scala | 4 +- .../main/scala/kamon/context/Storage.scala | 36 +++++++++- 4 files changed, 111 insertions(+), 37 deletions(-) diff --git a/core/kamon-core-bench/src/main/scala/kamon/bench/ThreadLocalStorageBenchmark.scala b/core/kamon-core-bench/src/main/scala/kamon/bench/ThreadLocalStorageBenchmark.scala index 7dcc543a1..531227cb1 100644 --- a/core/kamon-core-bench/src/main/scala/kamon/bench/ThreadLocalStorageBenchmark.scala +++ b/core/kamon-core-bench/src/main/scala/kamon/bench/ThreadLocalStorageBenchmark.scala @@ -16,9 +16,9 @@ package kamon.bench -import java.util.concurrent.TimeUnit +import kamon.context.Storage.CrossThreadLocal -import kamon.context.Storage.Scope +import java.util.concurrent.TimeUnit import kamon.context.{Context, Storage} import org.openjdk.jmh.annotations._ @@ -28,7 +28,7 @@ class ThreadLocalStorageBenchmark { val TestKey: Context.Key[Int] = Context.key("test-key", 0) val ContextWithKey: Context = Context.of(TestKey, 43) - val TLS: Storage = new OldThreadLocal + val CrossTLS: Storage = new CrossThreadLocal val FTLS: Storage = new Storage.ThreadLocal @@ -36,11 +36,11 @@ class ThreadLocalStorageBenchmark { @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.NANOSECONDS) @Fork - def currentThreadLocal: Context = { - val scope = TLS.store(ContextWithKey) - TLS.current() + def crossThreadLocal: Context = { + val scope = CrossTLS.store(ContextWithKey) + CrossTLS.current() scope.close() - TLS.current() + CrossTLS.current() } @Benchmark @@ -54,28 +54,3 @@ class ThreadLocalStorageBenchmark { FTLS.current() } } - - -class OldThreadLocal extends Storage { - private val tls = new java.lang.ThreadLocal[Context]() { - override def initialValue(): Context = Context.Empty - } - - override def current(): Context = - tls.get() - - override def store(context: Context): Scope = { - val newContext = context - val previousContext = tls.get() - tls.set(newContext) - - new Scope { - override def context: Context = newContext - override def close(): Unit = tls.set(previousContext) - } - } -} - -object OldThreadLocal { - def apply(): OldThreadLocal = new OldThreadLocal() -} \ No newline at end of file diff --git a/core/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala b/core/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala index 8b94ac0c6..4723d0842 100644 --- a/core/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala +++ b/core/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala @@ -16,9 +16,17 @@ package kamon.context -import org.scalatest.{Matchers, WordSpec} +import kamon.context.Storage.Scope +import org.scalatest.{WordSpec, BeforeAndAfterAll, AsyncWordSpec, Matchers, Assertion} -class ThreadLocalStorageSpec extends WordSpec with Matchers { +import java.util.concurrent.Executors +import scala.concurrent.{Promise, ExecutionContext, Future} +import scala.util.Try +import org.scalatest.concurrent.ScalaFutures._ + +class ThreadLocalStorageSpec extends WordSpec with Matchers with BeforeAndAfterAll { + + private val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)) "the Storage.ThreadLocal implementation of Context storage" should { "return a empty context when no context has been set" in { @@ -47,9 +55,64 @@ class ThreadLocalStorageSpec extends WordSpec with Matchers { } - val TLS: Storage = new Storage.ThreadLocal + "the Storage.CrossThreadLocal implementation of Context storage" should { + "return a empty context when no context has been set" in { + CrossTLS.current() shouldBe Context.Empty + } + + "return the empty value for keys that have not been set in the context" in { + CrossTLS.current().get(TestKey) shouldBe 42 + CrossTLS.current().get(AnotherKey) shouldBe 99 + CrossTLS.current().get(BroadcastKey) shouldBe "i travel around" + + ScopeWithKey.get(TestKey) shouldBe 43 + ScopeWithKey.get(AnotherKey) shouldBe 99 + ScopeWithKey.get(BroadcastKey) shouldBe "i travel around" + } + + "allow setting a context as current and remove it when closing the Scope" in { + CrossTLS.current() shouldBe Context.Empty + + val scope = CrossTLS.store(ScopeWithKey) + CrossTLS.current() shouldBe theSameInstanceAs(ScopeWithKey) + scope.close() + + CrossTLS.current() shouldBe Context.Empty + } + + "Allow closing the scope in a different thread than the original" in { + var scope: Scope = null + + val f1 = Future { + // previous context + CrossTLS.store(ContextWithAnotherKey) + scope = CrossTLS.store(ScopeWithKey) + Thread.sleep(10) + CrossTLS.current() shouldBe theSameInstanceAs(ScopeWithKey) + }(ec) + + val f2 = Future { + while (scope == null) {} // wait for scope to be created in the other thread + CrossTLS.current() shouldBe Context.Empty + scope.close() + CrossTLS.current() shouldBe theSameInstanceAs(ContextWithAnotherKey) + }(ec) + + f1.flatMap(_ => f2)(ec).futureValue + } + + } + + override protected def afterAll(): Unit = { + ec.shutdown() + super.afterAll() + } + + val TLS: Storage = Storage.ThreadLocal() + val CrossTLS: Storage = Storage.CrossThreadLocal() val TestKey = Context.key("test-key", 42) val AnotherKey = Context.key("another-key", 99) val BroadcastKey = Context.key("broadcast", "i travel around") val ScopeWithKey = Context.of(TestKey, 43) + val ContextWithAnotherKey = Context.of(AnotherKey, 98) } diff --git a/core/kamon-core/src/main/scala/kamon/ContextStorage.scala b/core/kamon-core/src/main/scala/kamon/ContextStorage.scala index 7cebc33fc..5e9acbcab 100644 --- a/core/kamon-core/src/main/scala/kamon/ContextStorage.scala +++ b/core/kamon-core/src/main/scala/kamon/ContextStorage.scala @@ -141,8 +141,10 @@ object ContextStorage { * instrumentation follows them around. */ private val _contextStorage: Storage = { - if(sys.props("kamon.context.debug") == "true") + if (sys.props("kamon.context.debug") == "true") Storage.Debug() + else if (sys.props("kamon.context.crossThread") == "true") + Storage.CrossThreadLocal() else Storage.ThreadLocal() } diff --git a/core/kamon-core/src/main/scala/kamon/context/Storage.scala b/core/kamon-core/src/main/scala/kamon/context/Storage.scala index 23308b05a..c82e1456f 100644 --- a/core/kamon-core/src/main/scala/kamon/context/Storage.scala +++ b/core/kamon-core/src/main/scala/kamon/context/Storage.scala @@ -57,11 +57,43 @@ object Storage { def close(): Unit } + /** + * A ThreadLocal context storage that allows the scope to be closed in a different + * thread than the thread where store(..) was called. + * This is roughly 25% slower than [[kamon.context.Storage.ThreadLocal]] but is required for certain + * library integrations such as cats-effect IO or Monix. + * Turn this on by setting the System Property "kamon.context.crossThread" to "true". + */ + class CrossThreadLocal extends Storage { + private val tls = new java.lang.ThreadLocal[Context]() { + override def initialValue(): Context = Context.Empty + } + + override def current(): Context = + tls.get() + + override def store(newContext: Context): Scope = { + val previousContext = tls.get() + tls.set(newContext) + + new Scope { + override def context: Context = newContext + override def close(): Unit = tls.set(previousContext) + } + } + } + + object CrossThreadLocal { + def apply(): Storage.CrossThreadLocal = + new Storage.CrossThreadLocal() + } + /** * Wrapper that implements an optimized ThreadLocal access pattern ideal for heavily used ThreadLocals. It is faster * to use a mutable holder object and always perform ThreadLocal.get() and never use ThreadLocal.set(), because the * value is more likely to be found in the ThreadLocalMap direct hash slot and avoid the slow path of - * ThreadLocalMap.getEntryAfterMiss(). + * ThreadLocalMap.getEntryAfterMiss(). Closing of the returned Scope **MUST** be called in the same thread as + * store(..) was originally called. * * Credit to @trask from the FastThreadLocal in glowroot. One small change is that we don't use an kamon-defined * holder object as that would prevent class unloading. @@ -104,6 +136,8 @@ object Storage { * different stack traces for every store/close operation pair. Do not use this for any reason other than debugging * Context propagation issues (like, dirty Threads) in a controlled environment. * + * Note: Similar to the default ThreadLocal storage, the scope must be closed in the same thread as where store(..) + * was called, otherwise you are potentially modifying the context of another thread. */ class Debug extends Storage { import Debug._ From a8030974ee5b7fe9a62520d69b7b6aed1f00a384 Mon Sep 17 00:00:00 2001 From: Jacob Wang Date: Tue, 23 Mar 2021 23:19:50 +0000 Subject: [PATCH 2/2] Use CrossThreadLocal storage by default. - Fix Debug stroage to allow closing the scope in another thread without context contamination. - Add system property to choose a different context storage type --- .../src/main/scala/kamon/ContextStorage.scala | 15 +++++++++++---- .../src/main/scala/kamon/context/Storage.scala | 13 ++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/core/kamon-core/src/main/scala/kamon/ContextStorage.scala b/core/kamon-core/src/main/scala/kamon/ContextStorage.scala index 5e9acbcab..7676a112f 100644 --- a/core/kamon-core/src/main/scala/kamon/ContextStorage.scala +++ b/core/kamon-core/src/main/scala/kamon/ContextStorage.scala @@ -141,11 +141,18 @@ object ContextStorage { * instrumentation follows them around. */ private val _contextStorage: Storage = { + val storageTypeStr = Option(sys.props("kamon.context.storageType")) + if (sys.props("kamon.context.debug") == "true") Storage.Debug() - else if (sys.props("kamon.context.crossThread") == "true") - Storage.CrossThreadLocal() - else - Storage.ThreadLocal() + else { + storageTypeStr match { + case None => Storage.CrossThreadLocal() + case Some("debug") => Storage.Debug() + case Some("sameThreadScope") => Storage.ThreadLocal() + case Some("default") => Storage.CrossThreadLocal() + case Some(other) => throw new IllegalArgumentException(s"Unrecognized kamon.context.storageType value: $other") + } + } } } diff --git a/core/kamon-core/src/main/scala/kamon/context/Storage.scala b/core/kamon-core/src/main/scala/kamon/context/Storage.scala index c82e1456f..9f46b1e58 100644 --- a/core/kamon-core/src/main/scala/kamon/context/Storage.scala +++ b/core/kamon-core/src/main/scala/kamon/context/Storage.scala @@ -92,12 +92,13 @@ object Storage { * Wrapper that implements an optimized ThreadLocal access pattern ideal for heavily used ThreadLocals. It is faster * to use a mutable holder object and always perform ThreadLocal.get() and never use ThreadLocal.set(), because the * value is more likely to be found in the ThreadLocalMap direct hash slot and avoid the slow path of - * ThreadLocalMap.getEntryAfterMiss(). Closing of the returned Scope **MUST** be called in the same thread as - * store(..) was originally called. + * ThreadLocalMap.getEntryAfterMiss(). + * WARNING: Closing of the returned Scope **MUST** be called in the same thread as store(..) was originally called. * * Credit to @trask from the FastThreadLocal in glowroot. One small change is that we don't use an kamon-defined * holder object as that would prevent class unloading. */ + // Named ThreadLocal for binary compatibility reasons, despite the fact that this isn't the default storage type class ThreadLocal extends Storage { private val tls = new java.lang.ThreadLocal[Array[AnyRef]]() { override def initialValue(): Array[AnyRef] = @@ -135,9 +136,6 @@ object Storage { * This implementation is considerably less efficient than the default implementation since it is taking at least two * different stack traces for every store/close operation pair. Do not use this for any reason other than debugging * Context propagation issues (like, dirty Threads) in a controlled environment. - * - * Note: Similar to the default ThreadLocal storage, the scope must be closed in the same thread as where store(..) - * was called, otherwise you are potentially modifying the context of another thread. */ class Debug extends Storage { import Debug._ @@ -167,8 +165,9 @@ object Storage { newContext override def close(): Unit = { - ref.set(0, previousContext) - ref.set(2, stackTraceString()) + val thisRef = _tls.get() + thisRef.set(0, previousContext) + thisRef.set(2, stackTraceString()) } } }