Skip to content

Commit

Permalink
Projection implementation for MySQL support (#177)
Browse files Browse the repository at this point in the history
* Projection implementation for MySQL support

* Convert R2dbcProjectionSettings from case class to class

---------

Co-authored-by: PJ Fanning <[email protected]>
  • Loading branch information
ptrdom and pjfanning authored Nov 30, 2024
1 parent 0f32bf3 commit beee0d1
Show file tree
Hide file tree
Showing 18 changed files with 393 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}

test-docs:
name: Docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ final class R2dbcSettings(config: Config) {

val durableStateAssertSingleWriter: Boolean = config.getBoolean("state.assert-single-writer")

val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres].")
}
val dialect: Dialect = Dialect.fromString(config.getString("dialect"))

val querySettings = new QuerySettings(config.getConfig("query"))

Expand Down Expand Up @@ -96,6 +90,18 @@ object Dialect {

/** @since 1.1.0 */
case object MySQL extends Dialect

/** @since 1.1.0 */
def fromString(value: String): Dialect = {
toRootLowerCase(value) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(
s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres, mysql].")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object Sql {
* INTERNAL API
*/
@InternalApi
private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
private[pekko] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
def sql(args: Any*)(implicit dialect: Dialect): String =
dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
}
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object Dependencies {
r2dbcSpi,
r2dbcPool,
r2dbcPostgres % "provided,test",
r2dbcMysql % "provided,test",
pekkoProjectionCore,
TestDeps.pekkoProjectionEventSourced,
TestDeps.pekkoProjectionDurableState,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Converting case class to class
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.canEqual")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productArity")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productPrefix")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElement")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementName")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementNames")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productIterator")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$1")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$5")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$7")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$8")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$9")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$10")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._1")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._2")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._3")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._4")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._5")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._6")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._7")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._8")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._9")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._10")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings$")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.fromProduct")
2 changes: 1 addition & 1 deletion projection/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//#projection-config
pekko.projection.r2dbc {
# postgres or yugabyte
# postgres, yugabyte or mysql
dialect = ${pekko.persistence.r2dbc.dialect}

offset-store {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import java.time.{ Duration => JDuration }
import java.util.Locale

import scala.concurrent.duration._

import scala.util.hashing.MurmurHash3
import com.typesafe.config.Config
import org.apache.pekko
import pekko.util.JavaDurationConverters._
import pekko.actor.typed.ActorSystem
import com.typesafe.config.Config
import pekko.persistence.r2dbc.Dialect
import pekko.util.JavaDurationConverters._

object R2dbcProjectionSettings {

Expand All @@ -34,7 +35,8 @@ object R2dbcProjectionSettings {
case _ => config.getDuration("log-db-calls-exceeding").asScala
}

R2dbcProjectionSettings(
new R2dbcProjectionSettings(
dialect = Dialect.fromString(config.getString("dialect")),
schema = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty),
offsetTable = config.getString("offset-store.offset-table"),
timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"),
Expand All @@ -44,25 +46,149 @@ object R2dbcProjectionSettings {
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval = config.getDuration("offset-store.delete-interval"),
logDbCallsExceeding)
logDbCallsExceeding
)
}

def apply(system: ActorSystem[_]): R2dbcProjectionSettings =
apply(system.settings.config.getConfig(DefaultConfigPath))

def apply(
schema: Option[String],
offsetTable: String,
timestampOffsetTable: String,
managementTable: String,
useConnectionFactory: String,
timeWindow: JDuration,
keepNumberOfEntries: Int,
evictInterval: JDuration,
deleteInterval: JDuration,
logDbCallsExceeding: FiniteDuration
): R2dbcProjectionSettings = new R2dbcProjectionSettings(
Dialect.Postgres,
schema,
offsetTable,
timestampOffsetTable,
managementTable,
useConnectionFactory,
timeWindow,
keepNumberOfEntries,
evictInterval,
deleteInterval,
logDbCallsExceeding
)
}

// FIXME remove case class, and add `with` methods
final case class R2dbcProjectionSettings(
schema: Option[String],
offsetTable: String,
timestampOffsetTable: String,
managementTable: String,
useConnectionFactory: String,
timeWindow: JDuration,
keepNumberOfEntries: Int,
evictInterval: JDuration,
deleteInterval: JDuration,
logDbCallsExceeding: FiniteDuration) {
final class R2dbcProjectionSettings private (
val dialect: Dialect,
val schema: Option[String],
val offsetTable: String,
val timestampOffsetTable: String,
val managementTable: String,
val useConnectionFactory: String,
val timeWindow: JDuration,
val keepNumberOfEntries: Int,
val evictInterval: JDuration,
val deleteInterval: JDuration,
val logDbCallsExceeding: FiniteDuration
) extends Serializable {

override def toString: String =
s"R2dbcProjectionSettings($dialect, $schema, $offsetTable, $timestampOffsetTable, $managementTable, " +
s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding)"

override def equals(other: Any): Boolean =
other match {
case that: R2dbcProjectionSettings =>
dialect == that.dialect && schema == that.schema &&
offsetTable == that.offsetTable && timestampOffsetTable == that.timestampOffsetTable &&
managementTable == that.managementTable && useConnectionFactory == that.useConnectionFactory &&
timeWindow == that.timeWindow && keepNumberOfEntries == that.keepNumberOfEntries &&
evictInterval == that.evictInterval && deleteInterval == that.deleteInterval &&
logDbCallsExceeding == that.logDbCallsExceeding
case _ => false
}

override def hashCode(): Int = {
val values = Seq(
dialect,
schema,
offsetTable,
timestampOffsetTable,
managementTable,
useConnectionFactory,
timeWindow,
keepNumberOfEntries,
evictInterval,
deleteInterval,
logDbCallsExceeding
)
val h = values.foldLeft(MurmurHash3.productSeed) { case (h, value) =>
MurmurHash3.mix(h, value.##)
}
MurmurHash3.finalizeHash(h, values.size)
}

private[this] def copy(
dialect: Dialect = dialect,
schema: Option[String] = schema,
offsetTable: String = offsetTable,
timestampOffsetTable: String = timestampOffsetTable,
managementTable: String = managementTable,
useConnectionFactory: String = useConnectionFactory,
timeWindow: JDuration = timeWindow,
keepNumberOfEntries: Int = keepNumberOfEntries,
evictInterval: JDuration = evictInterval,
deleteInterval: JDuration = deleteInterval,
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding
): R2dbcProjectionSettings =
new R2dbcProjectionSettings(
dialect,
schema,
offsetTable,
timestampOffsetTable,
managementTable,
useConnectionFactory,
timeWindow,
keepNumberOfEntries,
evictInterval,
deleteInterval,
logDbCallsExceeding
)

def withDialect(dialect: Dialect): R2dbcProjectionSettings =
copy(dialect = dialect)

def withSchema(schema: Option[String]): R2dbcProjectionSettings =
copy(schema = schema)

def withOffsetTable(offsetTable: String): R2dbcProjectionSettings =
copy(offsetTable = offsetTable)

def withTimestampOffsetTable(timestampOffsetTable: String): R2dbcProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)

def withManagementTable(managementTable: String): R2dbcProjectionSettings =
copy(managementTable = managementTable)

def withUseConnectionFactory(useConnectionFactory: String): R2dbcProjectionSettings =
copy(useConnectionFactory = useConnectionFactory)

def withTimeWindow(timeWindow: JDuration): R2dbcProjectionSettings =
copy(timeWindow = timeWindow)

def withKeepNumberOfEntries(keepNumberOfEntries: Int): R2dbcProjectionSettings =
copy(keepNumberOfEntries = keepNumberOfEntries)

def withEvictInterval(evictInterval: JDuration): R2dbcProjectionSettings =
copy(evictInterval = evictInterval)

def withDeleteInterval(deleteInterval: JDuration): R2dbcProjectionSettings =
copy(deleteInterval = deleteInterval)

def withLogDbCallsExceeding(logDbCallsExceeding: FiniteDuration): R2dbcProjectionSettings =
copy(logDbCallsExceeding = logDbCallsExceeding)

val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable
val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable
val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") + managementTable
Expand Down
Loading

0 comments on commit beee0d1

Please sign in to comment.