Skip to content

Commit

Permalink
Make ledger end non-optional (#12570)
Browse files Browse the repository at this point in the history
* Make ledger end non-optional

changelog_begin
changelog_end

* Add migrations

* Ledger end can't be not null

... because of Oracle

* Rename migrations

* Improve ledger end parsing
  • Loading branch information
rautenrieth-da authored Jan 26, 2022
1 parent 122a487 commit bc9fa28
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
45bbc1ae3921493c10a8fa5f61540ff242a4f93d9c14c3deb5a86c898873751f
5cfddb9932a7ef4ea3ba7439cf9861110181fdfd97709df0fd21af8a89e9ce76
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ CREATE ALIAS array_intersection FOR "com.daml.platform.store.backend.h2.H2Functi
CREATE TABLE parameters (
ledger_id VARCHAR NOT NULL,
participant_id VARCHAR NOT NULL,
ledger_end VARCHAR,
ledger_end_sequential_id BIGINT,
ledger_end_string_interning_id INTEGER,
ledger_end VARCHAR NOT NULL,
ledger_end_sequential_id BIGINT NOT NULL,
ledger_end_string_interning_id INTEGER NOT NULL,
participant_pruned_up_to_inclusive VARCHAR,
participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e849b962d982296efa4e17e846681beea7ada94815897132eda75fc1e511c29e
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0


-- Note: can't make the ledger_end column NOT NULL, as we need to be able to insert the empty string

UPDATE parameters SET ledger_end_sequential_id = 0 WHERE ledger_end_sequential_id IS NULL;
UPDATE parameters SET ledger_end_string_interning_id = 0 WHERE ledger_end_string_interning_id IS NULL;

ALTER TABLE parameters MODIFY ( ledger_end_sequential_id NOT NULL);
ALTER TABLE parameters MODIFY ( ledger_end_string_interning_id NOT NULL);

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a465f021d192ffc4249ea0dd1da55b53f7d6c014470673e7eb8a4b9ad4eac0a6
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

UPDATE parameters SET ledger_end = '' WHERE ledger_end IS NULL;
UPDATE parameters SET ledger_end_sequential_id = 0 WHERE ledger_end_sequential_id IS NULL;
UPDATE parameters SET ledger_end_string_interning_id = 0 WHERE ledger_end_string_interning_id IS NULL;

ALTER TABLE parameters ALTER COLUMN ledger_end SET NOT NULL;
ALTER TABLE parameters ALTER COLUMN ledger_end_sequential_id SET NOT NULL;
ALTER TABLE parameters ALTER COLUMN ledger_end_string_interning_id SET NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ private[platform] case class InitializeParallelIngestion(
_ <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)(
ingestionStorageBackend.deletePartiallyIngestedData(ledgerEnd)
)
_ <- ledgerEnd match {
case Some(ledgerEnd) => updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
case None => Future.unit
}
ledgerEndOrBeforeBegin = ledgerEnd.getOrElse(ParameterStorageBackend.LedgerEndBeforeBegin)
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
} yield InitializeParallelIngestion.Initialized(
initialEventSeqId = ledgerEndOrBeforeBegin.lastEventSeqId,
initialStringInterningId = ledgerEndOrBeforeBegin.lastStringInterningId,
readServiceSource = readService.stateUpdates(beginAfter = ledgerEnd.map(_.lastOffset)),
initialEventSeqId = ledgerEnd.lastEventSeqId,
initialStringInterningId = ledgerEnd.lastStringInterningId,
readServiceSource = readService.stateUpdates(beginAfter = ledgerEnd.lastOffsetOption),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ object IndexMetadata {
for {
ledgerId <- dao.lookupLedgerId()
participantId <- dao.lookupParticipantId()
ledgerEnd <- dao.lookupInitialLedgerEnd()
ledgerEnd <- ledgerId match {
case Some(_) => dao.lookupLedgerEnd().map(x => Some(x.lastOffset))
case None => Future.successful(None)
}
} yield metadata(ledgerId, participantId, ledgerEnd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,11 @@ private class JdbcLedgerDao(
override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[LedgerEnd] =
dbDispatcher
.executeSql(metrics.daml.index.db.getLedgerEnd)(
parameterStorageBackend.ledgerEndOrBeforeBegin
parameterStorageBackend.ledgerEnd
)

case class InvalidLedgerEnd(msg: String) extends RuntimeException(msg)

override def lookupInitialLedgerEnd()(implicit
loggingContext: LoggingContext
): Future[Option[Offset]] =
dbDispatcher
.executeSql(metrics.daml.index.db.getInitialLedgerEnd)(
parameterStorageBackend.ledgerEnd(_).map(_.lastOffset)
)

override def initialize(
ledgerId: LedgerId,
participantId: ParticipantId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ private[platform] trait LedgerReadDao extends ReportsHealth {
/** Looks up the current ledger end */
def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[LedgerEnd]

/** Looks up the current external ledger end offset */
def lookupInitialLedgerEnd()(implicit loggingContext: LoggingContext): Future[Option[Offset]]

/** Looks up the current ledger configuration, if it has been set. */
def lookupLedgerConfiguration()(implicit
loggingContext: LoggingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ private[platform] class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics:
override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[LedgerEnd] =
Timed.future(metrics.daml.index.db.lookupLedgerEnd, ledgerDao.lookupLedgerEnd())

override def lookupInitialLedgerEnd()(implicit
loggingContext: LoggingContext
): Future[Option[Offset]] =
Timed.future(metrics.daml.index.db.lookupLedgerEnd, ledgerDao.lookupInitialLedgerEnd())

override def transactionsReader: LedgerDaoTransactionsReader = ledgerDao.transactionsReader

override def contractsReader: LedgerDaoContractsReader = ledgerDao.contractsReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH](

private def lazyInit(connection: Connection): Unit =
if (!lastEventSeqIdInitialized) {
val ledgerEnd = parameterStorageBackend.ledgerEndOrBeforeBegin(connection)
val ledgerEnd = parameterStorageBackend.ledgerEnd(connection)
lastEventSeqId = ledgerEnd.lastEventSeqId
lastStringInterningId = ledgerEnd.lastStringInterningId
lastEventSeqIdInitialized = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ trait IngestionStorageBackend[DB_BATCH] {
* @param ledgerEnd the current ledger end, or None if no ledger end exists
* @param connection to be used when inserting the batch
*/
def deletePartiallyIngestedData(ledgerEnd: Option[ParameterStorageBackend.LedgerEnd])(
def deletePartiallyIngestedData(ledgerEnd: ParameterStorageBackend.LedgerEnd)(
connection: Connection
): Unit
}
Expand All @@ -86,19 +86,9 @@ trait ParameterStorageBackend {
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used to get the LedgerEnd
* @return the current LedgerEnd, or None if no ledger end exists
* @return the current LedgerEnd
*/
def ledgerEnd(connection: Connection): Option[ParameterStorageBackend.LedgerEnd]

/** Query the current ledger end, returning a value that points to a point before the ledger begin
* if no ledger end exists.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
*
* @param connection to be used to get the LedgerEnd
* @return the current LedgerEnd, or a LedgerEnd that points to before the ledger begin if no ledger end exists
*/
final def ledgerEndOrBeforeBegin(connection: Connection): ParameterStorageBackend.LedgerEnd =
ledgerEnd(connection).getOrElse(ParameterStorageBackend.LedgerEndBeforeBegin)
def ledgerEnd(connection: Connection): ParameterStorageBackend.LedgerEnd

/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
*/
Expand Down Expand Up @@ -129,11 +119,15 @@ trait ParameterStorageBackend {
}

object ParameterStorageBackend {
case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long, lastStringInterningId: Int)
case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long, lastStringInterningId: Int) {
def lastOffsetOption: Option[Offset] =
if (lastOffset == Offset.beforeBegin) None else Some(lastOffset)
}
object LedgerEnd {
val beforeBegin: ParameterStorageBackend.LedgerEnd =
ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin, 0)
}
case class IdentityParams(ledgerId: LedgerId, participantId: ParticipantId)

final val LedgerEndBeforeBegin =
ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin, 0)
}

trait ConfigurationStorageBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,29 @@ private[backend] class IngestionStorageBackendTemplate(schema: Schema[DbDto])
extends IngestionStorageBackend[AppendOnlySchema.Batch] {

override def deletePartiallyIngestedData(
ledgerEnd: Option[ParameterStorageBackend.LedgerEnd]
ledgerEnd: ParameterStorageBackend.LedgerEnd
)(connection: Connection): Unit = {
ledgerEnd.foreach { existingLedgerEnd =>
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerOffset = existingLedgerEnd.lastOffset
val lastStringInterningId = existingLedgerEnd.lastStringInterningId
val lastEventSequentialId = existingLedgerEnd.lastEventSeqId

List(
SQL"DELETE FROM configuration_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM package_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM packages WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM participant_command_completions WHERE completion_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_divulgence WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_create WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_consuming_exercise WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM party_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM string_interning WHERE internal_id > $lastStringInterningId",
SQL"DELETE FROM participant_events_create_filter WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM transaction_metering WHERE ledger_offset > $ledgerOffset",
).map(_.execute()(connection))

()
}
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerOffset = ledgerEnd.lastOffset
val lastStringInterningId = ledgerEnd.lastStringInterningId
val lastEventSequentialId = ledgerEnd.lastEventSeqId

List(
SQL"DELETE FROM configuration_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM package_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM packages WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM participant_command_completions WHERE completion_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_divulgence WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_create WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_consuming_exercise WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > $ledgerOffset",
SQL"DELETE FROM party_entries WHERE ledger_offset > $ledgerOffset",
SQL"DELETE FROM string_interning WHERE internal_id > $lastStringInterningId",
SQL"DELETE FROM participant_events_create_filter WHERE event_sequential_id > $lastEventSequentialId",
SQL"DELETE FROM transaction_metering WHERE ledger_offset > $ledgerOffset",
).map(_.execute()(connection))

()
}

override def insertBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage
parameters
"""

override def ledgerEnd(connection: Connection): Option[ParameterStorageBackend.LedgerEnd] =
SqlGetLedgerEnd.as(LedgerEndParser.singleOpt)(connection).flatten
override def ledgerEnd(connection: Connection): ParameterStorageBackend.LedgerEnd =
SqlGetLedgerEnd
.as(LedgerEndParser.singleOpt)(connection)
.getOrElse(ParameterStorageBackend.LedgerEnd.beforeBegin)

private val TableName: String = "parameters"
private val LedgerIdColumnName: String = "ledger_id"
Expand All @@ -64,34 +66,31 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage
private val ParticipantIdParser: RowParser[ParticipantId] =
Conversions.participantId(ParticipantIdColumnName).map(ParticipantId(_))

private val LedgerEndOffsetParser: RowParser[Option[Offset]] =
offset(LedgerEndColumnName).?
private val LedgerEndOffsetParser: RowParser[Offset] = {
// Note: the ledger_end is a non-optional column,
// however some databases treat Offset.beforeBegin (the empty string) as NULL
offset(LedgerEndColumnName).?.map(_.getOrElse(Offset.beforeBegin))
}

private val LedgerEndSequentialIdParser: RowParser[Option[Long]] =
long(LedgerEndSequentialIdColumnName).?
private val LedgerEndSequentialIdParser: RowParser[Long] =
long(LedgerEndSequentialIdColumnName)

private val LedgerEndStringInterningIdParser: RowParser[Option[Int]] =
int(LedgerEndStringInterningIdColumnName).?
private val LedgerEndStringInterningIdParser: RowParser[Int] =
int(LedgerEndStringInterningIdColumnName)

private val LedgerIdentityParser: RowParser[ParameterStorageBackend.IdentityParams] =
LedgerIdParser ~ ParticipantIdParser map { case ledgerId ~ participantId =>
ParameterStorageBackend.IdentityParams(ledgerId, participantId)
}

private val LedgerEndParser: RowParser[Option[ParameterStorageBackend.LedgerEnd]] =
private val LedgerEndParser: RowParser[ParameterStorageBackend.LedgerEnd] =
LedgerEndOffsetParser ~ LedgerEndSequentialIdParser ~ LedgerEndStringInterningIdParser map {
case Some(lastOffset) ~ Some(lastEventSequentialId) ~ Some(lastStringInterningId) =>
Some(
ParameterStorageBackend.LedgerEnd(
lastOffset,
lastEventSequentialId,
lastStringInterningId,
)
case lastOffset ~ lastEventSequentialId ~ lastStringInterningId =>
ParameterStorageBackend.LedgerEnd(
lastOffset,
lastEventSequentialId,
lastStringInterningId,
)
case _ =>
// Note: offset and event sequential id are always written together.
// Cases where only one of them is defined are not handled here.
None
}

override def initializeParameters(
Expand All @@ -106,8 +105,22 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage
logger.info(
s"Initializing new database for ledgerId '${params.ledgerId}' and participantId '${params.participantId}'"
)
import com.daml.platform.store.Conversions.OffsetToStatement
val ledgerEnd = ParameterStorageBackend.LedgerEnd.beforeBegin
discard(
SQL"insert into #$TableName(#$LedgerIdColumnName, #$ParticipantIdColumnName) values(${ledgerId.unwrap}, ${participantId.unwrap: String})"
SQL"""insert into #$TableName(
#$LedgerIdColumnName,
#$ParticipantIdColumnName,
#$LedgerEndColumnName,
#$LedgerEndSequentialIdColumnName,
#$LedgerEndStringInterningIdColumnName
) values(
${ledgerId.unwrap},
${participantId.unwrap: String},
${ledgerEnd.lastOffset},
${ledgerEnd.lastEventSeqId},
${ledgerEnd.lastStringInterningId}
)"""
.execute()(connection)
)
case Some(ParameterStorageBackend.IdentityParams(`ledgerId`, `participantId`)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ trait IndexerStabilitySpec
)
// Note: we don't know exactly at which ledger end the current indexer has started.
// We only observe that the ledger end is moving right now.
val initialLedgerEnd = parameterStorageBackend.ledgerEndOrBeforeBegin(connection)
val initialLedgerEnd = parameterStorageBackend.ledgerEnd(connection)
val minEvents = 2L
eventually {
val ledgerEnd = parameterStorageBackend.ledgerEndOrBeforeBegin(connection)
val ledgerEnd = parameterStorageBackend.ledgerEnd(connection)
assert(ledgerEnd.lastEventSeqId > initialLedgerEnd.lastEventSeqId + minEvents)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[backend] trait StorageBackendProvider {
}

protected final def updateLedgerEndCache(connection: Connection): Unit = {
val ledgerEnd = backend.parameter.ledgerEndOrBeforeBegin(connection)
val ledgerEnd = backend.parameter.ledgerEnd(connection)
backend.ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac
)

identity shouldBe None
end shouldBe None
end shouldBe ParameterStorageBackend.LedgerEnd.beforeBegin
parties shouldBe empty
packages shouldBe empty
events shouldBe empty
Expand Down Expand Up @@ -95,7 +95,7 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac
)

identity shouldBe None
end shouldBe None
end shouldBe ParameterStorageBackend.LedgerEnd.beforeBegin
parties shouldBe empty
packages shouldBe empty // Note: resetAll() does delete packages
events shouldBe empty
Expand Down
Loading

0 comments on commit bc9fa28

Please sign in to comment.