Skip to content

Commit

Permalink
Switch from InputStream to Byte Array at the binary content JDBC tran…
Browse files Browse the repository at this point in the history
…sport

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
nmarton-da committed Sep 29, 2021
1 parent 6d8cf70 commit 2fe6d17
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[migration] class V3__Recompute_Key_Hash extends BaseJavaMigration {
qualifiedName = Ref.QualifiedName.assertFromString(rows.getString("template_name")),
)
val key = ValueSerializer
.deserializeValue(rows.getBinaryStream("contract_key"))
.deserializeValue(rows.getBytes("contract_key"))
.assertNoCid(coid => s"Found contract ID $coid in contract key")

hasNext = rows.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream

import com.daml.platform.store.appendonlydao.events
import com.daml.platform.store.dao.events.ContractStateEvent
import com.daml.platform.store.serialization.{Compression, ValueSerializer}
Expand Down Expand Up @@ -69,7 +67,7 @@ object ContractStateEventsReader {
private def getCachedOrDecompressContract(
contractId: ContractId,
templateId: events.Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
maybeCreateArgumentCompression: Option[Int],
lfValueTranslation: LfValueTranslation,
): Contract = {
Expand All @@ -88,7 +86,7 @@ object ContractStateEventsReader {

private def decompressKey(
templateId: events.Identifier,
maybeCreateKeyValue: Option[InputStream],
maybeCreateKeyValue: Option[Array[Byte]],
maybeCreateKeyValueCompression: Option[Int],
): Option[Key] =
for {
Expand All @@ -99,7 +97,7 @@ object ContractStateEventsReader {
keyValue = decompressAndDeserialize(createKeyValueCompression, createKeyValue)
} yield Key.assertBuild(templateId, keyValue.value)

private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(value))

case class CreateMissingError(field: String) extends NoStackTrace {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream
import java.time.Instant

import com.codahale.metrics.Timer
Expand Down Expand Up @@ -174,7 +173,7 @@ private[appendonlydao] object ContractsReader {
private def toContract(
contractId: ContractId,
templateId: String,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
decompressionTimer: Timer,
deserializationTimer: Timer,
Expand All @@ -188,7 +187,7 @@ private[appendonlydao] object ContractsReader {
Timed.value(
timer = deserializationTimer,
value = ValueSerializer.deserializeValue(
stream = decompressed,
byteArray = decompressed,
errorContext = s"Failed to deserialize create argument for contract ${contractId.coid}",
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream

import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent}
import com.daml.ledger.api.v1.value.{
Identifier => ApiIdentifier,
Expand Down Expand Up @@ -255,7 +253,7 @@ final class LfValueTranslation(
private def eventKey(s: String) =
LfValueTranslationCache.EventCache.Key(EventId.assertFromString(s))

private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(value))

def enricher: ValueEnricher = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream

import com.daml.ledger.api.v1.event.{
ArchivedEvent => PbArchivedEvent,
CreatedEvent => PbCreatedEvent,
Expand Down Expand Up @@ -51,9 +49,9 @@ object Raw {
*/
sealed abstract class Created[E](
val partial: PbCreatedEvent,
val createArgument: InputStream,
val createArgument: Array[Byte],
val createArgumentCompression: Compression.Algorithm,
val createKeyValue: Option[InputStream],
val createKeyValue: Option[Array[Byte]],
val createKeyValueCompression: Compression.Algorithm,
) extends Raw[E] {
protected def wrapInEvent(event: PbCreatedEvent): E
Expand Down Expand Up @@ -97,9 +95,9 @@ object Raw {

final class Created private[Raw] (
raw: PbCreatedEvent,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Compression.Algorithm,
) extends Raw.Created[PbFlatEvent](
raw,
Expand All @@ -118,12 +116,12 @@ object Raw {
eventId: String,
contractId: String,
templateId: Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Option[Int],
createSignatories: ArraySeq[String],
createObservers: ArraySeq[String],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Option[Int],
eventWitnesses: ArraySeq[String],
): Raw.FlatEvent.Created =
Expand Down Expand Up @@ -185,9 +183,9 @@ object Raw {

final class Created(
raw: PbCreatedEvent,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Compression.Algorithm,
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Compression.Algorithm,
) extends Raw.Created[PbTreeEvent](
raw,
Expand All @@ -206,12 +204,12 @@ object Raw {
eventId: String,
contractId: String,
templateId: Identifier,
createArgument: InputStream,
createArgument: Array[Byte],
createArgumentCompression: Option[Int],
createSignatories: ArraySeq[String],
createObservers: ArraySeq[String],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyValueCompression: Option[Int],
eventWitnesses: ArraySeq[String],
): Raw.TreeEvent.Created =
Expand All @@ -234,9 +232,9 @@ object Raw {

final class Exercised(
val partial: PbExercisedEvent,
val exerciseArgument: InputStream,
val exerciseArgument: Array[Byte],
val exerciseArgumentCompression: Compression.Algorithm,
val exerciseResult: Option[InputStream],
val exerciseResult: Option[Array[Byte]],
val exerciseResultCompression: Compression.Algorithm,
) extends TreeEvent {
override def applyDeserialization(
Expand All @@ -259,9 +257,9 @@ object Raw {
templateId: Identifier,
exerciseConsuming: Boolean,
exerciseChoice: String,
exerciseArgument: InputStream,
exerciseArgument: Array[Byte],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[InputStream],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: ArraySeq[String],
exerciseChildEventIds: ArraySeq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.platform.store.appendonlydao.events

import java.io.InputStream

import com.daml.lf.data.Ref
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.interfaces.TransactionLogUpdate
Expand Down Expand Up @@ -53,11 +51,11 @@ object TransactionLogUpdatesReader {
raw.exerciseArgument.mandatory("exercise_argument")
)
),
exerciseResult = raw.exerciseResult.map { inputStream =>
exerciseResult = raw.exerciseResult.map { byteArray =>
ValueSerializer.deserializeValue(
Compression.Algorithm
.assertLookup(raw.exerciseResultCompression)
.decompress(inputStream)
.decompress(byteArray)
)
},
consuming = raw.eventKind == EventKind.ConsumingExercise,
Expand Down Expand Up @@ -102,7 +100,7 @@ object TransactionLogUpdatesReader {
throw InvalidEventKind(unknownKind)
}

private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) =
private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) =
ValueSerializer.deserializeValue(algorithm.decompress(value))

final case class FieldMissingError(field: String) extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.daml.platform.store.backend

import java.io.InputStream
import java.sql.Connection
import java.time.Instant

Expand Down Expand Up @@ -347,26 +346,26 @@ object StorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Instant],
)

case class RawContract(
templateId: String,
createArgument: InputStream,
createArgumentCompression: Option[Int],
class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)

case class RawContractStateEvent(
eventKind: Int,
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Instant],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
Expand All @@ -386,17 +385,17 @@ object StorageBackend {
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[InputStream],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[InputStream],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[InputStream],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[InputStream],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

package com.daml.platform.store.backend.common

import java.io.InputStream
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{binaryStream, int, long, str}
import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
import com.daml.ledger.offset.Offset
Expand Down Expand Up @@ -105,8 +104,8 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {

private val rejectionStatusCodeColumn: RowParser[Int] = int("rejection_status_code")
private val rejectionStatusMessageColumn: RowParser[String] = str("rejection_status_message")
private val rejectionStatusDetailsColumn: RowParser[Option[InputStream]] =
binaryStream("rejection_status_details").?
private val rejectionStatusDetailsColumn: RowParser[Option[Array[Byte]]] =
byteArray("rejection_status_details").?

private val rejectedCommandParser: RowParser[CompletionStreamResponse] =
sharedColumns ~
Expand Down Expand Up @@ -140,7 +139,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
private def buildStatusProto(
rejectionStatusCode: Int,
rejectionStatusMessage: String,
rejectionStatusDetails: Option[InputStream],
rejectionStatusDetails: Option[Array[Byte]],
): StatusProto =
StatusProto.of(
rejectionStatusCode,
Expand All @@ -149,10 +148,11 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend {
)

private def parseRejectionStatusDetails(
rejectionStatusDetails: Option[InputStream]
rejectionStatusDetails: Option[Array[Byte]]
): Seq[any.Any] =
rejectionStatusDetails
.map(stream => StatusDetails.parseFrom(stream).details)
.map(StatusDetails.parseFrom)
.map(_.details)
.getOrElse(Seq.empty)

def pruneCompletions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package com.daml.platform.store.backend.common

import java.sql.Connection
import java.time.Instant
import anorm.SqlParser.{binaryStream, int, long, str}

import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~}
import com.daml.lf.data.Ref
import com.daml.platform.store.Conversions.{
Expand Down Expand Up @@ -138,7 +139,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
private val fullDetailsContractRowParser: RowParser[StorageBackend.RawContractState] =
(str("template_id").?
~ flatEventWitnessesColumn("flat_event_witnesses")
~ binaryStream("create_argument").?
~ byteArray("create_argument").?
~ int("create_argument_compression").?
~ int("event_kind") ~ instantFromMicros("ledger_effective_time").?)
.map(SqlParser.flatten)
Expand Down Expand Up @@ -171,9 +172,9 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {
contractId("contract_id") ~
identifier("template_id").? ~
instantFromMicros("ledger_effective_time").? ~
binaryStream("create_key_value").? ~
byteArray("create_key_value").? ~
int("create_key_value_compression").? ~
binaryStream("create_argument").? ~
byteArray("create_argument").? ~
int("create_argument_compression").? ~
long("event_sequential_id") ~
flatEventWitnessesColumn("flat_event_witnesses") ~
Expand Down Expand Up @@ -221,10 +222,12 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend {

private val contractRowParser: RowParser[StorageBackend.RawContract] =
(str("template_id")
~ binaryStream("create_argument")
~ byteArray("create_argument")
~ int("create_argument_compression").?)
.map(SqlParser.flatten)
.map(StorageBackend.RawContract.tupled)
.map { case (templateId, createArgument, createArgumentCompression) =>
new StorageBackend.RawContract(templateId, createArgument, createArgumentCompression)
}

protected def activeContractSqlLiteral(
contractId: ContractId,
Expand Down
Loading

0 comments on commit 2fe6d17

Please sign in to comment.