Skip to content

Commit

Permalink
DPP-553 Consistently use anorm string interpolation (#12266)
Browse files Browse the repository at this point in the history
* Consistently use anorm string interpolation

changelog_begin
changelog_end

* Use list of queries

* Remove redundant braces

* Consistently use OffsetToStatement

* Fix rebase error
  • Loading branch information
rautenrieth-da authored Jan 20, 2022
1 parent f03fa6a commit 072d57a
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,20 @@ package com.daml.platform.store.backend.common
import java.sql.Connection

import anorm.SqlParser.{byteArray, flatten, str}
import anorm.{RowParser, SQL}
import anorm.RowParser
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.platform.store.Conversions.offset
import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf
import com.daml.platform.store.appendonlydao.JdbcLedgerDao.{acceptType, rejectType}
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.ConfigurationStorageBackend
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.entries.ConfigurationEntry

private[backend] class ConfigurationStorageBackendTemplate(ledgerEndCache: LedgerEndCache)
extends ConfigurationStorageBackend {

private val SQL_GET_CONFIGURATION_ENTRIES = SQL(
"""select
| configuration_entries.ledger_offset,
| configuration_entries.recorded_at,
| configuration_entries.submission_id,
| configuration_entries.typ,
| configuration_entries.configuration,
| configuration_entries.rejection_reason
| from
| configuration_entries
| where
| ({startExclusive} is null or ledger_offset>{startExclusive}) and
| ledger_offset <= {endInclusive}
| order by ledger_offset asc
| offset {queryOffset} rows
| fetch next {pageSize} rows only
| """.stripMargin
)

private val SQL_GET_LATEST_CONFIGURATION_ENTRY = SQL(
s"""select
| configuration_entries.ledger_offset,
| configuration_entries.recorded_at,
| configuration_entries.submission_id,
| configuration_entries.typ,
| configuration_entries.configuration,
| configuration_entries.rejection_reason
| from
| configuration_entries
| where
| configuration_entries.typ = '$acceptType' and
| {ledger_end_offset} >= ledger_offset
| order by ledger_offset desc
| fetch next 1 row only""".stripMargin
)

private val configurationEntryParser: RowParser[(Offset, ConfigurationEntry)] =
(offset("ledger_offset") ~
str("typ") ~
Expand Down Expand Up @@ -85,13 +50,30 @@ private[backend] class ConfigurationStorageBackendTemplate(ledgerEndCache: Ledge
})
}

def ledgerConfiguration(connection: Connection): Option[(Offset, Configuration)] =
SQL_GET_LATEST_CONFIGURATION_ENTRY
.on("ledger_end_offset" -> ledgerEndCache()._1.toHexString.toString)
def ledgerConfiguration(connection: Connection): Option[(Offset, Configuration)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerEndOffset = ledgerEndCache()._1
SQL"""
select
configuration_entries.ledger_offset,
configuration_entries.recorded_at,
configuration_entries.submission_id,
configuration_entries.typ,
configuration_entries.configuration,
configuration_entries.rejection_reason
from
configuration_entries
where
configuration_entries.typ = '#$acceptType' and
$ledgerEndOffset >= ledger_offset
order by ledger_offset desc
fetch next 1 row only
"""
.asVectorOf(configurationEntryParser)(connection)
.collectFirst { case (offset, ConfigurationEntry.Accepted(_, configuration)) =>
offset -> configuration
}
}

def configurationEntries(
startExclusive: Offset,
Expand All @@ -100,13 +82,23 @@ private[backend] class ConfigurationStorageBackendTemplate(ledgerEndCache: Ledge
queryOffset: Long,
)(connection: Connection): Vector[(Offset, ConfigurationEntry)] = {
import com.daml.platform.store.Conversions.OffsetToStatement
SQL_GET_CONFIGURATION_ENTRIES
.on(
"startExclusive" -> startExclusive,
"endInclusive" -> endInclusive,
"pageSize" -> pageSize,
"queryOffset" -> queryOffset,
)
SQL"""
select
configuration_entries.ledger_offset,
configuration_entries.recorded_at,
configuration_entries.submission_id,
configuration_entries.typ,
configuration_entries.configuration,
configuration_entries.rejection_reason
from
configuration_entries
where
($startExclusive is null or ledger_offset>$startExclusive) and
ledger_offset <= $endInclusive
order by ledger_offset asc
offset $queryOffset rows
fetch next $pageSize rows only
"""
.asVectorOf(configurationEntryParser)(connection)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,44 @@
package com.daml.platform.store.backend.common

import java.sql.Connection
import anorm.{RowParser, SQL}
import anorm.RowParser
import com.daml.lf.data.Time.Timestamp
import com.daml.platform.store.Conversions.timestampFromMicros
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.DeduplicationStorageBackend

private[backend] trait DeduplicationStorageBackendTemplate extends DeduplicationStorageBackend {

private val SQL_SELECT_COMMAND = SQL("""
|select deduplicate_until
|from participant_command_submissions
|where deduplication_key = {deduplicationKey}
""".stripMargin)

private case class ParsedCommandData(deduplicateUntil: Timestamp)

private val CommandDataParser: RowParser[ParsedCommandData] =
timestampFromMicros("deduplicate_until")
.map(ParsedCommandData)

override def deduplicatedUntil(deduplicationKey: String)(connection: Connection): Timestamp =
SQL_SELECT_COMMAND
.on("deduplicationKey" -> deduplicationKey)
SQL"""
select deduplicate_until
from participant_command_submissions
where deduplication_key = $deduplicationKey
"""
.as(CommandDataParser.single)(connection)
.deduplicateUntil

private val SQL_DELETE_EXPIRED_COMMANDS = SQL("""
|delete from participant_command_submissions
|where deduplicate_until < {currentTime}
""".stripMargin)

override def removeExpiredDeduplicationData(
currentTime: Timestamp
)(connection: Connection): Unit = {
SQL_DELETE_EXPIRED_COMMANDS
.on("currentTime" -> currentTime.micros)
SQL"""
delete from participant_command_submissions
where deduplicate_until < ${currentTime.micros}
"""
.execute()(connection)
()
}

private val SQL_DELETE_COMMAND = SQL("""
|delete from participant_command_submissions
|where deduplication_key = {deduplicationKey}
""".stripMargin)

override def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit = {
SQL_DELETE_COMMAND
.on("deduplicationKey" -> deduplicationKey)
SQL"""
delete from participant_command_submissions
where deduplication_key = $deduplicationKey
"""
.execute()(connection)
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,13 @@ abstract class EventStorageBackendTemplate(
filterParams: FilterParams,
)(connection: Connection): Vector[EventsTable.Entry[Raw.FlatEvent]] = {
import com.daml.platform.store.Conversions.ledgerStringToStatement
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerEndOffset = ledgerEndCache()._1
events(
columnPrefix = "",
joinClause = cSQL"""JOIN parameters ON
(participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive)
AND event_offset <= ${ledgerEndCache()._1.toHexString.toString}""",
AND event_offset <= $ledgerEndOffset""",
additionalAndClause = cSQL"""
transaction_id = $transactionId AND
event_kind != 0 AND -- we do not want to fetch divulgence events""",
Expand Down Expand Up @@ -529,11 +531,13 @@ abstract class EventStorageBackendTemplate(
filterParams: FilterParams,
)(connection: Connection): Vector[EventsTable.Entry[Raw.TreeEvent]] = {
import com.daml.platform.store.Conversions.ledgerStringToStatement
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerEndOffset = ledgerEndCache()._1
events(
columnPrefix = "",
joinClause = cSQL"""JOIN parameters ON
(participant_pruned_up_to_inclusive is null or event_offset > participant_pruned_up_to_inclusive)
AND event_offset <= ${ledgerEndCache()._1.toHexString.toString}""",
AND event_offset <= $ledgerEndOffset""",
additionalAndClause = cSQL"""
transaction_id = $transactionId AND
event_kind != 0 AND -- we do not want to fetch divulgence events""",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,37 @@ package com.daml.platform.store.backend.common

import java.sql.Connection

import anorm.{SQL, SqlQuery}
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.{DbDto, IngestionStorageBackend, ParameterStorageBackend}
import com.daml.platform.store.interning.StringInterning

private[backend] class IngestionStorageBackendTemplate(schema: Schema[DbDto])
extends IngestionStorageBackend[AppendOnlySchema.Batch] {

private val SQL_DELETE_OVERSPILL_ENTRIES: List[SqlQuery] =
List(
SQL("DELETE FROM configuration_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM package_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM packages WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM participant_command_completions WHERE completion_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_divulgence WHERE event_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_create WHERE event_offset > {ledger_offset}"),
SQL("DELETE FROM participant_events_consuming_exercise WHERE event_offset > {ledger_offset}"),
SQL(
"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > {ledger_offset}"
),
SQL("DELETE FROM party_entries WHERE ledger_offset > {ledger_offset}"),
SQL("DELETE FROM string_interning WHERE internal_id > {last_string_interning_id}"),
SQL(
"DELETE FROM participant_events_create_filter WHERE event_sequential_id > {last_event_sequential_id}"
),
)

override def deletePartiallyIngestedData(
ledgerEnd: Option[ParameterStorageBackend.LedgerEnd]
)(connection: Connection): Unit = {
ledgerEnd.foreach { existingLedgerEnd =>
SQL_DELETE_OVERSPILL_ENTRIES.foreach { query =>
import com.daml.platform.store.Conversions.OffsetToStatement
query
.on("ledger_offset" -> existingLedgerEnd.lastOffset)
.on("last_string_interning_id" -> existingLedgerEnd.lastStringInterningId)
.on("last_event_sequential_id" -> existingLedgerEnd.lastEventSeqId)
.execute()(connection)
()
}
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerOffset = existingLedgerEnd.lastOffset
val lastStringInterningId = existingLedgerEnd.lastStringInterningId
val lastEventSequentialId = existingLedgerEnd.lastEventSeqId

List(
SQL"DELETE FROM configuration_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM package_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM packages WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM participant_command_completions WHERE completion_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_divulgence WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_create WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_consuming_exercise WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM party_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM string_interning WHERE internal_id > $lastStringInterningId",
SQL"DELETE FROM participant_events_create_filter WHERE event_sequential_id > $lastEventSequentialId",
).map(_.execute()(connection))

()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

package com.daml.platform.store.backend.common

import anorm.{RowParser, SQL}
import anorm.RowParser

import java.sql.Connection
import anorm.SqlParser.{long, str}
import anorm.~
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.backend.IntegrityStorageBackend

private[backend] object IntegrityStorageBackendTemplate extends IntegrityStorageBackend {
Expand All @@ -23,25 +24,25 @@ private[backend] object IntegrityStorageBackendTemplate extends IntegrityStorage
|SELECT event_sequential_id FROM participant_events_non_consuming_exercise
|""".stripMargin

private val SQL_EVENT_SEQUENTIAL_IDS_SUMMARY = SQL(s"""
|WITH sequential_ids AS ($allSequentialIds)
|SELECT min(event_sequential_id) as min, max(event_sequential_id) as max, count(event_sequential_id) as count
|FROM sequential_ids, parameters
|WHERE event_sequential_id <= parameters.ledger_end_sequential_id
|""".stripMargin)
private val SqlEventSequentialIdsSummary = SQL"""
WITH sequential_ids AS (#$allSequentialIds)
SELECT min(event_sequential_id) as min, max(event_sequential_id) as max, count(event_sequential_id) as count
FROM sequential_ids, parameters
WHERE event_sequential_id <= parameters.ledger_end_sequential_id
"""

// Don't fetch an unbounded number of rows
private val maxReportedDuplicates = 100

private val SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS = SQL(s"""
|WITH sequential_ids AS ($allSequentialIds)
|SELECT event_sequential_id as id, count(*) as count
|FROM sequential_ids, parameters
|WHERE event_sequential_id <= parameters.ledger_end_sequential_id
|GROUP BY event_sequential_id
|HAVING count(*) > 1
|FETCH NEXT $maxReportedDuplicates ROWS ONLY
|""".stripMargin)
private val SqlDuplicateEventSequentialIds = SQL"""
WITH sequential_ids AS (#$allSequentialIds)
SELECT event_sequential_id as id, count(*) as count
FROM sequential_ids, parameters
WHERE event_sequential_id <= parameters.ledger_end_sequential_id
GROUP BY event_sequential_id
HAVING count(*) > 1
FETCH NEXT #$maxReportedDuplicates ROWS ONLY
"""

private val allEventIds: String =
s"""
Expand All @@ -52,15 +53,15 @@ private[backend] object IntegrityStorageBackendTemplate extends IntegrityStorage
|SELECT event_offset, node_index FROM participant_events_non_consuming_exercise
|""".stripMargin

private val SQL_DUPLICATE_OFFSETS = SQL(s"""
|WITH event_ids AS ($allEventIds)
|SELECT event_offset, node_index, count(*) as count
|FROM event_ids, parameters
|WHERE event_offset <= parameters.ledger_end
|GROUP BY event_offset, node_index
|HAVING count(*) > 1
|FETCH NEXT $maxReportedDuplicates ROWS ONLY
|""".stripMargin)
private val SqlDuplicateOffsets = SQL"""
WITH event_ids AS (#$allEventIds)
SELECT event_offset, node_index, count(*) as count
FROM event_ids, parameters
WHERE event_offset <= parameters.ledger_end
GROUP BY event_offset, node_index
HAVING count(*) > 1
FETCH NEXT #$maxReportedDuplicates ROWS ONLY
"""

case class EventSequentialIdsRow(min: Long, max: Long, count: Long)

Expand All @@ -72,11 +73,11 @@ private[backend] object IntegrityStorageBackendTemplate extends IntegrityStorage
}

override def verifyIntegrity()(connection: Connection): Unit = {
val duplicateSeqIds = SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS
val duplicateSeqIds = SqlDuplicateEventSequentialIds
.as(long("id").*)(connection)
val duplicateOffsets = SQL_DUPLICATE_OFFSETS
val duplicateOffsets = SqlDuplicateOffsets
.as(str("event_offset").*)(connection)
val summary = SQL_EVENT_SEQUENTIAL_IDS_SUMMARY
val summary = SqlEventSequentialIdsSummary
.as(eventSequantialIdsParser.single)(connection)

// Verify that there are no duplicate offsets (events with the same offset and node index).
Expand Down
Loading

0 comments on commit 072d57a

Please sign in to comment.