Skip to content

Commit

Permalink
Core implementation for MySQL support (#175)
Browse files Browse the repository at this point in the history
* Core implementation for MySQL support

* Add documentation for mysql-specific configuration

* Addressing various PR comments

* Fix license headers

* Remove spurious projection dependency on mysql
  • Loading branch information
ptrdom authored Nov 15, 2024
1 parent e65b9c1 commit 719e9eb
Show file tree
Hide file tree
Showing 27 changed files with 780 additions and 80 deletions.
55 changes: 55 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,61 @@ jobs:
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte -Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test

test-mysql:
name: Run tests with MySQL
runs-on: ubuntu-latest
strategy:
matrix:
SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
JAVA_VERSION: [ 11, 17, 21 ]
# only compiling on JDK 8, because certain tests depend on the higher timestamp precision added in JDK 9
include:
- JAVA_VERSION: 8
SCALA_VERSION: 2.12
COMPILE_ONLY: true
- JAVA_VERSION: 8
SCALA_VERSION: 2.13
COMPILE_ONLY: true
- JAVA_VERSION: 8
SCALA_VERSION: 3.3
COMPILE_ONLY: true
if: github.repository == 'apache/pekko-persistence-r2dbc'
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
fetch-tags: true

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch
- name: Setup Java ${{ matrix.JAVA_VERSION }}
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: ${{ matrix.JAVA_VERSION }}

- name: Install sbt
uses: sbt/setup-sbt@v1

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Enable jvm-opts
run: cp .jvmopts-ci .jvmopts

- name: Start DB
run: |-
docker compose -f docker/docker-compose-mysql.yml up -d --wait
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}

test-docs:
name: Docs
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pekko.persistence.r2dbc {
// #connection-settings
pekko.persistence.r2dbc {

# postgres or yugabyte
# postgres, yugabyte or mysql
dialect = "postgres"

# set this to your database schema if applicable, empty by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@ import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }

import com.typesafe.config.Config
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
import io.r2dbc.postgresql.client.SSLMode
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions
import io.r2dbc.spi.Option
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Extension
import pekko.actor.typed.ExtensionId
import pekko.persistence.r2dbc.ConnectionFactoryProvider.{ ConnectionFactoryOptionsCustomizer, NoopCustomizer }
import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.util.ccompat.JavaConverters._
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
import io.r2dbc.postgresql.client.SSLMode
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions

object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider] {
def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new ConnectionFactoryProvider(system)
Expand Down Expand Up @@ -149,6 +150,12 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
builder.option(PostgresqlConnectionFactoryProvider.SSL_ROOT_CERT, settings.sslRootCert)
}

if (settings.driver == "mysql") {
// Either `connectionTimeZone = SERVER` or `forceConnectionTimeZoneToSession = true` need to be set for timezones to work correctly,
// likely caused by bug in https://github.com/asyncer-io/r2dbc-mysql/pull/240.
builder.option(Option.valueOf("connectionTimeZone"), "SERVER")
}

ConnectionFactories.get(customizer(builder, config).build())
}

Expand All @@ -158,7 +165,8 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
val connectionFactory = createConnectionFactory(settings, customizer, config)

val evictionInterval = {
import settings.{ maxIdleTime, maxLifeTime }
import settings.maxIdleTime
import settings.maxLifeTime
if (maxIdleTime <= Duration.Zero && maxLifeTime <= Duration.Zero) {
JDuration.ZERO
} else if (maxIdleTime <= Duration.Zero) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ final class R2dbcSettings(config: Config) {
val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres].")
}
Expand Down Expand Up @@ -92,6 +93,9 @@ sealed trait Dialect
object Dialect {
case object Postgres extends Dialect
case object Yugabyte extends Dialect

/** @since 1.1.0 */
case object MySQL extends Dialect
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb
connection.close().asFutureDone().map { _ =>
val durationMicros = durationInMicros(startTime)
if (durationMicros >= logDbCallsExceedingMicros)
log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, r.toString, durationMicros: java.lang.Long)
log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, Option(r).map(_.toString).orNull,
durationMicros: java.lang.Long)
r
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package org.apache.pekko.persistence.r2dbc.internal

