Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvutils: Use VersionedOffsetBuilder where possible, and introduce VersionedOffset. [KVL-1154] #11277

Merged
17 commits merged into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
e870f7d
kvutils: Remove `VersionedOffsetBuilder.apply`.
SamirTalwar Oct 14, 2021
225cedc
ledger-on-memory: Use `VersionedOffsetBuilder`.
SamirTalwar Oct 14, 2021
a5c3c49
indexer-benchmark: Use `VersionedOffsetBuilder`.
SamirTalwar Oct 14, 2021
3dd21ee
ledger-on-sql: Use `VersionedOffsetBuilder`.
SamirTalwar Oct 14, 2021
59486d7
kvutils: Use `VersionedOffsetBuilder` in tests.
SamirTalwar Oct 14, 2021
46696af
kvutils: Create a case class for `VersionedOffsetBuilder#split`.
SamirTalwar Oct 15, 2021
44eea4f
kvutils: Delete unused methods from the offset builders.
SamirTalwar Oct 15, 2021
e5a43e7
kvutils: Use `Bytes#startWith` to check the offset version.
SamirTalwar Oct 15, 2021
058f416
kvutils: Move offset splitting into `VersionedOffset`.
SamirTalwar Oct 15, 2021
3961aa1
kvutils: Extract out versioned offset generators.
SamirTalwar Oct 18, 2021
1e31a73
kvutils: Replace `OffsetBuilder` with `VersionedOffsetMutator`.
SamirTalwar Oct 18, 2021
d9e4794
kvutils: Randomize the offset version in testing.
SamirTalwar Oct 18, 2021
52611f1
kvutils: `Random.between` is not available in Scala 2.12.
SamirTalwar Oct 18, 2021
a68b990
Merge remote-tracking branch 'origin/main' into samir/kvutils/remove-…
SamirTalwar Oct 18, 2021
68e91e1
kvutils: Move offset mutation methods to `VersionedOffset`.
SamirTalwar Oct 18, 2021
6ce9ca6
kvutils: Move the versioned offset construction into `VersionedOffset`.
SamirTalwar Oct 18, 2021
6152a42
kvutils: Only vary the offset version in ledger-on-memory tests.
SamirTalwar Oct 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.daml.ledger.participant.state.kvutils.api.{
LedgerRecord,
}
import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter
import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw}
import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder}
import com.daml.ledger.participant.state.v2.Update
import com.daml.metrics.Metrics

Expand All @@ -38,14 +38,15 @@ object Main {
}
val importer = ProtobufBasedLedgerDataImporter(path)

