Skip to content

Commit

Permalink
Move select DB logic to JVM [DPP-760] (#11777)
Browse files Browse the repository at this point in the history
* Move select logic which was previously added as select expressions to Scala calulation, and exposing needed data on the queries

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 18, 2021
1 parent 5fb5784 commit 4b59c57
Showing 1 changed file with 81 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ abstract class EventStorageBackendTemplate(
"create_agreement_text",
"create_key_value",
"create_key_value_compression",
"submitters",
)

private val selectColumnsForFlatTransactions =
Expand All @@ -65,7 +66,7 @@ abstract class EventStorageBackendTemplate(

private type SharedRow =
Offset ~ String ~ Int ~ Long ~ String ~ String ~ Timestamp ~ Int ~ Option[String] ~
Option[String] ~ Array[Int]
Option[String] ~ Array[Int] ~ Option[Array[Int]]

private val sharedRow: RowParser[SharedRow] =
offset("event_offset") ~
Expand All @@ -78,7 +79,8 @@ abstract class EventStorageBackendTemplate(
int("template_id") ~
str("command_id").? ~
str("workflow_id").? ~
array[Int]("event_witnesses")
array[Int]("event_witnesses") ~
array[Int]("submitters").?

private type CreatedEventRow =
SharedRow ~ Array[Byte] ~ Option[Int] ~ Array[Int] ~ Array[Int] ~ Option[String] ~
Expand Down Expand Up @@ -115,10 +117,12 @@ abstract class EventStorageBackendTemplate(

private val archivedEventRow: RowParser[ArchiveEventRow] = sharedRow

private val createdFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Created]] =
private def createdFlatEventParser(
allQueryingParties: Set[Int]
): RowParser[EventsTable.Entry[Raw.FlatEvent.Created]] =
createdEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~
templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createArgumentCompression ~
templateId ~ commandId ~ workflowId ~ eventWitnesses ~ submitters ~ createArgument ~ createArgumentCompression ~
createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression =>
// ArraySeq.unsafeWrapArray is safe here
// since we get the Array from parsing and don't let it escape anywhere.
Expand All @@ -128,7 +132,11 @@ abstract class EventStorageBackendTemplate(
nodeIndex = nodeIndex,
eventSequentialId = eventSequentialId,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
commandId = commandId
.filter(commandId =>
commandId != "" && submitters.getOrElse(Array.empty).exists(allQueryingParties)
)
.getOrElse(""),
workflowId = workflowId.getOrElse(""),
event = Raw.FlatEvent.Created(
eventId = eventId,
Expand All @@ -146,15 +154,20 @@ abstract class EventStorageBackendTemplate(
createKeyValue = createKeyValue,
createKeyValueCompression = createKeyValueCompression,
eventWitnesses = ArraySeq.unsafeWrapArray(
eventWitnesses.map(stringInterning.party.unsafe.externalize)
eventWitnesses.view
.filter(allQueryingParties)
.map(stringInterning.party.unsafe.externalize)
.toArray
),
),
)
}

private val archivedFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Archived]] =
private def archivedFlatEventParser(
allQueryingParties: Set[Int]
): RowParser[EventsTable.Entry[Raw.FlatEvent.Archived]] =
archivedEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ submitters =>
// ArraySeq.unsafeWrapArray is safe here
// since we get the Array from parsing and don't let it escape anywhere.
EventsTable.Entry(
Expand All @@ -163,25 +176,36 @@ abstract class EventStorageBackendTemplate(
nodeIndex = nodeIndex,
eventSequentialId = eventSequentialId,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
commandId = commandId
.filter(commandId =>
commandId != "" && submitters.getOrElse(Array.empty).exists(allQueryingParties)
)
.getOrElse(""),
workflowId = workflowId.getOrElse(""),
event = Raw.FlatEvent.Archived(
eventId = eventId,
contractId = contractId,
templateId = stringInterning.templateId.externalize(templateId),
eventWitnesses = ArraySeq.unsafeWrapArray(
eventWitnesses.map(stringInterning.party.unsafe.externalize)
eventWitnesses.view
.filter(allQueryingParties)
.map(stringInterning.party.unsafe.externalize)
.toArray
),
),
)
}

private val rawFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent]] =
createdFlatEventParser | archivedFlatEventParser
private def rawFlatEventParser(
allQueryingParties: Set[Int]
): RowParser[EventsTable.Entry[Raw.FlatEvent]] =
createdFlatEventParser(allQueryingParties) | archivedFlatEventParser(allQueryingParties)

