diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 169573353399..a579d91aedea 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -21,7 +21,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ LedgerRecord, } import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter -import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw} +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.v2.Update import com.daml.metrics.Metrics @@ -38,6 +38,7 @@ object Main { } val importer = ProtobufBasedLedgerDataImporter(path) + val offsetBuilder = new VersionedOffsetBuilder(0) val dataSource: Source[LedgerRecord, NotUsed] = Source .fromIterator(() => importer.read().iterator) .statefulMapConcat { () => @@ -45,7 +46,7 @@ object Main { { case (_, writeSet) => writeSet.map { case (key, value) => - val offset = OffsetBuilder.fromLong(nextOffset.getAndIncrement()) + val offset = offsetBuilder.of(nextOffset.getAndIncrement()) val logEntryId = Raw.LogEntryId(key.bytes) // `key` is of an unknown type. LedgerRecord(offset, logEntryId, value) } diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala index 35f5e2df0dd1..3ab11615b74a 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala @@ -31,6 +31,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state readerWriter <- new InMemoryLedgerReaderWriter.Owner( ledgerId = config.ledgerId, participantId = participantConfig.participantId, + offsetVersion = 0, keySerializationStrategy = DefaultStateKeySerializationStrategy, metrics = metrics, stateValueCache = caching.WeightedCache.from( diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReader.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReader.scala index e5464f4f65c9..d69a47027091 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReader.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReader.scala @@ -8,7 +8,7 @@ import akka.stream.scaladsl.Source import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.configuration.LedgerId import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.kvutils.OffsetBuilder +import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord} import com.daml.metrics.{Metrics, Timed} import com.daml.platform.akkastreams.dispatcher.Dispatcher @@ -17,6 +17,7 @@ import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource class InMemoryLedgerReader( override val ledgerId: LedgerId, dispatcher: Dispatcher[Index], + offsetBuilder: VersionedOffsetBuilder, state: InMemoryState, metrics: Metrics, ) extends LedgerReader { @@ -24,7 +25,7 @@ class InMemoryLedgerReader( dispatcher .startingAt( startExclusive - .map(OffsetBuilder.highestIndex(_).toInt) + .map(offsetBuilder.highestIndex(_).toInt) .getOrElse(StartIndex), RangeSource((startExclusive, endInclusive) => Source.fromIterator(() => { diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala index af4c2ee08b87..3043e5bf154c 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriter.scala @@ -6,6 +6,7 @@ package com.daml.ledger.on.memory import com.daml.api.util.TimeProvider import com.daml.caching.Cache import com.daml.ledger.configuration.LedgerId +import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder import com.daml.ledger.participant.state.kvutils.api._ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.ledger.validator.StateKeySerializationStrategy @@ -21,6 +22,7 @@ object InMemoryLedgerReaderWriter { final class Owner( ledgerId: LedgerId, participantId: Ref.ParticipantId, + offsetVersion: Byte, keySerializationStrategy: StateKeySerializationStrategy, metrics: Metrics, timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider, @@ -31,7 +33,8 @@ object InMemoryLedgerReaderWriter { committerExecutionContext: ExecutionContext, ) extends ResourceOwner[KeyValueLedger] { override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = { - val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics) + val offsetBuilder = new VersionedOffsetBuilder(offsetVersion) + val reader = new InMemoryLedgerReader(ledgerId, dispatcher, offsetBuilder, state, metrics) for { writer <- new InMemoryLedgerWriter.Owner( participantId, @@ -40,6 +43,7 @@ object InMemoryLedgerReaderWriter { timeProvider, stateValueCache, dispatcher, + offsetBuilder, state, engine, committerExecutionContext, diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateAccess.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateAccess.scala index c05aacaa06cb..b6fb5f066337 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateAccess.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateAccess.scala @@ -3,6 +3,7 @@ package com.daml.ledger.on.memory +import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder import com.daml.ledger.validator.{ LedgerStateAccess, LedgerStateOperations, @@ -13,8 +14,11 @@ import com.daml.metrics.Metrics import scala.concurrent.{ExecutionContext, Future} -final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics) - extends LedgerStateAccess[Index] { +final class InMemoryLedgerStateAccess( + offsetBuilder: VersionedOffsetBuilder, + state: InMemoryState, + metrics: Metrics, +) extends LedgerStateAccess[Index] { override def inTransaction[T]( body: LedgerStateOperations[Index] => Future[T] )(implicit @@ -22,6 +26,11 @@ final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics) loggingContext: LoggingContext, ): Future[T] = state.write { (log, state) => - body(new TimedLedgerStateOperations(new InMemoryLedgerStateOperations(log, state), metrics)) + body( + new TimedLedgerStateOperations( + new InMemoryLedgerStateOperations(offsetBuilder, log, state), + metrics, + ) + ) } } diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateOperations.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateOperations.scala index 472f02872798..a7f42cd8cc41 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateOperations.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerStateOperations.scala @@ -4,20 +4,19 @@ package com.daml.ledger.on.memory import com.daml.ledger.offset.Offset -import com.daml.ledger.on.memory.InMemoryLedgerStateOperations._ import com.daml.ledger.on.memory.InMemoryState.MutableLog import com.daml.ledger.participant.state.kvutils.api.LedgerRecord -import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw} +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import com.daml.ledger.validator.BatchingLedgerStateOperations import com.daml.logging.LoggingContext import scala.concurrent.{ExecutionContext, Future} final class InMemoryLedgerStateOperations( + offsetBuilder: VersionedOffsetBuilder, log: InMemoryState.MutableLog, state: InMemoryState.MutableState, ) extends BatchingLedgerStateOperations[Index] { - override def readState( keys: Iterable[Raw.StateKey] )(implicit @@ -39,16 +38,11 @@ final class InMemoryLedgerStateOperations( )(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Index] = Future.successful(appendEntry(log, LedgerRecord(_, key, value))) -} - -object InMemoryLedgerStateOperations { - - private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = { + private def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = { val entryAtIndex = log.size - val offset = OffsetBuilder.fromLong(entryAtIndex.toLong) + val offset = offsetBuilder.of(entryAtIndex.toLong) val entry = createEntry(offset) log += entry entryAtIndex } - } diff --git a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala index 104918b134fe..b6c11e74258e 100644 --- a/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala +++ b/ledger/ledger-on-memory/src/main/scala/com/daml/ledger/on/memory/InMemoryLedgerWriter.scala @@ -12,7 +12,7 @@ import com.daml.ledger.on.memory.InMemoryLedgerWriter._ import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter} import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter import com.daml.ledger.participant.state.kvutils.store.{DamlStateKey, DamlStateValue} -import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw} +import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.v2.SubmissionResult import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy} @@ -39,6 +39,7 @@ import scala.util.Success final class InMemoryLedgerWriter private[memory] ( override val participantId: Ref.ParticipantId, dispatcher: Dispatcher[Index], + offsetBuilder: VersionedOffsetBuilder, now: () => Instant, state: InMemoryState, committer: Committer, @@ -57,7 +58,7 @@ final class InMemoryLedgerWriter private[memory] ( correlationId, envelope, exportRecordTime = now(), - ledgerStateAccess = new InMemoryLedgerStateAccess(state, metrics), + ledgerStateAccess = new InMemoryLedgerStateAccess(offsetBuilder, state, metrics), )(committerExecutionContext) .andThen { case Success(SubmissionResult.Acknowledged) => dispatcher.signalNewHead(state.newHeadSinceLastWrite()) @@ -85,6 +86,7 @@ object InMemoryLedgerWriter { timeProvider: TimeProvider = DefaultTimeProvider, stateValueCache: StateValueCache = Cache.none, dispatcher: Dispatcher[Index], + offsetBuilder: VersionedOffsetBuilder, state: InMemoryState, engine: Engine, committerExecutionContext: ExecutionContext, @@ -97,6 +99,7 @@ object InMemoryLedgerWriter { } yield new InMemoryLedgerWriter( participantId, dispatcher, + offsetBuilder, now, state, newCommitter(ledgerDataExporter), diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala index 8be303743b4e..6d99341eb62c 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala @@ -17,16 +17,27 @@ import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import scala.concurrent.ExecutionContext +import scala.util.Random class InMemoryLedgerReaderWriterIntegrationSpec extends ParticipantStateIntegrationSpecBase(s"In-memory ledger/participant") { override val isPersistent: Boolean = false + // Select a random offset version to ensure that this works with any possible value. + override val offsetVersion: Byte = { + val bytes = Array[Byte](Byte.MinValue) + while (bytes.head < 0) { + Random.nextBytes(bytes) + } + bytes.head + } + override def participantStateFactory( ledgerId: LedgerId, participantId: Ref.ParticipantId, testId: String, + offsetVersion: Byte, metrics: Metrics, )(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] = for { @@ -37,6 +48,7 @@ class InMemoryLedgerReaderWriterIntegrationSpec readerWriter <- new InMemoryLedgerReaderWriter.Owner( ledgerId = ledgerId, participantId = participantId, + offsetVersion = offsetVersion, keySerializationStrategy = StateKeySerializationStrategy.createDefault(), metrics = metrics, dispatcher = dispatcher, @@ -45,5 +57,4 @@ class InMemoryLedgerReaderWriterIntegrationSpec committerExecutionContext = committerExecutionContext, ) } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) - } diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala index 272ec63b1c8f..462131d22ba0 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerWriterSpec.scala @@ -7,7 +7,7 @@ import java.time.Instant import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll -import com.daml.ledger.participant.state.kvutils.Raw +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.kvutils.api.CommitMetadata import com.daml.ledger.participant.state.v2.SubmissionResult import com.daml.ledger.validator.LedgerStateAccess @@ -52,6 +52,7 @@ class InMemoryLedgerWriterSpec val instance = new InMemoryLedgerWriter( participantId = Ref.ParticipantId.assertFromString("participant ID"), dispatcher = mockDispatcher, + offsetBuilder = new VersionedOffsetBuilder(0), now = () => Instant.EPOCH, state = InMemoryState.empty, committer = mockCommitter, diff --git a/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala b/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala index 872f03c4ec9f..982c81af5a30 100644 --- a/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala +++ b/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala @@ -71,6 +71,7 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] { engine = engine, jdbcUrl = jdbcUrl, resetOnStartup = false, + offsetVersion = 0, logEntryIdAllocator = RandomLogEntryIdAllocator, stateValueCache = caching.WeightedCache.from( configuration = config.stateValueCache, diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala index 37b10afc5aae..9ef9ae03a373 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/Database.scala @@ -16,6 +16,8 @@ import com.daml.metrics.{Metrics, Timed} import com.daml.resources.ProgramResource.StartupException import com.zaxxer.hikari.HikariDataSource import javax.sql.DataSource + +import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder import org.flywaydb.core.Flyway import scalaz.syntax.bind._ @@ -23,7 +25,8 @@ import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success} final class Database( - queries: Connection => Queries, + queries: QueriesFactory, + offsetBuilder: VersionedOffsetBuilder, metrics: Metrics, )(implicit readerConnectionPool: ConnectionPool[Reader], @@ -54,7 +57,7 @@ final class Database( Timed.future( metrics.daml.ledger.database.transactions.run(name), Future[X] { - body(new TimedQueries(queries(connection), metrics)) + body(new TimedQueries(queries(offsetBuilder, connection), metrics)) .andThen { case Success(_) => connection.commit() case Failure(_) => connection.rollback() @@ -81,26 +84,28 @@ object Database { // entries missing. private val MaximumWriterConnectionPoolSize: Int = 1 - def owner(jdbcUrl: String, metrics: Metrics)(implicit - loggingContext: LoggingContext - ): ResourceOwner[UninitializedDatabase] = + def owner( + jdbcUrl: String, + offsetBuilder: VersionedOffsetBuilder, + metrics: Metrics, + )(implicit loggingContext: LoggingContext): ResourceOwner[UninitializedDatabase] = (jdbcUrl match { case "jdbc:h2:mem:" => throw new InvalidDatabaseException( "Unnamed in-memory H2 databases are not supported. Please name the database using the format \"jdbc:h2:mem:NAME\"." ) case url if url.startsWith("jdbc:h2:mem:") => - SingleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, metrics) + SingleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, offsetBuilder, metrics) case url if url.startsWith("jdbc:h2:") => - MultipleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, metrics) + MultipleConnectionDatabase.owner(RDBMS.H2, jdbcUrl, offsetBuilder, metrics) case url if url.startsWith("jdbc:postgresql:") => - MultipleConnectionDatabase.owner(RDBMS.PostgreSQL, jdbcUrl, metrics) + MultipleConnectionDatabase.owner(RDBMS.PostgreSQL, jdbcUrl, offsetBuilder, metrics) case url if url.startsWith("jdbc:sqlite::memory:") => throw new InvalidDatabaseException( "Unnamed in-memory SQLite databases are not supported. Please name the database using the format \"jdbc:sqlite:file:NAME?mode=memory&cache=shared\"." ) case url if url.startsWith("jdbc:sqlite:") => - SingleConnectionDatabase.owner(RDBMS.SQLite, jdbcUrl, metrics) + SingleConnectionDatabase.owner(RDBMS.SQLite, jdbcUrl, offsetBuilder, metrics) case _ => throw new InvalidDatabaseException(s"Unknown database") }).map { database => @@ -112,6 +117,7 @@ object Database { def owner( system: RDBMS, jdbcUrl: String, + offsetBuilder: VersionedOffsetBuilder, metrics: Metrics, ): ResourceOwner[UninitializedDatabase] = for { @@ -139,7 +145,7 @@ object Database { new ConnectionPool(writerDataSource) implicit val adminConnectionPool: ConnectionPool[Migrator] = new ConnectionPool(adminDataSource)(DirectExecutionContext) - new UninitializedDatabase(system = system, metrics = metrics) + new UninitializedDatabase(system, offsetBuilder, metrics) } } @@ -147,6 +153,7 @@ object Database { def owner( system: RDBMS, jdbcUrl: String, + offsetBuilder: VersionedOffsetBuilder, metrics: Metrics, ): ResourceOwner[UninitializedDatabase] = for { @@ -164,7 +171,7 @@ object Database { new ConnectionPool(readerWriterDataSource) implicit val adminConnectionPool: ConnectionPool[Migrator] = new ConnectionPool(adminDataSource)(DirectExecutionContext) - new UninitializedDatabase(system = system, metrics = metrics) + new UninitializedDatabase(system, offsetBuilder, metrics) } } @@ -184,31 +191,34 @@ object Database { sealed trait RDBMS { val name: String - val queries: Connection => Queries + val queries: QueriesFactory } object RDBMS { object H2 extends RDBMS { override val name: String = "h2" - override val queries: Connection => Queries = H2Queries.apply + override val queries: QueriesFactory = H2Queries.apply } object PostgreSQL extends RDBMS { override val name: String = "postgresql" - override val queries: Connection => Queries = PostgresqlQueries.apply + override val queries: QueriesFactory = PostgresqlQueries.apply } object SQLite extends RDBMS { override val name: String = "sqlite" - override val queries: Connection => Queries = SqliteQueries.apply + override val queries: QueriesFactory = SqliteQueries.apply } - } - class UninitializedDatabase(system: RDBMS, metrics: Metrics)(implicit + class UninitializedDatabase( + system: RDBMS, + offsetBuilder: VersionedOffsetBuilder, + metrics: Metrics, + )(implicit readerConnectionPool: ConnectionPool[Reader], writerConnectionPool: ConnectionPool[Writer], adminConnectionPool: ConnectionPool[Migrator], @@ -224,7 +234,7 @@ object Database { def migrate(): Database = { flyway.migrate() - new Database(queries = system.queries, metrics = metrics) + new Database(system.queries, offsetBuilder, metrics) } def migrateAndReset()(implicit diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala index 6d1601bdde7c..641ebbd6e32f 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriter.scala @@ -15,7 +15,6 @@ import com.daml.ledger.api.domain import com.daml.ledger.api.health.{HealthStatus, Healthy} import com.daml.ledger.configuration.LedgerId import com.daml.ledger.offset.Offset -import com.daml.ledger.on.sql.SqlLedgerReaderWriter._ import com.daml.ledger.on.sql.queries.Queries import com.daml.ledger.participant.state.kvutils.api.{ CommitMetadata, @@ -23,7 +22,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ LedgerRecord, LedgerWriter, } -import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw} +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.v2.SubmissionResult import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.ledger.validator._ @@ -43,6 +42,7 @@ final class SqlLedgerReaderWriter( override val ledgerId: LedgerId = Ref.LedgerString.assertFromString(UUID.randomUUID.toString), val participantId: Ref.ParticipantId, metrics: Metrics, + offsetBuilder: VersionedOffsetBuilder, database: Database, dispatcher: Dispatcher[Index], committer: ValidatingCommitter[Index], @@ -50,12 +50,14 @@ final class SqlLedgerReaderWriter( ) extends LedgerWriter with LedgerReader { + private val startOffset: Offset = offsetBuilder.of(StartIndex) + override def currentHealth(): HealthStatus = Healthy override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] = dispatcher .startingAt( - OffsetBuilder.highestIndex(startExclusive.getOrElse(StartOffset)), + offsetBuilder.highestIndex(startExclusive.getOrElse(startOffset)), RangeSource((startExclusive, endInclusive) => Source .future( @@ -83,8 +85,6 @@ final class SqlLedgerReaderWriter( } object SqlLedgerReaderWriter { - private val StartOffset: Offset = OffsetBuilder.fromLong(StartIndex) - val DefaultTimeProvider: TimeProvider = TimeProvider.UTC final class Owner( @@ -94,6 +94,7 @@ object SqlLedgerReaderWriter { engine: Engine, jdbcUrl: String, resetOnStartup: Boolean, + offsetVersion: Byte, logEntryIdAllocator: LogEntryIdAllocator, stateValueCache: StateValueCache = Cache.none, timeProvider: TimeProvider = DefaultTimeProvider, @@ -102,8 +103,9 @@ object SqlLedgerReaderWriter { override def acquire()(implicit context: ResourceContext): Resource[SqlLedgerReaderWriter] = { implicit val migratorExecutionContext: ExecutionContext[Database.Migrator] = ExecutionContext(context.executionContext) + val offsetBuilder = new VersionedOffsetBuilder(offsetVersion) for { - uninitializedDatabase <- Database.owner(jdbcUrl, metrics).acquire() + uninitializedDatabase <- Database.owner(jdbcUrl, offsetBuilder, metrics).acquire() database <- Resource.fromFuture( if (resetOnStartup) uninitializedDatabase.migrateAndReset().removeExecutionContext else sc.Future(uninitializedDatabase.migrate()) @@ -132,6 +134,7 @@ object SqlLedgerReaderWriter { ledgerId, participantId, metrics, + offsetBuilder, database, dispatcher, committer, diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala index 47ed3004d4b7..aaf321ab13d8 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/CommonQueries.scala @@ -10,15 +10,14 @@ import anorm._ import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ import com.daml.ledger.participant.state.kvutils.api.LedgerRecord -import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw} +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import scala.collection.compat._ import scala.collection.immutable import scala.util.Try -trait CommonQueries extends Queries { - protected implicit val connection: Connection - +abstract class CommonQueries(offsetBuilder: VersionedOffsetBuilder)(implicit connection: Connection) + extends Queries { override final def selectLatestLogEntryId(): Try[Option[Index]] = Try { SQL"SELECT MAX(sequence_no) max_sequence_no FROM #$LogTable" .as(get[Option[Long]]("max_sequence_no").singleOpt) @@ -32,7 +31,7 @@ trait CommonQueries extends Queries { SQL"SELECT sequence_no, entry_id, envelope FROM #$LogTable WHERE sequence_no > $startExclusive AND sequence_no <= $endInclusive ORDER BY sequence_no" .as((long("sequence_no") ~ rawLogEntryId("entry_id") ~ rawEnvelope("envelope")).map { case index ~ entryId ~ envelope => - index -> LedgerRecord(OffsetBuilder.fromLong(index), entryId, envelope) + index -> LedgerRecord(offsetBuilder.of(index), entryId, envelope) }.*) } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala index 2e3bc4e358a7..1dab9690c682 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/H2Queries.scala @@ -10,13 +10,12 @@ import anorm._ import com.daml.ledger.configuration.LedgerId import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ -import com.daml.ledger.participant.state.kvutils.Raw +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import scala.util.Try -final class H2Queries(override protected implicit val connection: Connection) - extends Queries - with CommonQueries { +final class H2Queries(offsetBuilder: VersionedOffsetBuilder)(implicit connection: Connection) + extends CommonQueries(offsetBuilder) { override def updateOrRetrieveLedgerId(providedLedgerId: LedgerId): Try[LedgerId] = Try { SQL"MERGE INTO #$MetaTable USING DUAL ON table_key = $MetaTableKey WHEN NOT MATCHED THEN INSERT (table_key, ledger_id) VALUES ($MetaTableKey, $providedLedgerId)" .executeInsert() @@ -47,9 +46,9 @@ final class H2Queries(override protected implicit val connection: Connection) } } -object H2Queries { - def apply(connection: Connection): Queries = { +object H2Queries extends QueriesFactory { + override def apply(offsetBuilder: VersionedOffsetBuilder, connection: Connection): Queries = { implicit val conn: Connection = connection - new H2Queries + new H2Queries(offsetBuilder) } } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala index 8e17f1897c17..5d0f278a4c41 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/PostgresqlQueries.scala @@ -10,13 +10,14 @@ import anorm._ import com.daml.ledger.configuration.LedgerId import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ -import com.daml.ledger.participant.state.kvutils.Raw +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import scala.util.Try -final class PostgresqlQueries(override protected implicit val connection: Connection) - extends Queries - with CommonQueries { +final class PostgresqlQueries( + offsetBuilder: VersionedOffsetBuilder +)(implicit connection: Connection) + extends CommonQueries(offsetBuilder) { override def updateOrRetrieveLedgerId(providedLedgerId: LedgerId): Try[LedgerId] = Try { SQL"INSERT INTO #$MetaTable (table_key, ledger_id) VALUES ($MetaTableKey, $providedLedgerId) ON CONFLICT DO NOTHING" .executeInsert() @@ -40,9 +41,9 @@ final class PostgresqlQueries(override protected implicit val connection: Connec } } -object PostgresqlQueries { - def apply(connection: Connection): Queries = { +object PostgresqlQueries extends QueriesFactory { + override def apply(offsetBuilder: VersionedOffsetBuilder, connection: Connection): Queries = { implicit val conn: Connection = connection - new PostgresqlQueries + new PostgresqlQueries(offsetBuilder) } } diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/QueriesFactory.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/QueriesFactory.scala new file mode 100644 index 000000000000..568a95dfe706 --- /dev/null +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/QueriesFactory.scala @@ -0,0 +1,12 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.on.sql.queries + +import java.sql.Connection + +import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder + +trait QueriesFactory { + def apply(offsetBuilder: VersionedOffsetBuilder, connection: Connection): Queries +} diff --git a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala index 5b84287c881e..646f8b3a8b6b 100644 --- a/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala +++ b/ledger/ledger-on-sql/src/main/scala/com/daml/ledger/on/sql/queries/SqliteQueries.scala @@ -10,13 +10,12 @@ import anorm._ import com.daml.ledger.configuration.LedgerId import com.daml.ledger.on.sql.Index import com.daml.ledger.on.sql.queries.Queries._ -import com.daml.ledger.participant.state.kvutils.Raw +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import scala.util.Try -final class SqliteQueries(override protected implicit val connection: Connection) - extends Queries - with CommonQueries { +final class SqliteQueries(offsetBuilder: VersionedOffsetBuilder)(implicit connection: Connection) + extends CommonQueries(offsetBuilder) { override def updateOrRetrieveLedgerId(providedLedgerId: LedgerId): Try[LedgerId] = Try { SQL"INSERT INTO #$MetaTable (table_key, ledger_id) VALUES ($MetaTableKey, $providedLedgerId) ON CONFLICT DO NOTHING" .executeInsert() @@ -47,9 +46,9 @@ final class SqliteQueries(override protected implicit val connection: Connection } } -object SqliteQueries { - def apply(connection: Connection): Queries = { +object SqliteQueries extends QueriesFactory { + override def apply(offsetBuilder: VersionedOffsetBuilder, connection: Connection): Queries = { implicit val conn: Connection = connection - new SqliteQueries + new SqliteQueries(offsetBuilder) } } diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala index 73eabe3a7e9d..0182eb4247d6 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala @@ -23,6 +23,7 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri ledgerId: LedgerId, participantId: Ref.ParticipantId, testId: String, + offsetVersion: Byte, metrics: Metrics, )(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] = new SqlLedgerReaderWriter.Owner( @@ -32,6 +33,7 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri engine = Engine.DevEngine(), jdbcUrl = jdbcUrl(testId), resetOnStartup = false, + offsetVersion = offsetVersion, logEntryIdAllocator = RandomLogEntryIdAllocator, ).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) } diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/queries/QueryBehaviors.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/queries/QueryBehaviors.scala index 6b07f33b1767..b1419d8b075a 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/queries/QueryBehaviors.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/queries/QueryBehaviors.scala @@ -7,7 +7,7 @@ import com.codahale.metrics.MetricRegistry import com.daml.concurrent.{Future => DamlFuture} import com.daml.ledger.on.sql.Database import com.daml.ledger.on.sql.Database.RDBMS -import com.daml.ledger.participant.state.kvutils.Raw +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import com.daml.ledger.resources.ResourceContext import com.daml.metrics.Metrics import com.google.protobuf.ByteString @@ -19,9 +19,10 @@ trait QueryBehaviors { this: AsyncFlatSpec => def queriesOnInsertion(rdbms: RDBMS, jdbcUrl: => String): Unit = { it should "insert the first log entry with an ID of 1" in { + val offsetBuilder = new VersionedOffsetBuilder(0) val metrics = new Metrics(new MetricRegistry) Database.SingleConnectionDatabase - .owner(rdbms, jdbcUrl, metrics) + .owner(rdbms, jdbcUrl, offsetBuilder, metrics) .map(_.migrate()) .use { database => database diff --git a/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/DatabaseSpec.scala b/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/DatabaseSpec.scala index aedbe50e2243..74ba40a152eb 100644 --- a/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/DatabaseSpec.scala +++ b/ledger/ledger-on-sql/src/test/suite/scala/com/daml/ledger/on/sql/DatabaseSpec.scala @@ -5,6 +5,7 @@ package com.daml.ledger.on.sql import com.codahale.metrics.MetricRegistry import com.daml.ledger.on.sql.Database.InvalidDatabaseException +import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics import org.scalatest.matchers.should.Matchers @@ -15,14 +16,22 @@ class DatabaseSpec extends AsyncWordSpec with Matchers { "not accept unnamed H2 database URLs" in { newLoggingContext { implicit loggingContext => an[InvalidDatabaseException] should be thrownBy - Database.owner("jdbc:h2:mem:", new Metrics(new MetricRegistry)) + Database.owner( + jdbcUrl = "jdbc:h2:mem:", + offsetBuilder = new VersionedOffsetBuilder(0), + metrics = new Metrics(new MetricRegistry), + ) } } "not accept unnamed SQLite database URLs" in { newLoggingContext { implicit loggingContext => an[InvalidDatabaseException] should be thrownBy - Database.owner("jdbc:sqlite::memory:", new Metrics(new MetricRegistry)) + Database.owner( + jdbcUrl = "jdbc:sqlite::memory:", + offsetBuilder = new VersionedOffsetBuilder(0), + metrics = new Metrics(new MetricRegistry), + ) } } } diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index 7417b41e9750..b4bff4e264fd 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -91,6 +91,7 @@ da_scala_library( "@maven//:org_mockito_mockito_scala", "@maven//:org_scala_lang_modules_scala_collection_compat", "@maven//:org_scala_lang_modules_scala_java8_compat", + "@maven//:org_scalacheck_scalacheck", "@maven//:org_scalactic_scalactic", "@maven//:org_scalatest_scalatest_core", "@maven//:org_scalatest_scalatest_matchers_core", @@ -186,7 +187,6 @@ da_scala_test_suite( "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", "//ledger/caching", - "//ledger/ledger-api-common", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", "//ledger/ledger-configuration", diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/OffsetBuilder.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/OffsetBuilder.scala deleted file mode 100644 index 3d9d19b13951..000000000000 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/OffsetBuilder.scala +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.participant.state.kvutils - -import com.daml.ledger.offset.Offset - -/** Helper functions for generating 16 byte [[Offset]]s from integers. - * The created offset will look as follows: - * | zeros (8 bits) | highest index (56 bits) | middle index (32 bits) | lowest index (32 bits) | - * Leading zeros will be retained when generating the resulting offset bytes. - * - * Example usage: - * - * - If you have one record per block then use [[OffsetBuilder.fromLong]] with a single argument, the block ID. - * - If you may have multiple records per block then use [[OffsetBuilder.fromLong]] with the index within the block as the second argument. - * - * @see com.daml.ledger.offset.Offset - * @see com.daml.ledger.offset.VersionedOffsetBuilder - * @see com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReader - */ -object OffsetBuilder { - private[kvutils] val highestStart = 0 - private[kvutils] val middleStart = 8 - private[kvutils] val lowestStart = 12 - private[kvutils] val end = 16 - - private val delegate = VersionedOffsetBuilder(version = 0) - - def onlyKeepHighestIndex(offset: Offset): Offset = delegate.onlyKeepHighestIndex(offset) - - def dropLowestIndex(offset: Offset): Offset = delegate.dropLowestIndex(offset) - - def setMiddleIndex(offset: Offset, middle: Int): Offset = delegate.setMiddleIndex(offset, middle) - - def setLowestIndex(offset: Offset, lowest: Int): Offset = delegate.setLowestIndex(offset, lowest) - - def fromLong(first: Long, second: Int = 0, third: Int = 0): Offset = - delegate.of(first, second, third) - - def highestIndex(offset: Offset): Long = delegate.highestIndex(offset) - - def middleIndex(offset: Offset): Int = delegate.middleIndex(offset) - - def lowestIndex(offset: Offset): Int = delegate.lowestIndex(offset) - - private[kvutils] def split(offset: Offset): (Long, Int, Int) = { - val (highest, middle, lowest) = delegate.split(offset) - (highest, middle, lowest) - } -} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffset.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffset.scala new file mode 100644 index 000000000000..07273891da70 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffset.scala @@ -0,0 +1,59 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils + +import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.nio.ByteBuffer + +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.kvutils.VersionedOffset._ + +private[kvutils] final case class VersionedOffset(offset: Offset) { + lazy val (version, highest, middle, lowest) = { + val stream = new DataInputStream(offset.toInputStream) + val versionAndHighest = stream.readLong() + val version = ((versionAndHighest & VersionMask) >> 56).toByte + val highest = versionAndHighest & HighestMask + val middle = stream.readInt() + val lowest = stream.readInt() + (version, highest, middle, lowest) + } + + def zeroLowest: VersionedOffset = + setLowest(0) + + def setLowest(newLowest: Int): VersionedOffset = + VersionedOffset(new VersionedOffsetBuilder(version).of(highest, middle, newLowest)) +} + +object VersionedOffset { + val MaxHighest: Long = (1L << 56) - 1 + val VersionMask: Long = 0xff00000000000000L + val HighestMask: Long = 0x00ffffffffffffffL + private val HighestStartByte = 1 + private val HighestSizeBytes = 7 + + def of(version: Byte, highest: Long, middle: Int, lowest: Int): VersionedOffset = { + require( + highest >= 0 && highest <= MaxHighest, + s"highest ($highest) is out of range [0, $MaxHighest]", + ) + require(middle >= 0, s"middle ($middle) is lower than 0") + require(lowest >= 0, s"lowest ($lowest) is lower than 0") + + val bytes = new ByteArrayOutputStream + val stream = new DataOutputStream(bytes) + stream.writeByte(version.toInt) + writeHighest(highest, stream) + stream.writeInt(middle) + stream.writeInt(lowest) + VersionedOffset(Offset.fromByteArray(bytes.toByteArray)) + } + + private def writeHighest(highest: Long, stream: DataOutputStream): Unit = { + val buffer = ByteBuffer.allocate(8) + buffer.putLong(highest) + stream.write(buffer.array(), HighestStartByte, HighestSizeBytes) + } +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilder.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilder.scala index 907ae29445c3..e4417582877e 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilder.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilder.scala @@ -3,10 +3,10 @@ package com.daml.ledger.participant.state.kvutils -import com.daml.ledger.offset.Offset +import java.io.DataInputStream -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} -import java.nio.ByteBuffer +import com.daml.ledger.offset.Offset +import com.daml.lf.data.Bytes /** Helper functions for generating versioned 16 byte [[Offset]]s from integers. * The created offset will look as follows: @@ -24,48 +24,15 @@ class VersionedOffsetBuilder(version: Byte) { import VersionedOffsetBuilder._ - def onlyKeepHighestIndex(offset: Offset): Offset = { - val highest = highestIndex(offset) - of(highest) - } - - def dropLowestIndex(offset: Offset): Offset = { - val (highest, middle, _) = split(offset) - of(highest, middle) - } - - def setMiddleIndex(offset: Offset, middle: Int): Offset = { - val (highest, _, lowest) = split(offset) - of(highest, middle, lowest) - } - - def setLowestIndex(offset: Offset, lowest: Int): Offset = { - val (highest, middle, _) = split(offset) - of(highest, middle, lowest) - } - - def of(highest: Long, middle: Int = 0, lowest: Int = 0): Offset = { - require( - highest >= 0 && highest <= MaxHighest, - s"highest ($highest) is out of range [0, $MaxHighest]", - ) - require(middle >= 0, s"middle ($middle) is lower than 0") - require(lowest >= 0, s"lowest ($lowest) is lower than 0") + private val versionBytes = Bytes.fromByteArray(Array(version)) - val bytes = new ByteArrayOutputStream - val stream = new DataOutputStream(bytes) - stream.writeByte(version.toInt) - writeHighest(highest, stream) - stream.writeInt(middle) - stream.writeInt(lowest) - Offset.fromByteArray(bytes.toByteArray) - } + def of(highest: Long, middle: Int = 0, lowest: Int = 0): Offset = + VersionedOffset.of(version, highest, middle, lowest).offset def version(offset: Offset): Byte = { + validateVersion(offset) val stream = toDataInputStream(offset) - val extractedVersion = stream.readByte() - validateVersion(extractedVersion) - extractedVersion + stream.readByte() } def matchesVersionOf(offset: Offset): Boolean = { @@ -76,53 +43,31 @@ class VersionedOffsetBuilder(version: Byte) { // `highestIndex` is used a lot, so it's worth optimizing a little rather than reusing `split`. def highestIndex(offset: Offset): Long = { + validateVersion(offset) val stream = toDataInputStream(offset) - val (extractedVersion, highest) = readVersionAndHighest(stream) - validateVersion(extractedVersion) - highest + readHighest(stream) } - def middleIndex(offset: Offset): Int = split(offset)._2 - - def lowestIndex(offset: Offset): Int = split(offset)._3 - - private[kvutils] def split(offset: Offset): (Long, Int, Int) = { - val stream = toDataInputStream(offset) - val (extractedVersion, highest) = readVersionAndHighest(stream) - validateVersion(extractedVersion) - val middle = stream.readInt() - val lowest = stream.readInt() - (highest, middle, lowest) + private[kvutils] def split(offset: Offset): VersionedOffset = { + validateVersion(offset) + VersionedOffset(offset) } - private def validateVersion(extractedVersion: Byte): Unit = - require(version == extractedVersion, s"wrong version $extractedVersion, should be $version") + private def validateVersion(offset: Offset): Unit = + require( + offset.bytes.startsWith(versionBytes), { + val extractedVersion = toDataInputStream(offset).readByte() + s"wrong version $extractedVersion, should be $version" + }, + ) } object VersionedOffsetBuilder { - val MaxHighest: Long = (1L << 56) - 1 - - private val highestStartByte = 1 - private val highestSizeBytes = 7 - private val versionMask = 0xff00000000000000L - private val highestMask = 0x00ffffffffffffffL - - def apply(version: Byte): VersionedOffsetBuilder = new VersionedOffsetBuilder(version) + private def toDataInputStream(offset: Offset) = + new DataInputStream(offset.toInputStream) - private def toDataInputStream(offset: Offset) = new DataInputStream( - new ByteArrayInputStream(offset.toByteArray) - ) - - private def readVersionAndHighest(stream: DataInputStream): (Byte, Long) = { + private def readHighest(stream: DataInputStream): Long = { val versionAndHighest = stream.readLong() - val version = ((versionAndHighest & versionMask) >> 56).toByte - val highest = versionAndHighest & highestMask - version -> highest - } - - private def writeHighest(highest: Long, stream: DataOutputStream): Unit = { - val buffer = ByteBuffer.allocate(8) - buffer.putLong(highest) - stream.write(buffer.array(), highestStartByte, highestSizeBytes) + versionAndHighest & VersionedOffset.HighestMask } } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetMutator.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetMutator.scala new file mode 100644 index 000000000000..7cf4fe2ce449 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetMutator.scala @@ -0,0 +1,19 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils + +import com.daml.ledger.offset.Offset + +/** Modifies specific offset fields while preserving all other fields. + */ +object VersionedOffsetMutator { + def zeroLowest(offset: Offset): Offset = + setLowest(offset, 0) + + def setLowest(offset: Offset, newLowest: Int): Offset = { + val versionedOffset = VersionedOffset(offset) + new VersionedOffsetBuilder(versionedOffset.version) + .of(versionedOffset.highest, versionedOffset.middle, newLowest) + } +} diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index c589de415089..0dfb9933bdff 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -9,7 +9,7 @@ import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId} -import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueConsumption, OffsetBuilder} +import com.daml.ledger.participant.state.kvutils.{Envelope, KeyValueConsumption, VersionedOffset} import com.daml.ledger.participant.state.v2._ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider import com.daml.lf.data.Time @@ -18,13 +18,11 @@ import com.daml.metrics.{Metrics, Timed} /** Adapts a [[LedgerReader]] instance to [[ReadService]]. * Performs translation between the offsets required by the underlying reader and [[ReadService]]: - * * a 3 component integer offset is exposed to [[ReadService]] (see [[OffsetBuilder.fromLong]]), - * * a max. 2 component integer offset is expected from the underlying [[LedgerReader]], and - * * the third (lowest index) component is generated as the index of the update in case more than - * 1 has been generated by [[KeyValueConsumption.logEntryToUpdate]], - * * otherwise the offset is passed on to [[ReadService]] as-is. * - * @see com.daml.ledger.participant.state.kvutils.OffsetBuilder + * - a 3 component integer offset is exposed to [[ReadService]] (see [[com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder]]), + * - a max. 2 component integer offset is expected from the underlying [[LedgerReader]], and + * - the third (lowest index) component is generated as the index of the update in case more than 1 has been generated by [[KeyValueConsumption.logEntryToUpdate]], + * - otherwise the offset is passed on to [[ReadService]] as-is. */ class KeyValueParticipantStateReader private[api] ( reader: LedgerReader, @@ -33,6 +31,7 @@ class KeyValueParticipantStateReader private[api] ( timeUpdatesProvider: TimeUpdatesProvider, failOnUnexpectedEvent: Boolean, ) extends ReadService { + import KeyValueParticipantStateReader._ override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = @@ -40,7 +39,7 @@ class KeyValueParticipantStateReader private[api] ( override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { Source - .single(beginAfter.map(OffsetBuilder.dropLowestIndex)) + .single(beginAfter.map(offset => VersionedOffset(offset).zeroLowest.offset)) .flatMapConcat(reader.events) .flatMapConcat { case LedgerRecord(offset, entryId, envelope) => Timed @@ -103,7 +102,7 @@ object KeyValueParticipantStateReader { totalUpdates: Int, ): Offset = if (totalUpdates > 1) { - OffsetBuilder.setLowestIndex(offsetFromRecord, index) + VersionedOffset(offsetFromRecord).setLowest(index).offset } else { offsetFromRecord } diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/OffsetGen.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/OffsetGen.scala new file mode 100644 index 000000000000..4adf8f7bbb55 --- /dev/null +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/OffsetGen.scala @@ -0,0 +1,29 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils + +import org.scalacheck.Arbitrary.arbitrary +import org.scalacheck.Gen + +object OffsetGen { + val genVersion: Gen[Byte] = arbitrary[Byte] + + val genDifferentVersions: Gen[(Byte, Byte)] = for { + version1 <- genVersion + version2 <- genVersion.suchThat(_ != version1) + } yield (version1, version2) + + val genHighest: Gen[Long] = Gen.chooseNum(0L, VersionedOffset.MaxHighest) + + val genOutOfRangeHighest: Gen[Long] = + Gen.oneOf(Gen.negNum[Long], Gen.chooseNum(VersionedOffset.MaxHighest + 1, Long.MaxValue)) + + val genMiddle: Gen[Int] = Gen.chooseNum(0, Int.MaxValue, 0, 1, Int.MaxValue) + + val genOutOfRangeMiddle: Gen[Int] = Gen.negNum[Int] + + val genLowest: Gen[Int] = Gen.chooseNum(0, Int.MaxValue, 0, 1, Int.MaxValue) + + val genOutOfRangeLowest: Gen[Int] = Gen.negNum[Int] +} diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala index fb5038c867ad..86a2b3edea2f 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala @@ -14,7 +14,6 @@ import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.configuration.{Configuration, LedgerId, LedgerTimeModel} import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.kvutils.OffsetBuilder.{fromLong => toOffset} import com.daml.ledger.participant.state.kvutils.ParticipantStateIntegrationSpecBase._ import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.ledger.participant.state.v2.Update._ @@ -69,10 +68,16 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i // This can be overridden by tests for in-memory or otherwise ephemeral ledgers. protected val isPersistent: Boolean = true + // This can be overridden by tests which use a different offset version. + protected val offsetVersion: Byte = 0 + + private lazy val offsetBuilder = new VersionedOffsetBuilder(offsetVersion) + protected def participantStateFactory( ledgerId: LedgerId, participantId: Ref.ParticipantId, testId: String, + offsetVersion: Byte, metrics: Metrics, )(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] @@ -83,7 +88,13 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i ledgerId: LedgerId ): ResourceOwner[ParticipantState] = newLoggingContext { implicit loggingContext => - participantStateFactory(ledgerId, participantId, testId, new Metrics(new MetricRegistry)) + participantStateFactory( + ledgerId, + participantId, + testId, + offsetVersion, + new Metrics(new MetricRegistry), + ) } override protected def beforeEach(): Unit = { @@ -120,7 +131,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i .idleTimeout(IdleTimeout) .runWith(Sink.head) } yield { - offset should be(toOffset(1)) + offset should be(offsetBuilder.of(1)) update.recordTime should be >= rt matchPackageUpload(update, submissionId, List(anArchive)) } @@ -136,7 +147,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i .idleTimeout(IdleTimeout) .runWith(Sink.head) } yield { - offset should be(toOffset(1)) + offset should be(offsetBuilder.of(1)) update.recordTime should be >= rt matchPackageUpload(update, submissionId, archives) } @@ -159,11 +170,11 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i } yield { all(updates.map(_.recordTime)) should be >= rt // first upload arrives as head update: - offset1 should be(toOffset(1)) + offset1 should be(offsetBuilder.of(1)) matchPackageUpload(update1, subId1, List(anArchive)) - offset2 should be(toOffset(2)) + offset2 should be(offsetBuilder.of(2)) matchPackageUpload(update2, subId2, List()) - offset3 should be(toOffset(3)) + offset3 should be(offsetBuilder.of(3)) matchPackageUpload(update3, subId3, List(anotherArchive)) } } @@ -183,7 +194,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i .idleTimeout(IdleTimeout) .runWith(Sink.head) } yield { - offset should be(toOffset(1)) + offset should be(offsetBuilder.of(1)) update.recordTime should be >= rt inside(update) { case PublicPackageUploadRejected(actualSubmissionId, _, _) => actualSubmissionId should be(submissionId) @@ -206,7 +217,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i results = Seq(result1, result2, result3) _ = all(results) should be(SubmissionResult.Acknowledged) } yield { - offset2 should be(toOffset(3)) + offset2 should be(offsetBuilder.of(3)) update2.recordTime should be >= rt inside(update2) { case PublicPackageUpload(_, _, _, Some(submissionId)) => submissionId should be(submissionIds._2) @@ -227,7 +238,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i _ = result should be(SubmissionResult.Acknowledged) (offset, update) <- waitForNextUpdate(ps, None) } yield { - offset should be(toOffset(1)) + offset should be(offsetBuilder.of(1)) update.recordTime should be >= rt inside(update) { case PartyAddedToParticipant(party, actualDisplayName, actualParticipantId, _, _) => @@ -246,7 +257,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i _ = result should be(SubmissionResult.Acknowledged) (offset, update) <- waitForNextUpdate(ps, None) } yield { - offset should be(toOffset(1)) + offset should be(offsetBuilder.of(1)) update.recordTime should be >= rt inside(update) { case PartyAddedToParticipant(party, actualDisplayName, actualParticipantId, _, _) => @@ -273,7 +284,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i results = Seq(result1, result2, result3) _ = all(results) should be(SubmissionResult.Acknowledged) } yield { - offset2 should be(toOffset(3)) + offset2 should be(offsetBuilder.of(3)) update2.recordTime should be >= rt inside(update2) { case PartyAddedToParticipant(_, displayName, _, _, Some(submissionId)) => @@ -294,7 +305,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i results = Seq(result1, result2) _ = all(results) should be(SubmissionResult.Acknowledged) } yield { - offset2 should be(toOffset(2)) + offset2 should be(offsetBuilder.of(2)) update2.recordTime should be >= rt inside(update2) { case PartyAllocationRejected(_, _, _, rejectionReason) => rejectionReason should be("Party already exists") @@ -318,7 +329,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i .toScala (offset2, _) <- waitForNextUpdate(ps, Some(offset1)) } yield { - offset2 should be(toOffset(2)) + offset2 should be(offsetBuilder.of(2)) } } @@ -362,18 +373,18 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i } yield { all(updates.map(_.recordTime)) should be >= rt - offset1 should be(toOffset(1)) + offset1 should be(offsetBuilder.of(1)) update1 should be(a[PartyAddedToParticipant]) - offset2 should be(toOffset(2)) + offset2 should be(offsetBuilder.of(2)) matchTransaction(update2, firstCommandId) - offset3 should be(toOffset(3)) + offset3 should be(offsetBuilder.of(3)) inside(update3) { case CommandRejected(_, _, FinalReason(status)) => status.code should be(Code.ALREADY_EXISTS.value) } - offset4 should be(toOffset(4)) + offset4 should be(offsetBuilder.of(4)) matchTransaction(update4, secondCommandId) } } @@ -405,7 +416,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i results = Seq(result1, result2, result3) _ = all(results) should be(SubmissionResult.Acknowledged) } yield { - offset3 should be(toOffset(3)) + offset3 should be(offsetBuilder.of(3)) update3.recordTime should be >= rt update3 should be(a[TransactionAccepted]) } @@ -450,7 +461,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i //get the new party off state updates newParty <- ps - .stateUpdates(beginAfter = Some(toOffset(2))) + .stateUpdates(beginAfter = Some(offsetBuilder.of(2))) .idleTimeout(IdleTimeout) .runWith(Sink.head) .map(_._2.asInstanceOf[PartyAddedToParticipant].party) @@ -468,18 +479,18 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i } yield { all(updates.map(_.recordTime)) should be >= rt - offset1 should be(toOffset(1)) + offset1 should be(offsetBuilder.of(1)) update1 should be(a[ConfigurationChanged]) - offset2 should be(toOffset(2)) + offset2 should be(offsetBuilder.of(2)) inside(update2) { case CommandRejected(_, _, FinalReason(status)) => status.code should be(Code.INVALID_ARGUMENT.value) } - offset3 should be(toOffset(3)) + offset3 should be(offsetBuilder.of(3)) update3 should be(a[PartyAddedToParticipant]) - offset4 should be(toOffset(4)) + offset4 should be(offsetBuilder.of(4)) update4 should be(a[TransactionAccepted]) } } @@ -569,7 +580,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i results = Seq(result1, result2, result3) _ = all(results) should be(SubmissionResult.Acknowledged) } yield { - offset2 should be(toOffset(3)) + offset2 should be(offsetBuilder.of(3)) update2.recordTime should be >= rt inside(update2) { case ConfigurationChanged(_, submissionId, _, _) => submissionId should be(submissionIds._2) @@ -587,7 +598,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i .map(i => Ref.Party.assertFromString(s"party-%0${partyIdDigits}d".format(i))) .to(SortedSet) - val expectedOffsets = partyIds.map(i => toOffset(i)).to(SortedSet) + val expectedOffsets = partyIds.map(i => offsetBuilder.of(i)).to(SortedSet) val updates = mutable.Buffer.empty[(Offset, Update)] val stateUpdatesF = ps diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/OffsetBuilderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/OffsetBuilderSpec.scala deleted file mode 100644 index 33845ef79120..000000000000 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/OffsetBuilderSpec.scala +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.participant.state.kvutils - -import com.daml.ledger.offset.Offset -import com.daml.lf.data.Bytes -import org.scalacheck.Gen -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec -import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks - -class OffsetBuilderSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks { - import OffsetBuilderSpec._ - - "OffsetBuilder" should { - "return all zeroes for the zeroth offset" in { - val offset = OffsetBuilder.fromLong(0) - - offset should be(zeroOffset) - } - - "always return an offset of the same length" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int]) { (highest, middle, lowest) => - val offset = OffsetBuilder.fromLong(highest, middle, lowest) - offset.bytes.length should be(OffsetBuilder.end) - } - } - - "construct and extract" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int]) { (highest, middle, lowest) => - val offset = OffsetBuilder.fromLong(highest, middle, lowest) - - OffsetBuilder.highestIndex(offset) should be(highest) - OffsetBuilder.middleIndex(offset) should be(middle) - OffsetBuilder.lowestIndex(offset) should be(lowest) - OffsetBuilder.split(offset) should be((highest, middle, lowest)) - } - } - - "set the middle index" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) { - (highest, middle, lowest, newMiddle) => - val offset = OffsetBuilder.fromLong(highest, middle, lowest) - val modifiedOffset = OffsetBuilder.setMiddleIndex(offset, newMiddle) - - OffsetBuilder.split(modifiedOffset) should be((highest, newMiddle, lowest)) - } - } - - "only change individual indexes" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) { - (highest, middle, lowest, newLowest) => - val offset = OffsetBuilder.fromLong(highest, middle, lowest) - val modifiedOffset = OffsetBuilder.setLowestIndex(offset, newLowest) - - OffsetBuilder.split(modifiedOffset) should be((highest, middle, newLowest)) - } - } - - "zero out the middle and lowest index" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int]) { (highest, middle, lowest) => - val offset = OffsetBuilder.fromLong(highest, middle, lowest) - val modifiedOffset = OffsetBuilder.onlyKeepHighestIndex(offset) - - OffsetBuilder.split(modifiedOffset) should be((highest, 0, 0)) - } - } - - "zero out the lowest index" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int]) { (highest, middle, lowest) => - val offset = OffsetBuilder.fromLong(highest, middle, lowest) - val modifiedOffset = OffsetBuilder.dropLowestIndex(offset) - - OffsetBuilder.split(modifiedOffset) should be((highest, middle, 0)) - } - } - - "retain leading zeros" in { - val offset = OffsetBuilder.fromLong(1, 2, 3) - val highest = offset.toByteArray.slice(OffsetBuilder.highestStart, OffsetBuilder.middleStart) - val middle = offset.toByteArray.slice(OffsetBuilder.middleStart, OffsetBuilder.lowestStart) - val lowest = offset.toByteArray.slice(OffsetBuilder.lowestStart, OffsetBuilder.end) - - val highestZeros = highest.dropRight(1).toSeq - all(highestZeros) should be(0) - highest.takeRight(1) should be(Array[Byte](1)) - - val middleZeros = middle.dropRight(1).toSeq - all(middleZeros) should be(0) - middle.takeRight(1) should be(Array[Byte](2)) - - val lowestZeros = lowest.dropRight(1).toSeq - all(lowestZeros) should be(0) - lowest.takeRight(1) should be(Array[Byte](3)) - } - } -} - -object OffsetBuilderSpec { - private val zeroOffset = Offset(Bytes.fromByteArray(Array.fill(16)(0: Byte))) - - private val genHighest = Gen.chooseNum(0L, VersionedOffsetBuilder.MaxHighest) -} diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilderSpec.scala index a713a237b083..d743beba7970 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetBuilderSpec.scala @@ -3,114 +3,72 @@ package com.daml.ledger.participant.state.kvutils -import org.scalacheck.Arbitrary.arbitrary -import org.scalacheck.Gen import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks -/** Other test cases are covered by [[OffsetBuilderSpec]] */ class VersionedOffsetBuilderSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks { - import VersionedOffsetBuilderSpec._ + import OffsetGen._ "VersionedOffsetBuilder" should { "construct and extract" in { - forAll(arbitrary[Byte], genHighest, Gen.posNum[Int], Gen.posNum[Int]) { - (version, highest, middle, lowest) => - val builder = VersionedOffsetBuilder(version) - val offset = builder.of(highest, middle, lowest) - - builder.version(offset) should be(version) - builder.highestIndex(offset) should be(highest) - builder.middleIndex(offset) should be(middle) - builder.lowestIndex(offset) should be(lowest) - builder.split(offset) should be((highest, middle, lowest)) + forAll(genVersion, genHighest, genMiddle, genLowest) { (version, highest, middle, lowest) => + val builder = new VersionedOffsetBuilder(version) + val offset = builder.of(highest, middle, lowest) + + val splitOffset = VersionedOffset(offset) + splitOffset.version should be(version) + splitOffset.highest should be(highest) + splitOffset.middle should be(middle) + splitOffset.lowest should be(lowest) } } - "fail on a highest that is out of range" in { - forAll(arbitrary[Byte], genOutOfRangeHighest, Gen.posNum[Int], Gen.posNum[Int]) { - (version, highest, middle, lowest) => - the[IllegalArgumentException] thrownBy VersionedOffsetBuilder(version).of( - highest, - middle, - lowest, - ) should have message s"requirement failed: highest ($highest) is out of range [0, ${VersionedOffsetBuilder.MaxHighest}]" - } - } - - "fail on a negative middle index" in { - forAll(arbitrary[Byte], genHighest, Gen.negNum[Int], Gen.posNum[Int]) { - (version, highest, middle, lowest) => - the[IllegalArgumentException] thrownBy VersionedOffsetBuilder(version).of( - highest, - middle, - lowest, - ) should have message s"requirement failed: middle ($middle) is lower than 0" - } - } + "extract the highest index" in { + forAll(genVersion, genHighest, genMiddle, genLowest) { (version, highest, middle, lowest) => + val builder = new VersionedOffsetBuilder(version) + val offset = builder.of(highest, middle, lowest) - "fail on a negative lowest index" in { - forAll(arbitrary[Byte], genHighest, Gen.posNum[Int], Gen.negNum[Int]) { - (version, highest, middle, lowest) => - the[IllegalArgumentException] thrownBy VersionedOffsetBuilder(version).of( - highest, - middle, - lowest, - ) should have message s"requirement failed: lowest ($lowest) is lower than 0" + builder.highestIndex(offset) should be(highest) } } "fail on a wrong version" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int], genDifferentVersions) { - (highest, middle, lowest, versions) => - val offset = VersionedOffsetBuilder(versions._1).of(highest, middle, lowest) - val offsetBuilder = VersionedOffsetBuilder(versions._2) + forAll(genDifferentVersions, genHighest, genMiddle, genLowest) { + case ((versionA, versionB), highest, middle, lowest) => + val offset = new VersionedOffsetBuilder(versionA).of(highest, middle, lowest) + val offsetBuilder = new VersionedOffsetBuilder(versionB) val testedMethods = List(offsetBuilder.version(_), offsetBuilder.highestIndex(_), offsetBuilder.split(_)) testedMethods.foreach { method => - the[IllegalArgumentException] thrownBy method( - offset - ) should have message s"requirement failed: wrong version ${versions._1}, should be ${versions._2}" + (the[IllegalArgumentException] thrownBy method(offset) + should have message s"requirement failed: wrong version $versionA, should be $versionB") } } } "test the version of the offset, returning `true` on a match" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int], arbitrary[Byte]) { - (highest, middle, lowest, version) => - val offsetBuilder = VersionedOffsetBuilder(version) - val offset = offsetBuilder.of(highest, middle, lowest) + forAll(genVersion, genHighest, genMiddle, genLowest) { (version, highest, middle, lowest) => + val offsetBuilder = new VersionedOffsetBuilder(version) + val offset = offsetBuilder.of(highest, middle, lowest) - offsetBuilder.matchesVersionOf(offset) should be(true) + offsetBuilder.matchesVersionOf(offset) should be(true) } } "test the version of the offset, returning `false` on a mismatch" in { - forAll(genHighest, Gen.posNum[Int], Gen.posNum[Int], genDifferentVersions) { - (highest, middle, lowest, versions) => - val offset = VersionedOffsetBuilder(versions._1).of(highest, middle, lowest) - val offsetBuilder = VersionedOffsetBuilder(versions._2) + forAll(genDifferentVersions, genHighest, genMiddle, genLowest) { + case ((versionA, versionB), highest, middle, lowest) => + val offset = new VersionedOffsetBuilder(versionA).of(highest, middle, lowest) + val offsetBuilder = new VersionedOffsetBuilder(versionB) offsetBuilder.matchesVersionOf(offset) should be(false) } } } } - -object VersionedOffsetBuilderSpec { - private val genHighest = Gen.chooseNum(0L, VersionedOffsetBuilder.MaxHighest) - - private val genOutOfRangeHighest = - Gen.oneOf(Gen.negNum[Long], Gen.chooseNum(VersionedOffsetBuilder.MaxHighest + 1, Long.MaxValue)) - - private val genDifferentVersions = for { - version1 <- arbitrary[Byte] - version2 <- arbitrary[Byte].suchThat(_ != version1) - } yield (version1, version2) -} diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetSpec.scala new file mode 100644 index 000000000000..0f663b8fb7fa --- /dev/null +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/VersionedOffsetSpec.scala @@ -0,0 +1,71 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +class VersionedOffsetSpec extends AnyWordSpec with Matchers with ScalaCheckDrivenPropertyChecks { + + import OffsetGen._ + + "VersionedOffset" when { + "constructing" should { + "construct and split up an offset" in { + forAll(genVersion, genHighest, genMiddle, genLowest) { (version, highest, middle, lowest) => + val offset = VersionedOffset.of(version, highest, middle, lowest) + + offset.version should be(version) + offset.highest should be(highest) + offset.middle should be(middle) + offset.lowest should be(lowest) + } + } + + "fail on a highest that is out of range" in { + forAll(genVersion, genOutOfRangeHighest, genMiddle, genLowest) { + (version, highest, middle, lowest) => + val builder = new VersionedOffsetBuilder(version) + (the[IllegalArgumentException] thrownBy builder.of(highest, middle, lowest) + should have message s"requirement failed: highest ($highest) is out of range [0, ${VersionedOffset.MaxHighest}]") + } + } + + "fail on a negative middle index" in { + forAll(genVersion, genHighest, genOutOfRangeMiddle, genLowest) { + (version, highest, middle, lowest) => + val builder = new VersionedOffsetBuilder(version) + (the[IllegalArgumentException] thrownBy builder.of(highest, middle, lowest) + should have message s"requirement failed: middle ($middle) is lower than 0") + } + } + + "fail on a negative lowest index" in { + forAll(genVersion, genHighest, genMiddle, genOutOfRangeLowest) { + (version, highest, middle, lowest) => + val builder = new VersionedOffsetBuilder(version) + (the[IllegalArgumentException] thrownBy builder.of(highest, middle, lowest) + should have message s"requirement failed: lowest ($lowest) is lower than 0") + } + } + } + + "mutating" should { + "only change individual indexes" in { + forAll(genVersion, genHighest, genMiddle, genLowest, genLowest) { + (version, highest, middle, lowest, newLowest) => + val offset = VersionedOffset.of(version, highest, middle, lowest) + + val modifiedOffset = offset.setLowest(newLowest) + + modifiedOffset.version should be(version) + modifiedOffset.highest should be(highest) + modifiedOffset.middle should be(middle) + modifiedOffset.lowest should be(newLowest) + } + } + } + } +} diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala index 501af8433012..8eb22aaa0ee2 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala @@ -17,7 +17,7 @@ import com.daml.ledger.participant.state.kvutils.store.{ DamlPartyAllocation, DamlStateValue, } -import com.daml.ledger.participant.state.kvutils.{Envelope, OffsetBuilder, Raw} +import com.daml.ledger.participant.state.kvutils.{Envelope, Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp @@ -35,15 +35,15 @@ class KeyValueParticipantStateReaderSpec with Matchers with AkkaBeforeAndAfterAll { - import OffsetBuilder.{fromLong => toOffset} + private val offsetBuilder = new VersionedOffsetBuilder(0) "participant state reader" should { "stream offsets from the start" in { val reader = readerStreamingFrom( offset = None, - LedgerRecord(toOffset(1), aLogEntryId(1), aWrappedLogEntry), - LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry), - LedgerRecord(toOffset(3), aLogEntryId(3), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(1), aLogEntryId(1), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(2), aLogEntryId(2), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(3), aLogEntryId(3), aWrappedLogEntry), ) val instance = createInstance(reader) val stream = instance.stateUpdates(None) @@ -51,54 +51,54 @@ class KeyValueParticipantStateReaderSpec offsetsFrom(stream).map { actual => actual should have size 3 actual shouldBe Seq( - toOffset(1), - toOffset(2), - toOffset(3), + offsetBuilder.of(1), + offsetBuilder.of(2), + offsetBuilder.of(3), ) } } "stream offsets from a given 1 component offset" in { val reader = readerStreamingFrom( - offset = Some(toOffset(4)), - LedgerRecord(toOffset(5), aLogEntryId(5), aWrappedLogEntry), - LedgerRecord(toOffset(6), aLogEntryId(6), aWrappedLogEntry), - LedgerRecord(toOffset(7), aLogEntryId(7), aWrappedLogEntry), - LedgerRecord(toOffset(8), aLogEntryId(8), aWrappedLogEntry), + offset = Some(offsetBuilder.of(4)), + LedgerRecord(offsetBuilder.of(5), aLogEntryId(5), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(6), aLogEntryId(6), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(7), aLogEntryId(7), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(8), aLogEntryId(8), aWrappedLogEntry), ) val instance = createInstance(reader) - val stream = instance.stateUpdates(Some(toOffset(4))) + val stream = instance.stateUpdates(Some(offsetBuilder.of(4))) offsetsFrom(stream).map { actual => actual should have size 4 actual shouldBe Seq( - toOffset(5), - toOffset(6), - toOffset(7), - toOffset(8), + offsetBuilder.of(5), + offsetBuilder.of(6), + offsetBuilder.of(7), + offsetBuilder.of(8), ) } } "remove third component of input offset when streaming from underlying reader" in { val reader = readerStreamingFrom( - offset = Some(toOffset(1, 2)), - LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry), + offset = Some(offsetBuilder.of(1, 2)), + LedgerRecord(offsetBuilder.of(2), aLogEntryId(2), aWrappedLogEntry), ) val instance = createInstance(reader) - val stream = instance.stateUpdates(Some(toOffset(1, 2, 3))) + val stream = instance.stateUpdates(Some(offsetBuilder.of(1, 2, 3))) offsetsFrom(stream).map { actual => actual should have size 1 - actual shouldBe Seq(toOffset(2)) + actual shouldBe Seq(offsetBuilder.of(2)) } } "do not append index to underlying reader's offset in case of no more than 1 update" in { val reader = readerStreamingFrom( offset = None, - LedgerRecord(toOffset(1), aLogEntryId(1), aWrappedLogEntry), - LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(1), aLogEntryId(1), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(2), aLogEntryId(2), aWrappedLogEntry), ) for (updateGenerator <- Seq(zeroUpdateGenerator, singleUpdateGenerator)) { val instance = createInstance(reader, updateGenerator) @@ -106,7 +106,7 @@ class KeyValueParticipantStateReaderSpec offsetsFrom(stream).map { actual => actual should have size 2 - actual shouldBe Seq(toOffset(1), toOffset(2)) + actual shouldBe Seq(offsetBuilder.of(1), offsetBuilder.of(2)) } } succeed @@ -115,8 +115,8 @@ class KeyValueParticipantStateReaderSpec "append index to underlying reader's offset in case of more than 1 update" in { val reader = readerStreamingFrom( offset = None, - LedgerRecord(toOffset(1, 11), aLogEntryId(1), aWrappedLogEntry), - LedgerRecord(toOffset(2, 22), aLogEntryId(2), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(1, 11), aLogEntryId(1), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(2, 22), aLogEntryId(2), aWrappedLogEntry), ) val instance = createInstance(reader, twoUpdatesGenerator) val stream = instance.stateUpdates(None) @@ -124,19 +124,19 @@ class KeyValueParticipantStateReaderSpec offsetsFrom(stream).map { actual => actual should have size 4 actual shouldBe Seq( - toOffset(1, 11, 0), - toOffset(1, 11, 1), - toOffset(2, 22, 0), - toOffset(2, 22, 1), + offsetBuilder.of(1, 11, 0), + offsetBuilder.of(1, 11, 1), + offsetBuilder.of(2, 22, 0), + offsetBuilder.of(2, 22, 1), ) } } "skip events before specified offset" in { val records = List( - LedgerRecord(toOffset(1), aLogEntryId(1), aWrappedLogEntry), - LedgerRecord(toOffset(2), aLogEntryId(2), aWrappedLogEntry), - LedgerRecord(toOffset(3), aLogEntryId(3), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(1), aLogEntryId(1), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(2), aLogEntryId(2), aWrappedLogEntry), + LedgerRecord(offsetBuilder.of(3), aLogEntryId(3), aWrappedLogEntry), ) def getInstance( @@ -153,7 +153,7 @@ class KeyValueParticipantStateReaderSpec Future .sequence( - Seq(None, Some(toOffset(1)), Some(toOffset(2)), Some(toOffset(3))) + Seq(None, Some(offsetBuilder.of(1)), Some(offsetBuilder.of(2)), Some(offsetBuilder.of(3))) .map(offset => offsetsFrom(instances(offset).stateUpdates(offset))) ) .map { case Seq(all, afterFirst, beforeLast, afterLast) => @@ -168,7 +168,7 @@ class KeyValueParticipantStateReaderSpec val anInvalidEnvelope = Raw.Envelope(ByteString.copyFrom(Array[Byte](0, 1, 2))) val reader = readerStreamingFrom( offset = None, - LedgerRecord(toOffset(0), aLogEntryId(0), anInvalidEnvelope), + LedgerRecord(offsetBuilder.of(0), aLogEntryId(0), anInvalidEnvelope), ) val instance = createInstance(reader) @@ -181,7 +181,7 @@ class KeyValueParticipantStateReaderSpec val anInvalidEnvelopeMessage = Envelope.enclose(aStateValue) val reader = readerStreamingFrom( offset = None, - LedgerRecord(toOffset(0), aLogEntryId(0), anInvalidEnvelopeMessage), + LedgerRecord(offsetBuilder.of(0), aLogEntryId(0), anInvalidEnvelopeMessage), ) val instance = createInstance(reader) @@ -194,7 +194,7 @@ class KeyValueParticipantStateReaderSpec val anInvalidEnvelopeMessage = Envelope.enclose(aStateValue) val reader = readerStreamingFrom( offset = None, - LedgerRecord(toOffset(0), aLogEntryId(0), anInvalidEnvelopeMessage), + LedgerRecord(offsetBuilder.of(0), aLogEntryId(0), anInvalidEnvelopeMessage), ) val instance = createInstance(reader, failOnUnexpectedEvent = false) @@ -206,19 +206,16 @@ class KeyValueParticipantStateReaderSpec "offsetForUpdate" should { "not overwrite middle offset from record in case of 2 updates" in { - val offsetFromRecord = OffsetBuilder.fromLong(1, 2) + val offsetFromRecord = offsetBuilder.of(1, 2) for (subOffset <- Seq(0, 1)) { - offsetForUpdate(offsetFromRecord, subOffset, 2) shouldBe OffsetBuilder.fromLong( - 1, - 2, - subOffset, - ) + (offsetForUpdate(offsetFromRecord, subOffset, 2) + shouldBe offsetBuilder.of(1, 2, subOffset)) } succeed } "use original offset in case less than 2 updates" in { - val expectedOffset = OffsetBuilder.fromLong(1, 2, 3) + val expectedOffset = offsetBuilder.of(1, 2, 3) for (totalUpdates <- Seq(0, 1)) { for (i <- 0 until totalUpdates) { offsetForUpdate(expectedOffset, i, totalUpdates) shouldBe expectedOffset diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala index aa68d27465a0..f981baffed00 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala @@ -5,7 +5,7 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck import akka.stream.Materializer import com.daml.ledger.on.memory.{InMemoryLedgerStateAccess, InMemoryState, Index} -import com.daml.ledger.participant.state.kvutils.KeyValueCommitting +import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, VersionedOffsetBuilder} import com.daml.ledger.participant.state.kvutils.export.{ NoOpLedgerDataExporter, SubmissionInfo, @@ -28,6 +28,7 @@ final class LogAppendingCommitStrategySupport( metrics: Metrics )(implicit executionContext: ExecutionContext) extends CommitStrategySupport[Index] { + private val offsetBuilder = new VersionedOffsetBuilder(0) private val state = InMemoryState.empty private val serializationStrategy = StateKeySerializationStrategy.createDefault() @@ -48,7 +49,9 @@ final class LogAppendingCommitStrategySupport( override def commit( submissionInfo: SubmissionInfo )(implicit materializer: Materializer, loggingContext: LoggingContext): Future[WriteSet] = { - val access = new WriteRecordingLedgerStateAccess(new InMemoryLedgerStateAccess(state, metrics)) + val access = new WriteRecordingLedgerStateAccess( + new InMemoryLedgerStateAccess(offsetBuilder, state, metrics) + ) access.inTransaction { operations => val (ledgerStateReader, commitStrategy) = BatchedSubmissionValidatorFactory.readerAndCommitStrategyFrom( @@ -69,7 +72,7 @@ final class LogAppendingCommitStrategySupport( } override def newReadServiceFactory(): ReplayingReadServiceFactory = - new LogAppendingReadServiceFactory(metrics) + new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = new RawWriteSetComparison(serializationStrategy) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index 14c5d2f76a28..87f32c800fdf 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -15,7 +15,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ LedgerRecord, } import com.daml.ledger.participant.state.kvutils.export.{SubmissionInfo, WriteSet} -import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw} +import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.v2.Update import com.daml.metrics.Metrics @@ -23,14 +23,15 @@ import scala.collection.immutable import scala.collection.mutable.ListBuffer final class LogAppendingReadServiceFactory( - metrics: Metrics + offsetBuilder: VersionedOffsetBuilder, + metrics: Metrics, ) extends ReplayingReadServiceFactory { private val recordedBlocks = ListBuffer.empty[LedgerRecord] override def appendBlock(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit = this.synchronized { writeSet.foreach { case (key, value) => - val offset = OffsetBuilder.fromLong(recordedBlocks.length.toLong) + val offset = offsetBuilder.of(recordedBlocks.length.toLong) val logEntryId = Raw.LogEntryId(key.bytes) // `key` is of an unknown type. recordedBlocks.append(LedgerRecord(offset, logEntryId, value)) } diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala index c52c9a1a9c74..2690e4ec248e 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.stream.Materializer import com.daml.ledger.on.memory.{InMemoryLedgerStateAccess, InMemoryState, Index} -import com.daml.ledger.participant.state.kvutils.KeyValueCommitting +import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, VersionedOffsetBuilder} import com.daml.ledger.participant.state.kvutils.export.{ NoOpLedgerDataExporter, SubmissionInfo, @@ -38,8 +38,9 @@ final class RawPreExecutingCommitStrategySupport( override val stateKeySerializationStrategy: StateKeySerializationStrategy = StateKeySerializationStrategy.createDefault() + private val offsetBuilder = new VersionedOffsetBuilder(0) private val state = InMemoryState.empty - private val ledgerStateAccess = new InMemoryLedgerStateAccess(state, metrics) + private val ledgerStateAccess = new InMemoryLedgerStateAccess(offsetBuilder, state, metrics) // To mimic the original pre-execution as closely as possible, we use the original submission // record time as the current time. This effectively means that the committer thinks the @@ -86,7 +87,7 @@ final class RawPreExecutingCommitStrategySupport( } override def newReadServiceFactory(): ReplayingReadServiceFactory = - new LogAppendingReadServiceFactory(metrics) + new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = new RawWriteSetComparison(stateKeySerializationStrategy) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala index 27f31e50fdc7..40843f76df9d 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala @@ -13,7 +13,7 @@ import com.daml.ledger.participant.state.kvutils.export.SubmissionInfo import com.daml.ledger.participant.state.kvutils.store.events.DamlPartyAllocationEntry import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId} import com.daml.ledger.participant.state.kvutils.tools.integritycheck.LogAppendingReadServiceFactorySpec._ -import com.daml.ledger.participant.state.kvutils.{Envelope, Raw} +import com.daml.ledger.participant.state.kvutils.{Envelope, Raw, VersionedOffsetBuilder} import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp @@ -72,8 +72,9 @@ final class LogAppendingReadServiceFactorySpec object LogAppendingReadServiceFactorySpec { private def createFactory() = { + val offsetBuilder = new VersionedOffsetBuilder(0) val metrics = new Metrics(new MetricRegistry) - new LogAppendingReadServiceFactory(metrics) + new LogAppendingReadServiceFactory(offsetBuilder, metrics) } private val anEntryId = "AnEntryId" diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index b3fb43314adf..b31e89e4d589 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -269,6 +269,7 @@ object RecoveringIndexerIntegrationSpec { readerWriter <- new memory.InMemoryLedgerReaderWriter.Owner( ledgerId = ledgerId, participantId = participantId, + offsetVersion = 0, keySerializationStrategy = StateKeySerializationStrategy.createDefault(), metrics = metrics, dispatcher = dispatcher, diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index e249c326dfaa..7724a82a78f1 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -159,6 +159,7 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { engine = engine, jdbcUrl = ledgerJdbcUrl, resetOnStartup = isReset, + offsetVersion = 0, logEntryIdAllocator = new SeedServiceLogEntryIdAllocator(SeedService(config.seeding.get)), stateValueCache = caching.WeightedCache.from(