Skip to content

Commit

Permalink
Switch from InputStream to Byte Array at the binary content JDBC tran…
Browse files Browse the repository at this point in the history
…sport (#11072)

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
nmarton-da authored Sep 29, 2021
1 parent 6ae3afa commit 735c309
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream
import java.io.ByteArrayInputStream

import com.daml.platform.store.appendonlydao.events
import com.daml.platform.store.dao.events.ContractStateEvent
Expand Down Expand Up @@ -69,7 +69,7 @@ object ContractStateEventsReader {
private def getCachedOrDecompressContract(
contractId: ContractId,
templateId: events.Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
maybeCreateArgumentCompression: Option[Int],
lfValueTranslation: LfValueTranslation,
): Contract = {
Expand All @@ -88,7 +88,7 @@ object ContractStateEventsReader {

private def decompressKey(
templateId: events.Identifier,
maybeCreateKeyValue: Option[InputStream],
maybeCreateKeyValue: Option[Array[Byte]],
maybeCreateKeyValueCompression: Option[Int],
): Option[Key] =
for {
Expand All @@ -99,8 +99,8 @@ object ContractStateEventsReader {
keyValue = decompressAndDeserialize(createKeyValueCompression, createKeyValue)
} yield Key.assertBuild(templateId, keyValue.value)

private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
ValueSerializer.deserializeValue(algorithm.decompress(value))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value)))

case class CreateMissingError(field: String) extends NoStackTrace {
override def getMessage: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream
import java.io.ByteArrayInputStream
import java.time.Instant

import com.codahale.metrics.Timer
Expand Down Expand Up @@ -174,15 +174,15 @@ private[appendonlydao] object ContractsReader {
private def toContract(
contractId: ContractId,
templateId: String,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
decompressionTimer: Timer,
deserializationTimer: Timer,
): Contract = {
val decompressed =
Timed.value(
timer = decompressionTimer,
value = createArgumentCompression.decompress(createArgument),
value = createArgumentCompression.decompress(new ByteArrayInputStream(createArgument)),
)
val deserialized =
Timed.value(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream
import java.io.ByteArrayInputStream

import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent}
import com.daml.ledger.api.v1.value.{
Expand Down Expand Up @@ -255,8 +255,8 @@ final class LfValueTranslation(
private def eventKey(s: String) =
LfValueTranslationCache.EventCache.Key(EventId.assertFromString(s))

private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
ValueSerializer.deserializeValue(algorithm.decompress(value))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value)))

def enricher: ValueEnricher = {
// Note: LfValueTranslation is used by JdbcLedgerDao for both serialization and deserialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream

import com.daml.ledger.api.v1.event.{
ArchivedEvent => PbArchivedEvent,
CreatedEvent => PbCreatedEvent,
Expand Down Expand Up @@ -51,9 +49,9 @@ object Raw {
*/
sealed abstract class Created[E](
val partial: PbCreatedEvent,
val createArgument: InputStream,
val createArgument: Array[Byte],
val createArgumentCompression: Compression.Algorithm,
val createKeyValue: Option[InputStream],
val createKeyValue: Option[Array[Byte]],
val createKeyValueCompression: Compression.Algorithm,
) extends Raw[E] {
protected def wrapInEvent(event: PbCreatedEvent): E
Expand Down Expand Up @@ -97,9 +95,9 @@ object Raw {

final class Created private[Raw] (
raw: PbCreatedEvent,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Compression.Algorithm,
) extends Raw.Created[PbFlatEvent](
raw,
Expand All @@ -118,12 +116,12 @@ object Raw {
eventId: String,
contractId: String,
templateId: Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Option[Int],
createSignatories: ArraySeq[String],
createObservers: ArraySeq[String],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Option[Int],
eventWitnesses: ArraySeq[String],
): Raw.FlatEvent.Created =
Expand Down Expand Up @@ -185,9 +183,9 @@ object Raw {

final class Created(
raw: PbCreatedEvent,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Compression.Algorithm,
) extends Raw.Created[PbTreeEvent](
raw,
Expand All @@ -206,12 +204,12 @@ object Raw {
eventId: String,
contractId: String,
templateId: Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Option[Int],
createSignatories: ArraySeq[String],
createObservers: ArraySeq[String],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Option[Int],
eventWitnesses: ArraySeq[String],
): Raw.TreeEvent.Created =
Expand All @@ -234,9 +232,9 @@ object Raw {

final class Exercised(
val partial: PbExercisedEvent,
val exerciseArgument: InputStream,
val exerciseArgument: Array[Byte],
val exerciseArgumentCompression: Compression.Algorithm,
val exerciseResult: Option[InputStream],
val exerciseResult: Option[Array[Byte]],
val exerciseResultCompression: Compression.Algorithm,
) extends TreeEvent {
override def applyDeserialization(
Expand All @@ -259,9 +257,9 @@ object Raw {
templateId: Identifier,
exerciseConsuming: Boolean,
exerciseChoice: String,
exerciseArgument: InputStream,
exerciseArgument: Array[Byte],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[InputStream],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: ArraySeq[String],
exerciseChildEventIds: ArraySeq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream
import java.io.ByteArrayInputStream

import com.daml.lf.data.Ref
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
Expand Down Expand Up @@ -50,14 +50,14 @@ object TransactionLogUpdatesReader {
Compression.Algorithm
.assertLookup(raw.exerciseArgumentCompression)
.decompress(
raw.exerciseArgument.mandatory("exercise_argument")
new ByteArrayInputStream(raw.exerciseArgument.mandatory("exercise_argument"))
)
),
exerciseResult = raw.exerciseResult.map { inputStream =>
exerciseResult = raw.exerciseResult.map { byteArray =>
ValueSerializer.deserializeValue(
Compression.Algorithm
.assertLookup(raw.exerciseResultCompression)
.decompress(inputStream)
.decompress(new ByteArrayInputStream(byteArray))
)
},
consuming = raw.eventKind == EventKind.ConsumingExercise,
Expand Down Expand Up @@ -102,8 +102,8 @@ object TransactionLogUpdatesReader {
throw InvalidEventKind(unknownKind)
}

private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
ValueSerializer.deserializeValue(algorithm.decompress(value))
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value)))

final case class FieldMissingError(field: String) extends RuntimeException {
override def getMessage: String = s"Missing mandatory field $field"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.daml.platform.store.backend

import java.io.InputStream
import java.sql.Connection
import java.time.Instant

Expand Down Expand Up @@ -347,26 +346,26 @@ object StorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Instant],
)

case class RawContract(
templateId: String,
createArgument: InputStream,
createArgumentCompression: Option[Int],
class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)

case class RawContractStateEvent(
eventKind: Int,
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Instant],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
Expand All @@ -386,17 +385,17 @@ object StorageBackend {
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[InputStream],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[InputStream],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

package com.daml.platform.store.backend.common

import java.io.InputStream
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
Expand Down Expand Up @@ -105,8 +104,8 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

private val rejectionStatusCodeColumn: RowParser[Int] = int("rejection_status_code")
private val rejectionStatusMessageColumn: RowParser[String] = str("rejection_status_message")
private val rejectionStatusDetailsColumn: RowParser[Option[InputStream]] =
binaryStream("rejection_status_details").?
private val rejectionStatusDetailsColumn: RowParser[Option[Array[Byte]]] =
byteArray("rejection_status_details").?

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~
Expand Down Expand Up @@ -140,7 +139,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private def buildStatusProto(
rejectionStatusCode: Int,
rejectionStatusMessage: String,
rejectionStatusDetails: Option[InputStream],
rejectionStatusDetails: Option[Array[Byte]],
): StatusProto =
StatusProto.of(
rejectionStatusCode,
Expand All @@ -149,10 +148,11 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
)

private def parseRejectionStatusDetails(
rejectionStatusDetails: Option[InputStream]
rejectionStatusDetails: Option[Array[Byte]]
): Seq[any.Any] =
rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.map(StatusDetails.parseFrom)
.map(_.details)
.getOrElse(Seq.empty)

def pruneCompletions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package com.daml.platform.store.backend.common

import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, int, long, str}

import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{
Expand Down Expand Up @@ -138,7 +139,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
private val fullDetailsContractRowParser: RowParser[StorageBackend.RawContractState] =
(str("template_id").?
~ flatEventWitnessesColumn("flat_event_witnesses")
~ binaryStream("create_argument").?
~ byteArray("create_argument").?
~ int("create_argument_compression").?
~ int("event_kind") ~ instantFromMicros("ledger_effective_time").?)
.map(SqlParser.flatten)
Expand Down Expand Up @@ -171,9 +172,9 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
contractId("contract_id") ~
identifier("template_id").? ~
instantFromMicros("ledger_effective_time").? ~
binaryStream("create_key_value").? ~
byteArray("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~
byteArray("create_argument").? ~
int("create_argument_compression").? ~
long("event_sequential_id") ~
flatEventWitnessesColumn("flat_event_witnesses") ~
Expand Down Expand Up @@ -221,10 +222,12 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {

private val contractRowParser: RowParser[StorageBackend.RawContract] =
(str("template_id")
~ binaryStream("create_argument")
~ byteArray("create_argument")
~ int("create_argument_compression").?)
.map(SqlParser.flatten)
.map(StorageBackend.RawContract.tupled)
.map { case (templateId, createArgument, createArgumentCompression) =>
new StorageBackend.RawContract(templateId, createArgument, createArgumentCompression)
}

protected def activeContractSqlLiteral(
contractId: ContractId,
Expand Down
Loading

0 comments on commit 735c309

Please sign in to comment.