private val createdTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Created]] =
private def createdTreeEventParser(
allQueryingParties: Set[Int]
): RowParser[EventsTable.Entry[Raw.TreeEvent.Created]] =
createdEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createArgumentCompression ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ submitters ~ createArgument ~ createArgumentCompression ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression =>
// ArraySeq.unsafeWrapArray is safe here
// since we get the Array from parsing and don't let it escape anywhere.
EventsTable.Entry(
Expand All @@ -190,7 +214,11 @@ abstract class EventStorageBackendTemplate(
nodeIndex = nodeIndex,
eventSequentialId = eventSequentialId,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
commandId = commandId
.filter(commandId =>
commandId != "" && submitters.getOrElse(Array.empty).exists(allQueryingParties)
)
.getOrElse(""),
workflowId = workflowId.getOrElse(""),
event = Raw.TreeEvent.Created(
eventId = eventId,
Expand All @@ -208,15 +236,20 @@ abstract class EventStorageBackendTemplate(
createKeyValue = createKeyValue,
createKeyValueCompression = createKeyValueCompression,
eventWitnesses = ArraySeq.unsafeWrapArray(
eventWitnesses.map(stringInterning.party.unsafe.externalize)
eventWitnesses.view
.filter(allQueryingParties)
.map(stringInterning.party.unsafe.externalize)
.toArray
),
),
)
}

private val exercisedTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Exercised]] =
private def exercisedTreeEventParser(
allQueryingParties: Set[Int]
): RowParser[EventsTable.Entry[Raw.TreeEvent.Exercised]] =
exercisedEventRow map {
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseArgumentCompression ~ exerciseResult ~ exerciseResultCompression ~ exerciseActors ~ exerciseChildEventIds =>
case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ submitters ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseArgumentCompression ~ exerciseResult ~ exerciseResultCompression ~ exerciseActors ~ exerciseChildEventIds =>
// ArraySeq.unsafeWrapArray is safe here
// since we get the Array from parsing and don't let it escape anywhere.
EventsTable.Entry(
Expand All @@ -225,7 +258,11 @@ abstract class EventStorageBackendTemplate(
nodeIndex = nodeIndex,
eventSequentialId = eventSequentialId,
ledgerEffectiveTime = ledgerEffectiveTime,
commandId = commandId.getOrElse(""),
commandId = commandId
.filter(commandId =>
commandId.nonEmpty && submitters.getOrElse(Array.empty).exists(allQueryingParties)
)
.getOrElse(""),
workflowId = workflowId.getOrElse(""),
event = Raw.TreeEvent.Exercised(
eventId = eventId,
Expand All @@ -242,14 +279,19 @@ abstract class EventStorageBackendTemplate(
),
exerciseChildEventIds = ArraySeq.unsafeWrapArray(exerciseChildEventIds),
eventWitnesses = ArraySeq.unsafeWrapArray(
eventWitnesses.map(stringInterning.party.unsafe.externalize)
eventWitnesses.view
.filter(allQueryingParties)
.map(stringInterning.party.unsafe.externalize)
.toArray
),
),
)
}

private val rawTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent]] =
createdTreeEventParser | exercisedTreeEventParser
private def rawTreeEventParser(
allQueryingParties: Set[Int]
): RowParser[EventsTable.Entry[Raw.TreeEvent]] =
createdTreeEventParser(allQueryingParties) | exercisedTreeEventParser(allQueryingParties)

private val selectColumnsForTransactionTree = Seq(
"event_offset",
Expand All @@ -275,27 +317,30 @@ abstract class EventStorageBackendTemplate(
"exercise_result_compression",
"exercise_actors",
"exercise_child_event_ids",
"submitters",
).mkString(", ")

private def events[T](
columnPrefix: String,
joinClause: CompositeSql,
additionalAndClause: CompositeSql,
rowParser: RowParser[T],
rowParser: Set[Int] => RowParser[T],
selectColumns: String,
witnessesColumn: String,
)(
limit: Option[Int],
fetchSizeHint: Option[Int],
filterParams: FilterParams,
)(connection: Connection): Vector[T] = {
val parties = filterParams.wildCardParties ++ filterParams.partiesAndTemplates.flatMap(_._1)
val parties =
filterParams.wildCardParties.iterator
.++(filterParams.partiesAndTemplates.iterator.flatMap(_._1.iterator))
.map(stringInterning.party.tryInternalize)
.flatMap(_.iterator)
.toSet
SQL"""
SELECT
#$selectColumns, ${eventStrategy
.filteredEventWitnessesClause(witnessesColumn, parties, stringInterning)} as event_witnesses,
case when ${eventStrategy
.submittersArePartiesClause("submitters", parties, stringInterning)} then command_id else '' end as command_id
#$selectColumns, #$witnessesColumn as event_witnesses, command_id
FROM
participant_events #$columnPrefix $joinClause
WHERE
Expand All @@ -304,7 +349,7 @@ abstract class EventStorageBackendTemplate(
ORDER BY event_sequential_id
${queryStrategy.limitClause(limit)}"""
.withFetchSize(fetchSizeHint)
.asVectorOf(rowParser)(connection)
.asVectorOf(rowParser(parties))(connection)
}

override def activeContractEvents(
Expand Down Expand Up @@ -406,14 +451,15 @@ abstract class EventStorageBackendTemplate(
allFilterParties: Set[Ref.Party],
endInclusive: Long,
)(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = {
val eventWitnessesClause =
eventStrategy
.filteredEventWitnessesClause("flat_event_witnesses", allFilterParties, stringInterning)
val allInternedFilterParties = allFilterParties.iterator
.map(stringInterning.party.tryInternalize)
.flatMap(_.iterator)
.toSet
SQL"""
SELECT
#$selectColumnsForACSEvents,
$eventWitnessesClause as event_witnesses,
'' as command_id
flat_event_witnesses as event_witnesses,
'' AS command_id
FROM
participant_events_create create_evs
WHERE
Expand All @@ -428,7 +474,7 @@ abstract class EventStorageBackendTemplate(
ORDER BY
create_evs.event_sequential_id -- deliver in index order
"""
.asVectorOf(rawFlatEventParser)(connection)
.asVectorOf(rawFlatEventParser(allInternedFilterParties))(connection)
}

override def flatTransaction(
Expand Down

0 comments on commit 4b59c57

Please sign in to comment.