diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index f5f8622a..4e205abe 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -169,6 +169,12 @@ pekko.persistence.r2dbc { # Enabling this has some performance overhead. # A fast query for Postgres is "SELECT 1" validation-query = "" + + # FQCN of a ConnectionFactoryOptionsCustomizer. If non-empty, it must be the fully + # qualified class name of a class implementing the trait ConnectionFactoryOptionsCustomizer. + # The class must have a constructor with a single parameter of type ActorSystem[_]. + # If this setting is empty, the default no-op customizer will be used. + connection-factory-options-customizer = "" } # If database timestamp is guaranteed to not move backwards for two subsequent diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala index d9485cd3..c8af66b3 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala @@ -18,13 +18,16 @@ import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future import scala.concurrent.duration.Duration +import scala.util.{ Failure, Success } +import com.typesafe.config.Config import org.apache.pekko import pekko.Done import pekko.actor.CoordinatedShutdown import pekko.actor.typed.ActorSystem import pekko.actor.typed.Extension import pekko.actor.typed.ExtensionId +import pekko.persistence.r2dbc.ConnectionFactoryProvider.{ ConnectionFactoryOptionsCustomizer, NoopCustomizer } import pekko.persistence.r2dbc.internal.R2dbcExecutor import pekko.util.ccompat.JavaConverters._ import io.r2dbc.pool.ConnectionPool @@ -40,6 +43,33 @@ object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider] // Java API def get(system: ActorSystem[_]): ConnectionFactoryProvider = apply(system) + + /** + * Enables customization of [[ConnectionFactoryOptions]] right before the connection factory is created. + * This is particularly useful for setting options that support dynamically computed values rather than + * just plain constants. Classes implementing this trait must have a constructor with a single parameter + * of type [[ActorSystem]]. + * + * @since 1.1.0 + */ + trait ConnectionFactoryOptionsCustomizer { + + /** + * Customizes the [[ConnectionFactoryOptions.Builder]] instance based on the provided configuration. + * + * @param builder the options builder that has been pre-configured by the connection factory provider + * @param config the connection factory configuration + * @return the modified options builder with the applied customizations + * + * @since 1.1.0 + */ + def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder + } + + private object NoopCustomizer extends ConnectionFactoryOptionsCustomizer { + override def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder = + builder + } } class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension { @@ -62,13 +92,30 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension { configLocation => { val config = system.settings.config.getConfig(configLocation) val settings = new ConnectionFactorySettings(config) - createConnectionPoolFactory(settings) + val customizer = createConnectionFactoryOptionsCustomizer(settings) + createConnectionPoolFactory(settings, customizer, config) }) .asInstanceOf[ConnectionFactory] } - private def createConnectionFactory(settings: ConnectionFactorySettings): ConnectionFactory = { + private def createConnectionFactoryOptionsCustomizer( + settings: ConnectionFactorySettings): ConnectionFactoryOptionsCustomizer = { + settings.connectionFactoryOptionsCustomizer match { + case None => NoopCustomizer + case Some(fqcn) => + val args = List(classOf[ActorSystem[_]] -> system) + system.dynamicAccess.createInstanceFor[ConnectionFactoryOptionsCustomizer](fqcn, args) match { + case Success(customizer) => customizer + case Failure(cause) => + throw new IllegalArgumentException(s"Failed to create ConnectionFactoryOptionsCustomizer for class $fqcn", + cause) + } + } + } + private def createConnectionFactory(settings: ConnectionFactorySettings, + customizer: ConnectionFactoryOptionsCustomizer, + config: Config): ConnectionFactory = { val builder = settings.urlOption match { case Some(url) => @@ -102,11 +149,13 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension { builder.option(PostgresqlConnectionFactoryProvider.SSL_ROOT_CERT, settings.sslRootCert) } - ConnectionFactories.get(builder.build()) + ConnectionFactories.get(customizer(builder, config).build()) } - private def createConnectionPoolFactory(settings: ConnectionFactorySettings): ConnectionPool = { - val connectionFactory = createConnectionFactory(settings) + private def createConnectionPoolFactory(settings: ConnectionFactorySettings, + customizer: ConnectionFactoryOptionsCustomizer, + config: Config): ConnectionPool = { + val connectionFactory = createConnectionFactory(settings, customizer, config) val evictionInterval = { import settings.{ maxIdleTime, maxLifeTime } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala index 46c7bb76..49e48603 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala @@ -142,4 +142,7 @@ final class ConnectionFactorySettings(config: Config) { val validationQuery: String = config.getString("validation-query") val statementCacheSize: Int = config.getInt("statement-cache-size") + + val connectionFactoryOptionsCustomizer: Option[String] = + Option(config.getString("connection-factory-options-customizer")).filter(_.trim.nonEmpty) } diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryOptionsCustomizerSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryOptionsCustomizerSpec.scala new file mode 100644 index 00000000..0e3ebbde --- /dev/null +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryOptionsCustomizerSpec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.r2dbc + +import com.typesafe.config.{ Config, ConfigFactory } +import io.r2dbc.spi.ConnectionFactoryOptions +import org.apache.pekko.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.eventstream.EventStream +import org.apache.pekko.persistence.r2dbc.ConnectionFactoryOptionsCustomizerSpec.{ config, CustomizerCalled } +import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer +import org.scalatest.wordspec.AnyWordSpecLike + +class ConnectionFactoryOptionsCustomizerSpec extends ScalaTestWithActorTestKit(config) with AnyWordSpecLike { + "ConnectionFactoryProvider" should { + "instantiate and apply a custom ConnectionFactoryOptionsCustomizer when connection-factory-options-customizer settings is set" in { + val probe = TestProbe[CustomizerCalled.type]() + system.eventStream.tell(EventStream.Subscribe(probe.ref)) + + ConnectionFactoryProvider(system).connectionFactoryFor("pekko.persistence.r2dbc.connection-factory") + probe.expectMessage(CustomizerCalled) + } + } +} + +object ConnectionFactoryOptionsCustomizerSpec { + object CustomizerCalled + + class Customizer(system: ActorSystem[_]) extends ConnectionFactoryOptionsCustomizer { + override def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder = { + system.eventStream.tell(EventStream.Publish(CustomizerCalled)) + builder + } + } + + val config: Config = ConfigFactory.parseString(""" + pekko.persistence.r2dbc.connection-factory { + connection-factory-options-customizer = "org.apache.pekko.persistence.r2dbc.ConnectionFactoryOptionsCustomizerSpec$Customizer" + } + """).withFallback(TestConfig.config) +} diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala index 9317505f..324818e0 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala @@ -62,5 +62,13 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { settings.connectionFactorySettings.sslMode shouldBe "verify-full" SSLMode.fromValue(settings.connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL } + + "allow to specify ConnectionFactoryOptions customizer" in { + val config = ConfigFactory + .parseString("pekko.persistence.r2dbc.connection-factory.connection-factory-options-customizer=fqcn") + .withFallback(ConfigFactory.load()) + val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc")) + settings.connectionFactorySettings.connectionFactoryOptionsCustomizer shouldBe Some("fqcn") + } } }