Skip to content

Commit

Permalink
Postgres batching refinements (#9898)
Browse files Browse the repository at this point in the history
* Switch from shortcut to support nullable types to explicit Optional PGField-s
* Add scaladoc to type parameters of PGField
* Typo fix

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Jun 3, 2021
1 parent 7bc925e commit 316069d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import java.time.format.DateTimeFormatter

import scala.reflect.ClassTag

/** Type parameters TO and CONVERTED need to comply to the following contract:
* - either both of them nullable (AnyRef)
* - or neither of them nullable (CONVERTED AnyVal, TO AnyVal or AnyRef which will be never null)
* This is important, since absence is automatically signaled with null references (and we still would want to support primitives)
/** @tparam FROM is an arbitrary type from which we can extract the data of interest for the particular column
* @tparam TO is the intermediary type of the result of the extraction.
* FROM => TO functionality is intended to be injected at Schema definition time.
* TO is not nullable, should express a clean Scala type
* @tparam CONVERTED is the (possibly primitive) type needed by the JDBC API
* TO => CONVERTED is intended to be injected at PGField definition time.
* CONVERTED might be nullable, primitive, boxed-type, whatever the JDBC API requires
*/
private[postgresql] sealed abstract class PGField[FROM, TO, CONVERTED](implicit
classTag: ClassTag[CONVERTED]
Expand All @@ -23,12 +26,7 @@ private[postgresql] sealed abstract class PGField[FROM, TO, CONVERTED](implicit

final def toArray(input: Vector[FROM]): Array[CONVERTED] =
input.view
.map(extract)
.map {
case null =>
null.asInstanceOf[CONVERTED] // this is safe if clients comply with the contract above
case notNull => convert(notNull)
}
.map(extract andThen convert)
.toArray(classTag)
}

Expand All @@ -37,31 +35,44 @@ private[postgresql] sealed abstract class TrivialPGField[FROM, TO](implicit clas
override def convert: TO => TO = identity
}

private[postgresql] final case class PGTimestamp[FROM](extract: FROM => Instant)
extends PGField[FROM, Instant, String] {
private[postgresql] sealed trait TrivialOptionalPGField[FROM, TO >: Null <: AnyRef]
extends PGField[FROM, Option[TO], TO] {
override def convert: Option[TO] => TO = _.orNull
}

private[postgresql] sealed trait PGTimestampBase[FROM, TO] extends PGField[FROM, TO, String] {
override def selectFieldExpression(inputFieldName: String): String =
s"$inputFieldName::timestamp"

override def convert: Instant => String =
private val PGTimestampFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")

protected def convertBase: Instant => String =
_.atZone(ZoneOffset.UTC).toLocalDateTime
.format(PGTimestamp.PGTimestampFormat)
.format(PGTimestampFormat)
}

private[postgresql] object PGTimestamp {
private val PGTimestampFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
private[postgresql] final case class PGTimestamp[FROM](extract: FROM => Instant)
extends PGTimestampBase[FROM, Instant] {
override def convert: Instant => String = convertBase
}

private[postgresql] final case class PGTimestampOptional[FROM](extract: FROM => Option[Instant])
extends PGTimestampBase[FROM, Option[Instant]] {
override def convert: Option[Instant] => String = _.map(convertBase).orNull
}

private[postgresql] final case class PGString[FROM](extract: FROM => String)
extends TrivialPGField[FROM, String]

private[postgresql] final case class PGStringArray[FROM](extract: FROM => Iterable[String])
extends PGField[FROM, Iterable[String], String] {
private[postgresql] final case class PGStringOptional[FROM](extract: FROM => Option[String])
extends TrivialOptionalPGField[FROM, String]

private[postgresql] sealed trait PGStringArrayBase[FROM, TO] extends PGField[FROM, TO, String] {
override def selectFieldExpression(inputFieldName: String): String =
s"string_to_array($inputFieldName, '|')"

override def convert: Iterable[String] => String = { in =>
protected def convertBase: Iterable[String] => String = { in =>
assert(
in.forall(!_.contains("|")),
s"The following input string(s) contain the character '|', which is not expected: ${in.filter(_.contains("|")).mkString(", ")}",
Expand All @@ -70,12 +81,26 @@ private[postgresql] final case class PGStringArray[FROM](extract: FROM => Iterab
}
}

private[postgresql] final case class PGStringArray[FROM](extract: FROM => Iterable[String])
extends PGStringArrayBase[FROM, Iterable[String]] {
override def convert: Iterable[String] => String = convertBase
}

private[postgresql] final case class PGStringArrayOptional[FROM](
extract: FROM => Option[Iterable[String]]
) extends PGStringArrayBase[FROM, Option[Iterable[String]]] {
override def convert: Option[Iterable[String]] => String = _.map(convertBase).orNull
}

private[postgresql] final case class PGBytea[FROM](extract: FROM => Array[Byte])
extends TrivialPGField[FROM, Array[Byte]]

private[postgresql] final case class PGByteaOptional[FROM](extract: FROM => Option[Array[Byte]])
extends TrivialOptionalPGField[FROM, Array[Byte]]

private[postgresql] final case class PGIntOptional[FROM](extract: FROM => Option[Int])
extends PGField[FROM, Option[Int], java.lang.Integer] {
override def convert: Option[Int] => Integer = _.map(x => x: java.lang.Integer).orNull
override def convert: Option[Int] => Integer = _.map(Int.box).orNull
}

private[postgresql] final case class PGBigint[FROM](extract: FROM => Long)
Expand All @@ -86,13 +111,13 @@ private[postgresql] final case class PGSmallintOptional[FROM](extract: FROM => O
override def selectFieldExpression(inputFieldName: String): String =
s"$inputFieldName::smallint"

override def convert: Option[Int] => Integer = _.map(x => x: java.lang.Integer).orNull
override def convert: Option[Int] => Integer = _.map(Int.box).orNull
}

private[postgresql] final case class PGBoolean[FROM](extract: FROM => Boolean)
extends TrivialPGField[FROM, Boolean]

private[postgresql] final case class PGBooleanOptional[FROM](extract: FROM => Option[Boolean])
extends PGField[FROM, Option[Boolean], java.lang.Boolean] {
override def convert: Option[Boolean] => lang.Boolean = _.map(x => x: java.lang.Boolean).orNull
override def convert: Option[Boolean] => lang.Boolean = _.map(Boolean.box).orNull
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,63 +7,63 @@ import com.daml.platform.store.backend.DbDto

private[postgresql] object PGSchema {
val eventsDivulgence: PGTable[DbDto.EventDivulgence] = PGTable("participant_events_divulgence")(
"event_offset" -> PGString(_.event_offset.orNull),
"command_id" -> PGString(_.command_id.orNull),
"workflow_id" -> PGString(_.workflow_id.orNull),
"application_id" -> PGString(_.application_id.orNull),
"submitters" -> PGStringArray(_.submitters.orNull),
"event_offset" -> PGStringOptional(_.event_offset),
"command_id" -> PGStringOptional(_.command_id),
"workflow_id" -> PGStringOptional(_.workflow_id),
"application_id" -> PGStringOptional(_.application_id),
"submitters" -> PGStringArrayOptional(_.submitters),
"contract_id" -> PGString(_.contract_id),
"template_id" -> PGString(_.template_id.orNull),
"template_id" -> PGStringOptional(_.template_id),
"tree_event_witnesses" -> PGStringArray(_.tree_event_witnesses),
"create_argument" -> PGBytea(_.create_argument.orNull),
"create_argument" -> PGByteaOptional(_.create_argument),
"event_sequential_id" -> PGBigint(_.event_sequential_id),
"create_argument_compression" -> PGSmallintOptional(_.create_argument_compression),
)

val eventsCreate: PGTable[DbDto.EventCreate] = PGTable("participant_events_create")(
"event_offset" -> PGString(_.event_offset.orNull),
"transaction_id" -> PGString(_.transaction_id.orNull),
"ledger_effective_time" -> PGTimestamp(_.ledger_effective_time.orNull),
"command_id" -> PGString(_.command_id.orNull),
"workflow_id" -> PGString(_.workflow_id.orNull),
"application_id" -> PGString(_.application_id.orNull),
"submitters" -> PGStringArray(_.submitters.orNull),
"event_offset" -> PGStringOptional(_.event_offset),
"transaction_id" -> PGStringOptional(_.transaction_id),
"ledger_effective_time" -> PGTimestampOptional(_.ledger_effective_time),
"command_id" -> PGStringOptional(_.command_id),
"workflow_id" -> PGStringOptional(_.workflow_id),
"application_id" -> PGStringOptional(_.application_id),
"submitters" -> PGStringArrayOptional(_.submitters),
"node_index" -> PGIntOptional(_.node_index),
"event_id" -> PGString(_.event_id.orNull),
"event_id" -> PGStringOptional(_.event_id),
"contract_id" -> PGString(_.contract_id),
"template_id" -> PGString(_.template_id.orNull),
"template_id" -> PGStringOptional(_.template_id),
"flat_event_witnesses" -> PGStringArray(_.flat_event_witnesses),
"tree_event_witnesses" -> PGStringArray(_.tree_event_witnesses),
"create_argument" -> PGBytea(_.create_argument.orNull),
"create_signatories" -> PGStringArray(_.create_signatories.orNull),
"create_observers" -> PGStringArray(_.create_observers.orNull),
"create_agreement_text" -> PGString(_.create_agreement_text.orNull),
"create_key_value" -> PGBytea(_.create_key_value.orNull),
"create_key_hash" -> PGString(_.create_key_hash.orNull),
"create_argument" -> PGByteaOptional(_.create_argument),
"create_signatories" -> PGStringArrayOptional(_.create_signatories),
"create_observers" -> PGStringArrayOptional(_.create_observers),
"create_agreement_text" -> PGStringOptional(_.create_agreement_text),
"create_key_value" -> PGByteaOptional(_.create_key_value),
"create_key_hash" -> PGStringOptional(_.create_key_hash),
"event_sequential_id" -> PGBigint(_.event_sequential_id),
"create_argument_compression" -> PGSmallintOptional(_.create_argument_compression),
"create_key_value_compression" -> PGSmallintOptional(_.create_key_value_compression),
)

val exerciseFields: Vector[(String, PGField[DbDto.EventExercise, _, _])] =
Vector[(String, PGField[DbDto.EventExercise, _, _])](
"event_id" -> PGString(_.event_id.orNull),
"event_offset" -> PGString(_.event_offset.orNull),
"event_id" -> PGStringOptional(_.event_id),
"event_offset" -> PGStringOptional(_.event_offset),
"contract_id" -> PGString(_.contract_id),
"transaction_id" -> PGString(_.transaction_id.orNull),
"ledger_effective_time" -> PGTimestamp(_.ledger_effective_time.orNull),
"transaction_id" -> PGStringOptional(_.transaction_id),
"ledger_effective_time" -> PGTimestampOptional(_.ledger_effective_time),
"node_index" -> PGIntOptional(_.node_index),
"command_id" -> PGString(_.command_id.orNull),
"workflow_id" -> PGString(_.workflow_id.orNull),
"application_id" -> PGString(_.application_id.orNull),
"submitters" -> PGStringArray(_.submitters.orNull),
"create_key_value" -> PGBytea(_.create_key_value.orNull),
"exercise_choice" -> PGString(_.exercise_choice.orNull),
"exercise_argument" -> PGBytea(_.exercise_argument.orNull),
"exercise_result" -> PGBytea(_.exercise_result.orNull),
"exercise_actors" -> PGStringArray(_.exercise_actors.orNull),
"exercise_child_event_ids" -> PGStringArray(_.exercise_child_event_ids.orNull),
"template_id" -> PGString(_.template_id.orNull),
"command_id" -> PGStringOptional(_.command_id),
"workflow_id" -> PGStringOptional(_.workflow_id),
"application_id" -> PGStringOptional(_.application_id),
"submitters" -> PGStringArrayOptional(_.submitters),
"create_key_value" -> PGByteaOptional(_.create_key_value),
"exercise_choice" -> PGStringOptional(_.exercise_choice),
"exercise_argument" -> PGByteaOptional(_.exercise_argument),
"exercise_result" -> PGByteaOptional(_.exercise_result),
"exercise_actors" -> PGStringArrayOptional(_.exercise_actors),
"exercise_child_event_ids" -> PGStringArrayOptional(_.exercise_child_event_ids),
"template_id" -> PGStringOptional(_.template_id),
"flat_event_witnesses" -> PGStringArray(_.flat_event_witnesses),
"tree_event_witnesses" -> PGStringArray(_.tree_event_witnesses),
"event_sequential_id" -> PGBigint(_.event_sequential_id),
Expand All @@ -84,15 +84,15 @@ private[postgresql] object PGSchema {
"submission_id" -> PGString(_.submission_id),
"typ" -> PGString(_.typ),
"configuration" -> PGBytea(_.configuration),
"rejection_reason" -> PGString(_.rejection_reason.orNull),
"rejection_reason" -> PGStringOptional(_.rejection_reason),
)

val packageEntries: PGTable[DbDto.PackageEntry] = PGTable("package_entries")(
"ledger_offset" -> PGString(_.ledger_offset),
"recorded_at" -> PGTimestamp(_.recorded_at),
"submission_id" -> PGString(_.submission_id.orNull),
"submission_id" -> PGStringOptional(_.submission_id),
"typ" -> PGString(_.typ),
"rejection_reason" -> PGString(_.rejection_reason.orNull),
"rejection_reason" -> PGStringOptional(_.rejection_reason),
)

val packages: PGTable[DbDto.Package] = PGTable(
Expand All @@ -101,7 +101,7 @@ private[postgresql] object PGSchema {
fields = Vector[(String, PGField[DbDto.Package, _, _])](
"package_id" -> PGString(_.package_id),
"upload_id" -> PGString(_.upload_id),
"source_description" -> PGString(_.source_description.orNull),
"source_description" -> PGStringOptional(_.source_description),
"size" -> PGBigint(_.size),
"known_since" -> PGTimestamp(_.known_since),
"ledger_offset" -> PGString(_.ledger_offset),
Expand All @@ -112,19 +112,19 @@ private[postgresql] object PGSchema {
val partyEntries: PGTable[DbDto.PartyEntry] = PGTable("party_entries")(
"ledger_offset" -> PGString(_.ledger_offset),
"recorded_at" -> PGTimestamp(_.recorded_at),
"submission_id" -> PGString(_.submission_id.orNull),
"party" -> PGString(_.party.orNull),
"display_name" -> PGString(_.display_name.orNull),
"submission_id" -> PGStringOptional(_.submission_id),
"party" -> PGStringOptional(_.party),
"display_name" -> PGStringOptional(_.display_name),
"typ" -> PGString(_.typ),
"rejection_reason" -> PGString(_.rejection_reason.orNull),
"rejection_reason" -> PGStringOptional(_.rejection_reason),
"is_local" -> PGBooleanOptional(_.is_local),
)

val parties: PGTable[DbDto.Party] = PGTable("parties")(
"party" -> PGString(_.party),
"display_name" -> PGString(_.display_name.orNull),
"display_name" -> PGStringOptional(_.display_name),
"explicit" -> PGBoolean(_.explicit),
"ledger_offset" -> PGString(_.ledger_offset.orNull),
"ledger_offset" -> PGStringOptional(_.ledger_offset),
"is_local" -> PGBoolean(_.is_local),
)

Expand All @@ -135,8 +135,8 @@ private[postgresql] object PGSchema {
"application_id" -> PGString(_.application_id),
"submitters" -> PGStringArray(_.submitters),
"command_id" -> PGString(_.command_id),
"transaction_id" -> PGString(_.transaction_id.orNull),
"transaction_id" -> PGStringOptional(_.transaction_id),
"status_code" -> PGIntOptional(_.status_code),
"status_message" -> PGString(_.status_message.orNull),
"status_message" -> PGStringOptional(_.status_message),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ private[postgresql] case class PGTable[FROM](
insertSuffix: String = "",
) {
val insertStatement: String = {
def comaSeparatedOf(extractor: ((String, PGField[FROM, _, _])) => String): String =
def commaSeparatedOf(extractor: ((String, PGField[FROM, _, _])) => String): String =
fields.view
.map(extractor)
.mkString(",")
def inputFieldName: String => String = fieldName => s"${fieldName}_in"
val tableFields = comaSeparatedOf(_._1)
val selectFields = comaSeparatedOf { case (fieldName, field) =>
val tableFields = commaSeparatedOf(_._1)
val selectFields = commaSeparatedOf { case (fieldName, field) =>
field.selectFieldExpression(inputFieldName(fieldName))
}
val unnestFields = comaSeparatedOf(_ => "?")
val inputFields = comaSeparatedOf(fieldDef => inputFieldName(fieldDef._1))
val unnestFields = commaSeparatedOf(_ => "?")
val inputFields = commaSeparatedOf(fieldDef => inputFieldName(fieldDef._1))
s"""
|INSERT INTO $tableName
| ($tableFields)
Expand Down

0 comments on commit 316069d

Please sign in to comment.