val offsetBuilder = new VersionedOffsetBuilder(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to introduce a constant for this version number? It's hardcoded in multiple places.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only hardcoded in test code (and test-adjacent code, such as integrity checking and benchmarking).

In production, this value should most definitely not be hardcoded, and so I would prefer to not hardcode it anywhere, to ensure that any user thinks about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems sensible, thank you!

val dataSource: Source[LedgerRecord, NotUsed] = Source
.fromIterator(() => importer.read().iterator)
.statefulMapConcat { () =>
val nextOffset = new AtomicLong(0)

{ case (_, writeSet) =>
writeSet.map { case (key, value) =>
val offset = OffsetBuilder.fromLong(nextOffset.getAndIncrement())
val offset = offsetBuilder.of(nextOffset.getAndIncrement())
val logEntryId = Raw.LogEntryId(key.bytes) // `key` is of an unknown type.
LedgerRecord(offset, logEntryId, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
readerWriter <- new InMemoryLedgerReaderWriter.Owner(
ledgerId = config.ledgerId,
participantId = participantConfig.participantId,
offsetVersion = 0,
keySerializationStrategy = DefaultStateKeySerializationStrategy,
metrics = metrics,
stateValueCache = caching.WeightedCache.from(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.stream.scaladsl.Source
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.kvutils.OffsetBuilder
import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord}
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.akkastreams.dispatcher.Dispatcher
Expand All @@ -17,14 +17,15 @@ import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
class InMemoryLedgerReader(
override val ledgerId: LedgerId,
dispatcher: Dispatcher[Index],
offsetBuilder: VersionedOffsetBuilder,
state: InMemoryState,
metrics: Metrics,
) extends LedgerReader {
override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] =
dispatcher
.startingAt(
startExclusive
.map(OffsetBuilder.highestIndex(_).toInt)
.map(offsetBuilder.highestIndex(_).toInt)
.getOrElse(StartIndex),
RangeSource((startExclusive, endInclusive) =>
Source.fromIterator(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.ledger.on.memory
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import com.daml.ledger.participant.state.kvutils.api._
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.StateKeySerializationStrategy
Expand All @@ -21,6 +22,7 @@ object InMemoryLedgerReaderWriter {
final class Owner(
ledgerId: LedgerId,
participantId: Ref.ParticipantId,
offsetVersion: Byte,
keySerializationStrategy: StateKeySerializationStrategy,
metrics: Metrics,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
Expand All @@ -31,7 +33,8 @@ object InMemoryLedgerReaderWriter {
committerExecutionContext: ExecutionContext,
) extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
val offsetBuilder = new VersionedOffsetBuilder(offsetVersion)
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, offsetBuilder, state, metrics)
for {
writer <- new InMemoryLedgerWriter.Owner(
participantId,
Expand All @@ -40,6 +43,7 @@ object InMemoryLedgerReaderWriter {
timeProvider,
stateValueCache,
dispatcher,
offsetBuilder,
state,
engine,
committerExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.on.memory

import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import com.daml.ledger.validator.{
LedgerStateAccess,
LedgerStateOperations,
Expand All @@ -13,15 +14,23 @@ import com.daml.metrics.Metrics

import scala.concurrent.{ExecutionContext, Future}

final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics)
extends LedgerStateAccess[Index] {
final class InMemoryLedgerStateAccess(
offsetBuilder: VersionedOffsetBuilder,
state: InMemoryState,
metrics: Metrics,
) extends LedgerStateAccess[Index] {
override def inTransaction[T](
body: LedgerStateOperations[Index] => Future[T]
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[T] =
state.write { (log, state) =>
body(new TimedLedgerStateOperations(new InMemoryLedgerStateOperations(log, state), metrics))
body(
new TimedLedgerStateOperations(
new InMemoryLedgerStateOperations(offsetBuilder, log, state),
metrics,
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
package com.daml.ledger.on.memory

import com.daml.ledger.offset.Offset
import com.daml.ledger.on.memory.InMemoryLedgerStateOperations._
import com.daml.ledger.on.memory.InMemoryState.MutableLog
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw}
import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder}
import com.daml.ledger.validator.BatchingLedgerStateOperations
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

final class InMemoryLedgerStateOperations(
offsetBuilder: VersionedOffsetBuilder,
log: InMemoryState.MutableLog,
state: InMemoryState.MutableState,
) extends BatchingLedgerStateOperations[Index] {

override def readState(
keys: Iterable[Raw.StateKey]
)(implicit
Expand All @@ -39,16 +38,11 @@ final class InMemoryLedgerStateOperations(
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Index] =
Future.successful(appendEntry(log, LedgerRecord(_, key, value)))

}

object InMemoryLedgerStateOperations {

private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = {
private def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = {
val entryAtIndex = log.size
val offset = OffsetBuilder.fromLong(entryAtIndex.toLong)
val offset = offsetBuilder.of(entryAtIndex.toLong)
val entry = createEntry(offset)
log += entry
entryAtIndex
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.daml.ledger.on.memory.InMemoryLedgerWriter._
import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.store.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw, VersionedOffsetBuilder}
import com.daml.ledger.participant.state.v2.SubmissionResult
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy}
Expand All @@ -39,6 +39,7 @@ import scala.util.Success
final class InMemoryLedgerWriter private[memory] (
override val participantId: Ref.ParticipantId,
dispatcher: Dispatcher[Index],
offsetBuilder: VersionedOffsetBuilder,
now: () => Instant,
state: InMemoryState,
committer: Committer,
Expand All @@ -57,7 +58,7 @@ final class InMemoryLedgerWriter private[memory] (
correlationId,
envelope,
exportRecordTime = now(),
ledgerStateAccess = new InMemoryLedgerStateAccess(state, metrics),
ledgerStateAccess = new InMemoryLedgerStateAccess(offsetBuilder, state, metrics),
)(committerExecutionContext)
.andThen { case Success(SubmissionResult.Acknowledged) =>
dispatcher.signalNewHead(state.newHeadSinceLastWrite())
Expand Down Expand Up @@ -85,6 +86,7 @@ object InMemoryLedgerWriter {
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
offsetBuilder: VersionedOffsetBuilder,
state: InMemoryState,
engine: Engine,
committerExecutionContext: ExecutionContext,
Expand All @@ -97,6 +99,7 @@ object InMemoryLedgerWriter {
} yield new InMemoryLedgerWriter(
participantId,
dispatcher,
offsetBuilder,
now,
state,
newCommitter(ledgerDataExporter),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class InMemoryLedgerReaderWriterIntegrationSpec
readerWriter <- new InMemoryLedgerReaderWriter.Owner(
ledgerId = ledgerId,
participantId = participantId,
offsetVersion = 0,
keySerializationStrategy = StateKeySerializationStrategy.createDefault(),
metrics = metrics,
dispatcher = dispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.time.Instant

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder}
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata
import com.daml.ledger.participant.state.v2.SubmissionResult
import com.daml.ledger.validator.LedgerStateAccess
Expand Down Expand Up @@ -52,6 +52,7 @@ class InMemoryLedgerWriterSpec
val instance = new InMemoryLedgerWriter(
participantId = Ref.ParticipantId.assertFromString("participant ID"),
dispatcher = mockDispatcher,
offsetBuilder = new VersionedOffsetBuilder(0),
now = () => Instant.EPOCH,
state = InMemoryState.empty,
committer = mockCommitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ import com.daml.metrics.{Metrics, Timed}
import com.daml.resources.ProgramResource.StartupException
import com.zaxxer.hikari.HikariDataSource
import javax.sql.DataSource

import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import org.flywaydb.core.Flyway
import scalaz.syntax.bind._

import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}

final class Database(
queries: Connection => Queries,
queries: QueriesFactory,
offsetBuilder: VersionedOffsetBuilder,
metrics: Metrics,
)(implicit
readerConnectionPool: ConnectionPool[Reader],
Expand Down Expand Up @@ -54,7 +57,7 @@ final class Database(
Timed.future(
metrics.daml.ledger.database.transactions.run(name),
Future[X] {
body(new TimedQueries(queries(connection), metrics))
body(new TimedQueries(queries(offsetBuilder, connection), metrics))
.andThen {
case Success(_) => connection.commit()
case Failure(_) => connection.rollback()
Expand All @@ -81,26 +84,28 @@ object Database {
// entries missing.
private val MaximumWriterConnectionPoolSize: Int = 1

def owner(jdbcUrl: String, metrics: Metrics)(implicit
loggingContext: LoggingContext
): ResourceOwner[UninitializedDatabase] =
def owner(
jdbcUrl: String,
offsetBuilder: VersionedOffsetBuilder,
metrics: Metrics,
)(implicit loggingContext: LoggingContext): ResourceOwner[UninitializedDatabase] =
(jdbcUrl match {
case "jdbc:h2:mem:" =>
throw new InvalidDatabaseException(
"Unnamed in-memory H2 databases are not supported. Please name the database using the format \"jdbc:h2:mem:NAME\"."
)
case url if url.startsWith("jdbc:h2:mem:") =>
SingleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, metrics)
SingleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, offsetBuilder, metrics)
case url if url.startsWith("jdbc:h2:") =>
MultipleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, metrics)
MultipleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, offsetBuilder, metrics)
case url if url.startsWith("jdbc:postgresql:") =>
MultipleConnectionDatabase.owner(RDBMS.PostgreSQL, jdbcUrl, metrics)
MultipleConnectionDatabase.owner(RDBMS.PostgreSQL, jdbcUrl, offsetBuilder, metrics)
case url if url.startsWith("jdbc:sqlite::memory:") =>
throw new InvalidDatabaseException(
"Unnamed in-memory SQLite databases are not supported. Please name the database using the format \"jdbc:sqlite:file:NAME?mode=memory&cache=shared\"."
)
case url if url.startsWith("jdbc:sqlite:") =>
SingleConnectionDatabase.owner(RDBMS.SQLite, jdbcUrl, metrics)
SingleConnectionDatabase.owner(RDBMS.SQLite, jdbcUrl, offsetBuilder, metrics)
case _ =>
throw new InvalidDatabaseException(s"Unknown database")
}).map { database =>
Expand All @@ -112,6 +117,7 @@ object Database {
def owner(
system: RDBMS,
jdbcUrl: String,
offsetBuilder: VersionedOffsetBuilder,
metrics: Metrics,
): ResourceOwner[UninitializedDatabase] =
for {
Expand Down Expand Up @@ -139,14 +145,15 @@ object Database {
new ConnectionPool(writerDataSource)
implicit val adminConnectionPool: ConnectionPool[Migrator] =
new ConnectionPool(adminDataSource)(DirectExecutionContext)
new UninitializedDatabase(system = system, metrics = metrics)
new UninitializedDatabase(system, offsetBuilder, metrics)
}
}

object SingleConnectionDatabase {
def owner(
system: RDBMS,
jdbcUrl: String,
offsetBuilder: VersionedOffsetBuilder,
metrics: Metrics,
): ResourceOwner[UninitializedDatabase] =
for {
Expand All @@ -164,7 +171,7 @@ object Database {
new ConnectionPool(readerWriterDataSource)
implicit val adminConnectionPool: ConnectionPool[Migrator] =
new ConnectionPool(adminDataSource)(DirectExecutionContext)
new UninitializedDatabase(system = system, metrics = metrics)
new UninitializedDatabase(system, offsetBuilder, metrics)
}
}

Expand All @@ -184,31 +191,34 @@ object Database {
sealed trait RDBMS {
val name: String

val queries: Connection => Queries
val queries: QueriesFactory
}

object RDBMS {
object H2 extends RDBMS {
override val name: String = "h2"

override val queries: Connection => Queries = H2Queries.apply
override val queries: QueriesFactory = H2Queries.apply
}

object PostgreSQL extends RDBMS {
override val name: String = "postgresql"

override val queries: Connection => Queries = PostgresqlQueries.apply
override val queries: QueriesFactory = PostgresqlQueries.apply
}

object SQLite extends RDBMS {
override val name: String = "sqlite"

override val queries: Connection => Queries = SqliteQueries.apply
override val queries: QueriesFactory = SqliteQueries.apply
}

}

class UninitializedDatabase(system: RDBMS, metrics: Metrics)(implicit
class UninitializedDatabase(
system: RDBMS,
offsetBuilder: VersionedOffsetBuilder,
metrics: Metrics,
)(implicit
readerConnectionPool: ConnectionPool[Reader],
writerConnectionPool: ConnectionPool[Writer],
adminConnectionPool: ConnectionPool[Migrator],
Expand All @@ -224,7 +234,7 @@ object Database {

def migrate(): Database = {
flyway.migrate()
new Database(queries = system.queries, metrics = metrics)
new Database(system.queries, offsetBuilder, metrics)
}

def migrateAndReset()(implicit
Expand Down
Loading