Skip to content

Commit

Permalink
kvutils: Use VersionedOffsetBuilder where possible, and introduce `…
Browse files Browse the repository at this point in the history
…VersionedOffsetMutator`. [KVL-1154] (#11277)

* kvutils: Remove `VersionedOffsetBuilder.apply`.

Doesn't do anything.

* ledger-on-memory: Use `VersionedOffsetBuilder`.

* indexer-benchmark: Use `VersionedOffsetBuilder`.

* ledger-on-sql: Use `VersionedOffsetBuilder`.

* kvutils: Use `VersionedOffsetBuilder` in tests.

* kvutils: Create a case class for `VersionedOffsetBuilder#split`.

* kvutils: Delete unused methods from the offset builders.

* kvutils: Use `Bytes#startWith` to check the offset version.

* kvutils: Move offset splitting into `VersionedOffset`.

* kvutils: Extract out versioned offset generators.

* kvutils: Replace `OffsetBuilder` with `VersionedOffsetMutator`.

This takes care of the last usages of `OffsetBuilder`, which were to
modify the lowest component of the offset.

CHANGELOG_BEGIN
CHANGELOG_END

* kvutils: Randomize the offset version in testing.

To make sure we don't use a hard-coded offset version anywhere.

* kvutils: `Random.between` is not available in Scala 2.12.

* kvutils: Move offset mutation methods to `VersionedOffset`.

* kvutils: Move the versioned offset construction into `VersionedOffset`.

The `VersionedOffsetBuilder` is still useful as it remembers the
version, meaning we only need to specify it once.
  • Loading branch information
SamirTalwar authored Oct 18, 2021
1 parent 81fde97 commit 8f94cff
Show file tree
Hide file tree
Showing 38 changed files with 475 additions and 473 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.daml.ledger.participant.state.kvutils.api.{
LedgerRecord,
}
import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter
import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw}
import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder}
import com.daml.ledger.participant.state.v2.Update
import com.daml.metrics.Metrics

Expand All @@ -38,14 +38,15 @@ object Main {
}
val importer = ProtobufBasedLedgerDataImporter(path)

