From f8f88072ee59d17a68541ed2b2a060112f1893bf Mon Sep 17 00:00:00 2001 From: Marton Nagy Date: Wed, 10 Nov 2021 17:17:07 +0100 Subject: [PATCH] Activate interning write side [DPP-710] (#11614) After this PR is merged write side of string-interning is activated: * String interning table is being populated as part of usual indexing * Data migration will populate the table from already existent data Effect of populating string interning is not visible yet. List of changes * Add schema and data migrations where applicable * Patch up sandbox-classic and parallel indexer to populate string_interning persistence, and initialize write side string interning view * Add ledger_end_stringinterning_id to track the need for update at the view (+ aligning code) * remove lookupLedgerEndOffsetAndSequentialId * fix/extend unit-tests changelog_begin changelog_end --- .../main/scala/com/daml/metrics/Metrics.scala | 6 +- .../V1__Append_only_schema.sha256 | 2 +- .../V1__Append_only_schema.sql | 1 + ...ing_interning_columnt_to_parameters.sha256 | 1 + ...string_interning_columnt_to_parameters.sql | 5 ++ .../V113__enable_string_interning.sha256 | 1 + .../V113__enable_string_interning.sql | 69 +++++++++++++++++++ .../ReadOnlySqlLedgerWithMutableCache.scala | 24 ++++--- ...eadOnlySqlLedgerWithTranslationCache.scala | 12 ++-- .../scala/platform/indexer/JdbcIndexer.scala | 2 + .../InitializeParallelIngestion.scala | 12 +++- .../parallel/ParallelIndexerFactory.scala | 20 +++++- .../ParallelIndexerSubscription.scala | 42 +++++++++-- .../store/appendonlydao/JdbcLedgerDao.scala | 14 +--- .../store/appendonlydao/LedgerDao.scala | 8 +-- .../appendonlydao/MeteredLedgerDao.scala | 11 +-- .../appendonlydao/SequentialWriteDao.scala | 30 +++++++- .../scala/platform/store/backend/DbDto.scala | 5 ++ .../store/backend/StorageBackend.scala | 9 +-- .../IngestionStorageBackendTemplate.scala | 2 + .../ParameterStorageBackendTemplate.scala | 25 +++++-- .../backend/StorageBackendProvider.scala | 11 +++ .../backend/StorageBackendTestValues.scala | 2 +- .../StorageBackendTestsCompletions.scala | 14 ++-- .../StorageBackendTestsIngestion.scala | 6 +- .../StorageBackendTestsIntegrity.scala | 6 +- .../StorageBackendTestsMigrationPruning.scala | 2 +- .../backend/StorageBackendTestsPruning.scala | 10 +-- .../StorageBackendTestsTimestamps.scala | 4 +- .../JdbcLedgerDaoActiveContractsSpec.scala | 17 ++--- .../store/dao/JdbcLedgerDaoBackend.scala | 19 +++-- .../dao/JdbcLedgerDaoCompletionsSpec.scala | 48 +++++++++---- .../dao/JdbcLedgerDaoConfigurationSpec.scala | 2 +- ...dbcLedgerDaoContractEventsStreamSpec.scala | 8 +-- ...JdbcLedgerDaoContractsAppendOnlySpec.scala | 19 ++--- ...dbcLedgerDaoPostCommitValidationSpec.scala | 25 +++++-- ...bcLedgerDaoTransactionLogUpdatesSpec.scala | 22 +++--- .../JdbcLedgerDaoTransactionTreesSpec.scala | 14 ++-- .../dao/JdbcLedgerDaoTransactionsSpec.scala | 34 ++++----- .../JdbcLedgerDaoTransactionsWriterSpec.scala | 4 +- .../ParallelIndexerSubscriptionSpec.scala | 32 +++++++-- .../SequentialWriteDaoSpec.scala | 66 +++++++++++++++--- .../RecoveringIndexerIntegrationSpec.scala | 4 +- .../sandbox/stores/ledger/sql/SqlLedger.scala | 23 ++++++- 44 files changed, 498 insertions(+), 195 deletions(-) create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sql create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sha256 create mode 100644 ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sql diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index 4a651dccd750..8a0cdda53338 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -430,9 +430,6 @@ final class Metrics(val registry: MetricRegistry) { val getLedgerId: DatabaseMetrics = createDbMetrics("get_ledger_id") val getParticipantId: DatabaseMetrics = createDbMetrics("get_participant_id") val getLedgerEnd: DatabaseMetrics = createDbMetrics("get_ledger_end") - val getLedgerEndOffsetAndSequentialId: DatabaseMetrics = createDbMetrics( - "get_ledger_end_offset_and_sequential_id" - ) val getInitialLedgerEnd: DatabaseMetrics = createDbMetrics("get_initial_ledger_end") val initializeLedgerParameters: DatabaseMetrics = createDbMetrics( "initialize_ledger_parameters" @@ -524,6 +521,9 @@ final class Metrics(val registry: MetricRegistry) { val getContractStateEvents: DatabaseMetrics = createDbMetrics( "get_contract_state_events" ) + val loadStringInterningEntries: DatabaseMetrics = createDbMetrics( + "loadStringInterningEntries" + ) object translation { private val Prefix: MetricName = db.Prefix :+ "translation" diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 index d5901a60aa73..2308b70c55c3 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sha256 @@ -1 +1 @@ -e28b88accecead16dcd67becabf7ad230654b65f93c277e785ba77203c3db8b5 +bd9ddd16ae4ecc09923215635442c83b7894e95d23203baccde6df27cb0ce2cf diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql index 247a61261b16..ff8d2a22edcd 100644 --- a/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql +++ b/ledger/participant-integration-api/src/main/resources/db/migration/h2database-appendonly/V1__Append_only_schema.sql @@ -11,6 +11,7 @@ CREATE TABLE parameters ( participant_id VARCHAR NOT NULL, ledger_end VARCHAR, ledger_end_sequential_id BIGINT, + ledger_end_string_interning_id INTEGER, participant_pruned_up_to_inclusive VARCHAR, participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR ); diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sha256 new file mode 100644 index 000000000000..663c61212206 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sha256 @@ -0,0 +1 @@ +2131cf0ed208e236ebf0ae68c8153cb8080d3b133114902185393c2b34f0b45d diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sql b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sql new file mode 100644 index 000000000000..eb9ecef033d5 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/oracle-appendonly/V4__add_ledger_end_string_interning_columnt_to_parameters.sql @@ -0,0 +1,5 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +ALTER TABLE parameters + ADD ledger_end_string_interning_id NUMBER; \ No newline at end of file diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sha256 b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sha256 new file mode 100644 index 000000000000..ec2c92e798c9 --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sha256 @@ -0,0 +1 @@ +cc55310126bde9627d59698cee292506e4d80c8303f090ba8649c4f4c6e19fbc diff --git a/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sql b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sql new file mode 100644 index 000000000000..f42aefeba05f --- /dev/null +++ b/ledger/participant-integration-api/src/main/resources/db/migration/postgres-appendonly/V113__enable_string_interning.sql @@ -0,0 +1,69 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +-- add string_interning ledger-end tracking column to parameters +ALTER TABLE parameters + ADD COLUMN ledger_end_string_interning_id INTEGER; + +-- create temporary sequence to populate the string_interning table as migrating data +CREATE SEQUENCE string_interning_seq_temp; +-- add temporary index to aid string lookups as migrating data +CREATE UNIQUE INDEX string_interning_external_string_temp_idx ON string_interning USING btree (external_string); + +-- add temporary migration function +CREATE FUNCTION insert_to_string_interning(prefix TEXT, table_name TEXT, selector_expr TEXT) +RETURNS void +LANGUAGE plpgsql +AS +$$ +BEGIN + EXECUTE + 'INSERT INTO string_interning(internal_id, external_string) ' || + 'SELECT nextval(''string_interning_seq_temp''), id ' || + 'FROM (SELECT DISTINCT ''' || prefix || ''' || ' || selector_expr || ' id FROM ' || table_name || ') distinct_ids ' || + 'WHERE NOT EXISTS (SELECT 1 FROM string_interning WHERE external_string = distinct_ids.id)' || + 'AND id IS NOT NULL'; +END +$$; + +-- data migrations + +-- template_id +SELECT insert_to_string_interning('t|', 'participant_events_create', 'template_id'); +SELECT insert_to_string_interning('t|', 'participant_events_divulgence', 'template_id'); +SELECT insert_to_string_interning('t|', 'participant_events_consuming_exercise', 'template_id'); +SELECT insert_to_string_interning('t|', 'participant_events_non_consuming_exercise', 'template_id'); + +-- party +SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(tree_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(flat_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(submitters)'); +SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(create_observers)'); +SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(create_signatories)'); + +SELECT insert_to_string_interning('p|', 'participant_events_divulgence', 'unnest(tree_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_divulgence', 'unnest(submitters)'); + +SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(tree_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(flat_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(submitters)'); +SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(exercise_actors)'); + +SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(tree_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(flat_event_witnesses)'); +SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(submitters)'); +SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(exercise_actors)'); + +SELECT insert_to_string_interning('p|', 'participant_command_completions', 'unnest(submitters)'); + +SELECT insert_to_string_interning('p|', 'party_entries', 'party'); + +-- fill ledger-end +UPDATE parameters +SET ledger_end_string_interning_id = (SELECT max(internal_id) FROM string_interning); + +-- remove temporary SQL objects +DROP SEQUENCE string_interning_seq_temp; +DROP INDEX string_interning_external_string_temp_idx; +DROP FUNCTION insert_to_string_interning(TEXT, TEXT, TEXT); + diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithMutableCache.scala b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithMutableCache.scala index c89765bc305d..953b8cce6654 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithMutableCache.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithMutableCache.scala @@ -55,15 +55,15 @@ private[index] object ReadOnlySqlLedgerWithMutableCache { context: ResourceContext ): Resource[ReadOnlySqlLedgerWithMutableCache] = for { - (ledgerEndOffset, ledgerEndSequentialId) <- Resource.fromFuture( - ledgerDao.lookupLedgerEndOffsetAndSequentialId() + ledgerEnd <- Resource.fromFuture( + ledgerDao.lookupLedgerEnd() ) - _ = ledgerEndCache.set((ledgerEndOffset, ledgerEndSequentialId)) + _ = ledgerEndCache.set((ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId)) prefetchingDispatcher <- dispatcherOffsetSeqIdOwner( - ledgerEndOffset, - ledgerEndSequentialId, + ledgerEnd.lastOffset, + ledgerEnd.lastEventSeqId, ).acquire() - generalDispatcher <- dispatcherOwner(ledgerEndOffset).acquire() + generalDispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire() dispatcherLagMeter <- Resource.successful( new DispatcherLagMeter((offset, eventSeqId) => { ledgerEndCache.set((offset, eventSeqId)) @@ -79,7 +79,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache { prefetchingDispatcher, generalDispatcher, dispatcherLagMeter, - ledgerEndOffset -> ledgerEndSequentialId, + ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId, ).acquire() } yield ledger @@ -294,12 +294,14 @@ private final class ReadOnlySqlLedgerWithMutableCache( )(() => Source .tick(0.millis, 100.millis, ()) - .mapAsync(1)(_ => ledgerDao.lookupLedgerEndOffsetAndSequentialId()) + .mapAsync(1)(_ => ledgerDao.lookupLedgerEnd()) ) .viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) - .toMat(Sink.foreach { case newLedgerHead @ (offset, _) => - dispatcherLagger.startTimer(offset) - contractStateEventsDispatcher.signalNewHead(newLedgerHead) + .toMat(Sink.foreach { case newLedgerHead => + dispatcherLagger.startTimer(newLedgerHead.lastOffset) + contractStateEventsDispatcher.signalNewHead( + newLedgerHead.lastOffset -> newLedgerHead.lastEventSeqId + ) })( Keep.both[UniqueKillSwitch, Future[Done]] ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithTranslationCache.scala b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithTranslationCache.scala index b677b3e1ee30..6d10c3f43e34 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithTranslationCache.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedgerWithTranslationCache.scala @@ -34,9 +34,9 @@ private[index] object ReadOnlySqlLedgerWithTranslationCache { context: ResourceContext ): Resource[ReadOnlySqlLedgerWithTranslationCache] = for { - ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEndOffsetAndSequentialId()) - _ = ledgerEndCache.set(ledgerEnd) - dispatcher <- dispatcherOwner(ledgerEnd._1).acquire() + ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd()) + _ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId) + dispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire() contractsStore <- contractStoreOwner() ledger <- ledgerOwner(dispatcher, contractsStore).acquire() } yield ledger @@ -89,15 +89,15 @@ private final class ReadOnlySqlLedgerWithTranslationCache( )(() => Source .tick(0.millis, 100.millis, ()) - .mapAsync(1)(_ => ledgerDao.lookupLedgerEndOffsetAndSequentialId()) + .mapAsync(1)(_ => ledgerDao.lookupLedgerEnd()) ) .viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) .toMat(Sink.foreach { ledgerEnd => - ledgerEndCache.set(ledgerEnd) + ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId) // the order here is very important: first we need to make data available for point-wise lookups // and SQL queries, and only then we can make it available on the streams. // (consider example: completion arrived on a stream, but the transaction cannot be looked up) - dispatcher.signalNewHead(ledgerEnd._1) + dispatcher.signalNewHead(ledgerEnd.lastOffset) })( Keep.both[UniqueKillSwitch, Future[Done]] ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala index 150e70268b72..3e13c6bf5996 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala @@ -47,6 +47,7 @@ object JdbcIndexer { val parameterStorageBackend = factory.createParameterStorageBackend val DBLockStorageBackend = factory.createDBLockStorageBackend val resetStorageBackend = factory.createResetStorageBackend + val stringInterningStorageBackend = factory.createStringInterningStorageBackend val indexer = ParallelIndexerFactory( jdbcUrl = config.jdbcUrl, inputMappingParallelism = config.inputMappingParallelism, @@ -97,6 +98,7 @@ object JdbcIndexer { batchWithinMillis = config.batchWithinMillis, metrics = metrics, ), + stringInterningStorageBackend = stringInterningStorageBackend, mat = materializer, readService = readService, ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/InitializeParallelIngestion.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/InitializeParallelIngestion.scala index a26e214e3cce..5f44de372558 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/InitializeParallelIngestion.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/InitializeParallelIngestion.scala @@ -12,9 +12,9 @@ import com.daml.ledger.participant.state.v2.{ReadService, Update} import com.daml.lf.data.Ref import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics -import com.daml.platform.store.EventSequentialId import com.daml.platform.store.appendonlydao.DbDispatcher import com.daml.platform.store.backend.{IngestionStorageBackend, ParameterStorageBackend} +import com.daml.platform.store.interning.UpdatingStringInterningView import scala.concurrent.{ExecutionContext, Future} @@ -29,6 +29,7 @@ private[platform] case class InitializeParallelIngestion( def apply( dbDispatcher: DbDispatcher, + updatingStringInterningView: UpdatingStringInterningView, readService: ReadService, ec: ExecutionContext, mat: Materializer, @@ -54,8 +55,14 @@ private[platform] case class InitializeParallelIngestion( _ <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)( ingestionStorageBackend.deletePartiallyIngestedData(ledgerEnd) ) + _ <- ledgerEnd match { + case Some(ledgerEnd) => updatingStringInterningView.update(ledgerEnd.lastStringInterningId) + case None => Future.unit + } + ledgerEndOrBeforeBegin = ledgerEnd.getOrElse(ParameterStorageBackend.LedgerEndBeforeBegin) } yield InitializeParallelIngestion.Initialized( - initialEventSeqId = ledgerEnd.map(_.lastEventSeqId).getOrElse(EventSequentialId.beforeBegin), + initialEventSeqId = ledgerEndOrBeforeBegin.lastEventSeqId, + initialStringInterningId = ledgerEndOrBeforeBegin.lastStringInterningId, readServiceSource = readService.stateUpdates(beginAfter = ledgerEnd.map(_.lastOffset)), ) } @@ -66,6 +73,7 @@ object InitializeParallelIngestion { case class Initialized( initialEventSeqId: Long, + initialStringInterningId: Int, readServiceSource: Source[(Offset, Update), NotUsed], ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala index 5ea5f4654b72..779ae4a3b99a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala @@ -17,7 +17,12 @@ import com.daml.platform.indexer.parallel.AsyncSupport._ import com.daml.platform.indexer.Indexer import com.daml.platform.store.appendonlydao.DbDispatcher import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig -import com.daml.platform.store.backend.{DBLockStorageBackend, DataSourceStorageBackend} +import com.daml.platform.store.backend.{ + DBLockStorageBackend, + DataSourceStorageBackend, + StringInterningStorageBackend, +} +import com.daml.platform.store.interning.StringInterningView import com.google.common.util.concurrent.ThreadFactoryBuilder import scala.concurrent.duration.FiniteDuration @@ -39,6 +44,7 @@ object ParallelIndexerFactory { dataSourceStorageBackend: DataSourceStorageBackend, initializeParallelIngestion: InitializeParallelIngestion, parallelIndexerSubscription: ParallelIndexerSubscription[_], + stringInterningStorageBackend: StringInterningStorageBackend, mat: Materializer, readService: ReadService, )(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = @@ -101,8 +107,19 @@ object ParallelIndexerFactory { metrics = metrics, ) ) { dbDispatcher => + val stringInterningView = new StringInterningView( + loadPrefixedEntries = (fromExclusive, toInclusive) => + implicit loggingContext => + dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) { + stringInterningStorageBackend.loadStringInterningEntries( + fromExclusive, + toInclusive, + ) + } + ) initializeParallelIngestion( dbDispatcher = dbDispatcher, + updatingStringInterningView = stringInterningView, readService = readService, ec = ec, mat = mat, @@ -111,6 +128,7 @@ object ParallelIndexerFactory { inputMapperExecutor = inputMapperExecutor, batcherExecutor = batcherExecutor, dbDispatcher = dbDispatcher, + stringInterningView = stringInterningView, materializer = mat, ) ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala index 0846fedc4584..1d13805b2ffa 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerSubscription.scala @@ -22,10 +22,12 @@ import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValu import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.backend.{ DbDto, + DbDtoToStringsForInterning, IngestionStorageBackend, ParameterStorageBackend, UpdateToDbDto, } +import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning} import scala.concurrent.Future @@ -50,6 +52,7 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH]( inputMapperExecutor: Executor, batcherExecutor: Executor, dbDispatcher: DbDispatcher, + stringInterningView: StringInterning with InternizingStringInterningView, materializer: Materializer, )(implicit loggingContext: LoggingContext): InitializeParallelIngestion.Initialized => Handle = { initialized => @@ -67,8 +70,12 @@ private[platform] case class ParallelIndexerSubscription[DB_BATCH]( ), ) ), - seqMapperZero = seqMapperZero(initialized.initialEventSeqId), - seqMapper = seqMapper(metrics), + seqMapperZero = + seqMapperZero(initialized.initialEventSeqId, initialized.initialStringInterningId), + seqMapper = seqMapper( + dtos => stringInterningView.internize(DbDtoToStringsForInterning(dtos)), + metrics, + ), batchingParallelism = batchingParallelism, batcher = batcherExecutor.execute(batcher(ingestionStorageBackend.batch, metrics)), ingestingParallelism = ingestionParallelism, @@ -102,6 +109,7 @@ object ParallelIndexerSubscription { * * @param lastOffset The latest offset available in the batch. Needed for tail ingestion. * @param lastSeqEventId The latest sequential-event-id in the batch, or if none present there, then the latest from before. Needed for tail ingestion. + * @param lastStringInterningId The latest string interning id in the batch, or if none present there, then the latest from before. Needed for tail ingestion. * @param lastRecordTime The latest record time in the batch, in milliseconds since Epoch. Needed for metrics population. * @param batch The batch of variable type. * @param batchSize Size of the batch measured in number of updates. Needed for metrics population. @@ -110,6 +118,7 @@ object ParallelIndexerSubscription { case class Batch[+T]( lastOffset: Offset, lastSeqEventId: Long, + lastStringInterningId: Int, lastRecordTime: Long, batch: T, batchSize: Int, @@ -136,6 +145,7 @@ object ParallelIndexerSubscription { Batch( lastOffset = input.last._1._1, lastSeqEventId = 0, // will be filled later in the sequential step + lastStringInterningId = 0, // will be filled later in the sequential step lastRecordTime = input.last._1._2.recordTime.toInstant.toEpochMilli, batch = batch, batchSize = input.size, @@ -144,10 +154,15 @@ object ParallelIndexerSubscription { ) } - def seqMapperZero(initialSeqId: Long): Batch[Vector[DbDto]] = + def seqMapperZero( + initialSeqId: Long, + initialStringInterningId: Int, + ): Batch[Vector[DbDto]] = Batch( lastOffset = null, - lastSeqEventId = initialSeqId, // this is the only property of interest in the zero element + lastSeqEventId = initialSeqId, // this is property of interest in the zero element + lastStringInterningId = + initialStringInterningId, // this is property of interest in the zero element lastRecordTime = 0, batch = Vector.empty, batchSize = 0, @@ -155,7 +170,10 @@ object ParallelIndexerSubscription { offsets = Vector.empty, ) - def seqMapper(metrics: Metrics)( + def seqMapper( + internize: Iterable[DbDto] => Iterable[(Int, String)], + metrics: Metrics, + )( previous: Batch[Vector[DbDto]], current: Batch[Vector[DbDto]], ): Batch[Vector[DbDto]] = { @@ -175,6 +193,15 @@ object ParallelIndexerSubscription { case notEvent => notEvent } + + val (newLastStringInterningId, dbDtosWithStringInterning) = + internize(batchWithSeqIds) + .map(DbDto.StringInterningDto.from) match { + case noNewEntries if noNewEntries.isEmpty => + previous.lastStringInterningId -> batchWithSeqIds + case newEntries => newEntries.last.internalId -> (batchWithSeqIds ++ newEntries) + } + val nowNanos = System.nanoTime() metrics.daml.parallelIndexer.inputMapping.duration.update( (nowNanos - current.averageStartTime) / current.batchSize, @@ -182,7 +209,8 @@ object ParallelIndexerSubscription { ) current.copy( lastSeqEventId = eventSeqId, - batch = batchWithSeqIds, + lastStringInterningId = newLastStringInterningId, + batch = dbDtosWithStringInterning, averageStartTime = nowNanos, // setting start time to the start of the next stage ) } @@ -229,6 +257,7 @@ object ParallelIndexerSubscription { Batch[DB_BATCH]( lastOffset = curr.lastOffset, lastSeqEventId = curr.lastSeqEventId, + lastStringInterningId = curr.lastStringInterningId, lastRecordTime = curr.lastRecordTime, batch = zeroDbBatch, // not used anymore batchSize = 0, // not used anymore @@ -240,6 +269,7 @@ object ParallelIndexerSubscription { LedgerEnd( lastOffset = batch.lastOffset, lastEventSeqId = batch.lastSeqEventId, + lastStringInterningId = batch.lastStringInterningId, ) def ingestTail[DB_BATCH]( diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala index 355500fc7e00..0d2dc992ea36 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala @@ -33,6 +33,7 @@ import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store.Conversions._ import com.daml.platform.store._ import com.daml.platform.store.appendonlydao.events._ +import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.backend.{ DeduplicationStorageBackend, ParameterStorageBackend, @@ -92,23 +93,14 @@ private class JdbcLedgerDao( /** Defaults to Offset.begin if ledger_end is unset */ - override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[Offset] = + override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[LedgerEnd] = dbDispatcher .executeSql(metrics.daml.index.db.getLedgerEnd)( - parameterStorageBackend.ledgerEndOrBeforeBegin(_).lastOffset + parameterStorageBackend.ledgerEndOrBeforeBegin ) case class InvalidLedgerEnd(msg: String) extends RuntimeException(msg) - override def lookupLedgerEndOffsetAndSequentialId()(implicit - loggingContext: LoggingContext - ): Future[(Offset, Long)] = - dbDispatcher - .executeSql(metrics.daml.index.db.getLedgerEndOffsetAndSequentialId) { connection => - val end = parameterStorageBackend.ledgerEndOrBeforeBegin(connection) - end.lastOffset -> end.lastEventSeqId - } - override def lookupInitialLedgerEnd()(implicit loggingContext: LoggingContext ): Future[Option[Offset]] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala index 73339b8c8aa4..fe5cfc4528fa 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala @@ -25,6 +25,7 @@ import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.{BlindingInfo, CommittedTransaction} import com.daml.logging.LoggingContext import com.daml.platform.store.appendonlydao.events.{ContractStateEvent, FilterRelation} +import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.entries.{ ConfigurationEntry, LedgerEntry, @@ -116,12 +117,7 @@ private[platform] trait LedgerReadDao extends ReportsHealth { def lookupParticipantId()(implicit loggingContext: LoggingContext): Future[Option[ParticipantId]] /** Looks up the current ledger end */ - def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[Offset] - - /** Looks up the current ledger end as the offset and event sequential id */ - def lookupLedgerEndOffsetAndSequentialId()(implicit - loggingContext: LoggingContext - ): Future[(Offset, Long)] + def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[LedgerEnd] /** Looks up the current external ledger end offset */ def lookupInitialLedgerEnd()(implicit loggingContext: LoggingContext): Future[Option[Offset]] diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala index bc7d13df48fd..ea3f9441b6ea 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala @@ -17,6 +17,7 @@ import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.{BlindingInfo, CommittedTransaction} import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} +import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.entries.{ ConfigurationEntry, LedgerEntry, @@ -40,17 +41,9 @@ private[platform] class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: ): Future[Option[ParticipantId]] = Timed.future(metrics.daml.index.db.lookupParticipantId, ledgerDao.lookupParticipantId()) - override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[Offset] = + override def lookupLedgerEnd()(implicit loggingContext: LoggingContext): Future[LedgerEnd] = Timed.future(metrics.daml.index.db.lookupLedgerEnd, ledgerDao.lookupLedgerEnd()) - def lookupLedgerEndOffsetAndSequentialId()(implicit - loggingContext: LoggingContext - ): Future[(Offset, Long)] = - Timed.future( - metrics.daml.index.db.lookupLedgerEndSequentialId, - ledgerDao.lookupLedgerEndOffsetAndSequentialId(), - ) - override def lookupInitialLedgerEnd()(implicit loggingContext: LoggingContext ): Future[Option[Offset]] = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SequentialWriteDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SequentialWriteDao.scala index 6096fae14772..dff1f0c9c00e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SequentialWriteDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/SequentialWriteDao.scala @@ -14,11 +14,17 @@ import com.daml.platform.store.LfValueTranslationCache import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation} import com.daml.platform.store.backend.{ DbDto, + DbDtoToStringsForInterning, IngestionStorageBackend, ParameterStorageBackend, UpdateToDbDto, } import com.daml.platform.store.cache.MutableLedgerEndCache +import com.daml.platform.store.interning.{ + DomainStringIterators, + InternizingStringInterningView, + StringInterning, +} import scala.concurrent.Future import scala.util.chaining.scalaUtilChainingOps @@ -34,6 +40,7 @@ object SequentialWriteDao { metrics: Metrics, compressionStrategy: CompressionStrategy, ledgerEndCache: MutableLedgerEndCache, + stringInterningView: StringInterning with InternizingStringInterningView, ingestionStorageBackend: IngestionStorageBackend[_], parameterStorageBackend: ParameterStorageBackend, ): SequentialWriteDao = @@ -51,6 +58,8 @@ object SequentialWriteDao { compressionStrategy = compressionStrategy, ), ledgerEndCache = ledgerEndCache, + stringInterningView = stringInterningView, + dbDtosToStringsForInterning = DbDtoToStringsForInterning(_), ) val noop: SequentialWriteDao = NoopSequentialWriteDao @@ -66,14 +75,19 @@ private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH]( parameterStorageBackend: ParameterStorageBackend, updateToDbDtos: Offset => state.Update => Iterator[DbDto], ledgerEndCache: MutableLedgerEndCache, + stringInterningView: StringInterning with InternizingStringInterningView, + dbDtosToStringsForInterning: Iterable[DbDto] => DomainStringIterators, ) extends SequentialWriteDao { private var lastEventSeqId: Long = _ + private var lastStringInterningId: Int = _ private var lastEventSeqIdInitialized = false private def lazyInit(connection: Connection): Unit = if (!lastEventSeqIdInitialized) { - lastEventSeqId = parameterStorageBackend.ledgerEndOrBeforeBegin(connection).lastEventSeqId + val ledgerEnd = parameterStorageBackend.ledgerEndOrBeforeBegin(connection) + lastEventSeqId = ledgerEnd.lastEventSeqId + lastStringInterningId = ledgerEnd.lastStringInterningId lastEventSeqIdInitialized = true } @@ -99,7 +113,18 @@ private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH]( .map(adaptEventSeqIds) .getOrElse(Vector.empty) - dbDtos + val dbDtosWithStringInterning = + dbDtos + .pipe(dbDtosToStringsForInterning) + .pipe(stringInterningView.internize) + .map(DbDto.StringInterningDto.from) match { + case noNewEntries if noNewEntries.isEmpty => dbDtos + case newEntries => + lastStringInterningId = newEntries.last.internalId + dbDtos ++ newEntries + } + + dbDtosWithStringInterning .pipe(ingestionStorageBackend.batch) .pipe(ingestionStorageBackend.insertBatch(connection, _)) @@ -107,6 +132,7 @@ private[appendonlydao] case class SequentialWriteDaoImpl[DB_BATCH]( ParameterStorageBackend.LedgerEnd( lastOffset = offset, lastEventSeqId = lastEventSeqId, + lastStringInterningId = lastStringInterningId, ) )(connection) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala index 9c032b366bdd..543613f95cac 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/DbDto.scala @@ -139,4 +139,9 @@ object DbDto { internalId: Int, externalString: String, ) extends DbDto + + object StringInterningDto { + def from(entry: (Int, String)): StringInterningDto = + StringInterningDto(entry._1, entry._2) + } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 09833231296a..d0cfdd13ff12 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -103,9 +103,7 @@ trait ParameterStorageBackend { * @return the current LedgerEnd, or a LedgerEnd that points to before the ledger begin if no ledger end exists */ final def ledgerEndOrBeforeBegin(connection: Connection): ParameterStorageBackend.LedgerEnd = - ledgerEnd(connection).getOrElse( - ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin) - ) + ledgerEnd(connection).getOrElse(ParameterStorageBackend.LedgerEndBeforeBegin) /** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations */ @@ -136,8 +134,11 @@ trait ParameterStorageBackend { } object ParameterStorageBackend { - case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long) + case class LedgerEnd(lastOffset: Offset, lastEventSeqId: Long, lastStringInterningId: Int) case class IdentityParams(ledgerId: LedgerId, participantId: ParticipantId) + + final val LedgerEndBeforeBegin = + ParameterStorageBackend.LedgerEnd(Offset.beforeBegin, EventSequentialId.beforeBegin, 0) } trait ConfigurationStorageBackend { diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IngestionStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IngestionStorageBackendTemplate.scala index 9f9c6773ff1c..76cd3cd2bc3c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IngestionStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/IngestionStorageBackendTemplate.scala @@ -24,6 +24,7 @@ private[backend] class IngestionStorageBackendTemplate(schema: Schema[DbDto]) "DELETE FROM participant_events_non_consuming_exercise WHERE event_offset > {ledger_offset}" ), SQL("DELETE FROM party_entries WHERE ledger_offset > {ledger_offset}"), + SQL("DELETE FROM string_interning WHERE internal_id > {last_string_interning_id}"), ) override def deletePartiallyIngestedData( @@ -34,6 +35,7 @@ private[backend] class IngestionStorageBackendTemplate(schema: Schema[DbDto]) import com.daml.platform.store.Conversions.OffsetToStatement query .on("ledger_offset" -> existingLedgerEnd.lastOffset) + .on("last_string_interning_id" -> existingLedgerEnd.lastStringInterningId) .execute()(connection) () } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala index 16ebbdd2ab35..84400029cf26 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ParameterStorageBackendTemplate.scala @@ -5,7 +5,7 @@ package com.daml.platform.store.backend.common import java.sql.Connection -import anorm.SqlParser.long +import anorm.SqlParser.{int, long} import anorm.{RowParser, SQL, ~} import com.daml.ledger.api.domain.{LedgerId, ParticipantId} import com.daml.ledger.offset.Offset @@ -28,7 +28,8 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage | parameters |SET | ledger_end = {ledger_end}, - | ledger_end_sequential_id = {ledger_end_sequential_id} + | ledger_end_sequential_id = {ledger_end_sequential_id}, + | ledger_end_string_interning_id = {ledger_end_string_interning_id} |""".stripMargin ) @@ -39,6 +40,7 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage SQL_UPDATE_LEDGER_END .on("ledger_end" -> ledgerEnd.lastOffset) .on("ledger_end_sequential_id" -> ledgerEnd.lastEventSeqId) + .on("ledger_end_string_interning_id" -> ledgerEnd.lastStringInterningId) .execute()(connection) () } @@ -47,7 +49,8 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage """ |SELECT | ledger_end, - | ledger_end_sequential_id + | ledger_end_sequential_id, + | ledger_end_string_interning_id |FROM | parameters | @@ -62,6 +65,7 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage private val ParticipantIdColumnName: String = "participant_id" private val LedgerEndColumnName: String = "ledger_end" private val LedgerEndSequentialIdColumnName: String = "ledger_end_sequential_id" + private val LedgerEndStringInterningIdColumnName: String = "ledger_end_string_interning_id" private val LedgerIdParser: RowParser[LedgerId] = ledgerString(LedgerIdColumnName).map(LedgerId(_)) @@ -75,15 +79,24 @@ private[backend] object ParameterStorageBackendTemplate extends ParameterStorage private val LedgerEndSequentialIdParser: RowParser[Option[Long]] = long(LedgerEndSequentialIdColumnName).? + private val LedgerEndStringInterningIdParser: RowParser[Option[Int]] = + int(LedgerEndStringInterningIdColumnName).? + private val LedgerIdentityParser: RowParser[ParameterStorageBackend.IdentityParams] = LedgerIdParser ~ ParticipantIdParser map { case ledgerId ~ participantId => ParameterStorageBackend.IdentityParams(ledgerId, participantId) } private val LedgerEndParser: RowParser[Option[ParameterStorageBackend.LedgerEnd]] = - LedgerEndOffsetParser ~ LedgerEndSequentialIdParser map { - case Some(lastOffset) ~ Some(lastEventSequentialId) => - Some(ParameterStorageBackend.LedgerEnd(lastOffset, lastEventSequentialId)) + LedgerEndOffsetParser ~ LedgerEndSequentialIdParser ~ LedgerEndStringInterningIdParser map { + case Some(lastOffset) ~ Some(lastEventSequentialId) ~ Some(lastStringInterningId) => + Some( + ParameterStorageBackend.LedgerEnd( + lastOffset, + lastEventSequentialId, + lastStringInterningId, + ) + ) case _ => // Note: offset and event sequential id are always written together. // Cases where only one of them is defined are not handled here. diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala index b679196ff13f..a24dc04ff539 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala @@ -5,6 +5,7 @@ package com.daml.platform.store.backend import java.sql.Connection +import com.daml.ledger.offset.Offset import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.backend.h2.H2StorageBackendFactory import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory @@ -27,6 +28,16 @@ private[backend] trait StorageBackendProvider { typeBoundIngest(backend.ingestion) } + protected final def updateLedgerEnd( + ledgerEndOffset: Offset, + ledgerEndSequentialId: Long, + )(connection: Connection): Unit = { + backend.parameter.updateLedgerEnd(LedgerEnd(ledgerEndOffset, ledgerEndSequentialId, 0))( + connection + ) // we do not care about the stringInterningId here + updateLedgerEndCache(connection) + } + protected final def updateLedgerEnd(ledgerEnd: LedgerEnd)(connection: Connection): Unit = { backend.parameter.updateLedgerEnd(ledgerEnd)(connection) updateLedgerEndCache(connection) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala index 1afd364fba92..d52247372dfb 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala @@ -24,7 +24,7 @@ private[backend] object StorageBackendTestValues { /** Produces offsets that are ordered the same as the input value */ def offset(x: Long): Offset = Offset.fromHexString(Ref.HexString.assertFromString(f"$x%08d")) def ledgerEnd(o: Long, e: Long): ParameterStorageBackend.LedgerEnd = - ParameterStorageBackend.LedgerEnd(offset(o), e) + ParameterStorageBackend.LedgerEnd(offset(o), e, 0) def transactionIdFromOffset(x: Offset): Ref.LedgerString = Ref.LedgerString.assertFromString(x.toHexString) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala index e7ba3954e1c1..7b12bc66af4c 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala @@ -34,7 +34,7 @@ private[backend] trait StorageBackendTestsCompletions _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(4), 3L)) + updateLedgerEnd(offset(4), 3L) ) completions0to3 <- executeSql( backend.completion.commandCompletions( @@ -74,7 +74,7 @@ private[backend] trait StorageBackendTestsCompletions _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) + updateLedgerEnd(offset(2), 1L) ) completions <- executeSql( backend.completion.commandCompletions(offset(1), offset(2), applicationId, Set(party)) @@ -100,7 +100,7 @@ private[backend] trait StorageBackendTestsCompletions _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(offset(3), 2L) ) completions <- executeSql( backend.completion.commandCompletions( @@ -138,7 +138,7 @@ private[backend] trait StorageBackendTestsCompletions _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(offset(3), 2L) ) completions <- executeSql( backend.completion.commandCompletions( @@ -187,7 +187,7 @@ private[backend] trait StorageBackendTestsCompletions _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(offset(3), 2L) ) completions <- executeSql( backend.completion.commandCompletions( @@ -233,7 +233,7 @@ private[backend] trait StorageBackendTestsCompletions _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos1, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) + updateLedgerEnd(offset(2), 1L) ) result <- executeSql( backend.completion.commandCompletions( @@ -260,7 +260,7 @@ private[backend] trait StorageBackendTestsCompletions for { _ <- executeSql(ingest(dtos2, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(offset(3), 2L) ) result <- executeSql( backend.completion.commandCompletions( diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala index 82e00fe615a7..d24238151acd 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala @@ -28,7 +28,7 @@ private[backend] trait StorageBackendTestsIngestion _ <- executeSql(ingest(dtos, _)) configBeforeLedgerEndUpdate <- executeSql(backend.configuration.ledgerConfiguration) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) + updateLedgerEnd(someOffset, 0) ) configAfterLedgerEndUpdate <- executeSql(backend.configuration.ledgerConfiguration) } yield { @@ -57,7 +57,7 @@ private[backend] trait StorageBackendTestsIngestion _ <- executeSql(ingest(dtos, _)) packagesBeforeLedgerEndUpdate <- executeSql(backend.packageBackend.lfPackages) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) + updateLedgerEnd(someOffset, 0) ) packagesAfterLedgerEndUpdate <- executeSql(backend.packageBackend.lfPackages) } yield { @@ -81,7 +81,7 @@ private[backend] trait StorageBackendTestsIngestion _ <- executeSql(ingest(dtos, _)) partiesBeforeLedgerEndUpdate <- executeSql(backend.party.knownParties) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) + updateLedgerEnd(someOffset, 0) ) partiesAfterLedgerEndUpdate <- executeSql(backend.party.knownParties) } yield { diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala index 73383f9f6bb2..eb5c2cabf1a5 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala @@ -23,7 +23,7 @@ private[backend] trait StorageBackendTestsIntegrity extends Matchers with Storag _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(updates, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(7), 7L)) + updateLedgerEnd(offset(7), 7L) ) failure <- executeSql(backend.integrity.verifyIntegrity()).failed } yield { @@ -42,7 +42,7 @@ private[backend] trait StorageBackendTestsIntegrity extends Matchers with Storag _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(updates, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 3L)) + updateLedgerEnd(offset(3), 3L) ) failure <- executeSql(backend.integrity.verifyIntegrity()).failed } yield { @@ -63,7 +63,7 @@ private[backend] trait StorageBackendTestsIntegrity extends Matchers with Storag _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(updates, _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)) + updateLedgerEnd(offset(2), 2L) ) _ <- executeSql(backend.integrity.verifyIntegrity()) } yield { diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala index 4f170651fb84..7409e543c261 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala @@ -31,7 +31,7 @@ private[backend] trait StorageBackendTestsMigrationPruning _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(Vector(create, divulgence, archive), _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 3L)) + updateLedgerEnd(offset(2), 3L) ) // Simulate that the archive happened after the migration to append-only schema _ <- executeSql(updateMigrationHistoryTable(ledgerSequentialIdBefore = 2)) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala index 4cf9e764f7ee..39bc21b13727 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala @@ -103,7 +103,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB // Ingest a create and archive event _ <- executeSql(ingest(Vector(create, archive), _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)) + updateLedgerEnd(offset(2), 2L) ) // Make sure the events are visible before1 <- executeSql(backend.event.transactionEvents(range, filter)) @@ -162,7 +162,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB // Ingest a create and archive event _ <- executeSql(ingest(Vector(partyEntry, create), _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) + updateLedgerEnd(offset(2), 1L) ) // Make sure the events are visible before1 <- executeSql(backend.event.transactionEvents(range, filter)) @@ -246,7 +246,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) ) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(4), 4L)) + updateLedgerEnd(offset(4), 4L) ) contract1_beforePruning <- executeSql( backend.contract.activeContractWithoutArgument( @@ -336,7 +336,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) // Set the ledger end past the last ingested event so we can prune up to it inclusively _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(5), 5L)) + updateLedgerEnd(offset(5), 5L) ) contract1_beforePruning <- executeSql( backend.contract.activeContractWithoutArgument( @@ -394,7 +394,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB // Ingest a completion _ <- executeSql(ingest(Vector(completion), _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) + updateLedgerEnd(offset(1), 1L) ) // Make sure the completion is visible before <- executeSql( diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala index 0140abfefb66..46162d1e36fd 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala @@ -33,7 +33,7 @@ private[backend] trait StorageBackendTestsTimestamps extends Matchers with Stora _ <- executeSql(ingest(Vector(create), _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) + updateLedgerEnd(offset(1), 1L) ) let1 <- executeSql(backend.contract.maximumLedgerTime(Set(cid))) @@ -64,7 +64,7 @@ private[backend] trait StorageBackendTestsTimestamps extends Matchers with Stora _ <- executeSql(ingest(Vector(create), _)) _ <- executeSql( - updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) + updateLedgerEnd(offset(1), 1L) ) events1 <- executeSql(backend.event.rawEvents(0L, 1L)) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala index df0ee7a72348..47934b35d477 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoActiveContractsSpec.scala @@ -39,7 +39,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec activeContractsBefore <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( - activeAt = before, + activeAt = before.lastOffset, filter = Map(alice -> Set.empty, bob -> Set.empty, charlie -> Set.empty), verbose = true, ) @@ -47,7 +47,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec activeContractsAfter <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( - activeAt = after, + activeAt = after.lastOffset, filter = Map(alice -> Set.empty, bob -> Set.empty, charlie -> Set.empty), verbose = true, ) @@ -65,7 +65,8 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec it should "serve a stable result based on the input offset" in { for { - offset <- ledgerDao.lookupLedgerEnd() + ledgerEnd <- ledgerDao.lookupLedgerEnd() + offset = ledgerEnd.lastOffset activeContractsBefore <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( @@ -111,7 +112,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec result <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( - activeAt = ledgerEnd, + activeAt = ledgerEnd.lastOffset, filter = Map(party1 -> Set(otherTemplateId)), verbose = true, ) @@ -141,7 +142,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec result <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( - activeAt = ledgerEnd, + activeAt = ledgerEnd.lastOffset, filter = Map( party1 -> Set(otherTemplateId), party2 -> Set(otherTemplateId), @@ -181,7 +182,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec result <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( - activeAt = ledgerEnd, + activeAt = ledgerEnd.lastOffset, filter = Map( party1 -> Set(someTemplateId), party2 -> Set(otherTemplateId), @@ -221,7 +222,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec result <- activeContractsOf( ledgerDao.transactionsReader .getActiveContracts( - activeAt = ledgerEnd, + activeAt = ledgerEnd.lastOffset, filter = Map( party1 -> Set(someTemplateId), party2 -> Set.empty, @@ -250,7 +251,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec end <- ledgerDao.lookupLedgerEnd() activeContracts <- ledgerDao.transactionsReader .getActiveContracts( - activeAt = end, + activeAt = end.lastOffset, filter = Map(alice -> Set.empty), verbose = true, ) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala index 73b511e38ab7..4c60de9698fa 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala @@ -24,6 +24,7 @@ import com.daml.platform.store.appendonlydao.events.CompressionStrategy import com.daml.platform.store.backend.StorageBackendFactory import com.daml.platform.store.cache.MutableLedgerEndCache import com.daml.platform.store.dao.JdbcLedgerDaoBackend.{TestLedgerId, TestParticipantId} +import com.daml.platform.store.interning.StringInterningView import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache} import org.mockito.MockitoSugar import org.scalatest.AsyncTestSuite @@ -69,7 +70,16 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { connectionTimeout = 250.millis, metrics = metrics, ) - .map(dbDispatcher => + .map { dbDispatcher => + val stringInterningStorageBackend = + storageBackendFactory.createStringInterningStorageBackend + val stringInterningView = new StringInterningView( + loadPrefixedEntries = (fromExclusive, toInclusive) => + implicit loggingContext => + dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) { + stringInterningStorageBackend.loadStringInterningEntries(fromExclusive, toInclusive) + } + ) JdbcLedgerDao.write( dbDispatcher = dbDispatcher, sequentialWriteDao = SequentialWriteDao( @@ -78,6 +88,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { metrics = metrics, compressionStrategy = CompressionStrategy.none(metrics), ledgerEndCache = ledgerEndCache, + stringInterningView = stringInterningView, ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend, parameterStorageBackend = storageBackendFactory.createParameterStorageBackend, ), @@ -92,7 +103,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { ledgerEndCache = ledgerEndCache, errorFactories = errorFactories, ) - ) + } } protected final var ledgerDao: LedgerDao = _ @@ -114,8 +125,8 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { ) dao <- daoOwner(100, 4, errorFactories_mock).acquire() _ <- Resource.fromFuture(dao.initialize(TestLedgerId, TestParticipantId)) - initialLedgerEnd <- Resource.fromFuture(dao.lookupLedgerEndOffsetAndSequentialId()) - _ = ledgerEndCache.set(initialLedgerEnd) + initialLedgerEnd <- Resource.fromFuture(dao.lookupLedgerEnd()) + _ = ledgerEndCache.set(initialLedgerEnd.lastOffset -> initialLedgerEnd.lastEventSeqId) } yield dao } ledgerDao = Await.result(resource.asFuture, 30.seconds) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCompletionsSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCompletionsSpec.scala index 0cde0cdf8b57..ef060dfe73f1 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCompletionsSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoCompletionsSpec.scala @@ -31,7 +31,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl (offset, tx) <- store(singleCreate) to <- ledgerDao.lookupLedgerEnd() (_, response) <- ledgerDao.completions - .getCommandCompletions(from, to, tx.applicationId.get, tx.actAs.toSet) + .getCommandCompletions(from.lastOffset, to.lastOffset, tx.applicationId.get, tx.actAs.toSet) .runWith(Sink.head) } yield { offsetOf(response) shouldBe offset @@ -51,15 +51,25 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl to <- ledgerDao.lookupLedgerEnd() // Response 1: querying as all submitters (_, response1) <- ledgerDao.completions - .getCommandCompletions(from, to, tx.applicationId.get, tx.actAs.toSet) + .getCommandCompletions(from.lastOffset, to.lastOffset, tx.applicationId.get, tx.actAs.toSet) .runWith(Sink.head) // Response 2: querying as a proper subset of all submitters (_, response2) <- ledgerDao.completions - .getCommandCompletions(from, to, tx.applicationId.get, Set(tx.actAs.head)) + .getCommandCompletions( + from.lastOffset, + to.lastOffset, + tx.applicationId.get, + Set(tx.actAs.head), + ) .runWith(Sink.head) // Response 3: querying as a proper superset of all submitters (_, response3) <- ledgerDao.completions - .getCommandCompletions(from, to, tx.applicationId.get, tx.actAs.toSet + "UNRELATED") + .getCommandCompletions( + from.lastOffset, + to.lastOffset, + tx.applicationId.get, + tx.actAs.toSet + "UNRELATED", + ) .runWith(Sink.head) } yield { response1.completions.loneElement.commandId shouldBe tx.commandId.get @@ -78,7 +88,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl offset <- storeRejection(rejection, expectedCmdId) to <- ledgerDao.lookupLedgerEnd() (_, response) <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, parties) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, parties) .runWith(Sink.head) } yield { offsetOf(response) shouldBe offset @@ -102,15 +112,15 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl to <- ledgerDao.lookupLedgerEnd() // Response 1: querying as all submitters (_, response1) <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, parties) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, parties) .runWith(Sink.head) // Response 2: querying as a proper subset of all submitters (_, response2) <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, Set(parties.head)) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, Set(parties.head)) .runWith(Sink.head) // Response 3: querying as a proper superset of all submitters (_, response3) <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, parties + "UNRELATED") + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, parties + "UNRELATED") .runWith(Sink.head) } yield { response1.completions.loneElement.commandId shouldBe expectedCmdId @@ -128,7 +138,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl _ <- storeRejection(rejection) to <- ledgerDao.lookupLedgerEnd() response <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId = "WRONG", parties) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId = "WRONG", parties) .runWith(Sink.seq) } yield { response shouldBe Seq.empty @@ -144,10 +154,15 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl _ <- storeRejection(rejection) to <- ledgerDao.lookupLedgerEnd() response1 <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, Set("WRONG")) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, Set("WRONG")) .runWith(Sink.seq) response2 <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, Set("WRONG1", "WRONG2", "WRONG3")) + .getCommandCompletions( + from.lastOffset, + to.lastOffset, + applicationId, + Set("WRONG1", "WRONG2", "WRONG3"), + ) .runWith(Sink.seq) } yield { response1 shouldBe Seq.empty @@ -164,10 +179,15 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl _ <- storeMultiPartyRejection(rejection) to <- ledgerDao.lookupLedgerEnd() response1 <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, Set("WRONG")) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, Set("WRONG")) .runWith(Sink.seq) response2 <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, Set("WRONG1", "WRONG2", "WRONG3")) + .getCommandCompletions( + from.lastOffset, + to.lastOffset, + applicationId, + Set("WRONG1", "WRONG2", "WRONG3"), + ) .runWith(Sink.seq) } yield { response1 shouldBe Seq.empty @@ -184,7 +204,7 @@ private[dao] trait JdbcLedgerDaoCompletionsSpec extends OptionValues with LoneEl _ <- storeMultiPartyRejection(rejection) to <- ledgerDao.lookupLedgerEnd() response1 <- ledgerDao.completions - .getCommandCompletions(from, to, applicationId, Set("WRONG")) + .getCommandCompletions(from.lastOffset, to.lastOffset, applicationId, Set("WRONG")) .runWith(Sink.seq) } yield { response1 shouldBe Seq.empty diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoConfigurationSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoConfigurationSpec.scala index 1b86fe2ed8f4..3dfbea7c6ad8 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoConfigurationSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoConfigurationSpec.scala @@ -32,7 +32,7 @@ trait JdbcLedgerDaoConfigurationSpec { response shouldEqual PersistenceResponse.Ok startingConfig shouldEqual None optStoredConfig.map(_._2) shouldEqual Some(defaultConfig) - endingOffset should be > startingOffset + endingOffset.lastOffset should be > startingOffset.lastOffset } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractEventsStreamSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractEventsStreamSpec.scala index 00b94e57a598..d4154ba2c6fd 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractEventsStreamSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractEventsStreamSpec.scala @@ -42,7 +42,7 @@ trait JdbcLedgerDaoContractEventsStreamSpec extends LoneElement { ) for { - before <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + before <- ledgerDao.lookupLedgerEnd() (offset1, t1) <- store( singleCreate(cid => create(absCid = cid, contractArgument = contractArg("t1"))) ) @@ -70,12 +70,12 @@ trait JdbcLedgerDaoContractEventsStreamSpec extends LoneElement { singleCreate(cid => create(absCid = cid, contractArgument = contractArg("t6"))) ) - after <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + after <- ledgerDao.lookupLedgerEnd() contractStateEvents <- contractEventsOf( ledgerDao.transactionsReader.getContractStateEvents( - startExclusive = before, - endInclusive = after, + startExclusive = before.lastOffset -> before.lastEventSeqId, + endInclusive = after.lastOffset -> after.lastEventSeqId, ) ) } yield { diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractsAppendOnlySpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractsAppendOnlySpec.scala index fa3c9b967a5f..ce8155ef2b7c 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractsAppendOnlySpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoContractsAppendOnlySpec.scala @@ -26,13 +26,16 @@ private[dao] trait JdbcLedgerDaoContractsAppendOnlySpec extends LoneElement with (_, tx) <- store(singleCreate(create(_, signatories = Set(alice)))) contractId = nonTransient(tx).loneElement _ <- store(singleNonConsumingExercise(contractId)) - (_, eventSeqIdAtCreate) <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + ledgerEndAtCreate <- ledgerDao.lookupLedgerEnd() _ <- store(txArchiveContract(alice, (contractId, None))) - eventSeqIdAfterArchive <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() - queryAfterCreate <- contractsReader.lookupContractState(contractId, eventSeqIdAtCreate) + ledgerEndAfterArchive <- ledgerDao.lookupLedgerEnd() + queryAfterCreate <- contractsReader.lookupContractState( + contractId, + ledgerEndAtCreate.lastEventSeqId, + ) queryAfterArchive <- contractsReader.lookupContractState( contractId, - eventSeqIdAfterArchive._2, + ledgerEndAfterArchive.lastEventSeqId, ) } yield { queryAfterCreate.value match { @@ -64,11 +67,11 @@ private[dao] trait JdbcLedgerDaoContractsAppendOnlySpec extends LoneElement with key = GlobalKey.assertBuild(someTemplateId, aTextValue) contractId = nonTransient(tx).loneElement _ <- store(singleNonConsumingExercise(contractId)) - (_, eventSeqIdAtCreate) <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + ledgerEndAtCreate <- ledgerDao.lookupLedgerEnd() _ <- store(txArchiveContract(alice, (contractId, None))) - (_, eventSeqIdAfterArchive) <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() - queryAfterCreate <- contractsReader.lookupKeyState(key, eventSeqIdAtCreate) - queryAfterArchive <- contractsReader.lookupKeyState(key, eventSeqIdAfterArchive) + ledgerEndAfterArchive <- ledgerDao.lookupLedgerEnd() + queryAfterCreate <- contractsReader.lookupKeyState(key, ledgerEndAtCreate.lastEventSeqId) + queryAfterArchive <- contractsReader.lookupKeyState(key, ledgerEndAfterArchive.lastEventSeqId) } yield { queryAfterCreate match { case LedgerDaoContractsReader.KeyAssigned(fetchedContractId, stakeholders) => diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoPostCommitValidationSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoPostCommitValidationSpec.scala index 4b2be96a6f40..b7b70e4bbf5a 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoPostCommitValidationSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoPostCommitValidationSpec.scala @@ -23,6 +23,7 @@ import com.daml.platform.store.appendonlydao.{ SequentialWriteDao, } import com.daml.platform.store.backend.StorageBackendFactory +import com.daml.platform.store.interning.StringInterningView import org.scalatest.LoneElement import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers @@ -51,7 +52,16 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { connectionTimeout = 250.millis, metrics = metrics, ) - .map(dbDispatcher => + .map { dbDispatcher => + val stringInterningStorageBackend = + storageBackendFactory.createStringInterningStorageBackend + val stringInterningView = new StringInterningView( + loadPrefixedEntries = (fromExclusive, toInclusive) => + implicit loggingContext => + dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) { + stringInterningStorageBackend.loadStringInterningEntries(fromExclusive, toInclusive) + } + ) JdbcLedgerDao.validatingWrite( dbDispatcher = dbDispatcher, sequentialWriteDao = SequentialWriteDao( @@ -60,6 +70,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { metrics = metrics, compressionStrategy = CompressionStrategy.none(metrics), ledgerEndCache = ledgerEndCache, + stringInterningView = stringInterningView, ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend, parameterStorageBackend = storageBackendFactory.createParameterStorageBackend, ), @@ -74,7 +85,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { ledgerEndCache = ledgerEndCache, errorFactories = errorFactories, ) - ) + } } private val ok = io.grpc.Status.Code.OK.value() @@ -95,7 +106,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { _ <- store(original) _ <- store(duplicate) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( originalAttempt.commandId.get -> ok, @@ -115,7 +126,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { (_, create) <- store(txCreateContractWithKey(alice, keyValue)) (_, lookup) <- store(txLookupByKey(alice, keyValue, None)) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( create.commandId.get -> ok, @@ -137,7 +148,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { (_, archive) <- store(txArchiveContract(alice, createdContractId -> Some(keyValue))) (_, lookup) <- store(txLookupByKey(alice, keyValue, Some(createdContractId))) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( create.commandId.get -> ok, @@ -160,7 +171,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { (_, archive) <- store(txArchiveContract(alice, createdContractId -> Some(keyValue))) (_, fetch) <- store(txFetch(alice, createdContractId)) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( create.commandId.get -> ok, @@ -192,7 +203,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement { ) (_, fetch2) <- store(txFetch(alice, divulgedContractId)) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( fetch1.commandId.get -> aborted, diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionLogUpdatesSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionLogUpdatesSpec.scala index 3fc178142076..4117ad8ca7d7 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionLogUpdatesSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionLogUpdatesSpec.scala @@ -30,17 +30,17 @@ private[dao] trait JdbcLedgerDaoTransactionLogUpdatesSpec it should "return only the ledger end marker if no new transactions in range" in { for { - ledgerEnd @ (offset, eventSequentialId) <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + ledgerEnd <- ledgerDao.lookupLedgerEnd() result <- transactionsOf( ledgerDao.transactionsReader .getTransactionLogUpdates( - startExclusive = ledgerEnd, - endInclusive = ledgerEnd, + startExclusive = ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId, + endInclusive = ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId, ) ) } yield { result should contain theSameElementsAs Seq( - TransactionLogUpdate.LedgerEndMarker(offset, eventSequentialId) + TransactionLogUpdate.LedgerEndMarker(ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId) ) Succeeded } @@ -48,7 +48,7 @@ private[dao] trait JdbcLedgerDaoTransactionLogUpdatesSpec it should "return the correct transaction log updates" in { for { - from <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + from <- ledgerDao.lookupLedgerEnd() (createOffset, createTx) = singleCreate (offset1, t1) <- store(createOffset -> noSubmitterInfo(createTx)) (offset2, t2) <- store(txCreateContractWithKey(alice, "some-key")) @@ -61,12 +61,12 @@ private[dao] trait JdbcLedgerDaoTransactionLogUpdatesSpec (offset4, t4) <- store(fullyTransient()) (offset5, t5) <- store(singleCreate) (offset6, t6) <- store(singleNonConsumingExercise(nonTransient(t5).loneElement)) - to <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + to <- ledgerDao.lookupLedgerEnd() result <- transactionsOf( ledgerDao.transactionsReader .getTransactionLogUpdates( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset -> from.lastEventSeqId, + endInclusive = to.lastOffset -> to.lastEventSeqId, ) ) } yield { @@ -86,7 +86,7 @@ private[dao] trait JdbcLedgerDaoTransactionLogUpdatesSpec t2.transaction.nodes.head._2.asInstanceOf[Node.Create].key.get.key val exercisedContractKey = Map(offset2 -> contractKey, offset3 -> contractKey) - val eventSequentialIdGen = new AtomicLong(from._2 + 1L) + val eventSequentialIdGen = new AtomicLong(from.lastEventSeqId + 1L) assertExpectedEquality(actualTx1, t1, offset1, exercisedContractKey, eventSequentialIdGen) assertExpectedEquality(actualTx2, t2, offset2, exercisedContractKey, eventSequentialIdGen) @@ -95,8 +95,8 @@ private[dao] trait JdbcLedgerDaoTransactionLogUpdatesSpec assertExpectedEquality(actualTx5, t5, offset5, exercisedContractKey, eventSequentialIdGen) assertExpectedEquality(actualTx6, t6, offset6, exercisedContractKey, eventSequentialIdGen) - endMarker.eventOffset shouldBe to._1 - endMarker.eventSequentialId shouldBe to._2 + endMarker.eventOffset shouldBe to.lastOffset + endMarker.eventSequentialId shouldBe to.lastEventSeqId } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionTreesSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionTreesSpec.scala index 6e6a9c0156c0..631e97341689 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionTreesSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionTreesSpec.scala @@ -248,8 +248,8 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec resultForAlice <- transactionsOf( ledgerDao.transactionsReader .getTransactionTrees( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, requestingParties = Set(alice), verbose = true, ) @@ -257,8 +257,8 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec resultForBob <- transactionsOf( ledgerDao.transactionsReader .getTransactionTrees( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, requestingParties = Set(bob), verbose = true, ) @@ -266,8 +266,8 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec resultForCharlie <- transactionsOf( ledgerDao.transactionsReader .getTransactionTrees( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, requestingParties = Set(charlie), verbose = true, ) @@ -287,7 +287,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec (_, t3) <- store(singleExercise(nonTransient(t2).loneElement)) (_, t4) <- store(fullyTransient()) to <- ledgerDao.lookupLedgerEnd() - } yield (from, to, Seq(t1, t2, t3, t4)) + } yield (from.lastOffset, to.lastOffset, Seq(t1, t2, t3, t4)) private def lookupIndividually( transactions: Seq[LedgerEntry.Transaction], diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala index 5783c353bf61..0fdc4de19790 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala @@ -211,8 +211,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid resultForAlice <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map(alice -> Set.empty), verbose = true, ) @@ -220,8 +220,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid resultForBob <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map(bob -> Set.empty), verbose = true, ) @@ -229,8 +229,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid resultForCharlie <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map(charlie -> Set.empty), verbose = true, ) @@ -259,8 +259,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid result <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map(alice -> Set(otherTemplateId)), verbose = true, ) @@ -290,8 +290,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid result <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map( alice -> Set(otherTemplateId), bob -> Set(otherTemplateId), @@ -330,8 +330,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid result <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map( alice -> Set(someTemplateId), bob -> Set(otherTemplateId), @@ -370,8 +370,8 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid result <- transactionsOf( ledgerDao.transactionsReader .getFlatTransactions( - startExclusive = from, - endInclusive = to, + startExclusive = from.lastOffset, + endInclusive = to.lastOffset, filter = Map( alice -> Set(otherTemplateId), bob -> Set.empty, @@ -401,7 +401,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid (offset, exercise) <- store(exerciseWithChild(firstContractId)) result <- ledgerDao.transactionsReader .getFlatTransactions( - from, + from.lastOffset, offset, exercise.actAs.map(submitter => submitter -> Set.empty[Identifier]).toMap, verbose = true, @@ -555,7 +555,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid } to <- ledgerDao.lookupLedgerEnd() response <- ledgerDao.transactionsReader - .getFlatTransactions(from, to, cp.filter, verbose = false) + .getFlatTransactions(from.lastOffset, to.lastOffset, cp.filter, verbose = false) .runWith(Sink.seq) readOffsets = response flatMap { case (_, gtr) => gtr.transactions map (_.offset) } readCreates = extractAllTransactions(response) flatMap (_.events) @@ -600,7 +600,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid (_, t3) <- store(singleExercise(nonTransient(t2).loneElement)) (_, t4) <- store(fullyTransient()) to <- ledgerDao.lookupLedgerEnd() - } yield (from, to, Seq(t1, t2, t3, t4)) + } yield (from.lastOffset, to.lastOffset, Seq(t1, t2, t3, t4)) private def lookupIndividually( transactions: Seq[LedgerEntry.Transaction], diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsWriterSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsWriterSpec.scala index 7df6f40c8bdc..ce032a6d71a5 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsWriterSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoTransactionsWriterSpec.scala @@ -25,7 +25,7 @@ private[dao] trait JdbcLedgerDaoTransactionsWriterSpec extends LoneElement { createdContractId = nonTransient(create).loneElement (_, lookup) <- store(txLookupByKey(alice, keyValue, Some(createdContractId))) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( create.commandId.get -> ok, @@ -43,7 +43,7 @@ private[dao] trait JdbcLedgerDaoTransactionsWriterSpec extends LoneElement { createdContractId = nonTransient(create).loneElement (_, fetch) <- store(txFetch(alice, createdContractId)) to <- ledgerDao.lookupLedgerEnd() - completions <- getCompletions(from, to, defaultAppId, Set(alice)) + completions <- getCompletions(from.lastOffset, to.lastOffset, defaultAppId, Set(alice)) } yield { completions should contain.allOf( create.commandId.get -> ok, diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala index 39daee6bc88d..0aa9dc97a805 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/parallel/ParallelIndexerSubscriptionSpec.scala @@ -145,6 +145,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { val expected = Batch[Vector[DbDto]]( lastOffset = offset("02"), lastSeqEventId = 0, + lastStringInterningId = 0, lastRecordTime = someTime.plusMillis(2).toEpochMilli, batch = Vector( someParty, @@ -164,9 +165,10 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { behavior of "seqMapperZero" it should "provide required Batch in happy path case" in { - ParallelIndexerSubscription.seqMapperZero(123) shouldBe Batch( + ParallelIndexerSubscription.seqMapperZero(123, 234) shouldBe Batch( lastOffset = null, lastSeqEventId = 123, + lastStringInterningId = 234, lastRecordTime = 0, batch = Vector.empty, batchSize = 0, @@ -177,12 +179,16 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { behavior of "seqMapper" - it should "assign sequence ids correctly in happy path case" in { - val result = ParallelIndexerSubscription.seqMapper(metrics)( - ParallelIndexerSubscription.seqMapperZero(15), + it should "assign sequence ids correctly, and populate string-interning entries correctly in happy path case" in { + val result = ParallelIndexerSubscription.seqMapper( + _.zipWithIndex.map(x => x._2 -> x._2.toString).take(2), + metrics, + )( + ParallelIndexerSubscription.seqMapperZero(15, 26), Batch( lastOffset = offset("02"), lastSeqEventId = 0, + lastStringInterningId = 0, lastRecordTime = someTime.toEpochMilli, batch = Vector( someParty, @@ -199,18 +205,24 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { ), ) result.lastSeqEventId shouldBe 18 + result.lastStringInterningId shouldBe 1 result.averageStartTime should be > System.nanoTime() - 1000000000 result.batch(1).asInstanceOf[DbDto.EventDivulgence].event_sequential_id shouldBe 16 result.batch(3).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 17 result.batch(5).asInstanceOf[DbDto.EventExercise].event_sequential_id shouldBe 18 + result.batch(7).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 0 + result.batch(7).asInstanceOf[DbDto.StringInterningDto].externalString shouldBe "0" + result.batch(8).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 1 + result.batch(8).asInstanceOf[DbDto.StringInterningDto].externalString shouldBe "1" } it should "preserve sequence id if nothing to assign" in { - val result = ParallelIndexerSubscription.seqMapper(metrics)( - ParallelIndexerSubscription.seqMapperZero(15), + val result = ParallelIndexerSubscription.seqMapper(_ => Nil, metrics)( + ParallelIndexerSubscription.seqMapperZero(15, 25), Batch( lastOffset = offset("02"), lastSeqEventId = 0, + lastStringInterningId = 0, lastRecordTime = someTime.toEpochMilli, batch = Vector( someParty, @@ -224,6 +236,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { ), ) result.lastSeqEventId shouldBe 15 + result.lastStringInterningId shouldBe 25 } behavior of "batcher" @@ -237,6 +250,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { lastOffset = offset("02"), lastSeqEventId = 0, lastRecordTime = someTime.toEpochMilli, + lastStringInterningId = 0, batch = Vector( someParty, someParty, @@ -251,6 +265,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { result shouldBe Batch( lastOffset = offset("02"), lastSeqEventId = 0, + lastStringInterningId = 0, lastRecordTime = someTime.toEpochMilli, batch = "bumm", batchSize = 3, @@ -267,6 +282,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { Batch( lastOffset = offset("02"), lastSeqEventId = 1000, + lastStringInterningId = 200, lastRecordTime = someTime.toEpochMilli - 1000, batch = "bumm1", batchSize = 3, @@ -276,6 +292,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { Batch( lastOffset = offset("05"), lastSeqEventId = 2000, + lastStringInterningId = 210, lastRecordTime = someTime.toEpochMilli, batch = "bumm2", batchSize = 3, @@ -285,6 +302,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { ) shouldBe Batch( lastOffset = offset("05"), lastSeqEventId = 2000, + lastStringInterningId = 210, lastRecordTime = someTime.toEpochMilli, batch = "zero", batchSize = 0, @@ -300,6 +318,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { Batch( lastOffset = offset("05"), lastSeqEventId = 2000, + lastStringInterningId = 300, lastRecordTime = someTime.toEpochMilli, batch = "zero", batchSize = 0, @@ -309,6 +328,7 @@ class ParallelIndexerSubscriptionSpec extends AnyFlatSpec with Matchers { ) shouldBe ParameterStorageBackend.LedgerEnd( lastOffset = offset("05"), lastEventSeqId = 2000, + lastStringInterningId = 300, ) } } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala index c58987ba44e4..5adc6bec3f41 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/SequentialWriteDaoSpec.scala @@ -8,28 +8,38 @@ import java.sql.Connection import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.Party import com.daml.lf.data.Time.Timestamp import com.daml.logging.LoggingContext import com.daml.platform.store.appendonlydao.SequentialWriteDaoSpec._ import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.backend.{DbDto, IngestionStorageBackend, ParameterStorageBackend} import com.daml.platform.store.cache.MutableLedgerEndCache +import com.daml.platform.store.interning.{ + DomainStringIterators, + InternizingStringInterningView, + StringInterning, + StringInterningDomain, +} import org.mockito.MockitoSugar.mock import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import sun.reflect.generics.reflectiveObjects.NotImplementedException class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers { behavior of "SequentialWriteDaoImpl" it should "store correctly in a happy path case" in { - val storageBackendCaptor = new StorageBackendCaptor(Some(LedgerEnd(Offset.beforeBegin, 5))) + val storageBackendCaptor = new StorageBackendCaptor(Some(LedgerEnd(Offset.beforeBegin, 5, 1))) val ledgerEndCache = MutableLedgerEndCache() val testee = SequentialWriteDaoImpl( parameterStorageBackend = storageBackendCaptor, ingestionStorageBackend = storageBackendCaptor, updateToDbDtos = updateToDbDtoFixture, ledgerEndCache = ledgerEndCache, + stringInterningView = stringInterningViewFixture, + dbDtosToStringsForInterning = dbDtoToStringsForInterningFixture, ) testee.store(someConnection, offset("01"), singlePartyFixture) ledgerEndCache() shouldBe (offset("01") -> 5) @@ -41,7 +51,7 @@ class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers { ledgerEndCache() shouldBe (offset("04") -> 9) storageBackendCaptor.captured(0) shouldBe someParty - storageBackendCaptor.captured(1) shouldBe LedgerEnd(offset("01"), 5) + storageBackendCaptor.captured(1) shouldBe LedgerEnd(offset("01"), 5, 1) storageBackendCaptor.captured(2).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 6 storageBackendCaptor .captured(3) @@ -51,12 +61,22 @@ class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers { .captured(4) .asInstanceOf[DbDto.EventDivulgence] .event_sequential_id shouldBe 8 - storageBackendCaptor.captured(5) shouldBe LedgerEnd(offset("02"), 8) - storageBackendCaptor.captured(6) shouldBe LedgerEnd(offset("03"), 8) - storageBackendCaptor.captured(7) shouldBe someParty - storageBackendCaptor.captured(8).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 9 - storageBackendCaptor.captured(9) shouldBe LedgerEnd(offset("04"), 9) - storageBackendCaptor.captured should have size 10 + storageBackendCaptor.captured(5).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 1 + storageBackendCaptor + .captured(5) + .asInstanceOf[DbDto.StringInterningDto] + .externalString shouldBe "a" + storageBackendCaptor.captured(6).asInstanceOf[DbDto.StringInterningDto].internalId shouldBe 2 + storageBackendCaptor + .captured(6) + .asInstanceOf[DbDto.StringInterningDto] + .externalString shouldBe "b" + storageBackendCaptor.captured(7) shouldBe LedgerEnd(offset("02"), 8, 2) + storageBackendCaptor.captured(8) shouldBe LedgerEnd(offset("03"), 8, 2) + storageBackendCaptor.captured(9) shouldBe someParty + storageBackendCaptor.captured(10).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 9 + storageBackendCaptor.captured(11) shouldBe LedgerEnd(offset("04"), 9, 2) + storageBackendCaptor.captured should have size 12 } it should "start event_seq_id from 1" in { @@ -67,16 +87,18 @@ class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers { ingestionStorageBackend = storageBackendCaptor, updateToDbDtos = updateToDbDtoFixture, ledgerEndCache = ledgerEndCache, + stringInterningView = stringInterningViewFixture, + dbDtosToStringsForInterning = dbDtoToStringsForInterningFixture, ) testee.store(someConnection, offset("03"), None) ledgerEndCache() shouldBe (offset("03") -> 0) testee.store(someConnection, offset("04"), partyAndCreateFixture) ledgerEndCache() shouldBe (offset("04") -> 1) - storageBackendCaptor.captured(0) shouldBe LedgerEnd(offset("03"), 0) + storageBackendCaptor.captured(0) shouldBe LedgerEnd(offset("03"), 0, 0) storageBackendCaptor.captured(1) shouldBe someParty storageBackendCaptor.captured(2).asInstanceOf[DbDto.EventCreate].event_sequential_id shouldBe 1 - storageBackendCaptor.captured(3) shouldBe LedgerEnd(offset("04"), 1) + storageBackendCaptor.captured(3) shouldBe LedgerEnd(offset("04"), 1, 0) storageBackendCaptor.captured should have size 4 } @@ -251,13 +273,35 @@ object SequentialWriteDaoSpec { ), ) - val updateToDbDtoFixture: Offset => state.Update => Iterator[DbDto] = + private val updateToDbDtoFixture: Offset => state.Update => Iterator[DbDto] = _ => { case r: state.Update.PublicPackageUploadRejected => someUpdateToDbDtoFixture(r.rejectionReason).iterator case _ => throw new Exception } + private val dbDtoToStringsForInterningFixture: Iterable[DbDto] => DomainStringIterators = { + case iterable if iterable.size == 3 => + new DomainStringIterators(Iterator.empty, List("1").iterator) + case _ => new DomainStringIterators(Iterator.empty, Iterator.empty) + } + + private val stringInterningViewFixture: StringInterning with InternizingStringInterningView = { + new StringInterning with InternizingStringInterningView { + override def templateId: StringInterningDomain[Ref.Identifier] = + throw new NotImplementedException + + override def party: StringInterningDomain[Party] = throw new NotImplementedException + + override def internize( + domainStringIterators: DomainStringIterators + ): Iterable[(Int, String)] = { + if (domainStringIterators.templateIds.isEmpty) Nil + else List(1 -> "a", 2 -> "b") + } + } + } + private val someConnection = mock[Connection] } diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index dc1e446c0eda..8dd4a9c2d9ce 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -216,8 +216,8 @@ class RecoveringIndexerIntegrationSpec dao.use { case (ledgerDao, ledgerEndCache) => eventually { (_, _) => for { - ledgerEnd <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() - _ = ledgerEndCache.set(ledgerEnd) + ledgerEnd <- ledgerDao.lookupLedgerEnd() + _ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId) knownParties <- ledgerDao.listKnownParties() } yield { knownParties.map(_.displayName) shouldBe partyNames.map(Some(_)) diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala index dc530424ce77..6dd77521a457 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala @@ -48,6 +48,11 @@ import com.daml.platform.store.appendonlydao.{ } import com.daml.platform.store.backend.StorageBackendFactory import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry} +import com.daml.platform.store.interning.{ + InternizingStringInterningView, + StringInterning, + StringInterningView, +} import com.daml.platform.store.{BaseLedger, DbType, FlywayMigrations, LfValueTranslationCache} import com.google.rpc.status.{Status => RpcStatus} import io.grpc.Status @@ -119,10 +124,19 @@ private[sandbox] object SqlLedger { metrics = metrics, ) .acquire() + stringInterningStorageBackend = storageBackendFactory.createStringInterningStorageBackend + stringInterningView = new StringInterningView( + loadPrefixedEntries = (fromExclusive, toInclusive) => + implicit loggingContext => + dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) { + stringInterningStorageBackend.loadStringInterningEntries(fromExclusive, toInclusive) + } + ) dao = ledgerDaoOwner( dbDispatcher, storageBackendFactory, ledgerEndCache, + stringInterningView, servicesExecutionContext, errorFactories, ) @@ -135,10 +149,11 @@ private[sandbox] object SqlLedger { existingLedgerId <- Resource.fromFuture(dao.lookupLedgerId()) existingParticipantId <- Resource.fromFuture(dao.lookupParticipantId()) ledgerId <- Resource.fromFuture(initialize(dao, existingLedgerId, existingParticipantId)) - ledgerEnd <- Resource.fromFuture(dao.lookupLedgerEndOffsetAndSequentialId()) - _ = ledgerEndCache.set(ledgerEnd) + ledgerEnd <- Resource.fromFuture(dao.lookupLedgerEnd()) + _ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId) + _ <- Resource.fromFuture(stringInterningView.update(ledgerEnd.lastStringInterningId)) ledgerConfig <- Resource.fromFuture(dao.lookupLedgerConfiguration()) - dispatcher <- dispatcherOwner(ledgerEnd._1).acquire() + dispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire() persistenceQueue <- new PersistenceQueueOwner(dispatcher).acquire() // Close the dispatcher before the persistence queue. _ <- Resource(Future.unit)(_ => Future.successful(dispatcher.close())) @@ -280,6 +295,7 @@ private[sandbox] object SqlLedger { dbDispatcher: DbDispatcher, storageBackendFactory: StorageBackendFactory, ledgerEndCache: MutableLedgerEndCache, + stringInterningView: StringInterning with InternizingStringInterningView, servicesExecutionContext: ExecutionContext, errorFactories: ErrorFactories, ): LedgerDao = { @@ -295,6 +311,7 @@ private[sandbox] object SqlLedger { metrics = metrics, compressionStrategy = compressionStrategy, ledgerEndCache = ledgerEndCache, + stringInterningView = stringInterningView, ingestionStorageBackend = storageBackendFactory.createIngestionStorageBackend, parameterStorageBackend = storageBackendFactory.createParameterStorageBackend, ),