diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index a4696d69..6e7d7cdd 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -132,6 +132,61 @@ jobs: - name: test run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte -Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test + test-mysql: + name: Run tests with MySQL + runs-on: ubuntu-latest + strategy: + matrix: + SCALA_VERSION: [ 2.12, 2.13, 3.3 ] + JAVA_VERSION: [ 11, 17, 21 ] + # only compiling on JDK 8, because certain tests depend on the higher timestamp precision added in JDK 9 + include: + - JAVA_VERSION: 8 + SCALA_VERSION: 2.12 + COMPILE_ONLY: true + - JAVA_VERSION: 8 + SCALA_VERSION: 2.13 + COMPILE_ONLY: true + - JAVA_VERSION: 8 + SCALA_VERSION: 3.3 + COMPILE_ONLY: true + if: github.repository == 'apache/pekko-persistence-r2dbc' + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + fetch-tags: true + + - name: Checkout GitHub merge + if: github.event.pull_request + run: |- + git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch + git checkout scratch + + - name: Setup Java ${{ matrix.JAVA_VERSION }} + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: ${{ matrix.JAVA_VERSION }} + + - name: Install sbt + uses: sbt/setup-sbt@v1 + + - name: Cache Coursier cache + uses: coursier/cache-action@v6 + + - name: Enable jvm-opts + run: cp .jvmopts-ci .jvmopts + + - name: Start DB + run: |- + docker compose -f docker/docker-compose-mysql.yml up -d --wait + docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql + + - name: test + run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }} + test-docs: name: Docs runs-on: ubuntu-latest diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4e205abe..1419a80e 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -101,7 +101,7 @@ pekko.persistence.r2dbc { // #connection-settings pekko.persistence.r2dbc { - # postgres or yugabyte + # postgres, yugabyte or mysql dialect = "postgres" # set this to your database schema if applicable, empty by default diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala index c8af66b3..612e7650 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala @@ -19,24 +19,25 @@ import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.util.{ Failure, Success } - import com.typesafe.config.Config +import io.r2dbc.pool.ConnectionPool +import io.r2dbc.pool.ConnectionPoolConfiguration +import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider +import io.r2dbc.postgresql.client.SSLMode +import io.r2dbc.spi.ConnectionFactories +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.ConnectionFactoryOptions +import io.r2dbc.spi.Option import org.apache.pekko import pekko.Done import pekko.actor.CoordinatedShutdown import pekko.actor.typed.ActorSystem import pekko.actor.typed.Extension import pekko.actor.typed.ExtensionId -import pekko.persistence.r2dbc.ConnectionFactoryProvider.{ ConnectionFactoryOptionsCustomizer, NoopCustomizer } +import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer +import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer import pekko.persistence.r2dbc.internal.R2dbcExecutor import pekko.util.ccompat.JavaConverters._ -import io.r2dbc.pool.ConnectionPool -import io.r2dbc.pool.ConnectionPoolConfiguration -import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider -import io.r2dbc.postgresql.client.SSLMode -import io.r2dbc.spi.ConnectionFactories -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.ConnectionFactoryOptions object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider] { def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new ConnectionFactoryProvider(system) @@ -149,6 +150,12 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension { builder.option(PostgresqlConnectionFactoryProvider.SSL_ROOT_CERT, settings.sslRootCert) } + if (settings.driver == "mysql") { + // Either `connectionTimeZone = SERVER` or `forceConnectionTimeZoneToSession = true` need to be set for timezones to work correctly, + // likely caused by bug in https://github.com/asyncer-io/r2dbc-mysql/pull/240. + builder.option(Option.valueOf("connectionTimeZone"), "SERVER") + } + ConnectionFactories.get(customizer(builder, config).build()) } @@ -158,7 +165,8 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension { val connectionFactory = createConnectionFactory(settings, customizer, config) val evictionInterval = { - import settings.{ maxIdleTime, maxLifeTime } + import settings.maxIdleTime + import settings.maxLifeTime if (maxIdleTime <= Duration.Zero && maxLifeTime <= Duration.Zero) { JDuration.ZERO } else if (maxIdleTime <= Duration.Zero) { diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala index 49e48603..dd8856f5 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala @@ -56,6 +56,7 @@ final class R2dbcSettings(config: Config) { val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match { case "yugabyte" => Dialect.Yugabyte case "postgres" => Dialect.Postgres + case "mysql" => Dialect.MySQL case other => throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres].") } @@ -92,6 +93,9 @@ sealed trait Dialect object Dialect { case object Postgres extends Dialect case object Yugabyte extends Dialect + + /** @since 1.1.0 */ + case object MySQL extends Dialect } /** diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala index b757efed..894b8fa5 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -325,7 +325,8 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb connection.close().asFutureDone().map { _ => val durationMicros = durationInMicros(startTime) if (durationMicros >= logDbCallsExceedingMicros) - log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, r.toString, durationMicros: java.lang.Long) + log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, Option(r).map(_.toString).orNull, + durationMicros: java.lang.Long) r } } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala index f1884f40..affe56f1 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala @@ -14,8 +14,10 @@ package org.apache.pekko.persistence.r2dbc.internal import scala.annotation.varargs - -import org.apache.pekko.annotation.InternalStableApi +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.annotation.InternalStableApi +import pekko.persistence.r2dbc.Dialect /** * INTERNAL API: Utility to format SQL strings. Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Trims @@ -24,6 +26,21 @@ import org.apache.pekko.annotation.InternalStableApi @InternalStableApi object Sql { + /** + * INTERNAL API + */ + @InternalApi + private[r2dbc] implicit class DialectOps(dialect: Dialect) { + def replaceParameters(sql: String): String = { + dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + fillInParameterNumbers(sql) + case Dialect.MySQL => + sql + } + } + } + /** * Scala string interpolation with `sql` prefix. Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Trims * whitespace, including line breaks. Standard string interpolation arguments `$` can be used. @@ -33,6 +50,15 @@ object Sql { fillInParameterNumbers(trimLineBreaks(sc.s(args: _*))) } + /** + * INTERNAL API + */ + @InternalApi + private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal { + def sql(args: Any*)(implicit dialect: Dialect): String = + dialect.replaceParameters(trimLineBreaks(sc.s(args: _*))) + } + /** * Java API: Replaces `?` with numbered `\$1`, `\$2` for bind parameters. Trims whitespace, including line breaks. The * arguments are used like in [[java.lang.String.format]]. diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala index 8ca01987..4434cfd4 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala @@ -17,20 +17,22 @@ import java.time.Instant import scala.concurrent.ExecutionContext import scala.concurrent.Future - +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.Row +import io.r2dbc.spi.Statement import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence +import pekko.persistence.r2dbc.ConnectionFactoryProvider +import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.R2dbcSettings -import pekko.persistence.r2dbc.internal.Sql.Interpolation import pekko.persistence.r2dbc.internal.BySliceQuery import pekko.persistence.r2dbc.internal.R2dbcExecutor +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao import pekko.persistence.typed.PersistenceId -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.Row -import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -71,6 +73,19 @@ private[r2dbc] object JournalDao { } } + def fromConfig( + journalSettings: R2dbcSettings, + sharedConfigPath: String + )(implicit system: ActorSystem[_], ec: ExecutionContext): JournalDao = { + val connectionFactory = + ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") + journalSettings.dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + new JournalDao(journalSettings, connectionFactory) + case Dialect.MySQL => + new MySQLJournalDao(journalSettings, connectionFactory) + } + } } /** @@ -86,13 +101,16 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor import JournalDao.SerializedJournalRow import JournalDao.log + implicit protected val dialect: Dialect = journalSettings.dialect + protected lazy val timestampSql: String = "transaction_timestamp()" + private val persistenceExt = Persistence(system) private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, journalSettings.logDbCallsExceeding)(ec, system) - private val journalTable = journalSettings.journalTableWithSchema + protected val journalTable: String = journalSettings.journalTableWithSchema - private val (insertEventWithParameterTimestampSql, insertEventWithTransactionTimestampSql) = { + protected val (insertEventWithParameterTimestampSql: String, insertEventWithTransactionTimestampSql: String) = { val baseSql = s"INSERT INTO $journalTable " + "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + @@ -132,7 +150,7 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor private val insertDeleteMarkerSql = sql""" INSERT INTO $journalTable (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted) - VALUES (?, ?, ?, ?, transaction_timestamp(), ?, ?, ?, ?, ?, ?)""" + VALUES (?, ?, ?, ?, $timestampSql, ?, ?, ?, ?, ?, ?)""" /** * All events must be for the same persistenceId. @@ -217,12 +235,18 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) } - result + if (useTimestampFromDb) { + result + } else { + result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + } } else { val result = r2dbcExecutor.updateInBatchReturning(s"batch insert [$persistenceId], [$totalEvents] events")( connection => - events.foldLeft(connection.createStatement(insertSql)) { (stmt, write) => - stmt.add() + events.zipWithIndex.foldLeft(connection.createStatement(insertSql)) { case (stmt, (write, idx)) => + if (idx != 0) { + stmt.add() + } bind(stmt, write) }, row => row.get(0, classOf[Instant])) @@ -230,7 +254,11 @@ private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, connectionFactor result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) } - result.map(_.head)(ExecutionContexts.parasitic) + if (useTimestampFromDb) { + result.map(_.head)(ExecutionContexts.parasitic) + } else { + result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + } } } diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala index eb111cf7..d141ebd1 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala @@ -36,7 +36,6 @@ import pekko.persistence.PersistentRepr import pekko.persistence.journal.AsyncWriteJournal import pekko.persistence.journal.Tagged import pekko.persistence.query.PersistenceQuery -import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.R2dbcSettings import pekko.persistence.r2dbc.internal.PubSub import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata @@ -97,10 +96,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends private val serialization: Serialization = SerializationExtension(context.system) private val journalSettings = R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath)) - private val journalDao = - new JournalDao( - journalSettings, - ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory")) + private val journalDao = JournalDao.fromConfig(journalSettings, sharedConfigPath) private val query = PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath + ".query") private val pubSub: Option[PubSub] = diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala new file mode 100644 index 00000000..9adba882 --- /dev/null +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.r2dbc.journal.mysql + +import scala.concurrent.ExecutionContext +import io.r2dbc.spi.ConnectionFactory +import org.apache.pekko +import pekko.actor.typed.ActorSystem +import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.R2dbcSettings +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.journal.JournalDao + +/** + * INTERNAL API + */ +@InternalApi +private[r2dbc] object MySQLJournalDao { + def settingRequirements(journalSettings: R2dbcSettings): Unit = { + // Application timestamps are used because MySQL does not have transaction_timestamp like Postgres. In future releases + // they could be tried to be emulated, but the benefits are questionable - no matter where the timestamps are generated, + // risk of clock skews remains. + require(journalSettings.useAppTimestamp, + "use-app-timestamp config must be on for MySQL support") + // Supporting the non-monotonic increasing timestamps by incrementing the timestamp within the insert queries based on + // latest row in the database seems to cause deadlocks when running tests like PersistTimestampSpec. Possibly this could + // be fixed. + require(journalSettings.dbTimestampMonotonicIncreasing, + "db-timestamp-monotonic-increasing config must be on for MySQL support") + // Also, missing RETURNING implementation makes grabbing the timestamp generated by the database less efficient - this + // applies for both of the requirements above. + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[r2dbc] class MySQLJournalDao( + journalSettings: R2dbcSettings, + connectionFactory: ConnectionFactory)( + implicit ec: ExecutionContext, system: ActorSystem[_] +) extends JournalDao(journalSettings, connectionFactory) { + MySQLJournalDao.settingRequirements(journalSettings) + + override lazy val timestampSql: String = "NOW(6)" + + override val insertEventWithParameterTimestampSql: String = + sql"INSERT INTO $journalTable " + + "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, " + + "event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " + + s"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +} diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala index 0fff8797..46e59b0e 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala @@ -19,28 +19,42 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration - +import io.r2dbc.spi.ConnectionFactory import org.apache.pekko import pekko.NotUsed import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi -import pekko.persistence.Persistence +import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.R2dbcSettings -import pekko.persistence.r2dbc.internal.Sql.Interpolation import pekko.persistence.r2dbc.internal.BySliceQuery import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import pekko.persistence.r2dbc.internal.R2dbcExecutor +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.journal.JournalDao import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow +import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao import pekko.stream.scaladsl.Source -import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory object QueryDao { val log: Logger = LoggerFactory.getLogger(classOf[QueryDao]) + + def fromConfig( + journalSettings: R2dbcSettings, + sharedConfigPath: String + )(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = { + val connectionFactory = + ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") + journalSettings.dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + new QueryDao(journalSettings, connectionFactory) + case Dialect.MySQL => + new MySQLQueryDao(journalSettings, connectionFactory) + } + } } /** @@ -54,12 +68,15 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, connectionFactory: Connec import JournalDao.readMetadata import QueryDao.log - private val journalTable = settings.journalTableWithSchema + implicit protected val dialect: Dialect = settings.dialect + protected lazy val statementTimestampSql: String = "statement_timestamp()" + + protected val journalTable = settings.journalTableWithSchema private val currentDbTimestampSql = "SELECT transaction_timestamp() AS db_timestamp" - private def eventsBySlicesRangeSql( + protected def eventsBySlicesRangeSql( toDbTimestampParam: Boolean, behindCurrentTime: FiniteDuration, backtracking: Boolean, @@ -96,10 +113,11 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, connectionFactory: Connec settings.dialect match { case Dialect.Yugabyte => s"slice BETWEEN $minSlice AND $maxSlice" case Dialect.Postgres => s"slice in (${(minSlice to maxSlice).mkString(",")})" + case unhandled => throw new IllegalArgumentException(s"Unable to handle dialect [$unhandled]") } } - private def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { + protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { sql""" SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count FROM $journalTable @@ -116,12 +134,12 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, connectionFactory: Connec WHERE persistence_id = ? AND seq_nr = ? AND deleted = false""" private val selectOneEventSql = sql""" - SELECT slice, entity_type, db_timestamp, statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload + SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload FROM $journalTable WHERE persistence_id = ? AND seq_nr = ? AND deleted = false""" private val selectEventsSql = sql""" - SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload + SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, meta_payload from $journalTable WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ? AND deleted = false diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 91ea310d..4280f113 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -19,10 +19,11 @@ import scala.collection.immutable import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration - +import com.typesafe.config.Config import org.apache.pekko import pekko.NotUsed import pekko.actor.ExtendedActorSystem +import pekko.actor.typed.ActorSystem import pekko.actor.typed.pubsub.Topic import pekko.actor.typed.scaladsl.adapter._ import pekko.annotation.InternalApi @@ -48,7 +49,6 @@ import pekko.serialization.SerializationExtension import pekko.stream.OverflowStrategy import pekko.stream.scaladsl.Flow import pekko.stream.scaladsl.Source -import com.typesafe.config.Config import org.slf4j.LoggerFactory object R2dbcReadJournal { @@ -76,14 +76,13 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private val sharedConfigPath = cfgPath.replaceAll("""\.query$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) - private val typedSystem = system.toTyped + private implicit val typedSystem: ActorSystem[_] = system.toTyped import typedSystem.executionContext private val serialization = SerializationExtension(system) private val persistenceExt = Persistence(system) private val connectionFactory = ConnectionFactoryProvider(typedSystem) .connectionFactoryFor(sharedConfigPath + ".connection-factory") - private val queryDao = - new QueryDao(settings, connectionFactory)(typedSystem.executionContext, typedSystem) + private val queryDao = QueryDao.fromConfig(settings, sharedConfigPath) private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]] = { val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Any] = (offset, row) => { @@ -108,7 +107,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private def bySlice[Event]: BySliceQuery[SerializedJournalRow, EventEnvelope[Event]] = _bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow, EventEnvelope[Event]]] - private val journalDao = new JournalDao(settings, connectionFactory)(typedSystem.executionContext, typedSystem) + private val journalDao = JournalDao.fromConfig(settings, sharedConfigPath) def extractEntityTypeFromPersistenceId(persistenceId: String): String = PersistenceId.extractEntityType(persistenceId) diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala new file mode 100644 index 00000000..8cbc5b66 --- /dev/null +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.r2dbc.query.scaladsl.mysql + +import java.time.Instant + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import io.r2dbc.spi.ConnectionFactory +import org.apache.pekko +import pekko.actor.typed.ActorSystem +import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.R2dbcSettings +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.query.scaladsl.QueryDao + +/** + * INTERNAL API + */ +@InternalApi +private[r2dbc] class MySQLQueryDao( + journalSettings: R2dbcSettings, + connectionFactory: ConnectionFactory +)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends QueryDao(journalSettings, connectionFactory) { + + override lazy val statementTimestampSql: String = "NOW(6)" + + override def eventsBySlicesRangeSql( + toDbTimestampParam: Boolean, + behindCurrentTime: FiniteDuration, + backtracking: Boolean, + minSlice: Int, + maxSlice: Int): String = { + + def toDbTimestampParamCondition = + if (toDbTimestampParam) "AND db_timestamp <= ?" else "" + + def behindCurrentTimeIntervalCondition = + if (behindCurrentTime > Duration.Zero) + s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL '${behindCurrentTime.toMicros}' MICROSECOND)" + else "" + + val selectColumns = { + if (backtracking) + s"SELECT slice, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp " + else + s"SELECT slice, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload " + } + + sql""" + $selectColumns + FROM $journalTable + WHERE entity_type = ? + AND slice BETWEEN $minSlice AND $maxSlice + AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition + AND deleted = false + ORDER BY db_timestamp, seq_nr + LIMIT ?""" + } + + override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { + sql""" + SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket, count(*) AS count + FROM $journalTable + WHERE entity_type = ? + AND slice BETWEEN $minSlice AND $maxSlice + AND db_timestamp >= ? AND db_timestamp <= ? + AND deleted = false + GROUP BY bucket ORDER BY bucket LIMIT ? + """ + } + + override def currentDbTimestamp(): Future[Instant] = Future.successful(Instant.now()) +} diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala index 9402583d..7d637ad1 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala @@ -17,7 +17,7 @@ import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.adapter._ import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } -import pekko.persistence.r2dbc.{ ConnectionFactoryProvider, R2dbcSettings } +import pekko.persistence.r2dbc.R2dbcSettings import pekko.persistence.snapshot.SnapshotStore import pekko.serialization.{ Serialization, SerializationExtension } import com.typesafe.config.Config @@ -59,9 +59,7 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, cfgPath: String) exte private val dao = { val sharedConfigPath = cfgPath.replaceAll("""\.snapshot$""", "") val settings = R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath)) - new SnapshotDao( - settings, - ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory")) + SnapshotDao.fromConfig(settings, sharedConfigPath) } def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala index 16968b3a..043ff6e1 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala @@ -15,19 +15,21 @@ package org.apache.pekko.persistence.r2dbc.snapshot import scala.concurrent.ExecutionContext import scala.concurrent.Future - +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.Row import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence import pekko.persistence.SnapshotSelectionCriteria +import pekko.persistence.r2dbc.ConnectionFactoryProvider +import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.R2dbcSettings -import pekko.persistence.r2dbc.internal.Sql.Interpolation import pekko.persistence.r2dbc.internal.R2dbcExecutor +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao import pekko.persistence.typed.PersistenceId -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.Row import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -66,6 +68,19 @@ private[r2dbc] object SnapshotDao { row.get("meta_ser_manifest", classOf[String]))) }) + def fromConfig( + journalSettings: R2dbcSettings, + sharedConfigPath: String + )(implicit system: ActorSystem[_], ec: ExecutionContext): SnapshotDao = { + val connectionFactory = + ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") + journalSettings.dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + new SnapshotDao(journalSettings, connectionFactory) + case Dialect.MySQL => + new MySQLSnapshotDao(journalSettings, connectionFactory) + } + } } /** @@ -74,16 +89,18 @@ private[r2dbc] object SnapshotDao { * Class for doing db interaction outside of an actor to avoid mistakes in future callbacks */ @InternalApi -private[r2dbc] final class SnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit +private[r2dbc] class SnapshotDao(settings: R2dbcSettings, connectionFactory: ConnectionFactory)(implicit ec: ExecutionContext, system: ActorSystem[_]) { import SnapshotDao._ - private val snapshotTable = settings.snapshotsTableWithSchema + implicit protected val dialect: Dialect = settings.dialect + + protected val snapshotTable: String = settings.snapshotsTableWithSchema private val persistenceExt = Persistence(system) private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(ec, system) - private val upsertSql = sql""" + protected val upsertSql = sql""" INSERT INTO $snapshotTable (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala new file mode 100644 index 00000000..e725168c --- /dev/null +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.r2dbc.snapshot.mysql + +import scala.concurrent.ExecutionContext +import io.r2dbc.spi.ConnectionFactory +import org.apache.pekko +import pekko.actor.typed.ActorSystem +import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.R2dbcSettings +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.snapshot.SnapshotDao + +/** + * INTERNAL API + */ +@InternalApi +private[r2dbc] class MySQLSnapshotDao( + settings: R2dbcSettings, connectionFactory: ConnectionFactory +)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends SnapshotDao(settings, connectionFactory) { + + override val upsertSql = sql""" + INSERT INTO $snapshotTable + (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS excluded + ON DUPLICATE KEY UPDATE + seq_nr = excluded.seq_nr, + write_timestamp = excluded.write_timestamp, + snapshot = excluded.snapshot, + ser_id = excluded.ser_id, + ser_manifest = excluded.ser_manifest, + meta_payload = excluded.meta_payload, + meta_ser_id = excluded.meta_ser_id, + meta_ser_manifest = excluded.meta_ser_manifest""" +} diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala index 03cdc889..80ef95bd 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala @@ -19,7 +19,9 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration - +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.R2dbcDataIntegrityViolationException +import io.r2dbc.spi.Statement import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -27,18 +29,17 @@ import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence +import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.R2dbcSettings -import pekko.persistence.r2dbc.internal.Sql.Interpolation import pekko.persistence.r2dbc.internal.BySliceQuery import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import pekko.persistence.r2dbc.internal.R2dbcExecutor +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao import pekko.persistence.typed.PersistenceId import pekko.stream.scaladsl.Source -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.R2dbcDataIntegrityViolationException -import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -61,6 +62,20 @@ import org.slf4j.LoggerFactory extends BySliceQuery.SerializedRow { override def seqNr: Long = revision } + + def fromConfig( + journalSettings: R2dbcSettings, + sharedConfigPath: String + )(implicit system: ActorSystem[_], ec: ExecutionContext): DurableStateDao = { + val connectionFactory = + ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath + ".connection-factory") + journalSettings.dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + new DurableStateDao(journalSettings, connectionFactory) + case Dialect.MySQL => + new MySQLDurableStateDao(journalSettings, connectionFactory) + } + } } /** @@ -75,16 +90,19 @@ private[r2dbc] class DurableStateDao(settings: R2dbcSettings, connectionFactory: extends BySliceQuery.Dao[DurableStateDao.SerializedStateRow] { import DurableStateDao._ + implicit protected val dialect: Dialect = settings.dialect + protected lazy val transactionTimestampSql: String = "transaction_timestamp()" + private val persistenceExt = Persistence(system) private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(ec, system) - private val stateTable = settings.durableStateTableWithSchema + protected val stateTable = settings.durableStateTableWithSchema private val selectStateSql: String = sql""" SELECT revision, state_ser_id, state_ser_manifest, state_payload, db_timestamp FROM $stateTable WHERE persistence_id = ?""" - private def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { + protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { sql""" SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count FROM $stateTable @@ -99,20 +117,21 @@ private[r2dbc] class DurableStateDao(settings: R2dbcSettings, connectionFactory: settings.dialect match { case Dialect.Yugabyte => s"slice BETWEEN $minSlice AND $maxSlice" case Dialect.Postgres => s"slice in (${(minSlice to maxSlice).mkString(",")})" + case unhandled => throw new IllegalArgumentException(s"Unable to handle dialect [$unhandled]") } } private val insertStateSql: String = sql""" INSERT INTO $stateTable (slice, entity_type, persistence_id, revision, state_ser_id, state_ser_manifest, state_payload, tags, db_timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, transaction_timestamp())""" + VALUES (?, ?, ?, ?, ?, ?, ?, ?, $transactionTimestampSql)""" private val updateStateSql: String = { val timestamp = if (settings.dbTimestampMonotonicIncreasing) - "transaction_timestamp()" + s"$transactionTimestampSql" else - "GREATEST(transaction_timestamp(), " + + s"GREATEST($transactionTimestampSql, " + s"(SELECT db_timestamp + '1 microsecond'::interval FROM $stateTable WHERE persistence_id = ? AND revision = ?))" val revisionCondition = @@ -141,7 +160,7 @@ private[r2dbc] class DurableStateDao(settings: R2dbcSettings, connectionFactory: private val allPersistenceIdsAfterSql = sql"SELECT persistence_id from $stateTable WHERE persistence_id > ? ORDER BY persistence_id LIMIT ?" - private def stateBySlicesRangeSql( + protected def stateBySlicesRangeSql( maxDbTimestampParam: Boolean, behindCurrentTime: FiniteDuration, backtracking: Boolean, diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 546f9545..4d7de654 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -16,11 +16,12 @@ package org.apache.pekko.persistence.r2dbc.state.scaladsl import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future - +import com.typesafe.config.Config import org.apache.pekko import pekko.Done import pekko.NotUsed import pekko.actor.ExtendedActorSystem +import pekko.actor.typed.ActorSystem import pekko.actor.typed.scaladsl.adapter._ import pekko.dispatch.ExecutionContexts import pekko.persistence.Persistence @@ -30,7 +31,6 @@ import pekko.persistence.query.TimestampOffset import pekko.persistence.query.UpdatedDurableState import pekko.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery -import pekko.persistence.r2dbc.ConnectionFactoryProvider import pekko.persistence.r2dbc.R2dbcSettings import pekko.persistence.r2dbc.internal.BySliceQuery import pekko.persistence.r2dbc.internal.ContinuousQuery @@ -40,7 +40,6 @@ import pekko.persistence.state.scaladsl.GetObjectResult import pekko.serialization.SerializationExtension import pekko.serialization.Serializers import pekko.stream.scaladsl.Source -import com.typesafe.config.Config import org.slf4j.LoggerFactory object R2dbcDurableStateStore { @@ -59,15 +58,11 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "") private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath)) - private val typedSystem = system.toTyped + private implicit val typedSystem: ActorSystem[_] = system.toTyped + implicit val ec: ExecutionContext = system.dispatcher private val serialization = SerializationExtension(system) private val persistenceExt = Persistence(system) - private val stateDao = - new DurableStateDao( - settings, - ConnectionFactoryProvider(typedSystem).connectionFactoryFor(sharedConfigPath + ".connection-factory"))( - typedSystem.executionContext, - typedSystem) + private val stateDao = DurableStateDao.fromConfig(settings, sharedConfigPath) private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] = { val createEnvelope: (TimestampOffset, SerializedStateRow) => DurableStateChange[A] = (offset, row) => { diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala new file mode 100644 index 00000000..385dc218 --- /dev/null +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.r2dbc.state.scaladsl.mysql + +import java.time.Instant + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import io.r2dbc.spi.ConnectionFactory +import org.apache.pekko +import pekko.actor.typed.ActorSystem +import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.R2dbcSettings +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao +import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao + +/** + * INTERNAL API + */ +@InternalApi +private[r2dbc] class MySQLDurableStateDao( + settings: R2dbcSettings, + connectionFactory: ConnectionFactory +)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends DurableStateDao(settings, connectionFactory) { + MySQLJournalDao.settingRequirements(settings) + + override lazy val transactionTimestampSql: String = "NOW(6)" + + override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = { + sql""" + SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket, count(*) AS count + FROM $stateTable + WHERE entity_type = ? + AND slice BETWEEN $minSlice AND $maxSlice + AND db_timestamp >= ? AND db_timestamp <= ? + GROUP BY bucket ORDER BY bucket LIMIT ? + """ + } + + override def stateBySlicesRangeSql( + maxDbTimestampParam: Boolean, + behindCurrentTime: FiniteDuration, + backtracking: Boolean, + minSlice: Int, + maxSlice: Int): String = { + + def maxDbTimestampParamCondition = + if (maxDbTimestampParam) s"AND db_timestamp < ?" else "" + + def behindCurrentTimeIntervalCondition = + if (behindCurrentTime > Duration.Zero) + s"AND db_timestamp < DATE_SUB(NOW(6), INTERVAL '${behindCurrentTime.toMicros}' MICROSECOND)" + else "" + + val selectColumns = + if (backtracking) + "SELECT persistence_id, revision, db_timestamp, NOW(6) AS read_db_timestamp " + else + "SELECT persistence_id, revision, db_timestamp, NOW(6) AS read_db_timestamp, state_ser_id, state_ser_manifest, state_payload " + + sql""" + $selectColumns + FROM $stateTable + WHERE entity_type = ? + AND slice BETWEEN $minSlice AND $maxSlice + AND db_timestamp >= ? $maxDbTimestampParamCondition $behindCurrentTimeIntervalCondition + ORDER BY db_timestamp, revision + LIMIT ?""" + } + + override def currentDbTimestamp(): Future[Instant] = Future.successful(Instant.now()) +} diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala index e54fd8bf..161262ea 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala @@ -44,6 +44,21 @@ object TestConfig { database = "yugabyte" } """) + case "mysql" => + ConfigFactory.parseString(""" + pekko.persistence.r2dbc{ + connection-factory { + driver = "mysql" + host = "localhost" + port = 3306 + user = "root" + password = "root" + database = "mysql" + } + db-timestamp-monotonic-increasing = on + use-app-timestamp = on + } + """) } // using load here so that connection-factory can be overridden diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala index 7a490ed6..d0c6fcce 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -40,7 +40,18 @@ class PersistTagsSpec case class Row(pid: String, seqNr: Long, tags: Set[String]) + private lazy val dialect = system.settings.config.getString("pekko.persistence.r2dbc.dialect") + + private lazy val testEnabled: Boolean = { + // tags are not implemented for MySQL + dialect != "mysql" + } + "Persist tags" should { + if (!testEnabled) { + info(s"PersistTagsSpec not enabled for $dialect") + pending + } "be the same for events stored in same transaction" in { val numberOfEntities = 9 diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index a88010fd..d980798e 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -16,7 +16,6 @@ package org.apache.pekko.persistence.r2dbc.query import java.time.Instant import scala.concurrent.duration._ - import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -24,11 +23,12 @@ import pekko.actor.typed.ActorSystem import pekko.persistence.query.NoOffset import pekko.persistence.query.PersistenceQuery import pekko.persistence.query.typed.EventEnvelope +import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.R2dbcSettings -import pekko.persistence.r2dbc.internal.Sql.Interpolation import pekko.persistence.r2dbc.TestConfig import pekko.persistence.r2dbc.TestData import pekko.persistence.r2dbc.TestDbLifecycle +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import pekko.persistence.typed.PersistenceId import pekko.serialization.SerializationExtension @@ -55,6 +55,7 @@ class EventsBySliceBacktrackingSpec // to be able to store events with specific timestamps private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr: java.lang.Long, event, timestamp) + implicit val dialect: Dialect = settings.dialect val insertEventSql = sql""" INSERT INTO ${settings.journalTableWithSchema} (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) diff --git a/ddl-scripts/create_tables_mysql.sql b/ddl-scripts/create_tables_mysql.sql new file mode 100644 index 00000000..67554e76 --- /dev/null +++ b/ddl-scripts/create_tables_mysql.sql @@ -0,0 +1,112 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE IF NOT EXISTS event_journal( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp TIMESTAMP(6) NOT NULL, + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BLOB NOT NULL, + + deleted BOOLEAN DEFAULT FALSE NOT NULL, + writer VARCHAR(255) NOT NULL, + adapter_manifest VARCHAR(255), + tags TEXT, -- FIXME no array type, is this the best option? + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BLOB, + + PRIMARY KEY(persistence_id, seq_nr) +); + +-- `event_journal_slice_idx` is only needed if the slice based queries are used +CREATE INDEX event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr); + +CREATE TABLE IF NOT EXISTS snapshot( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + write_timestamp BIGINT NOT NULL, + ser_id INTEGER NOT NULL, + ser_manifest VARCHAR(255) NOT NULL, + snapshot BLOB NOT NULL, + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BLOB, + + PRIMARY KEY(persistence_id) +); + +CREATE TABLE IF NOT EXISTS durable_state ( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + db_timestamp TIMESTAMP(6) NOT NULL, + + state_ser_id INTEGER NOT NULL, + state_ser_manifest VARCHAR(255), + state_payload BLOB NOT NULL, + tags TEXT, -- FIXME no array type, is this the best option? + + PRIMARY KEY(persistence_id, revision) +); + +-- `durable_state_slice_idx` is only needed if the slice based queries are used +CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision); + +-- Primitive offset types are stored in this table. +-- If only timestamp based offsets are used this table is optional. +-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table is not created. +CREATE TABLE IF NOT EXISTS projection_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + current_offset VARCHAR(255) NOT NULL, + manifest VARCHAR(32) NOT NULL, + mergeable BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); + +-- Timestamp based offsets are stored in this table. +CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset TIMESTAMP(6) NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed TIMESTAMP(6) NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); + +CREATE TABLE IF NOT EXISTS projection_management ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + paused BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); diff --git a/ddl-scripts/drop_tables_mysql.sql b/ddl-scripts/drop_tables_mysql.sql new file mode 100644 index 00000000..ecef5734 --- /dev/null +++ b/ddl-scripts/drop_tables_mysql.sql @@ -0,0 +1,23 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS event_journal; +DROP TABLE IF EXISTS snapshot; +DROP TABLE IF EXISTS durable_state; +DROP TABLE IF EXISTS projection_offset_store; +DROP TABLE IF EXISTS projection_timestamp_offset_store; +DROP TABLE IF EXISTS projection_management; diff --git a/docker/docker-compose-mysql.yml b/docker/docker-compose-mysql.yml new file mode 100644 index 00000000..3a7c0697 --- /dev/null +++ b/docker/docker-compose-mysql.yml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + mysql-db: + image: mysql:9.1.0 + container_name: docker-mysql-db-1 + ports: + - 3306:3306 + environment: + MYSQL_ROOT_PASSWORD: root + healthcheck: + test: [ "CMD", "mysqladmin", "--password=root", "ping", "-h", "127.0.0.1" ] + interval: 1s + timeout: 1s + retries: 60 diff --git a/docs/src/main/paradox/connection-config.md b/docs/src/main/paradox/connection-config.md index 8f6dc82e..c96a6b6f 100644 --- a/docs/src/main/paradox/connection-config.md +++ b/docs/src/main/paradox/connection-config.md @@ -9,6 +9,9 @@ Postgres: Yugabyte: : @@snip [application.conf](/docs/src/test/resources/application-yugabyte.conf) { #connection-settings } +MySQL: +: @@snip [application.conf](/docs/src/test/resources/application-mysql.conf) { #connection-settings } + ## Reference configuration The following can be overridden in your `application.conf` for the connection settings: diff --git a/docs/src/test/resources/application-mysql.conf b/docs/src/test/resources/application-mysql.conf new file mode 100644 index 00000000..e645b4af --- /dev/null +++ b/docs/src/test/resources/application-mysql.conf @@ -0,0 +1,32 @@ +# SPDX-License-Identifier: Apache-2.0 + +pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal" +pekko.persistence.snapshot-store.plugin = "pekko.persistence.r2dbc.snapshot" +pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state" + +// #connection-settings +pekko.persistence.r2dbc { + dialect = "mysql" + connection-factory { + driver = "mysql" + host = "localhost" + host = ${?DB_HOST} + port = 3306 + database = "mysql" + database = ${?DB_NAME} + user = "root" + user = ${?DB_USER} + password = "root" + password = ${?DB_PASSWORD} + + db-timestamp-monotonic-increasing = on + use-app-timestamp = on + + # ssl { + # enabled = on + # mode = "VERIFY_CA" + # root-cert = "/path/db_root.crt" + # } + } +} +// #connection-settings diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c01be9b2..b0818d4d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -36,6 +36,7 @@ object Dependencies { val r2dbcPool = "io.r2dbc" % "r2dbc-pool" % "1.0.2.RELEASE" val r2dbcPostgres = Seq( "org.postgresql" % "r2dbc-postgresql" % "1.0.7.RELEASE") + val r2dbcMysql = "io.asyncer" % "r2dbc-mysql" % "1.3.0" } object TestDeps { @@ -75,6 +76,7 @@ object Dependencies { pekkoPersistenceQuery, r2dbcSpi, r2dbcPool, + r2dbcMysql % "provided,test", TestDeps.pekkoPersistenceTck, TestDeps.pekkoStreamTestkit, TestDeps.pekkoActorTestkitTyped,