Skip to content

Commit

Permalink
Activate interning write side [DPP-710] (#11614)
Browse files Browse the repository at this point in the history
After this PR is merged write side of string-interning is activated:
* String interning table is being populated as part of usual indexing
* Data migration will populate the table from already existent data
Effect of populating string interning is not visible yet.

List of changes
* Add schema and data migrations where applicable
* Patch up sandbox-classic and parallel indexer to populate string_interning persistence, and initialize write side string interning view
* Add ledger_end_stringinterning_id to track the need for update at the view (+ aligning code)
* remove lookupLedgerEndOffsetAndSequentialId
* fix/extend unit-tests

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 10, 2021
1 parent 93c07f4 commit f8f8807
Show file tree
Hide file tree
Showing 44 changed files with 498 additions and 195 deletions.
6 changes: 3 additions & 3 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,6 @@ final class Metrics(val registry: MetricRegistry) {
val getLedgerId: DatabaseMetrics = createDbMetrics("get_ledger_id")
val getParticipantId: DatabaseMetrics = createDbMetrics("get_participant_id")
val getLedgerEnd: DatabaseMetrics = createDbMetrics("get_ledger_end")
val getLedgerEndOffsetAndSequentialId: DatabaseMetrics = createDbMetrics(
"get_ledger_end_offset_and_sequential_id"
)
val getInitialLedgerEnd: DatabaseMetrics = createDbMetrics("get_initial_ledger_end")
val initializeLedgerParameters: DatabaseMetrics = createDbMetrics(
"initialize_ledger_parameters"
Expand Down Expand Up @@ -524,6 +521,9 @@ final class Metrics(val registry: MetricRegistry) {
val getContractStateEvents: DatabaseMetrics = createDbMetrics(
"get_contract_state_events"
)
val loadStringInterningEntries: DatabaseMetrics = createDbMetrics(
"loadStringInterningEntries"
)

object translation {
private val Prefix: MetricName = db.Prefix :+ "translation"
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e28b88accecead16dcd67becabf7ad230654b65f93c277e785ba77203c3db8b5
bd9ddd16ae4ecc09923215635442c83b7894e95d23203baccde6df27cb0ce2cf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE parameters (
participant_id VARCHAR NOT NULL,
ledger_end VARCHAR,
ledger_end_sequential_id BIGINT,
ledger_end_string_interning_id INTEGER,
participant_pruned_up_to_inclusive VARCHAR,
participant_all_divulged_contracts_pruned_up_to_inclusive VARCHAR
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2131cf0ed208e236ebf0ae68c8153cb8080d3b133114902185393c2b34f0b45d
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

ALTER TABLE parameters
ADD ledger_end_string_interning_id NUMBER;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cc55310126bde9627d59698cee292506e4d80c8303f090ba8649c4f4c6e19fbc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- add string_interning ledger-end tracking column to parameters
ALTER TABLE parameters
ADD COLUMN ledger_end_string_interning_id INTEGER;

-- create temporary sequence to populate the string_interning table as migrating data
CREATE SEQUENCE string_interning_seq_temp;
-- add temporary index to aid string lookups as migrating data
CREATE UNIQUE INDEX string_interning_external_string_temp_idx ON string_interning USING btree (external_string);

-- add temporary migration function
CREATE FUNCTION insert_to_string_interning(prefix TEXT, table_name TEXT, selector_expr TEXT)
RETURNS void
LANGUAGE plpgsql
AS
$$
BEGIN
EXECUTE
'INSERT INTO string_interning(internal_id, external_string) ' ||
'SELECT nextval(''string_interning_seq_temp''), id ' ||
'FROM (SELECT DISTINCT ''' || prefix || ''' || ' || selector_expr || ' id FROM ' || table_name || ') distinct_ids ' ||
'WHERE NOT EXISTS (SELECT 1 FROM string_interning WHERE external_string = distinct_ids.id)' ||
'AND id IS NOT NULL';
END
$$;

-- data migrations

-- template_id
SELECT insert_to_string_interning('t|', 'participant_events_create', 'template_id');
SELECT insert_to_string_interning('t|', 'participant_events_divulgence', 'template_id');
SELECT insert_to_string_interning('t|', 'participant_events_consuming_exercise', 'template_id');
SELECT insert_to_string_interning('t|', 'participant_events_non_consuming_exercise', 'template_id');

-- party
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(flat_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(submitters)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(create_observers)');
SELECT insert_to_string_interning('p|', 'participant_events_create', 'unnest(create_signatories)');

SELECT insert_to_string_interning('p|', 'participant_events_divulgence', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_divulgence', 'unnest(submitters)');

SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(flat_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(submitters)');
SELECT insert_to_string_interning('p|', 'participant_events_consuming_exercise', 'unnest(exercise_actors)');

SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(tree_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(flat_event_witnesses)');
SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(submitters)');
SELECT insert_to_string_interning('p|', 'participant_events_non_consuming_exercise', 'unnest(exercise_actors)');

SELECT insert_to_string_interning('p|', 'participant_command_completions', 'unnest(submitters)');

SELECT insert_to_string_interning('p|', 'party_entries', 'party');

-- fill ledger-end
UPDATE parameters
SET ledger_end_string_interning_id = (SELECT max(internal_id) FROM string_interning);

-- remove temporary SQL objects
DROP SEQUENCE string_interning_seq_temp;
DROP INDEX string_interning_external_string_temp_idx;
DROP FUNCTION insert_to_string_interning(TEXT, TEXT, TEXT);

Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
context: ResourceContext
): Resource[ReadOnlySqlLedgerWithMutableCache] =
for {
(ledgerEndOffset, ledgerEndSequentialId) <- Resource.fromFuture(
ledgerDao.lookupLedgerEndOffsetAndSequentialId()
ledgerEnd <- Resource.fromFuture(
ledgerDao.lookupLedgerEnd()
)
_ = ledgerEndCache.set((ledgerEndOffset, ledgerEndSequentialId))
_ = ledgerEndCache.set((ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId))
prefetchingDispatcher <- dispatcherOffsetSeqIdOwner(
ledgerEndOffset,
ledgerEndSequentialId,
ledgerEnd.lastOffset,
ledgerEnd.lastEventSeqId,
).acquire()
generalDispatcher <- dispatcherOwner(ledgerEndOffset).acquire()
generalDispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire()
dispatcherLagMeter <- Resource.successful(
new DispatcherLagMeter((offset, eventSeqId) => {
ledgerEndCache.set((offset, eventSeqId))
Expand All @@ -79,7 +79,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
prefetchingDispatcher,
generalDispatcher,
dispatcherLagMeter,
ledgerEndOffset -> ledgerEndSequentialId,
ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId,
).acquire()
} yield ledger

Expand Down Expand Up @@ -294,12 +294,14 @@ private final class ReadOnlySqlLedgerWithMutableCache(
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEndOffsetAndSequentialId())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { case newLedgerHead @ (offset, _) =>
dispatcherLagger.startTimer(offset)
contractStateEventsDispatcher.signalNewHead(newLedgerHead)
.toMat(Sink.foreach { case newLedgerHead =>
dispatcherLagger.startTimer(newLedgerHead.lastOffset)
contractStateEventsDispatcher.signalNewHead(
newLedgerHead.lastOffset -> newLedgerHead.lastEventSeqId
)
})(
Keep.both[UniqueKillSwitch, Future[Done]]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ private[index] object ReadOnlySqlLedgerWithTranslationCache {
context: ResourceContext
): Resource[ReadOnlySqlLedgerWithTranslationCache] =
for {
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEndOffsetAndSequentialId())
_ = ledgerEndCache.set(ledgerEnd)
dispatcher <- dispatcherOwner(ledgerEnd._1).acquire()
ledgerEnd <- Resource.fromFuture(ledgerDao.lookupLedgerEnd())
_ = ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
dispatcher <- dispatcherOwner(ledgerEnd.lastOffset).acquire()
contractsStore <- contractStoreOwner()
ledger <- ledgerOwner(dispatcher, contractsStore).acquire()
} yield ledger
Expand Down Expand Up @@ -89,15 +89,15 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEndOffsetAndSequentialId())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { ledgerEnd =>
ledgerEndCache.set(ledgerEnd)
ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId)
// the order here is very important: first we need to make data available for point-wise lookups
// and SQL queries, and only then we can make it available on the streams.
// (consider example: completion arrived on a stream, but the transaction cannot be looked up)
dispatcher.signalNewHead(ledgerEnd._1)
dispatcher.signalNewHead(ledgerEnd.lastOffset)
})(
Keep.both[UniqueKillSwitch, Future[Done]]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object JdbcIndexer {
val parameterStorageBackend = factory.createParameterStorageBackend
val DBLockStorageBackend = factory.createDBLockStorageBackend
val resetStorageBackend = factory.createResetStorageBackend
val stringInterningStorageBackend = factory.createStringInterningStorageBackend
val indexer = ParallelIndexerFactory(
jdbcUrl = config.jdbcUrl,
inputMappingParallelism = config.inputMappingParallelism,
Expand Down Expand Up @@ -97,6 +98,7 @@ object JdbcIndexer {
batchWithinMillis = config.batchWithinMillis,
metrics = metrics,
),
stringInterningStorageBackend = stringInterningStorageBackend,
mat = materializer,
readService = readService,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.backend.{IngestionStorageBackend, ParameterStorageBackend}
import com.daml.platform.store.interning.UpdatingStringInterningView

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -29,6 +29,7 @@ private[platform] case class InitializeParallelIngestion(

def apply(
dbDispatcher: DbDispatcher,
updatingStringInterningView: UpdatingStringInterningView,
readService: ReadService,
ec: ExecutionContext,
mat: Materializer,
Expand All @@ -54,8 +55,14 @@ private[platform] case class InitializeParallelIngestion(
_ <- dbDispatcher.executeSql(metrics.daml.parallelIndexer.initialization)(
ingestionStorageBackend.deletePartiallyIngestedData(ledgerEnd)
)
_ <- ledgerEnd match {
case Some(ledgerEnd) => updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
case None => Future.unit
}
ledgerEndOrBeforeBegin = ledgerEnd.getOrElse(ParameterStorageBackend.LedgerEndBeforeBegin)
} yield InitializeParallelIngestion.Initialized(
initialEventSeqId = ledgerEnd.map(_.lastEventSeqId).getOrElse(EventSequentialId.beforeBegin),
initialEventSeqId = ledgerEndOrBeforeBegin.lastEventSeqId,
initialStringInterningId = ledgerEndOrBeforeBegin.lastStringInterningId,
readServiceSource = readService.stateUpdates(beginAfter = ledgerEnd.map(_.lastOffset)),
)
}
Expand All @@ -66,6 +73,7 @@ object InitializeParallelIngestion {

case class Initialized(
initialEventSeqId: Long,
initialStringInterningId: Int,
readServiceSource: Source[(Offset, Update), NotUsed],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.indexer.Indexer
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.{DBLockStorageBackend, DataSourceStorageBackend}
import com.daml.platform.store.backend.{
DBLockStorageBackend,
DataSourceStorageBackend,
StringInterningStorageBackend,
}
import com.daml.platform.store.interning.StringInterningView
import com.google.common.util.concurrent.ThreadFactoryBuilder

import scala.concurrent.duration.FiniteDuration
Expand All @@ -39,6 +44,7 @@ object ParallelIndexerFactory {
dataSourceStorageBackend: DataSourceStorageBackend,
initializeParallelIngestion: InitializeParallelIngestion,
parallelIndexerSubscription: ParallelIndexerSubscription[_],
stringInterningStorageBackend: StringInterningStorageBackend,
mat: Materializer,
readService: ReadService,
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] =
Expand Down Expand Up @@ -101,8 +107,19 @@ object ParallelIndexerFactory {
metrics = metrics,
)
) { dbDispatcher =>
val stringInterningView = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
)
initializeParallelIngestion(
dbDispatcher = dbDispatcher,
updatingStringInterningView = stringInterningView,
readService = readService,
ec = ec,
mat = mat,
Expand All @@ -111,6 +128,7 @@ object ParallelIndexerFactory {
inputMapperExecutor = inputMapperExecutor,
batcherExecutor = batcherExecutor,
dbDispatcher = dbDispatcher,
stringInterningView = stringInterningView,
materializer = mat,
)
)
Expand Down
Loading

0 comments on commit f8f8807

Please sign in to comment.