From 316069d047e0c550e377e3044529910b6e81708d Mon Sep 17 00:00:00 2001 From: Marton Nagy Date: Thu, 3 Jun 2021 13:34:33 +0200 Subject: [PATCH] Postgres batching refinements (#9898) * Switch from shortcut to support nullable types to explicit Optional PGField-s * Add scaladoc to type parameters of PGField * Typo fix changelog_begin changelog_end --- .../store/backend/postgresql/PGField.scala | 71 +++++++++----- .../store/backend/postgresql/PGSchema.scala | 98 +++++++++---------- .../store/backend/postgresql/PGTable.scala | 10 +- 3 files changed, 102 insertions(+), 77 deletions(-) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGField.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGField.scala index 5ff4308cc597..ece2048cc0e8 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGField.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGField.scala @@ -9,10 +9,13 @@ import java.time.format.DateTimeFormatter import scala.reflect.ClassTag -/** Type parameters TO and CONVERTED need to comply to the following contract: - * - either both of them nullable (AnyRef) - * - or neither of them nullable (CONVERTED AnyVal, TO AnyVal or AnyRef which will be never null) - * This is important, since absence is automatically signaled with null references (and we still would want to support primitives) +/** @tparam FROM is an arbitrary type from which we can extract the data of interest for the particular column + * @tparam TO is the intermediary type of the result of the extraction. + * FROM => TO functionality is intended to be injected at Schema definition time. + * TO is not nullable, should express a clean Scala type + * @tparam CONVERTED is the (possibly primitive) type needed by the JDBC API + * TO => CONVERTED is intended to be injected at PGField definition time. + * CONVERTED might be nullable, primitive, boxed-type, whatever the JDBC API requires */ private[postgresql] sealed abstract class PGField[FROM, TO, CONVERTED](implicit classTag: ClassTag[CONVERTED] @@ -23,12 +26,7 @@ private[postgresql] sealed abstract class PGField[FROM, TO, CONVERTED](implicit final def toArray(input: Vector[FROM]): Array[CONVERTED] = input.view - .map(extract) - .map { - case null => - null.asInstanceOf[CONVERTED] // this is safe if clients comply with the contract above - case notNull => convert(notNull) - } + .map(extract andThen convert) .toArray(classTag) } @@ -37,31 +35,44 @@ private[postgresql] sealed abstract class TrivialPGField[FROM, TO](implicit clas override def convert: TO => TO = identity } -private[postgresql] final case class PGTimestamp[FROM](extract: FROM => Instant) - extends PGField[FROM, Instant, String] { +private[postgresql] sealed trait TrivialOptionalPGField[FROM, TO >: Null <: AnyRef] + extends PGField[FROM, Option[TO], TO] { + override def convert: Option[TO] => TO = _.orNull +} +private[postgresql] sealed trait PGTimestampBase[FROM, TO] extends PGField[FROM, TO, String] { override def selectFieldExpression(inputFieldName: String): String = s"$inputFieldName::timestamp" - override def convert: Instant => String = + private val PGTimestampFormat = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS") + + protected def convertBase: Instant => String = _.atZone(ZoneOffset.UTC).toLocalDateTime - .format(PGTimestamp.PGTimestampFormat) + .format(PGTimestampFormat) } -private[postgresql] object PGTimestamp { - private val PGTimestampFormat = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS") +private[postgresql] final case class PGTimestamp[FROM](extract: FROM => Instant) + extends PGTimestampBase[FROM, Instant] { + override def convert: Instant => String = convertBase +} + +private[postgresql] final case class PGTimestampOptional[FROM](extract: FROM => Option[Instant]) + extends PGTimestampBase[FROM, Option[Instant]] { + override def convert: Option[Instant] => String = _.map(convertBase).orNull } private[postgresql] final case class PGString[FROM](extract: FROM => String) extends TrivialPGField[FROM, String] -private[postgresql] final case class PGStringArray[FROM](extract: FROM => Iterable[String]) - extends PGField[FROM, Iterable[String], String] { +private[postgresql] final case class PGStringOptional[FROM](extract: FROM => Option[String]) + extends TrivialOptionalPGField[FROM, String] + +private[postgresql] sealed trait PGStringArrayBase[FROM, TO] extends PGField[FROM, TO, String] { override def selectFieldExpression(inputFieldName: String): String = s"string_to_array($inputFieldName, '|')" - override def convert: Iterable[String] => String = { in => + protected def convertBase: Iterable[String] => String = { in => assert( in.forall(!_.contains("|")), s"The following input string(s) contain the character '|', which is not expected: ${in.filter(_.contains("|")).mkString(", ")}", @@ -70,12 +81,26 @@ private[postgresql] final case class PGStringArray[FROM](extract: FROM => Iterab } } +private[postgresql] final case class PGStringArray[FROM](extract: FROM => Iterable[String]) + extends PGStringArrayBase[FROM, Iterable[String]] { + override def convert: Iterable[String] => String = convertBase +} + +private[postgresql] final case class PGStringArrayOptional[FROM]( + extract: FROM => Option[Iterable[String]] +) extends PGStringArrayBase[FROM, Option[Iterable[String]]] { + override def convert: Option[Iterable[String]] => String = _.map(convertBase).orNull +} + private[postgresql] final case class PGBytea[FROM](extract: FROM => Array[Byte]) extends TrivialPGField[FROM, Array[Byte]] +private[postgresql] final case class PGByteaOptional[FROM](extract: FROM => Option[Array[Byte]]) + extends TrivialOptionalPGField[FROM, Array[Byte]] + private[postgresql] final case class PGIntOptional[FROM](extract: FROM => Option[Int]) extends PGField[FROM, Option[Int], java.lang.Integer] { - override def convert: Option[Int] => Integer = _.map(x => x: java.lang.Integer).orNull + override def convert: Option[Int] => Integer = _.map(Int.box).orNull } private[postgresql] final case class PGBigint[FROM](extract: FROM => Long) @@ -86,7 +111,7 @@ private[postgresql] final case class PGSmallintOptional[FROM](extract: FROM => O override def selectFieldExpression(inputFieldName: String): String = s"$inputFieldName::smallint" - override def convert: Option[Int] => Integer = _.map(x => x: java.lang.Integer).orNull + override def convert: Option[Int] => Integer = _.map(Int.box).orNull } private[postgresql] final case class PGBoolean[FROM](extract: FROM => Boolean) @@ -94,5 +119,5 @@ private[postgresql] final case class PGBoolean[FROM](extract: FROM => Boolean) private[postgresql] final case class PGBooleanOptional[FROM](extract: FROM => Option[Boolean]) extends PGField[FROM, Option[Boolean], java.lang.Boolean] { - override def convert: Option[Boolean] => lang.Boolean = _.map(x => x: java.lang.Boolean).orNull + override def convert: Option[Boolean] => lang.Boolean = _.map(Boolean.box).orNull } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGSchema.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGSchema.scala index b7f220b08bed..8615b28a4357 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGSchema.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGSchema.scala @@ -7,39 +7,39 @@ import com.daml.platform.store.backend.DbDto private[postgresql] object PGSchema { val eventsDivulgence: PGTable[DbDto.EventDivulgence] = PGTable("participant_events_divulgence")( - "event_offset" -> PGString(_.event_offset.orNull), - "command_id" -> PGString(_.command_id.orNull), - "workflow_id" -> PGString(_.workflow_id.orNull), - "application_id" -> PGString(_.application_id.orNull), - "submitters" -> PGStringArray(_.submitters.orNull), + "event_offset" -> PGStringOptional(_.event_offset), + "command_id" -> PGStringOptional(_.command_id), + "workflow_id" -> PGStringOptional(_.workflow_id), + "application_id" -> PGStringOptional(_.application_id), + "submitters" -> PGStringArrayOptional(_.submitters), "contract_id" -> PGString(_.contract_id), - "template_id" -> PGString(_.template_id.orNull), + "template_id" -> PGStringOptional(_.template_id), "tree_event_witnesses" -> PGStringArray(_.tree_event_witnesses), - "create_argument" -> PGBytea(_.create_argument.orNull), + "create_argument" -> PGByteaOptional(_.create_argument), "event_sequential_id" -> PGBigint(_.event_sequential_id), "create_argument_compression" -> PGSmallintOptional(_.create_argument_compression), ) val eventsCreate: PGTable[DbDto.EventCreate] = PGTable("participant_events_create")( - "event_offset" -> PGString(_.event_offset.orNull), - "transaction_id" -> PGString(_.transaction_id.orNull), - "ledger_effective_time" -> PGTimestamp(_.ledger_effective_time.orNull), - "command_id" -> PGString(_.command_id.orNull), - "workflow_id" -> PGString(_.workflow_id.orNull), - "application_id" -> PGString(_.application_id.orNull), - "submitters" -> PGStringArray(_.submitters.orNull), + "event_offset" -> PGStringOptional(_.event_offset), + "transaction_id" -> PGStringOptional(_.transaction_id), + "ledger_effective_time" -> PGTimestampOptional(_.ledger_effective_time), + "command_id" -> PGStringOptional(_.command_id), + "workflow_id" -> PGStringOptional(_.workflow_id), + "application_id" -> PGStringOptional(_.application_id), + "submitters" -> PGStringArrayOptional(_.submitters), "node_index" -> PGIntOptional(_.node_index), - "event_id" -> PGString(_.event_id.orNull), + "event_id" -> PGStringOptional(_.event_id), "contract_id" -> PGString(_.contract_id), - "template_id" -> PGString(_.template_id.orNull), + "template_id" -> PGStringOptional(_.template_id), "flat_event_witnesses" -> PGStringArray(_.flat_event_witnesses), "tree_event_witnesses" -> PGStringArray(_.tree_event_witnesses), - "create_argument" -> PGBytea(_.create_argument.orNull), - "create_signatories" -> PGStringArray(_.create_signatories.orNull), - "create_observers" -> PGStringArray(_.create_observers.orNull), - "create_agreement_text" -> PGString(_.create_agreement_text.orNull), - "create_key_value" -> PGBytea(_.create_key_value.orNull), - "create_key_hash" -> PGString(_.create_key_hash.orNull), + "create_argument" -> PGByteaOptional(_.create_argument), + "create_signatories" -> PGStringArrayOptional(_.create_signatories), + "create_observers" -> PGStringArrayOptional(_.create_observers), + "create_agreement_text" -> PGStringOptional(_.create_agreement_text), + "create_key_value" -> PGByteaOptional(_.create_key_value), + "create_key_hash" -> PGStringOptional(_.create_key_hash), "event_sequential_id" -> PGBigint(_.event_sequential_id), "create_argument_compression" -> PGSmallintOptional(_.create_argument_compression), "create_key_value_compression" -> PGSmallintOptional(_.create_key_value_compression), @@ -47,23 +47,23 @@ private[postgresql] object PGSchema { val exerciseFields: Vector[(String, PGField[DbDto.EventExercise, _, _])] = Vector[(String, PGField[DbDto.EventExercise, _, _])]( - "event_id" -> PGString(_.event_id.orNull), - "event_offset" -> PGString(_.event_offset.orNull), + "event_id" -> PGStringOptional(_.event_id), + "event_offset" -> PGStringOptional(_.event_offset), "contract_id" -> PGString(_.contract_id), - "transaction_id" -> PGString(_.transaction_id.orNull), - "ledger_effective_time" -> PGTimestamp(_.ledger_effective_time.orNull), + "transaction_id" -> PGStringOptional(_.transaction_id), + "ledger_effective_time" -> PGTimestampOptional(_.ledger_effective_time), "node_index" -> PGIntOptional(_.node_index), - "command_id" -> PGString(_.command_id.orNull), - "workflow_id" -> PGString(_.workflow_id.orNull), - "application_id" -> PGString(_.application_id.orNull), - "submitters" -> PGStringArray(_.submitters.orNull), - "create_key_value" -> PGBytea(_.create_key_value.orNull), - "exercise_choice" -> PGString(_.exercise_choice.orNull), - "exercise_argument" -> PGBytea(_.exercise_argument.orNull), - "exercise_result" -> PGBytea(_.exercise_result.orNull), - "exercise_actors" -> PGStringArray(_.exercise_actors.orNull), - "exercise_child_event_ids" -> PGStringArray(_.exercise_child_event_ids.orNull), - "template_id" -> PGString(_.template_id.orNull), + "command_id" -> PGStringOptional(_.command_id), + "workflow_id" -> PGStringOptional(_.workflow_id), + "application_id" -> PGStringOptional(_.application_id), + "submitters" -> PGStringArrayOptional(_.submitters), + "create_key_value" -> PGByteaOptional(_.create_key_value), + "exercise_choice" -> PGStringOptional(_.exercise_choice), + "exercise_argument" -> PGByteaOptional(_.exercise_argument), + "exercise_result" -> PGByteaOptional(_.exercise_result), + "exercise_actors" -> PGStringArrayOptional(_.exercise_actors), + "exercise_child_event_ids" -> PGStringArrayOptional(_.exercise_child_event_ids), + "template_id" -> PGStringOptional(_.template_id), "flat_event_witnesses" -> PGStringArray(_.flat_event_witnesses), "tree_event_witnesses" -> PGStringArray(_.tree_event_witnesses), "event_sequential_id" -> PGBigint(_.event_sequential_id), @@ -84,15 +84,15 @@ private[postgresql] object PGSchema { "submission_id" -> PGString(_.submission_id), "typ" -> PGString(_.typ), "configuration" -> PGBytea(_.configuration), - "rejection_reason" -> PGString(_.rejection_reason.orNull), + "rejection_reason" -> PGStringOptional(_.rejection_reason), ) val packageEntries: PGTable[DbDto.PackageEntry] = PGTable("package_entries")( "ledger_offset" -> PGString(_.ledger_offset), "recorded_at" -> PGTimestamp(_.recorded_at), - "submission_id" -> PGString(_.submission_id.orNull), + "submission_id" -> PGStringOptional(_.submission_id), "typ" -> PGString(_.typ), - "rejection_reason" -> PGString(_.rejection_reason.orNull), + "rejection_reason" -> PGStringOptional(_.rejection_reason), ) val packages: PGTable[DbDto.Package] = PGTable( @@ -101,7 +101,7 @@ private[postgresql] object PGSchema { fields = Vector[(String, PGField[DbDto.Package, _, _])]( "package_id" -> PGString(_.package_id), "upload_id" -> PGString(_.upload_id), - "source_description" -> PGString(_.source_description.orNull), + "source_description" -> PGStringOptional(_.source_description), "size" -> PGBigint(_.size), "known_since" -> PGTimestamp(_.known_since), "ledger_offset" -> PGString(_.ledger_offset), @@ -112,19 +112,19 @@ private[postgresql] object PGSchema { val partyEntries: PGTable[DbDto.PartyEntry] = PGTable("party_entries")( "ledger_offset" -> PGString(_.ledger_offset), "recorded_at" -> PGTimestamp(_.recorded_at), - "submission_id" -> PGString(_.submission_id.orNull), - "party" -> PGString(_.party.orNull), - "display_name" -> PGString(_.display_name.orNull), + "submission_id" -> PGStringOptional(_.submission_id), + "party" -> PGStringOptional(_.party), + "display_name" -> PGStringOptional(_.display_name), "typ" -> PGString(_.typ), - "rejection_reason" -> PGString(_.rejection_reason.orNull), + "rejection_reason" -> PGStringOptional(_.rejection_reason), "is_local" -> PGBooleanOptional(_.is_local), ) val parties: PGTable[DbDto.Party] = PGTable("parties")( "party" -> PGString(_.party), - "display_name" -> PGString(_.display_name.orNull), + "display_name" -> PGStringOptional(_.display_name), "explicit" -> PGBoolean(_.explicit), - "ledger_offset" -> PGString(_.ledger_offset.orNull), + "ledger_offset" -> PGStringOptional(_.ledger_offset), "is_local" -> PGBoolean(_.is_local), ) @@ -135,8 +135,8 @@ private[postgresql] object PGSchema { "application_id" -> PGString(_.application_id), "submitters" -> PGStringArray(_.submitters), "command_id" -> PGString(_.command_id), - "transaction_id" -> PGString(_.transaction_id.orNull), + "transaction_id" -> PGStringOptional(_.transaction_id), "status_code" -> PGIntOptional(_.status_code), - "status_message" -> PGString(_.status_message.orNull), + "status_message" -> PGStringOptional(_.status_message), ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGTable.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGTable.scala index f5201d7db899..45d9ff3f9b1b 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGTable.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PGTable.scala @@ -11,17 +11,17 @@ private[postgresql] case class PGTable[FROM]( insertSuffix: String = "", ) { val insertStatement: String = { - def comaSeparatedOf(extractor: ((String, PGField[FROM, _, _])) => String): String = + def commaSeparatedOf(extractor: ((String, PGField[FROM, _, _])) => String): String = fields.view .map(extractor) .mkString(",") def inputFieldName: String => String = fieldName => s"${fieldName}_in" - val tableFields = comaSeparatedOf(_._1) - val selectFields = comaSeparatedOf { case (fieldName, field) => + val tableFields = commaSeparatedOf(_._1) + val selectFields = commaSeparatedOf { case (fieldName, field) => field.selectFieldExpression(inputFieldName(fieldName)) } - val unnestFields = comaSeparatedOf(_ => "?") - val inputFields = comaSeparatedOf(fieldDef => inputFieldName(fieldDef._1)) + val unnestFields = commaSeparatedOf(_ => "?") + val inputFields = commaSeparatedOf(fieldDef => inputFieldName(fieldDef._1)) s""" |INSERT INTO $tableName | ($tableFields)