Skip to content

Commit

Permalink
Add ConnectionFactoryOptionsCustomizer (#171)
Browse files Browse the repository at this point in the history
* Add OptionsCustomizer

* Rename ConnectionFactoryOptions customizer

* Fix header

* Specify full class names in imports

* Improve the error message when a customizer cannot be created

* Pass ConnectionFactoryOptions.Builder to the customizer

* Fix imports

* Add scaladoc

---------

Co-authored-by: Marcel Mojzis <[email protected]>
  • Loading branch information
marcelmojzis and Marcel Mojzis authored Nov 7, 2024
1 parent f9742b7 commit 319bc06
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 5 deletions.
6 changes: 6 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ pekko.persistence.r2dbc {
# Enabling this has some performance overhead.
# A fast query for Postgres is "SELECT 1"
validation-query = ""

# FQCN of a ConnectionFactoryOptionsCustomizer. If non-empty, it must be the fully
# qualified class name of a class implementing the trait ConnectionFactoryOptionsCustomizer.
# The class must have a constructor with a single parameter of type ActorSystem[_].
# If this setting is empty, the default no-op customizer will be used.
connection-factory-options-customizer = ""
}

# If database timestamp is guaranteed to not move backwards for two subsequent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }

import com.typesafe.config.Config
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Extension
import pekko.actor.typed.ExtensionId
import pekko.persistence.r2dbc.ConnectionFactoryProvider.{ ConnectionFactoryOptionsCustomizer, NoopCustomizer }
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.util.ccompat.JavaConverters._
import io.r2dbc.pool.ConnectionPool
Expand All @@ -40,6 +43,33 @@ object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider]

// Java API
def get(system: ActorSystem[_]): ConnectionFactoryProvider = apply(system)

/**
* Enables customization of [[ConnectionFactoryOptions]] right before the connection factory is created.
* This is particularly useful for setting options that support dynamically computed values rather than
* just plain constants. Classes implementing this trait must have a constructor with a single parameter
* of type [[ActorSystem]].
*
* @since 1.1.0
*/
trait ConnectionFactoryOptionsCustomizer {

/**
* Customizes the [[ConnectionFactoryOptions.Builder]] instance based on the provided configuration.
*
* @param builder the options builder that has been pre-configured by the connection factory provider
* @param config the connection factory configuration
* @return the modified options builder with the applied customizations
*
* @since 1.1.0
*/
def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder
}

private object NoopCustomizer extends ConnectionFactoryOptionsCustomizer {
override def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder =
builder
}
}

class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
Expand All @@ -62,13 +92,30 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
configLocation => {
val config = system.settings.config.getConfig(configLocation)
val settings = new ConnectionFactorySettings(config)
createConnectionPoolFactory(settings)
val customizer = createConnectionFactoryOptionsCustomizer(settings)
createConnectionPoolFactory(settings, customizer, config)
})
.asInstanceOf[ConnectionFactory]
}

private def createConnectionFactory(settings: ConnectionFactorySettings): ConnectionFactory = {
private def createConnectionFactoryOptionsCustomizer(
settings: ConnectionFactorySettings): ConnectionFactoryOptionsCustomizer = {
settings.connectionFactoryOptionsCustomizer match {
case None => NoopCustomizer
case Some(fqcn) =>
val args = List(classOf[ActorSystem[_]] -> system)
system.dynamicAccess.createInstanceFor[ConnectionFactoryOptionsCustomizer](fqcn, args) match {
case Success(customizer) => customizer
case Failure(cause) =>
throw new IllegalArgumentException(s"Failed to create ConnectionFactoryOptionsCustomizer for class $fqcn",
cause)
}
}
}

private def createConnectionFactory(settings: ConnectionFactorySettings,
customizer: ConnectionFactoryOptionsCustomizer,
config: Config): ConnectionFactory = {
val builder =
settings.urlOption match {
case Some(url) =>
Expand Down Expand Up @@ -102,11 +149,13 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
builder.option(PostgresqlConnectionFactoryProvider.SSL_ROOT_CERT, settings.sslRootCert)
}

ConnectionFactories.get(builder.build())
ConnectionFactories.get(customizer(builder, config).build())
}

private def createConnectionPoolFactory(settings: ConnectionFactorySettings): ConnectionPool = {
val connectionFactory = createConnectionFactory(settings)
private def createConnectionPoolFactory(settings: ConnectionFactorySettings,
customizer: ConnectionFactoryOptionsCustomizer,
config: Config): ConnectionPool = {
val connectionFactory = createConnectionFactory(settings, customizer, config)

val evictionInterval = {
import settings.{ maxIdleTime, maxLifeTime }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ final class ConnectionFactorySettings(config: Config) {
val validationQuery: String = config.getString("validation-query")

val statementCacheSize: Int = config.getInt("statement-cache-size")

val connectionFactoryOptionsCustomizer: Option[String] =
Option(config.getString("connection-factory-options-customizer")).filter(_.trim.nonEmpty)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.r2dbc

import com.typesafe.config.{ Config, ConfigFactory }
import io.r2dbc.spi.ConnectionFactoryOptions
import org.apache.pekko.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.eventstream.EventStream
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryOptionsCustomizerSpec.{ config, CustomizerCalled }
import org.apache.pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
import org.scalatest.wordspec.AnyWordSpecLike

class ConnectionFactoryOptionsCustomizerSpec extends ScalaTestWithActorTestKit(config) with AnyWordSpecLike {
"ConnectionFactoryProvider" should {
"instantiate and apply a custom ConnectionFactoryOptionsCustomizer when connection-factory-options-customizer settings is set" in {
val probe = TestProbe[CustomizerCalled.type]()
system.eventStream.tell(EventStream.Subscribe(probe.ref))

ConnectionFactoryProvider(system).connectionFactoryFor("pekko.persistence.r2dbc.connection-factory")
probe.expectMessage(CustomizerCalled)
}
}
}

object ConnectionFactoryOptionsCustomizerSpec {
object CustomizerCalled

class Customizer(system: ActorSystem[_]) extends ConnectionFactoryOptionsCustomizer {
override def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder = {
system.eventStream.tell(EventStream.Publish(CustomizerCalled))
builder
}
}

val config: Config = ConfigFactory.parseString("""
pekko.persistence.r2dbc.connection-factory {
connection-factory-options-customizer = "org.apache.pekko.persistence.r2dbc.ConnectionFactoryOptionsCustomizerSpec$Customizer"
}
""").withFallback(TestConfig.config)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,13 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers {
settings.connectionFactorySettings.sslMode shouldBe "verify-full"
SSLMode.fromValue(settings.connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL
}

"allow to specify ConnectionFactoryOptions customizer" in {
val config = ConfigFactory
.parseString("pekko.persistence.r2dbc.connection-factory.connection-factory-options-customizer=fqcn")
.withFallback(ConfigFactory.load())
val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
settings.connectionFactorySettings.connectionFactoryOptionsCustomizer shouldBe Some("fqcn")
}
}
}

0 comments on commit 319bc06

Please sign in to comment.