val offsetBuilder = new VersionedOffsetBuilder(0)
val dataSource: Source[LedgerRecord, NotUsed] = Source
.fromIterator(() => importer.read().iterator)
.statefulMapConcat { () =>
val nextOffset = new AtomicLong(0)

{ case (_, writeSet) =>
writeSet.map { case (key, value) =>
val offset = OffsetBuilder.fromLong(nextOffset.getAndIncrement())
val offset = offsetBuilder.of(nextOffset.getAndIncrement())
val logEntryId = Raw.LogEntryId(key.bytes) // `key` is of an unknown type.
LedgerRecord(offset, logEntryId, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
readerWriter <- new InMemoryLedgerReaderWriter.Owner(
ledgerId = config.ledgerId,
participantId = participantConfig.participantId,
offsetVersion = 0,
keySerializationStrategy = DefaultStateKeySerializationStrategy,
metrics = metrics,
stateValueCache = caching.WeightedCache.from(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.stream.scaladsl.Source
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.kvutils.OffsetBuilder
import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import com.daml.ledger.participant.state.kvutils.api.{LedgerReader, LedgerRecord}
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.akkastreams.dispatcher.Dispatcher
Expand All @@ -17,14 +17,15 @@ import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
class InMemoryLedgerReader(
override val ledgerId: LedgerId,
dispatcher: Dispatcher[Index],
offsetBuilder: VersionedOffsetBuilder,
state: InMemoryState,
metrics: Metrics,
) extends LedgerReader {
override def events(startExclusive: Option[Offset]): Source[LedgerRecord, NotUsed] =
dispatcher
.startingAt(
startExclusive
.map(OffsetBuilder.highestIndex(_).toInt)
.map(offsetBuilder.highestIndex(_).toInt)
.getOrElse(StartIndex),
RangeSource((startExclusive, endInclusive) =>
Source.fromIterator(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.ledger.on.memory
import com.daml.api.util.TimeProvider
import com.daml.caching.Cache
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import com.daml.ledger.participant.state.kvutils.api._
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.StateKeySerializationStrategy
Expand All @@ -21,6 +22,7 @@ object InMemoryLedgerReaderWriter {
final class Owner(
ledgerId: LedgerId,
participantId: Ref.ParticipantId,
offsetVersion: Byte,
keySerializationStrategy: StateKeySerializationStrategy,
metrics: Metrics,
timeProvider: TimeProvider = InMemoryLedgerWriter.DefaultTimeProvider,
Expand All @@ -31,7 +33,8 @@ object InMemoryLedgerReaderWriter {
committerExecutionContext: ExecutionContext,
) extends ResourceOwner[KeyValueLedger] {
override def acquire()(implicit context: ResourceContext): Resource[KeyValueLedger] = {
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, state, metrics)
val offsetBuilder = new VersionedOffsetBuilder(offsetVersion)
val reader = new InMemoryLedgerReader(ledgerId, dispatcher, offsetBuilder, state, metrics)
for {
writer <- new InMemoryLedgerWriter.Owner(
participantId,
Expand All @@ -40,6 +43,7 @@ object InMemoryLedgerReaderWriter {
timeProvider,
stateValueCache,
dispatcher,
offsetBuilder,
state,
engine,
committerExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.on.memory

import com.daml.ledger.participant.state.kvutils.VersionedOffsetBuilder
import com.daml.ledger.validator.{
LedgerStateAccess,
LedgerStateOperations,
Expand All @@ -13,15 +14,23 @@ import com.daml.metrics.Metrics

import scala.concurrent.{ExecutionContext, Future}

final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics)
extends LedgerStateAccess[Index] {
final class InMemoryLedgerStateAccess(
offsetBuilder: VersionedOffsetBuilder,
state: InMemoryState,
metrics: Metrics,
) extends LedgerStateAccess[Index] {
override def inTransaction[T](
body: LedgerStateOperations[Index] => Future[T]
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[T] =
state.write { (log, state) =>
body(new TimedLedgerStateOperations(new InMemoryLedgerStateOperations(log, state), metrics))
body(
new TimedLedgerStateOperations(
new InMemoryLedgerStateOperations(offsetBuilder, log, state),
metrics,
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
package com.daml.ledger.on.memory

import com.daml.ledger.offset.Offset
import com.daml.ledger.on.memory.InMemoryLedgerStateOperations._
import com.daml.ledger.on.memory.InMemoryState.MutableLog
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw}
import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder}
import com.daml.ledger.validator.BatchingLedgerStateOperations
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

final class InMemoryLedgerStateOperations(
offsetBuilder: VersionedOffsetBuilder,
log: InMemoryState.MutableLog,
state: InMemoryState.MutableState,
) extends BatchingLedgerStateOperations[Index] {

override def readState(
keys: Iterable[Raw.StateKey]
)(implicit
Expand All @@ -39,16 +38,11 @@ final class InMemoryLedgerStateOperations(
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Index] =
Future.successful(appendEntry(log, LedgerRecord(_, key, value)))

}

object InMemoryLedgerStateOperations {

private[memory] def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = {
private def appendEntry(log: MutableLog, createEntry: Offset => LedgerRecord): Index = {
val entryAtIndex = log.size
val offset = OffsetBuilder.fromLong(entryAtIndex.toLong)
val offset = offsetBuilder.of(entryAtIndex.toLong)
val entry = createEntry(offset)
log += entry
entryAtIndex
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.daml.ledger.on.memory.InMemoryLedgerWriter._
import com.daml.ledger.participant.state.kvutils.api.{CommitMetadata, LedgerWriter}
import com.daml.ledger.participant.state.kvutils.export.LedgerDataExporter
import com.daml.ledger.participant.state.kvutils.store.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw}
import com.daml.ledger.participant.state.kvutils.{KeyValueCommitting, Raw, VersionedOffsetBuilder}
import com.daml.ledger.participant.state.v2.SubmissionResult
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.validator.caching.{CachingStateReader, ImmutablesOnlyCacheUpdatePolicy}
Expand All @@ -39,6 +39,7 @@ import scala.util.Success
final class InMemoryLedgerWriter private[memory] (
override val participantId: Ref.ParticipantId,
dispatcher: Dispatcher[Index],
offsetBuilder: VersionedOffsetBuilder,
now: () => Instant,
state: InMemoryState,
committer: Committer,
Expand All @@ -57,7 +58,7 @@ final class InMemoryLedgerWriter private[memory] (
correlationId,
envelope,
exportRecordTime = now(),
ledgerStateAccess = new InMemoryLedgerStateAccess(state, metrics),
ledgerStateAccess = new InMemoryLedgerStateAccess(offsetBuilder, state, metrics),
)(committerExecutionContext)
.andThen { case Success(SubmissionResult.Acknowledged) =>
dispatcher.signalNewHead(state.newHeadSinceLastWrite())
Expand Down Expand Up @@ -85,6 +86,7 @@ object InMemoryLedgerWriter {
timeProvider: TimeProvider = DefaultTimeProvider,
stateValueCache: StateValueCache = Cache.none,
dispatcher: Dispatcher[Index],
offsetBuilder: VersionedOffsetBuilder,
state: InMemoryState,
engine: Engine,
committerExecutionContext: ExecutionContext,
Expand All @@ -97,6 +99,7 @@ object InMemoryLedgerWriter {
} yield new InMemoryLedgerWriter(
participantId,
dispatcher,
offsetBuilder,
now,
state,
newCommitter(ledgerDataExporter),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,27 @@ import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics

import scala.concurrent.ExecutionContext
import scala.util.Random

class InMemoryLedgerReaderWriterIntegrationSpec
extends ParticipantStateIntegrationSpecBase(s"In-memory ledger/participant") {

override val isPersistent: Boolean = false

// Select a random offset version to ensure that this works with any possible value.
override val offsetVersion: Byte = {
val bytes = Array[Byte](Byte.MinValue)
while (bytes.head < 0) {
Random.nextBytes(bytes)
}
bytes.head
}

override def participantStateFactory(
ledgerId: LedgerId,
participantId: Ref.ParticipantId,
testId: String,
offsetVersion: Byte,
metrics: Metrics,
)(implicit loggingContext: LoggingContext): ResourceOwner[ParticipantState] =
for {
Expand All @@ -37,6 +48,7 @@ class InMemoryLedgerReaderWriterIntegrationSpec
readerWriter <- new InMemoryLedgerReaderWriter.Owner(
ledgerId = ledgerId,
participantId = participantId,
offsetVersion = offsetVersion,
keySerializationStrategy = StateKeySerializationStrategy.createDefault(),
metrics = metrics,
dispatcher = dispatcher,
Expand All @@ -45,5 +57,4 @@ class InMemoryLedgerReaderWriterIntegrationSpec
committerExecutionContext = committerExecutionContext,
)
} yield new KeyValueParticipantState(readerWriter, readerWriter, metrics)

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.time.Instant

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.participant.state.kvutils.{Raw, VersionedOffsetBuilder}
import com.daml.ledger.participant.state.kvutils.api.CommitMetadata
import com.daml.ledger.participant.state.v2.SubmissionResult
import com.daml.ledger.validator.LedgerStateAccess
Expand Down Expand Up @@ -52,6 +52,7 @@ class InMemoryLedgerWriterSpec
val instance = new InMemoryLedgerWriter(
participantId = Ref.ParticipantId.assertFromString("participant ID"),
dispatcher = mockDispatcher,
offsetBuilder = new VersionedOffsetBuilder(0),
now = () => Instant.EPOCH,
state = InMemoryState.empty,
committer = mockCommitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
engine = engine,
jdbcUrl = jdbcUrl,
resetOnStartup = false,
offsetVersion = 0,
logEntryIdAllocator = RandomLogEntryIdAllocator,
stateValueCache = caching.WeightedCache.from(
configuration = config.stateValueCache,
Expand Down
Loading

0 comments on commit 8f94cff

Please sign in to comment.