import scala.annotation.varargs

import org.apache.pekko.annotation.InternalStableApi
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.annotation.InternalStableApi
import pekko.persistence.r2dbc.Dialect

/**
* INTERNAL API: Utility to format SQL strings. Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Trims
Expand All @@ -24,6 +26,21 @@ import org.apache.pekko.annotation.InternalStableApi
@InternalStableApi
object Sql {

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] implicit class DialectOps(dialect: Dialect) {
def replaceParameters(sql: String): String = {
dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
fillInParameterNumbers(sql)
case Dialect.MySQL =>
sql
}
}
}

/**
* Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Trims
* whitespace, including line breaks. Standard string interpolation arguments `$` can be used.
Expand All @@ -33,6 +50,15 @@ object Sql {
fillInParameterNumbers(trimLineBreaks(sc.s(args: _*)))
}

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
def sql(args: Any*)(implicit dialect: Dialect): String =
dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
}

/**
* Java API: Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Trims whitespace, including line breaks. The
* arguments are used like in [[java.lang.String.format]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.typed.PersistenceId
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -71,6 +73,19 @@ private[r2dbc] object JournalDao {
}
}

def fromConfig(
journalSettings: R2dbcSettings,
sharedConfigPath: String
)(implicit system: ActorSystem[_], ec: ExecutionContext): JournalDao = {
val connectionFactory =
ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory")
journalSettings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
new JournalDao(journalSettings, connectionFactory)
case Dialect.MySQL =>
new MySQLJournalDao(journalSettings, connectionFactory)
}
}
}

/**
Expand All @@ -86,13 +101,16 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor
import JournalDao.SerializedJournalRow
import JournalDao.log

implicit protected val dialect: Dialect = journalSettings.dialect
protected lazy val timestampSql: String = "transaction_timestamp()"

private val persistenceExt = Persistence(system)

private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, journalSettings.logDbCallsExceeding)(ec, system)

private val journalTable = journalSettings.journalTableWithSchema
protected val journalTable: String = journalSettings.journalTableWithSchema

private val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = {
protected val (insertEventWithParameterTimestampSql: String, insertEventWithTransactionTimestampSql: String) = {
val baseSql =
s"INSERT INTO $journalTable " +
"(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " +
Expand Down Expand Up @@ -132,7 +150,7 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor
private val insertDeleteMarkerSql = sql"""
INSERT INTO $journalTable
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, transaction_timestamp(), ?, ?, ?, ?, ?, ?)"""
VALUES (?, ?, ?, ?, $timestampSql, ?, ?, ?, ?, ?, ?)"""

/**
* All events must be for the same persistenceId.
Expand Down Expand Up @@ -217,20 +235,30 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor
result.foreach { _ =>
log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId)
}
result
if (useTimestampFromDb) {
result
} else {
result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
}
} else {
val result = r2dbcExecutor.updateInBatchReturning(s"batch insert [$persistenceId], [$totalEvents] events")(
connection =>
events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) =>
stmt.add()
events.zipWithIndex.foldLeft(connection.createStatement(insertSql)) { case (stmt, (write, idx)) =>
if (idx != 0) {
stmt.add()
}
bind(stmt, write)
},
row => row.get(0, classOf[Instant]))
if (log.isDebugEnabled())
result.foreach { _ =>
log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId)
}
result.map(_.head)(ExecutionContexts.parasitic)
if (useTimestampFromDb) {
result.map(_.head)(ExecutionContexts.parasitic)
} else {
result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import pekko.persistence.PersistentRepr
import pekko.persistence.journal.AsyncWriteJournal
import pekko.persistence.journal.Tagged
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
Expand Down Expand Up @@ -97,10 +96,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
private val serialization: Serialization = SerializationExtension(context.system)
private val journalSettings = R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))

private val journalDao =
new JournalDao(
journalSettings,
ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory"))
private val journalDao = JournalDao.fromConfig(journalSettings, sharedConfigPath)
private val query = PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath + ".query")

private val pubSub: Option[PubSub] =
Expand Down
Loading

0 comments on commit 719e9eb

Please sign in to comment.