diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTableFlatEventsRangeQueries.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTableFlatEventsRangeQueries.scala index 60f8dbefcd45..8fa9861ca75f 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTableFlatEventsRangeQueries.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTableFlatEventsRangeQueries.scala @@ -5,44 +5,16 @@ package com.daml.platform.store.appendonlydao.events import java.sql.Connection -import com.daml.lf.data.Ref.{Identifier => ApiIdentifier} import com.daml.platform.store.backend.EventStorageBackend +import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} private[events] sealed abstract class EventsTableFlatEventsRangeQueries[Offset] { import EventsTableFlatEventsRangeQueries.QueryParts - protected def singleWildcardParty( + protected def query( offset: Offset, - party: Party, - ): QueryParts - - protected def singlePartyWithTemplates( - offset: Offset, - party: Party, - templateIds: Set[ApiIdentifier], - ): QueryParts - - protected def onlyWildcardParties( - offset: Offset, - parties: Set[Party], - ): QueryParts - - protected def sameTemplates( - offset: Offset, - parties: Set[Party], - templateIds: Set[ApiIdentifier], - ): QueryParts - - protected def mixedTemplates( - offset: Offset, - partiesAndTemplateIds: Set[(Party, ApiIdentifier)], - ): QueryParts - - protected def mixedTemplatesWithWildcardParties( - offset: Offset, - wildcardParties: Set[Party], - partiesAndTemplateIds: Set[(Party, ApiIdentifier)], + filterParams: FilterParams, ): QueryParts protected def offsetRange(offset: Offset): EventsRange[Long] @@ -55,55 +27,51 @@ private[events] sealed abstract class EventsTableFlatEventsRangeQueries[Offset] require(filter.nonEmpty, "The request must be issued by at least one party") // Route the request to the correct underlying query - val frqK = if (filter.size == 1) { + val filterParams = if (filter.size == 1) { val (party, templateIds) = filter.iterator.next() if (templateIds.isEmpty) { // Single-party request, no specific template identifier - singleWildcardParty(offset, party) + FilterParams( + wildCardParties = Set(party), + partiesAndTemplates = Set.empty, + ) } else { // Single-party request, restricted to a set of template identifiers - singlePartyWithTemplates(offset, party, templateIds) + FilterParams( + wildCardParties = Set.empty, + partiesAndTemplates = Set(Set(party) -> templateIds), + ) } } else { // Multi-party requests // If no party requests specific template identifiers val parties = filter.keySet if (filter.forall(_._2.isEmpty)) - onlyWildcardParties( - offset = offset, - parties = parties, + FilterParams( + wildCardParties = parties, + partiesAndTemplates = Set.empty, ) else { // If all parties request the same template identifier val templateIds = filter.valuesIterator.flatten.toSet if (filter.valuesIterator.forall(_ == templateIds)) { - sameTemplates( - offset, - parties = parties, - templateIds = templateIds, + FilterParams( + wildCardParties = Set.empty, + partiesAndTemplates = Set(parties -> templateIds), ) } else { - // If there are different template identifier but there are no wildcard parties - val partiesAndTemplateIds = Relation.flatten(filter).toSet - val wildcardParties = filter.filter(_._2.isEmpty).keySet - if (wildcardParties.isEmpty) { - mixedTemplates( - offset, - partiesAndTemplateIds = partiesAndTemplateIds, - ) - } else { - // If there are wildcard parties and different template identifiers - mixedTemplatesWithWildcardParties( - offset, - wildcardParties, - partiesAndTemplateIds, - ) - } + // The generic case: passing down in the same shape, collecting wildCardParties + FilterParams( + wildCardParties = filter.filter(_._2.isEmpty).keySet, + partiesAndTemplates = filter.iterator.collect { + case (party, templateIds) if templateIds.nonEmpty => Set(party) -> templateIds + }.toSet, + ) } } } - frqK match { + query(offset, filterParams) match { case QueryParts.ByArith(read) => EventsRange.readPage( read, @@ -140,96 +108,21 @@ private[events] object EventsTableFlatEventsRangeQueries { storageBackend: EventStorageBackend ) extends EventsTableFlatEventsRangeQueries[EventsRange[Long]] { - override protected def singleWildcardParty( - range: EventsRange[Long], - party: Party, + override protected def query( + offset: EventsRange[Long], + filterParams: FilterParams, ): QueryParts = QueryParts.ByArith((range, limit, fetchSizeHint) => - storageBackend.transactionsEventsSingleWildcardParty( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - party = party, - limit = limit, - fetchSizeHint = fetchSizeHint, - ) - ) - - override protected def singlePartyWithTemplates( - range: EventsRange[Long], - party: Party, - templateIds: Set[ApiIdentifier], - ): QueryParts = - QueryParts.ByArith((range, limit, fetchSizeHint) => - storageBackend.transactionsEventsSinglePartyWithTemplates( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - party = party, - templateIds = templateIds, - limit = limit, - fetchSizeHint = fetchSizeHint, - ) - ) - - override protected def onlyWildcardParties( - range: EventsRange[Long], - parties: Set[Party], - ): QueryParts = - QueryParts.ByArith((range, limit, fetchSizeHint) => - storageBackend.transactionsEventsOnlyWildcardParties( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - parties = parties, - limit = limit, - fetchSizeHint = fetchSizeHint, - ) - ) - - override protected def sameTemplates( - range: EventsRange[Long], - parties: Set[Party], - templateIds: Set[ApiIdentifier], - ): QueryParts = - QueryParts.ByArith((range, limit, fetchSizeHint) => - storageBackend.transactionsEventsSameTemplates( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - parties = parties, - templateIds = templateIds, - limit = limit, - fetchSizeHint = fetchSizeHint, - ) - ) - - override protected def mixedTemplates( - range: EventsRange[Long], - partiesAndTemplateIds: Set[(Party, ApiIdentifier)], - ): QueryParts = - QueryParts.ByArith((range, limit, fetchSizeHint) => - storageBackend.transactionsEventsMixedTemplates( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - partiesAndTemplateIds = partiesAndTemplateIds, - limit = limit, - fetchSizeHint = fetchSizeHint, - ) - ) - - override protected def mixedTemplatesWithWildcardParties( - range: EventsRange[Long], - wildcardParties: Set[Party], - partiesAndTemplateIds: Set[(Party, ApiIdentifier)], - ): QueryParts = { - QueryParts.ByArith((range, limit, fetchSizeHint) => - storageBackend.transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - partiesAndTemplateIds = partiesAndTemplateIds, - wildcardParties = wildcardParties, - limit = limit, - fetchSizeHint = fetchSizeHint, + storageBackend.transactionEvents( + rangeParams = RangeParams( + startExclusive = range.startExclusive, + endInclusive = range.endInclusive, + limit = limit, + fetchSizeHint = fetchSizeHint, + ), + filterParams = filterParams, ) ) - } override protected def offsetRange(offset: EventsRange[Long]) = offset } @@ -238,99 +131,20 @@ private[events] object EventsTableFlatEventsRangeQueries { storageBackend: EventStorageBackend ) extends EventsTableFlatEventsRangeQueries[EventsRange[(Offset, Long)]] { - override protected def singleWildcardParty( - range: EventsRange[(Offset, Long)], - party: Party, - ): QueryParts = - QueryParts.ByLimit(limit => - storageBackend.activeContractsEventsSingleWildcardParty( - startExclusive = range.startExclusive._2, - endInclusiveSeq = range.endInclusive._2, - endInclusiveOffset = range.endInclusive._1, - party = party, - limit = Some(limit), - fetchSizeHint = Some(limit), - ) - ) - - override protected def singlePartyWithTemplates( - range: EventsRange[(Offset, Long)], - party: Party, - templateIds: Set[ApiIdentifier], - ): QueryParts = - QueryParts.ByLimit(limit => - storageBackend.activeContractsEventsSinglePartyWithTemplates( - startExclusive = range.startExclusive._2, - endInclusiveSeq = range.endInclusive._2, - endInclusiveOffset = range.endInclusive._1, - party = party, - templateIds = templateIds, - limit = Some(limit), - fetchSizeHint = Some(limit), - ) - ) - - override def onlyWildcardParties( - range: EventsRange[(Offset, Long)], - parties: Set[Party], - ): QueryParts = - QueryParts.ByLimit(limit => - storageBackend.activeContractsEventsOnlyWildcardParties( - startExclusive = range.startExclusive._2, - endInclusiveSeq = range.endInclusive._2, - endInclusiveOffset = range.endInclusive._1, - parties = parties, - limit = Some(limit), - fetchSizeHint = Some(limit), - ) - ) - - override def sameTemplates( - range: EventsRange[(Offset, Long)], - parties: Set[Party], - templateIds: Set[ApiIdentifier], - ): QueryParts = - QueryParts.ByLimit(limit => - storageBackend.activeContractsEventsSameTemplates( - startExclusive = range.startExclusive._2, - endInclusiveSeq = range.endInclusive._2, - endInclusiveOffset = range.endInclusive._1, - parties = parties, - templateIds = templateIds, - limit = Some(limit), - fetchSizeHint = Some(limit), - ) - ) - - override def mixedTemplates( - range: EventsRange[(Offset, Long)], - partiesAndTemplateIds: Set[(Party, ApiIdentifier)], - ): QueryParts = - QueryParts.ByLimit(limit => - storageBackend.activeContractsEventsMixedTemplates( - startExclusive = range.startExclusive._2, - endInclusiveSeq = range.endInclusive._2, - endInclusiveOffset = range.endInclusive._1, - partiesAndTemplateIds = partiesAndTemplateIds, - limit = Some(limit), - fetchSizeHint = Some(limit), - ) - ) - - override def mixedTemplatesWithWildcardParties( + override protected def query( range: EventsRange[(Offset, Long)], - wildcardParties: Set[Party], - partiesAndTemplateIds: Set[(Party, ApiIdentifier)], + filterParams: FilterParams, ): QueryParts = QueryParts.ByLimit(limit => - storageBackend.activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive = range.startExclusive._2, - endInclusiveSeq = range.endInclusive._2, + storageBackend.activeContractEvents( + rangeParams = RangeParams( + startExclusive = range.startExclusive._2, + endInclusive = range.endInclusive._2, + limit = Some(limit), + fetchSizeHint = Some(limit), + ), + filterParams = filterParams, endInclusiveOffset = range.endInclusive._1, - partiesAndTemplateIds = partiesAndTemplateIds, - wildcardParties = wildcardParties, - limit = Some(limit), - fetchSizeHint = Some(limit), ) ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala index 23cd144eb500..b9c602edd230 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala @@ -24,6 +24,7 @@ import com.daml.metrics._ import com.daml.nameof.NameOf.qualifiedNameOfCurrentFunc import com.daml.platform.ApiOffset import com.daml.platform.store.appendonlydao.{DbDispatcher, PaginatingAsyncStream} +import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} import com.daml.platform.store.backend.StorageBackend import com.daml.platform.store.dao.LedgerDaoTransactionsReader import com.daml.platform.store.dao.events.ContractStateEvent @@ -148,9 +149,12 @@ private[appendonlydao] final class TransactionsReader( )(implicit loggingContext: LoggingContext): Future[Option[GetFlatTransactionResponse]] = dispatcher .executeSql(dbMetrics.lookupFlatTransactionById)( - route(requestingParties)( - single = storageBackend.flatTransactionSingleParty(transactionId, _) _, - multi = storageBackend.flatTransactionMultiParty(transactionId, _) _, + storageBackend.flatTransaction( + transactionId, + FilterParams( + wildCardParties = requestingParties, + partiesAndTemplates = Set.empty, + ), ) ) .flatMap(rawEvents => @@ -181,33 +185,22 @@ private[appendonlydao] final class TransactionsReader( implicit connection: Connection => logger.debug(s"getTransactionTrees query($range)") queryNonPruned.executeSql( - route(requestingParties)( - single = party => - EventsRange.readPage( - read = (range, limit, fetchSizeHint) => - storageBackend.transactionTreeEventsSingleParty( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - requestingParty = party, - limit = limit, - fetchSizeHint = fetchSizeHint, - ), - range = EventsRange(range.startExclusive._2, range.endInclusive._2), - pageSize = pageSize, - ), - multi = parties => - EventsRange.readPage( - read = (range, limit, fetchSizeHint) => - storageBackend.transactionTreeEventsMultiParty( - startExclusive = range.startExclusive, - endInclusive = range.endInclusive, - requestingParties = parties, - limit = limit, - fetchSizeHint = fetchSizeHint, - ), - range = EventsRange(range.startExclusive._2, range.endInclusive._2), - pageSize = pageSize, + EventsRange.readPage( + read = (range, limit, fetchSizeHint) => + storageBackend.transactionTreeEvents( + rangeParams = RangeParams( + startExclusive = range.startExclusive, + endInclusive = range.endInclusive, + limit = limit, + fetchSizeHint = fetchSizeHint, + ), + filterParams = FilterParams( + wildCardParties = requestingParties, + partiesAndTemplates = Set.empty, + ), ), + range = EventsRange(range.startExclusive._2, range.endInclusive._2), + pageSize = pageSize, )(connection), range.startExclusive._1, pruned => @@ -251,9 +244,12 @@ private[appendonlydao] final class TransactionsReader( )(implicit loggingContext: LoggingContext): Future[Option[GetTransactionResponse]] = dispatcher .executeSql(dbMetrics.lookupTransactionTreeById)( - route(requestingParties)( - single = storageBackend.transactionTreeSingleParty(transactionId, _) _, - multi = storageBackend.transactionTreeMultiParty(transactionId, _) _, + storageBackend.transactionTree( + transactionId, + FilterParams( + wildCardParties = requestingParties, + partiesAndTemplates = Set.empty, + ), ) ) .flatMap(rawEvents => diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/package.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/package.scala index 62a2c4465cf2..ca04f28d9136 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/package.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/package.scala @@ -84,20 +84,6 @@ package object events { .fold(Vector.empty[A])(_ :+ _) .concatSubstreams - // Dispatches the call to either function based on the cardinality of the input - // This is mostly designed to route requests to queries specialized for single/multi-party subs - // Callers should ensure that the set is not empty, which in the usage this - // is designed for should be provided by the Ledger API validation layer - def route[A, B]( - set: Set[A] - )(single: A => B, multi: Set[A] => B): B = { - assume(set.nonEmpty, "Empty set, unable to dispatch to single/multi implementation") - set.size match { - case 1 => single(set.iterator.next()) - case n if n > 1 => multi(set) - } - } - def convert(template: Identifier, key: lftx.Node.KeyWithMaintainers[Value]): Key = Key.assertBuild(template, key.key.value) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 0750588bccff..ac9a2c87da5c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -6,6 +6,7 @@ package com.daml.platform.store.backend import java.io.InputStream import java.sql.Connection import java.time.Instant + import com.daml.ledger.{ApplicationId, TransactionId} import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse @@ -17,6 +18,7 @@ import com.daml.lf.ledger.EventId import com.daml.platform import com.daml.platform.store.DbType import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw} +import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent import com.daml.platform.store.backend.h2.H2StorageBackend import com.daml.platform.store.backend.oracle.OracleStorageBackend @@ -192,131 +194,26 @@ trait EventStorageBackend { /** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations */ def pruneEvents(pruneUpToInclusive: Offset)(connection: Connection): Unit - def transactionsEventsSingleWildcardParty( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def transactionsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def transactionsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def transactionsEventsSameTemplates( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def transactionsEventsMixedTemplates( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def activeContractsEventsSingleWildcardParty( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def activeContractsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], + def transactionEvents( + rangeParams: RangeParams, + filterParams: FilterParams, )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def activeContractsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, + def activeContractEvents( + rangeParams: RangeParams, + filterParams: FilterParams, endInclusiveOffset: Offset, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def activeContractsEventsSameTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def activeContractsEventsMixedTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def flatTransactionSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def flatTransactionMultiParty( + def flatTransaction( transactionId: TransactionId, - requestingParties: Set[Ref.Party], + filterParams: FilterParams, )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] - def transactionTreeSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, + def transactionTreeEvents( + rangeParams: RangeParams, + filterParams: FilterParams, )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] - def transactionTreeMultiParty( + def transactionTree( transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] - def transactionTreeEventsSingleParty( - startExclusive: Long, - endInclusive: Long, - requestingParty: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] - def transactionTreeEventsMultiParty( - startExclusive: Long, - endInclusive: Long, - requestingParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], + filterParams: FilterParams, )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] def rawEvents(startExclusive: Long, endInclusive: Long)( @@ -324,6 +221,20 @@ trait EventStorageBackend { ): Vector[RawTransactionEvent] } +object EventStorageBackend { + case class RangeParams( + startExclusive: Long, + endInclusive: Long, + limit: Option[Int], + fetchSizeHint: Option[Int], + ) + + case class FilterParams( + wildCardParties: Set[Ref.Party], + partiesAndTemplates: Set[(Set[Ref.Party], Set[Ref.Identifier])], + ) +} + object StorageBackend { case class Params(ledgerEnd: Offset, eventSeqId: Long) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala index 608ea2765cda..fc49c71abf25 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala @@ -949,8 +949,8 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_ FROM participant_events WHERE - event_sequential_id > #$startExclusive - and event_sequential_id <= #$endInclusive + event_sequential_id > $startExclusive + and event_sequential_id <= $endInclusive and event_kind != 0 ORDER BY event_sequential_id ASC""" .asVectorOf(rawTransactionEventParser)(connection) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala new file mode 100644 index 000000000000..a4aef662f7d0 --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala @@ -0,0 +1,516 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.backend.common + +import java.io.InputStream +import java.sql.Connection +import java.time.Instant + +import anorm.SqlParser.{array, binaryStream, bool, int, long, str} +import anorm.{NamedParameter, RowParser, SQL, ~} +import com.daml.ledger.TransactionId +import com.daml.ledger.participant.state.v1.Offset +import com.daml.lf.data.Ref +import com.daml.platform.store.Conversions.{identifier, instant, offset} +import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf +import com.daml.platform.store.appendonlydao.events.{EventsTable, Identifier, Raw} +import com.daml.platform.store.backend.EventStorageBackend +import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} + +import scala.collection.compat.immutable.ArraySeq + +trait EventStorageBackendTemplate extends EventStorageBackend { + import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray + + def eventStrategy: EventStrategy + + private val selectColumnsForFlatTransactions = + Seq( + "event_offset", + "transaction_id", + "node_index", + "event_sequential_id", + "ledger_effective_time", + "workflow_id", + "event_id", + "contract_id", + "template_id", + "create_argument", + "create_argument_compression", + "create_signatories", + "create_observers", + "create_agreement_text", + "create_key_value", + "create_key_value_compression", + ).mkString(", ") + + private type SharedRow = + Offset ~ String ~ Int ~ Long ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~ + Option[String] ~ Array[String] + + private val sharedRow: RowParser[SharedRow] = + offset("event_offset") ~ + str("transaction_id") ~ + int("node_index") ~ + long("event_sequential_id") ~ + str("event_id") ~ + str("contract_id") ~ + instant("ledger_effective_time") ~ + identifier("template_id") ~ + str("command_id").? ~ + str("workflow_id").? ~ + array[String]("event_witnesses") + + private type CreatedEventRow = + SharedRow ~ InputStream ~ Option[Int] ~ Array[String] ~ Array[String] ~ Option[String] ~ + Option[InputStream] ~ Option[Int] + + private val createdEventRow: RowParser[CreatedEventRow] = + sharedRow ~ + binaryStream("create_argument") ~ + int("create_argument_compression").? ~ + array[String]("create_signatories") ~ + array[String]("create_observers") ~ + str("create_agreement_text").? ~ + binaryStream("create_key_value").? ~ + int("create_key_value_compression").? + + private type ExercisedEventRow = + SharedRow ~ Boolean ~ String ~ InputStream ~ Option[Int] ~ Option[InputStream] ~ Option[Int] ~ + Array[String] ~ Array[String] + + private val exercisedEventRow: RowParser[ExercisedEventRow] = { + import com.daml.platform.store.Conversions.bigDecimalColumnToBoolean + sharedRow ~ + bool("exercise_consuming") ~ + str("exercise_choice") ~ + binaryStream("exercise_argument") ~ + int("exercise_argument_compression").? ~ + binaryStream("exercise_result").? ~ + int("exercise_result_compression").? ~ + array[String]("exercise_actors") ~ + array[String]("exercise_child_event_ids") + } + + private type ArchiveEventRow = SharedRow + + private val archivedEventRow: RowParser[ArchiveEventRow] = sharedRow + + private val createdFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Created]] = + createdEventRow map { + case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ + templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createArgumentCompression ~ + createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression => + // ArraySeq.unsafeWrapArray is safe here + // since we get the Array from parsing and don't let it escape anywhere. + EventsTable.Entry( + eventOffset = eventOffset, + transactionId = transactionId, + nodeIndex = nodeIndex, + eventSequentialId = eventSequentialId, + ledgerEffectiveTime = ledgerEffectiveTime, + commandId = commandId.getOrElse(""), + workflowId = workflowId.getOrElse(""), + event = Raw.FlatEvent.Created( + eventId = eventId, + contractId = contractId, + templateId = templateId, + createArgument = createArgument, + createArgumentCompression = createArgumentCompression, + createSignatories = ArraySeq.unsafeWrapArray(createSignatories), + createObservers = ArraySeq.unsafeWrapArray(createObservers), + createAgreementText = createAgreementText, + createKeyValue = createKeyValue, + createKeyValueCompression = createKeyValueCompression, + eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), + ), + ) + } + + private val archivedFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Archived]] = + archivedEventRow map { + case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses => + // ArraySeq.unsafeWrapArray is safe here + // since we get the Array from parsing and don't let it escape anywhere. + EventsTable.Entry( + eventOffset = eventOffset, + transactionId = transactionId, + nodeIndex = nodeIndex, + eventSequentialId = eventSequentialId, + ledgerEffectiveTime = ledgerEffectiveTime, + commandId = commandId.getOrElse(""), + workflowId = workflowId.getOrElse(""), + event = Raw.FlatEvent.Archived( + eventId = eventId, + contractId = contractId, + templateId = templateId, + eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), + ), + ) + } + + private val rawFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent]] = + createdFlatEventParser | archivedFlatEventParser + + private val createdTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Created]] = + createdEventRow map { + case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createArgumentCompression ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression => + // ArraySeq.unsafeWrapArray is safe here + // since we get the Array from parsing and don't let it escape anywhere. + EventsTable.Entry( + eventOffset = eventOffset, + transactionId = transactionId, + nodeIndex = nodeIndex, + eventSequentialId = eventSequentialId, + ledgerEffectiveTime = ledgerEffectiveTime, + commandId = commandId.getOrElse(""), + workflowId = workflowId.getOrElse(""), + event = Raw.TreeEvent.Created( + eventId = eventId, + contractId = contractId, + templateId = templateId, + createArgument = createArgument, + createArgumentCompression = createArgumentCompression, + createSignatories = ArraySeq.unsafeWrapArray(createSignatories), + createObservers = ArraySeq.unsafeWrapArray(createObservers), + createAgreementText = createAgreementText, + createKeyValue = createKeyValue, + createKeyValueCompression = createKeyValueCompression, + eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), + ), + ) + } + + private val exercisedTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Exercised]] = + exercisedEventRow map { + case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseArgumentCompression ~ exerciseResult ~ exerciseResultCompression ~ exerciseActors ~ exerciseChildEventIds => + // ArraySeq.unsafeWrapArray is safe here + // since we get the Array from parsing and don't let it escape anywhere. + EventsTable.Entry( + eventOffset = eventOffset, + transactionId = transactionId, + nodeIndex = nodeIndex, + eventSequentialId = eventSequentialId, + ledgerEffectiveTime = ledgerEffectiveTime, + commandId = commandId.getOrElse(""), + workflowId = workflowId.getOrElse(""), + event = Raw.TreeEvent.Exercised( + eventId = eventId, + contractId = contractId, + templateId = templateId, + exerciseConsuming = exerciseConsuming, + exerciseChoice = exerciseChoice, + exerciseArgument = exerciseArgument, + exerciseArgumentCompression = exerciseArgumentCompression, + exerciseResult = exerciseResult, + exerciseResultCompression = exerciseResultCompression, + exerciseActors = ArraySeq.unsafeWrapArray(exerciseActors), + exerciseChildEventIds = ArraySeq.unsafeWrapArray(exerciseChildEventIds), + eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), + ), + ) + } + + private val rawTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent]] = + createdTreeEventParser | exercisedTreeEventParser + + private val selectColumnsForTransactionTree = Seq( + "event_offset", + "transaction_id", + "node_index", + "event_sequential_id", + "participant_events.event_id", + "contract_id", + "ledger_effective_time", + "template_id", + "workflow_id", + "create_argument", + "create_argument_compression", + "create_signatories", + "create_observers", + "create_agreement_text", + "create_key_value", + "create_key_value_compression", + "exercise_choice", + "exercise_argument", + "exercise_argument_compression", + "exercise_result", + "exercise_result_compression", + "exercise_actors", + "exercise_child_event_ids", + ).mkString(", ") + + private def events[T]( + columnPrefix: String, + joinClause: String, + additionalAndClause: (String, List[NamedParameter]), + rowParser: RowParser[T], + selectColumns: String, + witnessesColumn: String, + )( + limit: Option[Int], + fetchSizeHint: Option[Int], + filterParams: FilterParams, + )(connection: Connection): Vector[T] = { + val parties = filterParams.wildCardParties ++ filterParams.partiesAndTemplates.flatMap(_._1) + val (filteredEventWitnessesClause, filteredEventWitnessesParams) = + eventStrategy.filteredEventWitnessesClause( + witnessesColumnName = witnessesColumn, + parties = parties, + ) + val (submittersArePartiesClause, submittersArePartiesParams) = + eventStrategy.submittersArePartiesClause( + submittersColumnName = "submitters", + parties = parties, + ) + val (witnessesWhereClause, witnessesWhereParams) = eventStrategy.witnessesWhereClause( + witnessesColumnName = witnessesColumn, + filterParams = filterParams, + ) + val (limitClause, limitParams) = eventStrategy.limitClause(limit) + val sql = + s"""SELECT + | $selectColumns, $filteredEventWitnessesClause as event_witnesses, + | case when $submittersArePartiesClause then command_id else '' end as command_id + |FROM + | participant_events $columnPrefix $joinClause + |WHERE + |${additionalAndClause._1} + | $witnessesWhereClause + |ORDER BY event_sequential_id + |$limitClause + |""".stripMargin + SQL(sql) + .on(additionalAndClause._2: _*) + .on(filteredEventWitnessesParams: _*) + .on(submittersArePartiesParams: _*) + .on(witnessesWhereParams: _*) + .on(limitParams: _*) + .withFetchSize(fetchSizeHint) + .asVectorOf(rowParser)(connection) + } + + override def activeContractEvents( + rangeParams: RangeParams, + filterParams: FilterParams, + endInclusiveOffset: Offset, + )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { + import com.daml.platform.store.Conversions.OffsetToStatement + events( + columnPrefix = "active_cs", + joinClause = "", + additionalAndClause = ( + """ event_sequential_id > {startExclusive} AND + | event_sequential_id <= {endInclusiveSeq} AND + | active_cs.event_kind = 10 AND -- create + | NOT EXISTS ( + | SELECT 1 + | FROM participant_events archived_cs + | WHERE + | archived_cs.contract_id = active_cs.contract_id AND + | archived_cs.event_kind = 20 AND -- consuming + | archived_cs.event_offset <= {endInclusiveOffset} + | ) AND""".stripMargin, + List( + "startExclusive" -> rangeParams.startExclusive, + "endInclusiveSeq" -> rangeParams.endInclusive, + "endInclusiveOffset" -> endInclusiveOffset, + ), + ), + rowParser = rawFlatEventParser, + selectColumns = selectColumnsForFlatTransactions, + witnessesColumn = "flat_event_witnesses", + )( + limit = rangeParams.limit, + fetchSizeHint = rangeParams.fetchSizeHint, + filterParams, + )(connection) + } + + override def transactionEvents( + rangeParams: RangeParams, + filterParams: FilterParams, + )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { + events( + columnPrefix = "", + joinClause = "", + additionalAndClause = ( + """ event_sequential_id > {startExclusive} AND + | event_sequential_id <= {endInclusiveSeq} AND""".stripMargin, + List( + "startExclusive" -> rangeParams.startExclusive, + "endInclusiveSeq" -> rangeParams.endInclusive, + ), + ), + rowParser = rawFlatEventParser, + selectColumns = selectColumnsForFlatTransactions, + witnessesColumn = "flat_event_witnesses", + )( + limit = rangeParams.limit, + fetchSizeHint = rangeParams.fetchSizeHint, + filterParams, + )(connection) + } + + override def flatTransaction( + transactionId: TransactionId, + filterParams: FilterParams, + )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { + import com.daml.platform.store.Conversions.ledgerStringToStatement + events( + columnPrefix = "", + joinClause = """JOIN parameters ON + | (participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive) + | AND event_offset <= ledger_end""".stripMargin, + additionalAndClause = ( + """ transaction_id = {transactionId} AND + | event_kind != 0 AND -- we do not want to fetch divulgence events""".stripMargin, + List( + "transactionId" -> transactionId + ), + ), + rowParser = rawFlatEventParser, + selectColumns = selectColumnsForFlatTransactions, + witnessesColumn = "flat_event_witnesses", + )( + limit = None, + fetchSizeHint = None, + filterParams = filterParams, + )(connection) + } + + override def transactionTreeEvents( + rangeParams: RangeParams, + filterParams: FilterParams, + )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = { + events( + columnPrefix = "", + joinClause = "", + additionalAndClause = ( + """ event_sequential_id > {startExclusive} AND + | event_sequential_id <= {endInclusiveSeq} AND + | event_kind != 0 AND -- we do not want to fetch divulgence events""".stripMargin, + List( + "startExclusive" -> rangeParams.startExclusive, + "endInclusiveSeq" -> rangeParams.endInclusive, + ), + ), + rowParser = rawTreeEventParser, + selectColumns = + s"$selectColumnsForTransactionTree, ${eventStrategy.columnEqualityBoolean("event_kind", "20")} as exercise_consuming", + witnessesColumn = "tree_event_witnesses", + )( + limit = rangeParams.limit, + fetchSizeHint = rangeParams.fetchSizeHint, + filterParams, + )(connection) + } + + override def transactionTree( + transactionId: TransactionId, + filterParams: FilterParams, + )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = { + import com.daml.platform.store.Conversions.ledgerStringToStatement + events( + columnPrefix = "", + joinClause = """JOIN parameters ON + | (participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive) + | AND event_offset <= ledger_end""".stripMargin, + additionalAndClause = ( + """ transaction_id = {transactionId} AND + | event_kind != 0 AND -- we do not want to fetch divulgence events""".stripMargin, + List( + "transactionId" -> transactionId + ), + ), + rowParser = rawTreeEventParser, + selectColumns = + s"$selectColumnsForTransactionTree, ${eventStrategy.columnEqualityBoolean("event_kind", "20")} as exercise_consuming", + witnessesColumn = "tree_event_witnesses", + )( + limit = None, + fetchSizeHint = None, + filterParams, + )(connection) + } + +} + +/** This encapsulates the moving part as composing various Events queries. + */ +trait EventStrategy { + + /** This populates the following part of the query: + * SELECT ..., [THIS PART] as event_witnesses + * Should boil down to an intersection between the set of the witnesses-column and the parties. + * + * @param witnessesColumnName name of the witnesses column in the query + * @param parties which is all the parties we are interested in in the resul + * @return a tuple: + * - first part is a plain SQL which fits the query template + * - second part is a list of NamedParameters + * Important note: the NamedParameters should confirm to the generated SQL expression (all names there should match exactly) + * Important note: implementor have to make sure that all used names are unique here in the scope of the whole query + */ + def filteredEventWitnessesClause( + witnessesColumnName: String, + parties: Set[Ref.Party], + ): (String, List[NamedParameter]) + + /** This populates the following part of the query: + * SELECT ...,case when [THIS PART] then command_id else "" end as command_id + * Should boil down to a do-intersect? query between the submittersColumName column and the parties + * + * @param submittersColumnName name of the Array column holding submitters + * @param parties which is all the parties we are interested in in the resul + * @return a tuple: + * - first part is a plain SQL which fits the query template + * - second part is a list of NamedParameters + * Important note: the NamedParameters should confirm to the generated SQL expression (all names there should match exactly) + * Important note: implementor have to make sure that all used names are unique here in the scope of the whole query + */ + def submittersArePartiesClause( + submittersColumnName: String, + parties: Set[Ref.Party], + ): (String, List[NamedParameter]) + + /** This populates the following part of the query: + * SELECT ... WHERE ... AND [THIS PART] + * This strategy is responsible to generate appropriate SQL cod based on the filterParams, so that results match the criteria + * + * @param witnessesColumnName name of the Array column holding witnesses + * @param filterParams the filtering criteria + * @return a tuple: + * - first part is a plain SQL which fits the query template + * - second part is a list of NamedParameters + * Important note: the NamedParameters should confirm to the generated SQL expression (all names there should match exactly) + * Important note: implementor have to make sure that all used names are unique here in the scope of the whole query + */ + def witnessesWhereClause( + witnessesColumnName: String, + filterParams: FilterParams, + ): (String, List[NamedParameter]) + + /** This populates the following part of the query: + * SELECT ... WHERE ... ORDER BY event_sequential_id [THIS PART] + * + * @param limit optional limit + * @return a tuple: + * - first part is a plain SQL which fits the query template + * - second part is a list of NamedParameters + * Important note: the NamedParameters should confirm to the generated SQL expression (all names there should match exactly) + * Important note: implementor have to make sure that all used names are unique here in the scope of the whole query + */ + def limitClause(limit: Option[Int]): (String, List[NamedParameter]) = + limit + .map(to => s"fetch next {tolc} rows only" -> List[NamedParameter]("tolc" -> to)) + .getOrElse("" -> Nil) + + /** An expression resulting to a boolean, to check equality between two SQL expressions + * + * @return plain SQL which fits the query template + */ + def columnEqualityBoolean(column: String, value: String): String = + s"""$column = $value""" +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/TemplatedStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/TemplatedStorageBackend.scala index cbe80b94c4ae..2733a6d68124 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/TemplatedStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/TemplatedStorageBackend.scala @@ -3,28 +3,22 @@ package com.daml.platform.store.backend.common -import java.io.InputStream import java.sql.Connection import java.time.Instant -import anorm.SqlParser.{array, binaryStream, bool, int, long, str} +import anorm.SqlParser.{binaryStream, int, str} import anorm.{RowParser, SqlParser, SqlStringInterpolation, ~} -import com.daml.ledger.{ApplicationId, TransactionId} +import com.daml.ledger.ApplicationId import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.participant.state.v1.Offset -import com.daml.lf.data.Ref import com.daml.platform.store.CompletionFromTransaction.toApiCheckpoint -import com.daml.platform.store.Conversions.{contractId, identifier, instant, offset} -import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Identifier, Key, Raw} +import com.daml.platform.store.Conversions.{contractId, instant, offset} +import com.daml.platform.store.appendonlydao.events.{ContractId, Key} import com.daml.platform.store.backend.StorageBackend import com.google.rpc.status.Status -import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf - -import scala.collection.compat.immutable.ArraySeq private[backend] object TemplatedStorageBackend { - import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray private val sharedCompletionColumns: RowParser[Offset ~ Instant ~ String] = offset("completion_offset") ~ instant("record_time") ~ str("command_id") @@ -247,724 +241,4 @@ private[backend] object TemplatedStorageBackend { """.as(contractId("contract_id").singleOpt)(connection) } - private val selectColumnsForTransactions = - Seq( - "event_offset", - "transaction_id", - "node_index", - "event_sequential_id", - "ledger_effective_time", - "workflow_id", - "participant_events.event_id", - "contract_id", - "template_id", - "create_argument", - "create_argument_compression", - "create_signatories", - "create_observers", - "create_agreement_text", - "create_key_value", - "create_key_value_compression", - ).mkString(", ") - - private val selectColumnsForACS = - Seq( - "active_cs.event_offset", - "active_cs.transaction_id", - "active_cs.node_index", - "active_cs.event_sequential_id", - "active_cs.ledger_effective_time", - "active_cs.workflow_id", - "active_cs.event_id", - "active_cs.contract_id", - "active_cs.template_id", - "active_cs.create_argument", - "active_cs.create_argument_compression", - "active_cs.create_signatories", - "active_cs.create_observers", - "active_cs.create_agreement_text", - "active_cs.create_key_value", - "active_cs.create_key_value_compression", - ).mkString(", ") - - private type SharedRow = - Offset ~ String ~ Int ~ Long ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~ - Option[String] ~ Array[String] - - private val sharedRow: RowParser[SharedRow] = - offset("event_offset") ~ - str("transaction_id") ~ - int("node_index") ~ - long("event_sequential_id") ~ - str("event_id") ~ - str("contract_id") ~ - instant("ledger_effective_time") ~ - identifier("template_id") ~ - str("command_id").? ~ - str("workflow_id").? ~ - array[String]("event_witnesses") - - private type CreatedEventRow = - SharedRow ~ InputStream ~ Option[Int] ~ Array[String] ~ Array[String] ~ Option[String] ~ - Option[InputStream] ~ Option[Int] - - private val createdEventRow: RowParser[CreatedEventRow] = - sharedRow ~ - binaryStream("create_argument") ~ - int("create_argument_compression").? ~ - array[String]("create_signatories") ~ - array[String]("create_observers") ~ - str("create_agreement_text").? ~ - binaryStream("create_key_value").? ~ - int("create_key_value_compression").? - - private type ExercisedEventRow = - SharedRow ~ Boolean ~ String ~ InputStream ~ Option[Int] ~ Option[InputStream] ~ Option[Int] ~ - Array[String] ~ Array[String] - - private val exercisedEventRow: RowParser[ExercisedEventRow] = { - import com.daml.platform.store.Conversions.bigDecimalColumnToBoolean - sharedRow ~ - bool("exercise_consuming") ~ - str("exercise_choice") ~ - binaryStream("exercise_argument") ~ - int("exercise_argument_compression").? ~ - binaryStream("exercise_result").? ~ - int("exercise_result_compression").? ~ - array[String]("exercise_actors") ~ - array[String]("exercise_child_event_ids") - } - - private type ArchiveEventRow = SharedRow - - private val archivedEventRow: RowParser[ArchiveEventRow] = sharedRow - - private val createdFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Created]] = - createdEventRow map { - case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ - templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createArgumentCompression ~ - createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression => - // ArraySeq.unsafeWrapArray is safe here - // since we get the Array from parsing and don't let it escape anywhere. - EventsTable.Entry( - eventOffset = eventOffset, - transactionId = transactionId, - nodeIndex = nodeIndex, - eventSequentialId = eventSequentialId, - ledgerEffectiveTime = ledgerEffectiveTime, - commandId = commandId.getOrElse(""), - workflowId = workflowId.getOrElse(""), - event = Raw.FlatEvent.Created( - eventId = eventId, - contractId = contractId, - templateId = templateId, - createArgument = createArgument, - createArgumentCompression = createArgumentCompression, - createSignatories = ArraySeq.unsafeWrapArray(createSignatories), - createObservers = ArraySeq.unsafeWrapArray(createObservers), - createAgreementText = createAgreementText, - createKeyValue = createKeyValue, - createKeyValueCompression = createKeyValueCompression, - eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), - ), - ) - } - - private val archivedFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent.Archived]] = - archivedEventRow map { - case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses => - // ArraySeq.unsafeWrapArray is safe here - // since we get the Array from parsing and don't let it escape anywhere. - EventsTable.Entry( - eventOffset = eventOffset, - transactionId = transactionId, - nodeIndex = nodeIndex, - eventSequentialId = eventSequentialId, - ledgerEffectiveTime = ledgerEffectiveTime, - commandId = commandId.getOrElse(""), - workflowId = workflowId.getOrElse(""), - event = Raw.FlatEvent.Archived( - eventId = eventId, - contractId = contractId, - templateId = templateId, - eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), - ), - ) - } - - private val rawFlatEventParser: RowParser[EventsTable.Entry[Raw.FlatEvent]] = - createdFlatEventParser | archivedFlatEventParser - - def transactionsEventsSingleWildcardParty( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - partyArrayContext: (String, String), - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "submitters" - ) - SQL""" - select #$selectColumnsForTransactions, #${partyArrayContext._1}$party#${partyArrayContext._2} as event_witnesses, - case when #$submitterIsPartyClausePrefix$party#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$witnessesWhereClause - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def transactionsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - partyArrayContext: (String, String), - witnessesWhereClause: String, - templateIds: Set[Ref.Identifier], - limitExpr: String, - fetchSizeHint: Option[Int], - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - import com.daml.platform.store.Conversions.IdentifierToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "submitters" - ) - SQL""" - select #$selectColumnsForTransactions, #${partyArrayContext._1}$party#${partyArrayContext._2} as event_witnesses, - case when #$submitterIsPartyClausePrefix$party#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$witnessesWhereClause - and template_id in ($templateIds) - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def transactionsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusive: Long, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - SQL""" - select #$selectColumnsForTransactions, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$witnessesWhereClause - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def transactionsEventsSameTemplates( - startExclusive: Long, - endInclusive: Long, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - witnessesWhereClause: String, - templateIds: Set[Ref.Identifier], - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.IdentifierToStatement - SQL""" - select #$selectColumnsForTransactions, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$witnessesWhereClause - and template_id in ($templateIds) - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def transactionsEventsMixedTemplates( - startExclusive: Long, - endInclusive: Long, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - partiesAndTemplatesCondition: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - SQL""" - select #$selectColumnsForTransactions, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$partiesAndTemplatesCondition - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusive: Long, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - witnessesWhereClause: String, - partiesAndTemplatesCondition: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - SQL""" - select #$selectColumnsForTransactions, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and (#$witnessesWhereClause or #$partiesAndTemplatesCondition) - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def activeContractsEventsSingleWildcardParty( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - partyArrayContext: (String, String), - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - import com.daml.platform.store.Conversions.OffsetToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "active_cs.submitters" - ) - SQL"""select #$selectColumnsForACS, #${partyArrayContext._1}$party#${partyArrayContext._2} as event_witnesses, - case when #$submitterIsPartyClausePrefix$party#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events active_cs - where active_cs.event_kind = 10 -- create - and active_cs.event_sequential_id > $startExclusive - and active_cs.event_sequential_id <= $endInclusiveSeq - and not exists ( - select 1 - from participant_events archived_cs - where - archived_cs.contract_id = active_cs.contract_id and - archived_cs.event_kind = 20 and -- consuming - archived_cs.event_offset <= $endInclusiveOffset - ) - and #$witnessesWhereClause - order by active_cs.event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def activeContractsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - partyArrayContext: (String, String), - templateIds: Set[Ref.Identifier], - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - import com.daml.platform.store.Conversions.OffsetToStatement - import com.daml.platform.store.Conversions.IdentifierToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "active_cs.submitters" - ) - SQL"""select #$selectColumnsForACS, #${partyArrayContext._1}$party#${partyArrayContext._2} as event_witnesses, - case when #$submitterIsPartyClausePrefix$party#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events active_cs - where active_cs.event_kind = 10 -- create - and active_cs.event_sequential_id > $startExclusive - and active_cs.event_sequential_id <= $endInclusiveSeq - and not exists ( - select 1 - from participant_events archived_cs - where - archived_cs.contract_id = active_cs.contract_id and - archived_cs.event_kind = 20 and -- consuming - archived_cs.event_offset <= $endInclusiveOffset - ) - and #$witnessesWhereClause - and active_cs.template_id in ($templateIds) - order by active_cs.event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def activeContractsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.OffsetToStatement - SQL"""select #$selectColumnsForACS, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then active_cs.command_id else '' end as command_id - from participant_events active_cs - where active_cs.event_kind = 10 -- create - and active_cs.event_sequential_id > $startExclusive - and active_cs.event_sequential_id <= $endInclusiveSeq - and not exists ( - select 1 - from participant_events archived_cs - where - archived_cs.contract_id = active_cs.contract_id and - archived_cs.event_kind = 20 and -- consuming - archived_cs.event_offset <= $endInclusiveOffset - ) - and #$witnessesWhereClause - order by active_cs.event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def activeContractsEventsSameTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - templateIds: Set[Ref.Identifier], - filteredWitnessesClause: String, - submittersInPartiesClause: String, - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.OffsetToStatement - import com.daml.platform.store.Conversions.IdentifierToStatement - SQL"""select #$selectColumnsForACS, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then active_cs.command_id else '' end as command_id - from participant_events active_cs - where active_cs.event_kind = 10 -- create - and active_cs.event_sequential_id > $startExclusive - and active_cs.event_sequential_id <= $endInclusiveSeq - and not exists ( - select 1 - from participant_events archived_cs - where - archived_cs.contract_id = active_cs.contract_id and - archived_cs.event_kind = 20 and -- consuming - archived_cs.event_offset <= $endInclusiveOffset - ) - and #$witnessesWhereClause - and active_cs.template_id in ($templateIds) - order by active_cs.event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def activeContractsEventsMixedTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - partiesAndTemplatesCondition: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.OffsetToStatement - SQL"""select #$selectColumnsForACS, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then active_cs.command_id else '' end as command_id - from participant_events active_cs - where active_cs.event_kind = 10 -- create - and active_cs.event_sequential_id > $startExclusive - and active_cs.event_sequential_id <= $endInclusiveSeq - and not exists ( - select 1 - from participant_events archived_cs - where - archived_cs.contract_id = active_cs.contract_id and - archived_cs.event_kind = 20 and -- consuming - archived_cs.event_offset <= $endInclusiveOffset - ) - and #$partiesAndTemplatesCondition - order by active_cs.event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - witnessesWhereClause: String, - partiesAndTemplatesCondition: String, - limitExpr: String, - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.OffsetToStatement - SQL"""select #$selectColumnsForACS, #$filteredWitnessesClause as event_witnesses, - case when #$submittersInPartiesClause then active_cs.command_id else '' end as command_id - from participant_events active_cs - where active_cs.event_kind = 10 -- create - and active_cs.event_sequential_id > $startExclusive - and active_cs.event_sequential_id <= $endInclusiveSeq - and not exists ( - select 1 - from participant_events archived_cs - where - archived_cs.contract_id = active_cs.contract_id and - archived_cs.event_kind = 20 and -- consuming - archived_cs.event_offset <= $endInclusiveOffset - ) - and (#$witnessesWhereClause or #$partiesAndTemplatesCondition) - order by active_cs.event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawFlatEventParser)(connection) - } - - def flatTransactionSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - partyArrayContext: (String, String), - witnessesWhereClause: String, - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - import com.daml.platform.store.Conversions.ledgerStringToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "submitters" - ) - SQL"""select #$selectColumnsForTransactions, #${partyArrayContext._1}$requestingParty#${partyArrayContext._2} as event_witnesses, - case when #$submitterIsPartyClausePrefix$requestingParty#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events - join parameters on - (participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive) - and event_offset <= ledger_end - where transaction_id = $transactionId and #$witnessesWhereClause and event_kind != 0 -- we do not want to fetch divulgence events - order by event_sequential_id""" - .asVectorOf(rawFlatEventParser)(connection) - } - - def flatTransactionMultiParty( - transactionId: TransactionId, - witnessesWhereClause: String, - submittersInPartiesClause: String, - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - import com.daml.platform.store.Conversions.ledgerStringToStatement - SQL"""select #$selectColumnsForTransactions, flat_event_witnesses as event_witnesses, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - join parameters on - (participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive) - and event_offset <= ledger_end - where transaction_id = $transactionId and #$witnessesWhereClause and event_kind != 0 -- we do not want to fetch divulgence events - order by event_sequential_id""" - .asVectorOf(rawFlatEventParser)(connection) - } - - private val createdTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Created]] = - createdEventRow map { - case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ createArgument ~ createArgumentCompression ~ createSignatories ~ createObservers ~ createAgreementText ~ createKeyValue ~ createKeyValueCompression => - // ArraySeq.unsafeWrapArray is safe here - // since we get the Array from parsing and don't let it escape anywhere. - EventsTable.Entry( - eventOffset = eventOffset, - transactionId = transactionId, - nodeIndex = nodeIndex, - eventSequentialId = eventSequentialId, - ledgerEffectiveTime = ledgerEffectiveTime, - commandId = commandId.getOrElse(""), - workflowId = workflowId.getOrElse(""), - event = Raw.TreeEvent.Created( - eventId = eventId, - contractId = contractId, - templateId = templateId, - createArgument = createArgument, - createArgumentCompression = createArgumentCompression, - createSignatories = ArraySeq.unsafeWrapArray(createSignatories), - createObservers = ArraySeq.unsafeWrapArray(createObservers), - createAgreementText = createAgreementText, - createKeyValue = createKeyValue, - createKeyValueCompression = createKeyValueCompression, - eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), - ), - ) - } - - private val exercisedTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent.Exercised]] = - exercisedEventRow map { - case eventOffset ~ transactionId ~ nodeIndex ~ eventSequentialId ~ eventId ~ contractId ~ ledgerEffectiveTime ~ templateId ~ commandId ~ workflowId ~ eventWitnesses ~ exerciseConsuming ~ exerciseChoice ~ exerciseArgument ~ exerciseArgumentCompression ~ exerciseResult ~ exerciseResultCompression ~ exerciseActors ~ exerciseChildEventIds => - // ArraySeq.unsafeWrapArray is safe here - // since we get the Array from parsing and don't let it escape anywhere. - EventsTable.Entry( - eventOffset = eventOffset, - transactionId = transactionId, - nodeIndex = nodeIndex, - eventSequentialId = eventSequentialId, - ledgerEffectiveTime = ledgerEffectiveTime, - commandId = commandId.getOrElse(""), - workflowId = workflowId.getOrElse(""), - event = Raw.TreeEvent.Exercised( - eventId = eventId, - contractId = contractId, - templateId = templateId, - exerciseConsuming = exerciseConsuming, - exerciseChoice = exerciseChoice, - exerciseArgument = exerciseArgument, - exerciseArgumentCompression = exerciseArgumentCompression, - exerciseResult = exerciseResult, - exerciseResultCompression = exerciseResultCompression, - exerciseActors = ArraySeq.unsafeWrapArray(exerciseActors), - exerciseChildEventIds = ArraySeq.unsafeWrapArray(exerciseChildEventIds), - eventWitnesses = ArraySeq.unsafeWrapArray(eventWitnesses), - ), - ) - } - - private val rawTreeEventParser: RowParser[EventsTable.Entry[Raw.TreeEvent]] = - createdTreeEventParser | exercisedTreeEventParser - - private val selectColumnsForTransactionTree = Seq( - "event_offset", - "transaction_id", - "node_index", - "event_sequential_id", - "participant_events.event_id", - "contract_id", - "ledger_effective_time", - "template_id", - "workflow_id", - "create_argument", - "create_argument_compression", - "create_signatories", - "create_observers", - "create_agreement_text", - "create_key_value", - "create_key_value_compression", - "exercise_choice", - "exercise_argument", - "exercise_argument_compression", - "exercise_result", - "exercise_result_compression", - "exercise_actors", - "exercise_child_event_ids", - ).mkString(", ") - - def transactionTreeSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - partyArrayContext: (String, String), - witnessesWhereClause: String, - createEventFilter: String, - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - import com.daml.platform.store.Conversions.ledgerStringToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "submitters" - ) - SQL"""select #$selectColumnsForTransactionTree, #${partyArrayContext._1}$requestingParty#${partyArrayContext._2} as event_witnesses, - #$createEventFilter as exercise_consuming, - case when #$submitterIsPartyClausePrefix$requestingParty#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events - join parameters on - (participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive) - and event_offset <= ledger_end - where transaction_id = $transactionId and #$witnessesWhereClause and event_kind != 0 -- we do not want to fetch divulgence events - order by node_index asc""" - .asVectorOf(rawTreeEventParser)(connection) - } - - def transactionTreeMultiParty( - transactionId: TransactionId, - witnessesWhereClause: String, - submittersInPartiesClause: String, - filteredWitnessesClause: String, - createEventFilter: String, - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = { - import com.daml.platform.store.Conversions.ledgerStringToStatement - SQL"""select #$selectColumnsForTransactionTree, #$filteredWitnessesClause as event_witnesses, - #$createEventFilter as exercise_consuming, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - join parameters on - (participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive) - and event_offset <= ledger_end - where transaction_id = $transactionId and #$witnessesWhereClause and event_kind != 0 -- we do not want to fetch divulgence events - order by node_index asc""" - .asVectorOf(rawTreeEventParser)(connection) - } - - def transactionTreeEventsSingleParty( - startExclusive: Long, - endInclusive: Long, - requestingParty: Ref.Party, - partyArrayContext: (String, String), - witnessesWhereClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - createEventFilter: String, - submitterIsPartyClause: String => (String, String), - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = { - import com.daml.platform.store.Conversions.partyToStatement - val (submitterIsPartyClausePrefix, submitterIsPartyClausePostfix) = submitterIsPartyClause( - "submitters" - ) - SQL""" - select #$selectColumnsForTransactionTree, #${partyArrayContext._1}$requestingParty#${partyArrayContext._2} as event_witnesses, - #$createEventFilter as exercise_consuming, - case when #$submitterIsPartyClausePrefix$requestingParty#$submitterIsPartyClausePostfix then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$witnessesWhereClause - and event_kind != 0 -- we do not want to fetch divulgence events - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawTreeEventParser)(connection) - } - - def transactionTreeEventsMultiParty( - startExclusive: Long, - endInclusive: Long, - witnessesWhereClause: String, - filteredWitnessesClause: String, - submittersInPartiesClause: String, - limitExpr: String, - fetchSizeHint: Option[Int], - createEventFilter: String, - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = { - SQL""" - select #$selectColumnsForTransactionTree, #$filteredWitnessesClause as event_witnesses, - #$createEventFilter as exercise_consuming, - case when #$submittersInPartiesClause then command_id else '' end as command_id - from participant_events - where event_sequential_id > $startExclusive - and event_sequential_id <= $endInclusive - and #$witnessesWhereClause - and event_kind != 0 -- we do not want to fetch divulgence events - order by event_sequential_id #$limitExpr""" - .withFetchSize(fetchSizeHint) - .asVectorOf(rawTreeEventParser)(connection) - } - } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala index b061d283481d..d49e300ade89 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala @@ -6,25 +6,27 @@ package com.daml.platform.store.backend.h2 import java.sql.Connection import java.time.Instant -import anorm.SQL -import anorm.SqlStringInterpolation +import anorm.{NamedParameter, SQL, SqlStringInterpolation} import anorm.SqlParser.get -import com.daml.ledger.{ApplicationId, TransactionId} +import com.daml.ledger.ApplicationId import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.participant.state.v1.Offset import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.Party -import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw} +import com.daml.platform.store.appendonlydao.events.{ContractId, Key} +import com.daml.platform.store.backend.EventStorageBackend.FilterParams import com.daml.platform.store.backend.common.{ AppendOnlySchema, CommonStorageBackend, + EventStorageBackendTemplate, + EventStrategy, TemplatedStorageBackend, } -import com.daml.platform.store.backend.{DbDto, StorageBackend} +import com.daml.platform.store.backend.{DbDto, StorageBackend, common} private[backend] object H2StorageBackend extends StorageBackend[AppendOnlySchema.Batch] - with CommonStorageBackend[AppendOnlySchema.Batch] { + with CommonStorageBackend[AppendOnlySchema.Batch] + with EventStorageBackendTemplate { override def reset(connection: Connection): Unit = { SQL("""set referential_integrity false; @@ -114,349 +116,6 @@ private[backend] object H2StorageBackend key = key, )(connection) - def transactionsEventsSingleWildcardParty( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSingleWildcardParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSinglePartyWithTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", Set(party)), - templateIds = templateIds, - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsOnlyWildcardParties( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def transactionsEventsSameTemplates( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSameTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", parties), - templateIds = templateIds, - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def transactionsEventsMixedTemplates( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.transactionsEventsMixedTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - partiesAndTemplatesCondition = - formatPartiesAndTemplatesWhereClause("flat_event_witnesses", partiesAndTemplateIds), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", wildcardParties), - partiesAndTemplatesCondition = - formatPartiesAndTemplatesWhereClause("flat_event_witnesses", partiesAndTemplateIds), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def activeContractsEventsSingleWildcardParty( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSingleWildcardParty( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def activeContractsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSinglePartyWithTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - party = party, - partyArrayContext = partyArrayContext, - templateIds = templateIds, - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def activeContractsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsOnlyWildcardParties( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def activeContractsEventsSameTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSameTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - templateIds = templateIds, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def activeContractsEventsMixedTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.activeContractsEventsMixedTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - partiesAndTemplatesCondition = formatPartiesAndTemplatesWhereClause( - "active_cs.flat_event_witnesses", - partiesAndTemplateIds, - ), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", wildcardParties), - partiesAndTemplatesCondition = formatPartiesAndTemplatesWhereClause( - "active_cs.flat_event_witnesses", - partiesAndTemplateIds, - ), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def flatTransactionSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.flatTransactionSingleParty( - transactionId = transactionId, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("flat_event_witnesses", Set(requestingParty)), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def flatTransactionMultiParty( - transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.flatTransactionMultiParty( - transactionId = transactionId, - witnessesWhereClause = - arrayIntersectionWhereClause("flat_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - )(connection) - - def transactionTreeSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeSingleParty( - transactionId = transactionId, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", Set(requestingParty)), - createEventFilter = columnEqualityBoolean("event_kind", "20"), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionTreeMultiParty( - transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeMultiParty( - transactionId = transactionId, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - filteredWitnessesClause = arrayIntersectionValues("tree_event_witnesses", requestingParties), - createEventFilter = columnEqualityBoolean("event_kind", "20"), - )(connection) - - def transactionTreeEventsSingleParty( - startExclusive: Long, - endInclusive: Long, - requestingParty: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeEventsSingleParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", Set(requestingParty)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - createEventFilter = columnEqualityBoolean("event_kind", "20"), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionTreeEventsMultiParty( - startExclusive: Long, - endInclusive: Long, - requestingParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeEventsMultiParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", requestingParties), - filteredWitnessesClause = arrayIntersectionValues("tree_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - createEventFilter = columnEqualityBoolean("event_kind", "20"), - )(connection) - // TODO FIXME: this is for postgres not for H2 def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = { import com.daml.platform.store.Conversions.OffsetToStatement @@ -467,35 +126,62 @@ private[backend] object H2StorageBackend .as(get[Long](1).singleOpt)(connection) } - private def format(parties: Set[Party]): String = parties.view.map(p => s"'$p'").mkString(",") + object H2EventStrategy extends EventStrategy { + override def filteredEventWitnessesClause( + witnessesColumnName: String, + parties: Set[Ref.Party], + ): (String, List[NamedParameter]) = + ( + s"array_intersection($witnessesColumnName, {partiesArrayfewc})", + List("partiesArrayfewc" -> parties.view.map(_.toString).toArray), + ) + + override def submittersArePartiesClause( + submittersColumnName: String, + parties: Set[Ref.Party], + ): (String, List[NamedParameter]) = + ( + s"(${arrayIntersectionWhereClause(submittersColumnName, parties)})", + Nil, + ) + + override def witnessesWhereClause( + witnessesColumnName: String, + filterParams: FilterParams, + ): (String, List[NamedParameter]) = { + val (wildCardClause, wildCardParams) = filterParams.wildCardParties match { + case wildCardParties if wildCardParties.isEmpty => (Nil, Nil) + case wildCardParties => + ( + List(s"(${arrayIntersectionWhereClause(witnessesColumnName, wildCardParties)})"), + Nil, + ) + } + val (partiesTemplatesClauses, partiesTemplatesParams) = + filterParams.partiesAndTemplates.iterator.zipWithIndex + .map { case ((parties, templateIds), index) => + ( + s"( (${arrayIntersectionWhereClause(witnessesColumnName, parties)}) AND (template_id = ANY({templateIdsArraywwc$index})) )", + List[NamedParameter]( + s"templateIdsArraywwc$index" -> templateIds.view.map(_.toString).toArray + ), + ) + } + .toList + .unzip + ( + (wildCardClause ::: partiesTemplatesClauses).mkString("(", " OR ", ")"), + wildCardParams ::: partiesTemplatesParams.flatten, + ) + } + } - // TODO append-only: this seems to be the same for all db backends, let's unify - private def limitClause(to: Option[Int]): String = - to.map(to => s"fetch next $to rows only").getOrElse("") + override def eventStrategy: common.EventStrategy = H2EventStrategy + // TODO append-only: remove as part of ContractStorageBackend consolidation, use the data-driven one private def arrayIntersectionWhereClause(arrayColumn: String, parties: Set[Ref.Party]): String = if (parties.isEmpty) "false" else parties.view.map(p => s"array_contains($arrayColumn, '$p')").mkString("(", " or ", ")") - - private def arrayIntersectionValues(arrayColumn: String, parties: Set[Party]): String = - s"array_intersection($arrayColumn, array[${format(parties)}])" - - private def formatPartiesAndTemplatesWhereClause( - witnessesAggregationColumn: String, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - ): String = - partiesAndTemplateIds.view - .map { case (p, i) => - s"(${arrayIntersectionWhereClause(witnessesAggregationColumn, Set(p))} and template_id = '$i')" - } - .mkString("(", " or ", ")") - - private val partyArrayContext = ("array[", "]") - - private def columnEqualityBoolean(column: String, value: String) = s"""$column = $value""" - - private def submittersIsPartyClause(submittersColumnName: String): (String, String) = - (s"$submittersColumnName = array[", "]") } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala index 0020bb4cd9e1..cc9119c58705 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala @@ -4,26 +4,29 @@ package com.daml.platform.store.backend.oracle import anorm.SqlParser.get -import anorm.{SQL, SqlStringInterpolation} +import anorm.{NamedParameter, SQL, SqlStringInterpolation} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.participant.state.v1.Offset -import com.daml.ledger.{ApplicationId, TransactionId} +import com.daml.ledger.ApplicationId import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.Party -import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw} +import com.daml.platform.store.appendonlydao.events.{ContractId, Key} import com.daml.platform.store.backend.common.{ AppendOnlySchema, CommonStorageBackend, + EventStorageBackendTemplate, + EventStrategy, TemplatedStorageBackend, } -import com.daml.platform.store.backend.{DbDto, StorageBackend} - +import com.daml.platform.store.backend.{DbDto, StorageBackend, common} import java.sql.Connection import java.time.Instant +import com.daml.platform.store.backend.EventStorageBackend.FilterParams + private[backend] object OracleStorageBackend extends StorageBackend[AppendOnlySchema.Batch] - with CommonStorageBackend[AppendOnlySchema.Batch] { + with CommonStorageBackend[AppendOnlySchema.Batch] + with EventStorageBackendTemplate { override def reset(connection: Connection): Unit = List( @@ -112,348 +115,83 @@ private[backend] object OracleStorageBackend key = key, )(connection) - def transactionsEventsSingleWildcardParty( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSingleWildcardParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSinglePartyWithTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", Set(party)), - templateIds = templateIds, - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsOnlyWildcardParties( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def transactionsEventsSameTemplates( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSameTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", parties), - templateIds = templateIds, - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def transactionsEventsMixedTemplates( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.transactionsEventsMixedTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - partiesAndTemplatesCondition = - formatPartiesAndTemplatesWhereClause("flat_event_witnesses", partiesAndTemplateIds), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", wildcardParties), - partiesAndTemplatesCondition = - formatPartiesAndTemplatesWhereClause("flat_event_witnesses", partiesAndTemplateIds), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def activeContractsEventsSingleWildcardParty( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSingleWildcardParty( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def activeContractsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSinglePartyWithTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - party = party, - partyArrayContext = partyArrayContext, - templateIds = templateIds, - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) + object OracleEventStrategy extends EventStrategy { - def activeContractsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsOnlyWildcardParties( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) + def arrayIntersectionClause( + columnName: String, + parties: Set[Ref.Party], + paramNamePostfix: String, + ): (String, List[NamedParameter]) = + ( + s"EXISTS (SELECT 1 FROM JSON_TABLE($columnName, '$$[*]' columns (value PATH '$$')) WHERE value IN ({parties$paramNamePostfix}))", + List(s"parties$paramNamePostfix" -> parties.map(_.toString)), + ) - def activeContractsEventsSameTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSameTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - templateIds = templateIds, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) + override def filteredEventWitnessesClause( + witnessesColumnName: String, + parties: Set[Ref.Party], + ): (String, List[NamedParameter]) = + if (parties.size == 1) + ( + "(json_array({singlePartyfewc}))", + List[NamedParameter]("singlePartyfewc" -> parties.head.toString), + ) + else + ( + s"""(select json_arrayagg(value) from (select value + |from json_table($witnessesColumnName, '$$[*]' columns (value PATH '$$')) + |where value IN ({partiesfewc}))) + |""".stripMargin, + List[NamedParameter]("partiesfewc" -> parties.map(_.toString)), + ) + + override def submittersArePartiesClause( + submittersColumnName: String, + parties: Set[Ref.Party], + ): (String, List[NamedParameter]) = { + val (clause, params) = arrayIntersectionClause(submittersColumnName, parties, "sapc") + (s"($clause)", params) + } - def activeContractsEventsMixedTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.activeContractsEventsMixedTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - partiesAndTemplatesCondition = formatPartiesAndTemplatesWhereClause( - "active_cs.flat_event_witnesses", - partiesAndTemplateIds, - ), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } + override def witnessesWhereClause( + witnessesColumnName: String, + filterParams: FilterParams, + ): (String, List[NamedParameter]) = { + val (wildCardClause, wildCardParams) = filterParams.wildCardParties match { + case wildCardParties if wildCardParties.isEmpty => (Nil, Nil) + case wildCardParties => + val (clause, params) = + arrayIntersectionClause(witnessesColumnName, wildCardParties, "wcwwc") + ( + List(s"($clause)"), + params, + ) + } + val (partiesTemplatesClauses, partiesTemplatesParams) = + filterParams.partiesAndTemplates.iterator.zipWithIndex + .map { case ((parties, templateIds), index) => + val (clause, params) = + arrayIntersectionClause(witnessesColumnName, parties, s"ptwwc$index") + ( + s"( ($clause) AND (template_id IN ({templateIdsArraywwc$index})) )", + List[NamedParameter]( + s"templateIdsArraywwc$index" -> templateIds.map(_.toString) + ) ::: params, + ) + } + .toList + .unzip + ( + (wildCardClause ::: partiesTemplatesClauses).mkString("(", " OR ", ")"), + wildCardParams ::: partiesTemplatesParams.flatten, + ) + } - def activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", wildcardParties), - partiesAndTemplatesCondition = formatPartiesAndTemplatesWhereClause( - "active_cs.flat_event_witnesses", - partiesAndTemplateIds, - ), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) + override def columnEqualityBoolean(column: String, value: String): String = + s"""case when ($column = $value) then 1 else 0 end""" } - def flatTransactionSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.flatTransactionSingleParty( - transactionId = transactionId, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("flat_event_witnesses", Set(requestingParty)), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def flatTransactionMultiParty( - transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.flatTransactionMultiParty( - transactionId = transactionId, - witnessesWhereClause = - arrayIntersectionWhereClause("flat_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - )(connection) - - def transactionTreeSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeSingleParty( - transactionId = transactionId, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", Set(requestingParty)), - createEventFilter = columnEqualityBoolean("event_kind", "20"), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionTreeMultiParty( - transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeMultiParty( - transactionId = transactionId, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - filteredWitnessesClause = arrayIntersectionValues("tree_event_witnesses", requestingParties), - createEventFilter = columnEqualityBoolean("event_kind", "20"), - )(connection) - - def transactionTreeEventsSingleParty( - startExclusive: Long, - endInclusive: Long, - requestingParty: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeEventsSingleParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", Set(requestingParty)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - createEventFilter = columnEqualityBoolean("event_kind", "20"), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionTreeEventsMultiParty( - startExclusive: Long, - endInclusive: Long, - requestingParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeEventsMultiParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", requestingParties), - filteredWitnessesClause = arrayIntersectionValues("tree_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - createEventFilter = columnEqualityBoolean("event_kind", "20"), - )(connection) + override def eventStrategy: common.EventStrategy = OracleEventStrategy // TODO FIXME: confirm this works for oracle def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = { @@ -469,6 +207,7 @@ private[backend] object OracleStorageBackend private def limitClause(to: Option[Int]): String = to.map(to => s"fetch next $to rows only").getOrElse("") + // TODO append-only: remove as part of ContractStorageBackend consolidation, use the data-driven one private def arrayIntersectionWhereClause(arrayColumn: String, parties: Set[Ref.Party]): String = if (parties.isEmpty) "false" @@ -498,27 +237,4 @@ private[backend] object OracleStorageBackend .mkString(" OR ") + ")" } - private def arrayIntersectionValues(arrayColumn: String, parties: Set[Party]): String = - s"""(select json_arrayagg(value) from (select value - |from json_table($arrayColumn, '$$[*]' columns (value PATH '$$')) - |where ${parties.map { party => s"value = '$party'" }.mkString(" or ")})) - |""".stripMargin - - private def formatPartiesAndTemplatesWhereClause( - witnessesAggregationColumn: String, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - ): String = - partiesAndTemplateIds.view - .map { case (p, i) => - s"(${arrayIntersectionWhereClause(witnessesAggregationColumn, Set(p))} and template_id = '$i')" - } - .mkString("(", " or ", ")") - - private val partyArrayContext = ("json_array(", ")") - - private def columnEqualityBoolean(column: String, value: String) = - s"""case when ($column = $value) then 1 else 0 end""" - - private def submittersIsPartyClause(submittersColumnName: String): (String, String) = - (s"json_equal(json_query($submittersColumnName, '$$'), json_array(", "))") } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala index 4f57ff7f73c3..71d8b72bd1e3 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala @@ -6,24 +6,27 @@ package com.daml.platform.store.backend.postgresql import java.sql.Connection import java.time.Instant -import anorm.SQL -import anorm.SqlStringInterpolation +import anorm.{NamedParameter, SQL, SqlStringInterpolation} import anorm.SqlParser.get -import com.daml.ledger.{ApplicationId, TransactionId} +import com.daml.ledger.ApplicationId import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.participant.state.v1.Offset import com.daml.lf.data.Ref -import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Party, Raw} +import com.daml.platform.store.appendonlydao.events.{ContractId, Key, Party} +import com.daml.platform.store.backend.EventStorageBackend.FilterParams import com.daml.platform.store.backend.common.{ AppendOnlySchema, CommonStorageBackend, + EventStorageBackendTemplate, + EventStrategy, TemplatedStorageBackend, } -import com.daml.platform.store.backend.{DbDto, StorageBackend} +import com.daml.platform.store.backend.{DbDto, StorageBackend, common} private[backend] object PostgresStorageBackend extends StorageBackend[AppendOnlySchema.Batch] - with CommonStorageBackend[AppendOnlySchema.Batch] { + with CommonStorageBackend[AppendOnlySchema.Batch] + with EventStorageBackendTemplate { override def insertBatch( connection: Connection, @@ -34,7 +37,7 @@ private[backend] object PostgresStorageBackend override def batch(dbDtos: Vector[DbDto]): AppendOnlySchema.Batch = PGSchema.schema.prepareData(dbDtos) - val SQL_INSERT_COMMAND: String = + private val SQL_INSERT_COMMAND: String = """insert into participant_command_submissions as pcs (deduplication_key, deduplicate_until) |values ({deduplicationKey}, {deduplicateUntil}) |on conflict (deduplication_key) @@ -42,7 +45,7 @@ private[backend] object PostgresStorageBackend | set deduplicate_until={deduplicateUntil} | where pcs.deduplicate_until < {submittedAt}""".stripMargin - def upsertDeduplicationEntry( + override def upsertDeduplicationEntry( key: String, submittedAt: Instant, deduplicateUntil: Instant, @@ -55,7 +58,7 @@ private[backend] object PostgresStorageBackend ) .executeUpdate()(connection) - def reset(connection: Connection): Unit = { + override def reset(connection: Connection): Unit = { SQL("""truncate table configuration_entries cascade; |truncate table package_entries cascade; |truncate table parameters cascade; @@ -72,7 +75,7 @@ private[backend] object PostgresStorageBackend () } - def enforceSynchronousCommit(connnection: Connection): Unit = { + override def enforceSynchronousCommit(connnection: Connection): Unit = { val statement = connnection.prepareStatement("SET LOCAL synchronous_commit = 'on'") try { @@ -83,9 +86,9 @@ private[backend] object PostgresStorageBackend } } - val duplicateKeyError: String = "duplicate key" + override val duplicateKeyError: String = "duplicate key" - def commandCompletions( + override def commandCompletions( startExclusive: Offset, endInclusive: Offset, applicationId: ApplicationId, @@ -98,7 +101,7 @@ private[backend] object PostgresStorageBackend submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), )(connection) - def activeContractWithArgument(readers: Set[Ref.Party], contractId: ContractId)( + override def activeContractWithArgument(readers: Set[Ref.Party], contractId: ContractId)( connection: Connection ): Option[StorageBackend.RawContract] = TemplatedStorageBackend.activeContractWithArgument( @@ -106,7 +109,7 @@ private[backend] object PostgresStorageBackend contractId = contractId, )(connection) - def activeContractWithoutArgument(readers: Set[Ref.Party], contractId: ContractId)( + override def activeContractWithoutArgument(readers: Set[Ref.Party], contractId: ContractId)( connection: Connection ): Option[String] = TemplatedStorageBackend.activeContractWithoutArgument( @@ -114,7 +117,7 @@ private[backend] object PostgresStorageBackend contractId = contractId, )(connection) - def contractKey(readers: Set[Ref.Party], key: Key)( + override def contractKey(readers: Set[Ref.Party], key: Key)( connection: Connection ): Option[ContractId] = TemplatedStorageBackend.contractKey( @@ -123,350 +126,68 @@ private[backend] object PostgresStorageBackend key = key, )(connection) - def transactionsEventsSingleWildcardParty( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSingleWildcardParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusive: Long, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSinglePartyWithTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", Set(party)), - templateIds = templateIds, - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsOnlyWildcardParties( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def transactionsEventsSameTemplates( - startExclusive: Long, - endInclusive: Long, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.transactionsEventsSameTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", parties), - templateIds = templateIds, - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def transactionsEventsMixedTemplates( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.transactionsEventsMixedTemplates( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - partiesAndTemplatesCondition = - formatPartiesAndTemplatesWhereClause("flat_event_witnesses", partiesAndTemplateIds), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusive: Long, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.transactionsEventsMixedTemplatesWithWildcardParties( - startExclusive = startExclusive, - endInclusive = endInclusive, - filteredWitnessesClause = arrayIntersectionValues("flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", parties), - witnessesWhereClause = arrayIntersectionWhereClause("flat_event_witnesses", wildcardParties), - partiesAndTemplatesCondition = - formatPartiesAndTemplatesWhereClause("flat_event_witnesses", partiesAndTemplateIds), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } - - def activeContractsEventsSingleWildcardParty( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSingleWildcardParty( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - party = party, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def activeContractsEventsSinglePartyWithTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - party: Ref.Party, - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSinglePartyWithTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - party = party, - partyArrayContext = partyArrayContext, - templateIds = templateIds, - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", Set(party)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def activeContractsEventsOnlyWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsOnlyWildcardParties( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def activeContractsEventsSameTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - parties: Set[Ref.Party], - templateIds: Set[Ref.Identifier], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.activeContractsEventsSameTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - templateIds = templateIds, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", parties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - - def activeContractsEventsMixedTemplates( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.activeContractsEventsMixedTemplates( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - partiesAndTemplatesCondition = formatPartiesAndTemplatesWhereClause( - "active_cs.flat_event_witnesses", - partiesAndTemplateIds, - ), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) - } + object PostgresEventStrategy extends EventStrategy { + override def filteredEventWitnessesClause( + witnessesColumnName: String, + parties: Set[Party], + ): (String, List[NamedParameter]) = + if (parties.size == 1) + ( + s"array[{singlePartyfewc}]::text[]", + List("singlePartyfewc" -> parties.head.toString), + ) + else + ( + s"array(select unnest($witnessesColumnName) intersect select unnest({partiesArrayfewc}::text[]))", + List("partiesArrayfewc" -> parties.view.map(_.toString).toArray), + ) + + override def submittersArePartiesClause( + submittersColumnName: String, + parties: Set[Party], + ): (String, List[NamedParameter]) = + ( + s"($submittersColumnName::text[] && {wildCardPartiesArraysapc}::text[])", + List("wildCardPartiesArraysapc" -> parties.view.map(_.toString).toArray), + ) - def activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive: Long, - endInclusiveSeq: Long, - endInclusiveOffset: Offset, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - wildcardParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = { - val parties = wildcardParties ++ partiesAndTemplateIds.map(_._1) - TemplatedStorageBackend.activeContractsEventsMixedTemplatesWithWildcardParties( - startExclusive = startExclusive, - endInclusiveSeq = endInclusiveSeq, - endInclusiveOffset = endInclusiveOffset, - filteredWitnessesClause = arrayIntersectionValues("active_cs.flat_event_witnesses", parties), - submittersInPartiesClause = arrayIntersectionWhereClause("active_cs.submitters", parties), - witnessesWhereClause = - arrayIntersectionWhereClause("active_cs.flat_event_witnesses", wildcardParties), - partiesAndTemplatesCondition = formatPartiesAndTemplatesWhereClause( - "active_cs.flat_event_witnesses", - partiesAndTemplateIds, - ), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - )(connection) + override def witnessesWhereClause( + witnessesColumnName: String, + filterParams: FilterParams, + ): (String, List[NamedParameter]) = { + val (wildCardClause, wildCardParams) = filterParams.wildCardParties match { + case wildCardParties if wildCardParties.isEmpty => (Nil, Nil) + case wildCardParties => + ( + List(s"($witnessesColumnName::text[] && {wildCardPartiesArraywwc}::text[])"), + List[NamedParameter]( + "wildCardPartiesArraywwc" -> wildCardParties.view.map(_.toString).toArray + ), + ) + } + val (partiesTemplatesClauses, partiesTemplatesParams) = + filterParams.partiesAndTemplates.iterator.zipWithIndex + .map { case ((parties, templateIds), index) => + ( + s"( ($witnessesColumnName::text[] && {partiesArraywwc$index}::text[]) AND (template_id = ANY({templateIdsArraywwc$index}::text[])) )", + List[NamedParameter]( + s"partiesArraywwc$index" -> parties.view.map(_.toString).toArray, + s"templateIdsArraywwc$index" -> templateIds.view.map(_.toString).toArray, + ), + ) + } + .toList + .unzip + ( + (wildCardClause ::: partiesTemplatesClauses).mkString("(", " OR ", ")"), + wildCardParams ::: partiesTemplatesParams.flatten, + ) + } } - def flatTransactionSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.flatTransactionSingleParty( - transactionId = transactionId, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("flat_event_witnesses", Set(requestingParty)), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def flatTransactionMultiParty( - transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = - TemplatedStorageBackend.flatTransactionMultiParty( - transactionId = transactionId, - witnessesWhereClause = - arrayIntersectionWhereClause("flat_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - )(connection) - - def transactionTreeSingleParty( - transactionId: TransactionId, - requestingParty: Ref.Party, - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeSingleParty( - transactionId = transactionId, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", Set(requestingParty)), - createEventFilter = columnEqualityBoolean("event_kind", "20"), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) - - def transactionTreeMultiParty( - transactionId: TransactionId, - requestingParties: Set[Ref.Party], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeMultiParty( - transactionId = transactionId, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - filteredWitnessesClause = arrayIntersectionValues("tree_event_witnesses", requestingParties), - createEventFilter = columnEqualityBoolean("event_kind", "20"), - )(connection) - - def transactionTreeEventsSingleParty( - startExclusive: Long, - endInclusive: Long, - requestingParty: Ref.Party, - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeEventsSingleParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - requestingParty = requestingParty, - partyArrayContext = partyArrayContext, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", Set(requestingParty)), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - createEventFilter = columnEqualityBoolean("event_kind", "20"), - submitterIsPartyClause = submittersIsPartyClause, - )(connection) + override def eventStrategy: common.EventStrategy = PostgresEventStrategy - def transactionTreeEventsMultiParty( - startExclusive: Long, - endInclusive: Long, - requestingParties: Set[Ref.Party], - limit: Option[Int], - fetchSizeHint: Option[Int], - )(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = - TemplatedStorageBackend.transactionTreeEventsMultiParty( - startExclusive = startExclusive, - endInclusive = endInclusive, - witnessesWhereClause = - arrayIntersectionWhereClause("tree_event_witnesses", requestingParties), - filteredWitnessesClause = arrayIntersectionValues("tree_event_witnesses", requestingParties), - submittersInPartiesClause = arrayIntersectionWhereClause("submitters", requestingParties), - limitExpr = limitClause(limit), - fetchSizeHint = fetchSizeHint, - createEventFilter = columnEqualityBoolean("event_kind", "20"), - )(connection) - - def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = { + override def maxEventSeqIdForOffset(offset: Offset)(connection: Connection): Option[Long] = { import com.daml.platform.store.Conversions.OffsetToStatement // This query could be: "select max(event_sequential_id) from participant_events where event_offset <= ${range.endInclusive}" // however tests using PostgreSQL 12 with tens of millions of events have shown that the index @@ -475,32 +196,10 @@ private[backend] object PostgresStorageBackend .as(get[Long](1).singleOpt)(connection) } + // TODO append-only: remove as part of ContractStorageBackend consolidation private def format(parties: Set[Party]): String = parties.view.map(p => s"'$p'").mkString(",") - // TODO append-only: this seems to be the same for all db backends, let's unify - private def limitClause(to: Option[Int]): String = - to.map(to => s"fetch next $to rows only").getOrElse("") - + // TODO append-only: remove as part of ContractStorageBackend consolidation private def arrayIntersectionWhereClause(arrayColumn: String, parties: Set[Ref.Party]): String = s"$arrayColumn::text[] && array[${format(parties)}]::text[]" - - private def arrayIntersectionValues(arrayColumn: String, parties: Set[Party]): String = - s"array(select unnest($arrayColumn) intersect select unnest(array[${format(parties)}]))" - - private def formatPartiesAndTemplatesWhereClause( - witnessesAggregationColumn: String, - partiesAndTemplateIds: Set[(Ref.Party, Ref.Identifier)], - ): String = - partiesAndTemplateIds.view - .map { case (p, i) => - s"(${arrayIntersectionWhereClause(witnessesAggregationColumn, Set(p))} and template_id = '$i')" - } - .mkString("(", " or ", ")") - - private val partyArrayContext: (String, String) = ("array[", "]::text[]") - - private def columnEqualityBoolean(column: String, value: String) = s"""$column = $value""" - - private def submittersIsPartyClause(submittersColumnName: String): (String, String) = - (s"$submittersColumnName = array[", "]::text[]") }