From 0736a8d78b1a423421be4e189a10f29fc9cc5a60 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 14 Mar 2024 20:14:43 +0100 Subject: [PATCH 1/4] support config for jackson buffer recycler pool --- .../src/main/resources/reference.conf | 11 +++++++++++ .../jackson/JacksonObjectMapperProvider.scala | 15 +++++++++++++++ .../jackson/JacksonFactorySpec.scala | 17 +++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/serialization-jackson/src/main/resources/reference.conf b/serialization-jackson/src/main/resources/reference.conf index a9a1b3f1886..083e254727b 100644 --- a/serialization-jackson/src/main/resources/reference.conf +++ b/serialization-jackson/src/main/resources/reference.conf @@ -35,6 +35,17 @@ pekko.serialization.jackson { migrations { } + # Controls the Buffer Recycler Pool implementation used by Jackson. + # https://javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.16.2/com/fasterxml/jackson/core/util/JsonRecyclerPools.html + # The default is "thread-local" which is the same as the default in Jackson 2.16. + buffer-recycler { + # the supported values are "thread-local", "lock-free", "shared-lock-free", "concurrent-deque", + # "shared-concurrent-deque", "bounded" + pool-instance = "thread-local" + # the maximum size of bounded recycler pools - must be >=1 or an IllegalArgumentException will occur + # only applies to pool-instance type "bounded" + bounded-pool-size = 100 + } } #//#stream-read-constraints diff --git a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala index a7a79f84f5c..deae791ae66 100644 --- a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala +++ b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala @@ -30,6 +30,7 @@ import com.fasterxml.jackson.core.{ StreamWriteFeature } import com.fasterxml.jackson.core.json.{ JsonReadFeature, JsonWriteFeature } +import com.fasterxml.jackson.core.util.{ BufferRecycler, JsonRecyclerPools, RecyclerPool } import com.fasterxml.jackson.databind.{ DeserializationFeature, MapperFeature, @@ -103,10 +104,12 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid // instead of using JsonFactoryBuilder (new in Jackson 2.10.0). factory.setStreamReadConstraints(streamReadConstraints) factory.setStreamWriteConstraints(streamWriteConstraints) + factory.setRecyclerPool(getBufferRecyclerPool(config)) case None => new JsonFactoryBuilder() .streamReadConstraints(streamReadConstraints) .streamWriteConstraints(streamWriteConstraints) + .recyclerPool(getBufferRecyclerPool(config)) .build() } @@ -153,6 +156,18 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid jsonFactory } + private def getBufferRecyclerPool(cfg: Config): RecyclerPool[BufferRecycler] = { + cfg.getString("buffer-recycler.pool-instance") match { + case "thread-local" => JsonRecyclerPools.threadLocalPool() + case "lock-free" => JsonRecyclerPools.newLockFreePool() + case "shared-lock-free" => JsonRecyclerPools.sharedLockFreePool() + case "concurrent-deque" => JsonRecyclerPools.newConcurrentDequePool() + case "shared-concurrent-deque" => JsonRecyclerPools.sharedConcurrentDequePool() + case "bounded" => JsonRecyclerPools.newBoundedPool(cfg.getInt("buffer-recycler.bounded-pool-size")) + case other => throw new IllegalArgumentException(s"Unknown recycler-pool: $other") + } + } + @nowarn("msg=deprecated") private def configureObjectMapperFeatures( bindingName: String, diff --git a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala index 272afadae44..8613f419064 100644 --- a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala +++ b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala @@ -17,6 +17,7 @@ package org.apache.pekko.serialization.jackson +import com.fasterxml.jackson.core.util.JsonRecyclerPools.BoundedPool import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers @@ -72,5 +73,21 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) val streamWriteConstraints = mapper.getFactory.streamWriteConstraints() streamWriteConstraints.getMaxNestingDepth shouldEqual maxNestingDepth } + "support BufferRecycler" in { + val bindingName = "testJackson" + val poolInstance = "bounded" + val boundedPoolSize = 1234 + val config = ConfigFactory.parseString( + s"""pekko.serialization.jackson.buffer-recycler.pool-instance=$poolInstance + |pekko.serialization.jackson.buffer-recycler.bounded-pool-size=$boundedPoolSize + |""".stripMargin) + .withFallback(defaultConfig) + val jacksonConfig = JacksonObjectMapperProvider.configForBinding(bindingName, config) + val mapper = JacksonObjectMapperProvider.createObjectMapper( + bindingName, None, objectMapperFactory, jacksonConfig, dynamicAccess, None) + val recyclerPool = mapper.getFactory._getRecyclerPool() + recyclerPool.getClass.getSimpleName shouldEqual "BoundedPool" + recyclerPool.asInstanceOf[BoundedPool].capacity() shouldEqual boundedPoolSize + } } } From 6b8183ca05cc641d6cb3c9bda712431fbc7c23f8 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 14 Mar 2024 20:24:28 +0100 Subject: [PATCH 2/4] Update JacksonFactorySpec.scala --- .../serialization/jackson/JacksonFactorySpec.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala index 8613f419064..10ddfc41740 100644 --- a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala +++ b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala @@ -73,7 +73,17 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) val streamWriteConstraints = mapper.getFactory.streamWriteConstraints() streamWriteConstraints.getMaxNestingDepth shouldEqual maxNestingDepth } - "support BufferRecycler" in { + "support BufferRecycler (default)" in { + val bindingName = "testJackson" + val poolInstance = "bounded" + val boundedPoolSize = 1234 + val jacksonConfig = JacksonObjectMapperProvider.configForBinding(bindingName, defaultConfig) + val mapper = JacksonObjectMapperProvider.createObjectMapper( + bindingName, None, objectMapperFactory, jacksonConfig, dynamicAccess, None) + val recyclerPool = mapper.getFactory._getRecyclerPool() + recyclerPool.getClass.getSimpleName shouldEqual "ThreadLocalPool" + } + "support BufferRecycler with config override" in { val bindingName = "testJackson" val poolInstance = "bounded" val boundedPoolSize = 1234 From 660863ecfc70bb325f32c3940c20edc8bd2680a5 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 14 Mar 2024 20:43:25 +0100 Subject: [PATCH 3/4] Update JacksonFactorySpec.scala --- .../apache/pekko/serialization/jackson/JacksonFactorySpec.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala index 10ddfc41740..317889c5811 100644 --- a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala +++ b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala @@ -75,8 +75,6 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) } "support BufferRecycler (default)" in { val bindingName = "testJackson" - val poolInstance = "bounded" - val boundedPoolSize = 1234 val jacksonConfig = JacksonObjectMapperProvider.configForBinding(bindingName, defaultConfig) val mapper = JacksonObjectMapperProvider.createObjectMapper( bindingName, None, objectMapperFactory, jacksonConfig, dynamicAccess, None) From 798645462e6041e3c15dcb0cd6e72d9ad8519f40 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 15 Mar 2024 14:45:57 +0100 Subject: [PATCH 4/4] add blank lines --- .../pekko/serialization/jackson/JacksonFactorySpec.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala index 317889c5811..090b4080379 100644 --- a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala +++ b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala @@ -61,6 +61,7 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) streamReadConstraints.getMaxDocumentLength shouldEqual maxDocLen streamReadConstraints.getMaxNestingDepth shouldEqual maxNestingDepth } + "support StreamWriteConstraints" in { val bindingName = "testJackson" val maxNestingDepth = 54321 @@ -73,6 +74,7 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) val streamWriteConstraints = mapper.getFactory.streamWriteConstraints() streamWriteConstraints.getMaxNestingDepth shouldEqual maxNestingDepth } + "support BufferRecycler (default)" in { val bindingName = "testJackson" val jacksonConfig = JacksonObjectMapperProvider.configForBinding(bindingName, defaultConfig) @@ -81,6 +83,7 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) val recyclerPool = mapper.getFactory._getRecyclerPool() recyclerPool.getClass.getSimpleName shouldEqual "ThreadLocalPool" } + "support BufferRecycler with config override" in { val bindingName = "testJackson" val poolInstance = "bounded"