Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Ingestion renames #9894

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.daml.platform.indexer.{IndexFeedHandle, Indexer}
import com.daml.platform.store.{DbType, backend}
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.{DBDTOV1, StorageBackend}
import com.daml.platform.store.backend.{DbDto, StorageBackend}
import com.daml.resources

import scala.concurrent.Future
Expand Down Expand Up @@ -69,7 +69,7 @@ object ParallelIndexerFactory {
metrics = metrics,
connectionAsyncCommitMode = DbType.AsynchronousCommit,
)
toDbDto = backend.UpdateToDBDTOV1(
toDbDto = backend.UpdateToDbDto(
participantId = participantId,
translation = translation,
compressionStrategy = compressionStrategy,
Expand Down Expand Up @@ -136,10 +136,10 @@ object ParallelIndexerFactory {

def inputMapper(
metrics: Metrics,
toDbDto: Offset => Update => Iterator[DBDTOV1],
toDbDto: Offset => Update => Iterator[DbDto],
)(implicit
loggingContext: LoggingContext
): Iterable[((Offset, Update), Long)] => Batch[Vector[DBDTOV1]] = { input =>
): Iterable[((Offset, Update), Long)] => Batch[Vector[DbDto]] = { input =>
metrics.daml.parallelIndexer.inputMapping.batchSize.update(input.size)
input.foreach { case ((offset, update), _) =>
LoggingContext.withEnrichedLoggingContext(
Expand All @@ -162,7 +162,7 @@ object ParallelIndexerFactory {
)
}

def seqMapperZero(initialSeqId: Long): Batch[Vector[DBDTOV1]] =
def seqMapperZero(initialSeqId: Long): Batch[Vector[DbDto]] =
Batch(
lastOffset = null,
lastSeqEventId = initialSeqId, // this is the only property of interest in the zero element
Expand All @@ -174,20 +174,20 @@ object ParallelIndexerFactory {
)

def seqMapper(metrics: Metrics)(
previous: Batch[Vector[DBDTOV1]],
current: Batch[Vector[DBDTOV1]],
): Batch[Vector[DBDTOV1]] = {
previous: Batch[Vector[DbDto]],
current: Batch[Vector[DbDto]],
): Batch[Vector[DbDto]] = {
var eventSeqId = previous.lastSeqEventId
val batchWithSeqIds = current.batch.map {
case dbDto: DBDTOV1.EventCreate =>
case dbDto: DbDto.EventCreate =>
eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId)

case dbDto: DBDTOV1.EventExercise =>
case dbDto: DbDto.EventExercise =>
eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId)

case dbDto: DBDTOV1.EventDivulgence =>
case dbDto: DbDto.EventDivulgence =>
eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId)

Expand All @@ -206,9 +206,9 @@ object ParallelIndexerFactory {
}

def batcher[DB_BATCH](
batchF: Vector[DBDTOV1] => DB_BATCH,
batchF: Vector[DbDto] => DB_BATCH,
metrics: Metrics,
): Batch[Vector[DBDTOV1]] => Batch[DB_BATCH] = { inBatch =>
): Batch[Vector[DbDto]] => Batch[DB_BATCH] = { inBatch =>
val dbBatch = batchF(inBatch.batch)
val nowNanos = System.nanoTime()
metrics.daml.parallelIndexer.batching.duration.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import com.daml.platform.store.appendonlydao.events.{
PostCommitValidation,
TransactionsReader,
}
import com.daml.platform.store.backend.{StorageBackend, UpdateToDBDTOV1}
import com.daml.platform.store.backend.{StorageBackend, UpdateToDbDto}
import com.daml.platform.store.dao.ParametersTable.LedgerEndUpdateError
import com.daml.platform.store.dao.events.TransactionsWriter.PreparedInsert
import com.daml.platform.store.dao.{
Expand Down Expand Up @@ -298,7 +298,7 @@ private class JdbcLedgerDao(
recordTime = Time.Timestamp.assertFromInstant(recordedAt),
submissionId = SubmissionId.assertFromString(submissionId),
participantId =
v1.ParticipantId.assertFromString("1"), // not used for DBDTO generation
v1.ParticipantId.assertFromString("1"), // not used for DbDto generation
newConfiguration = configuration,
)

Expand All @@ -307,7 +307,7 @@ private class JdbcLedgerDao(
recordTime = Time.Timestamp.assertFromInstant(recordedAt),
submissionId = SubmissionId.assertFromString(submissionId),
participantId =
v1.ParticipantId.assertFromString("1"), // not used for DBDTO generation
v1.ParticipantId.assertFromString("1"), // not used for DbDto generation
proposedConfiguration = configuration,
rejectionReason = reason,
)
Expand Down Expand Up @@ -574,11 +574,11 @@ private class JdbcLedgerDao(
ledgerEffectiveTime =
Time.Timestamp.assertFromInstant(tx.ledgerEffectiveTime),
workflowId = tx.workflowId,
submissionTime = null, // not used for DBDTO generation
submissionSeed = null, // not used for DBDTO generation
optUsedPackages = None, // not used for DBDTO generation
optNodeSeeds = None, // not used for DBDTO generation
optByKeyNodes = None, // not used for DBDTO generation
submissionTime = null, // not used for DbDto generation
submissionSeed = null, // not used for DbDto generation
optUsedPackages = None, // not used for DbDto generation
optNodeSeeds = None, // not used for DbDto generation
optByKeyNodes = None, // not used for DbDto generation
),
transaction = tx.transaction,
transactionId = tx.transactionId,
Expand Down Expand Up @@ -960,11 +960,11 @@ private class JdbcLedgerDao(
transactionMeta = TransactionMeta(
ledgerEffectiveTime = Time.Timestamp.assertFromInstant(ledgerEffectiveTime),
workflowId = workflowId,
submissionTime = null, // not used for DBDTO generation
submissionSeed = null, // not used for DBDTO generation
optUsedPackages = None, // not used for DBDTO generation
optNodeSeeds = None, // not used for DBDTO generation
optByKeyNodes = None, // not used for DBDTO generation
submissionTime = null, // not used for DbDto generation
submissionSeed = null, // not used for DbDto generation
optUsedPackages = None, // not used for DbDto generation
optNodeSeeds = None, // not used for DbDto generation
optByKeyNodes = None, // not used for DbDto generation
),
transaction = transaction,
transactionId = transactionId,
Expand Down Expand Up @@ -1116,7 +1116,7 @@ private[platform] object JdbcLedgerDao {
): SequentialWriteDao =
SequentialWriteDaoImpl(
storageBackend = StorageBackend.of(dbType),
updateToDbDtos = UpdateToDBDTOV1(
updateToDbDtos = UpdateToDbDto(
participantId = participantId,
translation = new LfValueTranslation(
cache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao
import java.sql.Connection

import com.daml.ledger.participant.state.v1.{Offset, Update}
import com.daml.platform.store.backend.{DBDTOV1, StorageBackend}
import com.daml.platform.store.backend.{DbDto, StorageBackend}

import scala.util.chaining.scalaUtilChainingOps

Expand All @@ -16,7 +16,7 @@ trait SequentialWriteDao {

case class SequentialWriteDaoImpl[DB_BATCH](
storageBackend: StorageBackend[DB_BATCH],
updateToDbDtos: Offset => Update => Iterator[DBDTOV1],
updateToDbDtos: Offset => Update => Iterator[DbDto],
) extends SequentialWriteDao {

private var lastEventSeqId: Long = _
Expand All @@ -33,11 +33,11 @@ case class SequentialWriteDaoImpl[DB_BATCH](
lastEventSeqId
}

private def adaptEventSeqIds(dbDtos: Iterator[DBDTOV1]): Vector[DBDTOV1] =
private def adaptEventSeqIds(dbDtos: Iterator[DbDto]): Vector[DbDto] =
dbDtos.map {
case e: DBDTOV1.EventCreate => e.copy(event_sequential_id = nextEventSeqId)
case e: DBDTOV1.EventDivulgence => e.copy(event_sequential_id = nextEventSeqId)
case e: DBDTOV1.EventExercise => e.copy(event_sequential_id = nextEventSeqId)
case e: DbDto.EventCreate => e.copy(event_sequential_id = nextEventSeqId)
case e: DbDto.EventDivulgence => e.copy(event_sequential_id = nextEventSeqId)
case e: DbDto.EventExercise => e.copy(event_sequential_id = nextEventSeqId)
case notEvent => notEvent
}.toVector

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import java.time.Instant

import com.daml.scalautil.NeverEqualsOverride

sealed trait DBDTOV1
sealed trait DbDto
extends NeverEqualsOverride
with Product
with Serializable // to aid type inference for case class implementors

object DBDTOV1 {
object DbDto {

final case class EventDivulgence(
event_offset: Option[String],
Expand All @@ -26,7 +26,7 @@ object DBDTOV1 {
create_argument: Option[Array[Byte]],
create_argument_compression: Option[Int],
event_sequential_id: Long,
) extends DBDTOV1
) extends DbDto

final case class EventCreate(
event_offset: Option[String],
Expand All @@ -51,7 +51,7 @@ object DBDTOV1 {
create_argument_compression: Option[Int],
create_key_value_compression: Option[Int],
event_sequential_id: Long,
) extends DBDTOV1
) extends DbDto

final case class EventExercise(
consuming: Boolean,
Expand All @@ -78,7 +78,7 @@ object DBDTOV1 {
exercise_argument_compression: Option[Int],
exercise_result_compression: Option[Int],
event_sequential_id: Long,
) extends DBDTOV1
) extends DbDto

final case class ConfigurationEntry(
ledger_offset: String,
Expand All @@ -87,15 +87,15 @@ object DBDTOV1 {
typ: String,
configuration: Array[Byte],
rejection_reason: Option[String],
) extends DBDTOV1
) extends DbDto

final case class PackageEntry(
ledger_offset: String,
recorded_at: Instant,
submission_id: Option[String],
typ: String,
rejection_reason: Option[String],
) extends DBDTOV1
) extends DbDto

final case class Package(
package_id: String,
Expand All @@ -105,7 +105,7 @@ object DBDTOV1 {
known_since: Instant,
ledger_offset: String,
_package: Array[Byte],
) extends DBDTOV1
) extends DbDto

final case class PartyEntry(
ledger_offset: String,
Expand All @@ -116,15 +116,15 @@ object DBDTOV1 {
typ: String,
rejection_reason: Option[String],
is_local: Option[Boolean],
) extends DBDTOV1
) extends DbDto

final case class Party(
party: String,
display_name: Option[String],
explicit: Boolean,
ledger_offset: Option[String],
is_local: Boolean,
) extends DBDTOV1
) extends DbDto

final case class CommandCompletion(
completion_offset: String,
Expand All @@ -135,8 +135,8 @@ object DBDTOV1 {
transaction_id: Option[String],
status_code: Option[Int],
status_message: Option[String],
) extends DBDTOV1
) extends DbDto

final case class CommandDeduplication(deduplication_key: String) extends DBDTOV1
final case class CommandDeduplication(deduplication_key: String) extends DbDto

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ trait StorageBackend[DB_BATCH] {
/** The CPU intensive batching operation hides the batching logic, and the mapping to the database specific representation of the inserted data.
* This should be pure CPU logic without IO.
*
* @param dbDtos is a collection of DBDTOV1 from which the batch is formed
* @param dbDtos is a collection of DbDto from which the batch is formed
* @return the database-specific batch DTO, which can be inserted via insertBatch
*/
def batch(dbDtos: Vector[DBDTOV1]): DB_BATCH
def batch(dbDtos: Vector[DbDto]): DB_BATCH

/** Using a JDBC connection, a batch will be inserted into the database.
* No significant CPU load, mostly blocking JDBC communication with the database backend.
Expand Down
Loading