From 735c3090a3513dbb3f2c33e39797f1cecd013771 Mon Sep 17 00:00:00 2001 From: Marton Nagy Date: Wed, 29 Sep 2021 21:44:24 +0200 Subject: [PATCH] Switch from InputStream to Byte Array at the binary content JDBC transport (#11072) CHANGELOG_BEGIN CHANGELOG_END --- .../events/ContractStateEventsReader.scala | 10 +++---- .../events/ContractsReader.scala | 6 ++-- .../events/LfValueTranslation.scala | 6 ++-- .../store/appendonlydao/events/Raw.scala | 30 +++++++++---------- .../events/TransactionLogUpdatesReader.scala | 12 ++++---- .../store/backend/StorageBackend.scala | 23 +++++++------- .../CompletionStorageBackendTemplate.scala | 14 ++++----- .../ContractStorageBackendTemplate.scala | 15 ++++++---- .../common/EventStorageBackendTemplate.scala | 25 ++++++++-------- 9 files changed, 70 insertions(+), 71 deletions(-) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala index c582e92cc053..9068b109a534 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala @@ -3,7 +3,7 @@ package com.daml.platform.store.appendonlydao.events -import java.io.InputStream +import java.io.ByteArrayInputStream import com.daml.platform.store.appendonlydao.events import com.daml.platform.store.dao.events.ContractStateEvent @@ -69,7 +69,7 @@ object ContractStateEventsReader { private def getCachedOrDecompressContract( contractId: ContractId, templateId: events.Identifier, - createArgument: InputStream, + createArgument: Array[Byte], maybeCreateArgumentCompression: Option[Int], lfValueTranslation: LfValueTranslation, ): Contract = { @@ -88,7 +88,7 @@ object ContractStateEventsReader { private def decompressKey( templateId: events.Identifier, - maybeCreateKeyValue: Option[InputStream], + maybeCreateKeyValue: Option[Array[Byte]], maybeCreateKeyValueCompression: Option[Int], ): Option[Key] = for { @@ -99,8 +99,8 @@ object ContractStateEventsReader { keyValue = decompressAndDeserialize(createKeyValueCompression, createKeyValue) } yield Key.assertBuild(templateId, keyValue.value) - private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) = - ValueSerializer.deserializeValue(algorithm.decompress(value)) + private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) = + ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value))) case class CreateMissingError(field: String) extends NoStackTrace { override def getMessage: String = diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala index 42bfb08eadd5..608d95a8596e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala @@ -3,7 +3,7 @@ package com.daml.platform.store.appendonlydao.events -import java.io.InputStream +import java.io.ByteArrayInputStream import java.time.Instant import com.codahale.metrics.Timer @@ -174,7 +174,7 @@ private[appendonlydao] object ContractsReader { private def toContract( contractId: ContractId, templateId: String, - createArgument: InputStream, + createArgument: Array[Byte], createArgumentCompression: Compression.Algorithm, decompressionTimer: Timer, deserializationTimer: Timer, @@ -182,7 +182,7 @@ private[appendonlydao] object ContractsReader { val decompressed = Timed.value( timer = decompressionTimer, - value = createArgumentCompression.decompress(createArgument), + value = createArgumentCompression.decompress(new ByteArrayInputStream(createArgument)), ) val deserialized = Timed.value( diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/LfValueTranslation.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/LfValueTranslation.scala index ae0c092e12b3..9ef884bebcaa 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/LfValueTranslation.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/LfValueTranslation.scala @@ -3,7 +3,7 @@ package com.daml.platform.store.appendonlydao.events -import java.io.InputStream +import java.io.ByteArrayInputStream import com.daml.ledger.api.v1.event.{CreatedEvent, ExercisedEvent} import com.daml.ledger.api.v1.value.{ @@ -255,8 +255,8 @@ final class LfValueTranslation( private def eventKey(s: String) = LfValueTranslationCache.EventCache.Key(EventId.assertFromString(s)) - private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) = - ValueSerializer.deserializeValue(algorithm.decompress(value)) + private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) = + ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value))) def enricher: ValueEnricher = { // Note: LfValueTranslation is used by JdbcLedgerDao for both serialization and deserialization. diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/Raw.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/Raw.scala index eea9ca064392..5d1e052a3595 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/Raw.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/Raw.scala @@ -3,8 +3,6 @@ package com.daml.platform.store.appendonlydao.events -import java.io.InputStream - import com.daml.ledger.api.v1.event.{ ArchivedEvent => PbArchivedEvent, CreatedEvent => PbCreatedEvent, @@ -51,9 +49,9 @@ object Raw { */ sealed abstract class Created[E]( val partial: PbCreatedEvent, - val createArgument: InputStream, + val createArgument: Array[Byte], val createArgumentCompression: Compression.Algorithm, - val createKeyValue: Option[InputStream], + val createKeyValue: Option[Array[Byte]], val createKeyValueCompression: Compression.Algorithm, ) extends Raw[E] { protected def wrapInEvent(event: PbCreatedEvent): E @@ -97,9 +95,9 @@ object Raw { final class Created private[Raw] ( raw: PbCreatedEvent, - createArgument: InputStream, + createArgument: Array[Byte], createArgumentCompression: Compression.Algorithm, - createKeyValue: Option[InputStream], + createKeyValue: Option[Array[Byte]], createKeyValueCompression: Compression.Algorithm, ) extends Raw.Created[PbFlatEvent]( raw, @@ -118,12 +116,12 @@ object Raw { eventId: String, contractId: String, templateId: Identifier, - createArgument: InputStream, + createArgument: Array[Byte], createArgumentCompression: Option[Int], createSignatories: ArraySeq[String], createObservers: ArraySeq[String], createAgreementText: Option[String], - createKeyValue: Option[InputStream], + createKeyValue: Option[Array[Byte]], createKeyValueCompression: Option[Int], eventWitnesses: ArraySeq[String], ): Raw.FlatEvent.Created = @@ -185,9 +183,9 @@ object Raw { final class Created( raw: PbCreatedEvent, - createArgument: InputStream, + createArgument: Array[Byte], createArgumentCompression: Compression.Algorithm, - createKeyValue: Option[InputStream], + createKeyValue: Option[Array[Byte]], createKeyValueCompression: Compression.Algorithm, ) extends Raw.Created[PbTreeEvent]( raw, @@ -206,12 +204,12 @@ object Raw { eventId: String, contractId: String, templateId: Identifier, - createArgument: InputStream, + createArgument: Array[Byte], createArgumentCompression: Option[Int], createSignatories: ArraySeq[String], createObservers: ArraySeq[String], createAgreementText: Option[String], - createKeyValue: Option[InputStream], + createKeyValue: Option[Array[Byte]], createKeyValueCompression: Option[Int], eventWitnesses: ArraySeq[String], ): Raw.TreeEvent.Created = @@ -234,9 +232,9 @@ object Raw { final class Exercised( val partial: PbExercisedEvent, - val exerciseArgument: InputStream, + val exerciseArgument: Array[Byte], val exerciseArgumentCompression: Compression.Algorithm, - val exerciseResult: Option[InputStream], + val exerciseResult: Option[Array[Byte]], val exerciseResultCompression: Compression.Algorithm, ) extends TreeEvent { override def applyDeserialization( @@ -259,9 +257,9 @@ object Raw { templateId: Identifier, exerciseConsuming: Boolean, exerciseChoice: String, - exerciseArgument: InputStream, + exerciseArgument: Array[Byte], exerciseArgumentCompression: Option[Int], - exerciseResult: Option[InputStream], + exerciseResult: Option[Array[Byte]], exerciseResultCompression: Option[Int], exerciseActors: ArraySeq[String], exerciseChildEventIds: ArraySeq[String], diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala index 64800392a226..71326e4690f2 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala @@ -3,7 +3,7 @@ package com.daml.platform.store.appendonlydao.events -import java.io.InputStream +import java.io.ByteArrayInputStream import com.daml.lf.data.Ref import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent @@ -50,14 +50,14 @@ object TransactionLogUpdatesReader { Compression.Algorithm .assertLookup(raw.exerciseArgumentCompression) .decompress( - raw.exerciseArgument.mandatory("exercise_argument") + new ByteArrayInputStream(raw.exerciseArgument.mandatory("exercise_argument")) ) ), - exerciseResult = raw.exerciseResult.map { inputStream => + exerciseResult = raw.exerciseResult.map { byteArray => ValueSerializer.deserializeValue( Compression.Algorithm .assertLookup(raw.exerciseResultCompression) - .decompress(inputStream) + .decompress(new ByteArrayInputStream(byteArray)) ) }, consuming = raw.eventKind == EventKind.ConsumingExercise, @@ -102,8 +102,8 @@ object TransactionLogUpdatesReader { throw InvalidEventKind(unknownKind) } - private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: InputStream) = - ValueSerializer.deserializeValue(algorithm.decompress(value)) + private def decompressAndDeserialize(algorithm: Compression.Algorithm, value: Array[Byte]) = + ValueSerializer.deserializeValue(algorithm.decompress(new ByteArrayInputStream(value))) final case class FieldMissingError(field: String) extends RuntimeException { override def getMessage: String = s"Missing mandatory field $field" diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 76197c7d7c94..b51852753977 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -3,7 +3,6 @@ package com.daml.platform.store.backend -import java.io.InputStream import java.sql.Connection import java.time.Instant @@ -347,16 +346,16 @@ object StorageBackend { case class RawContractState( templateId: Option[String], flatEventWitnesses: Set[Ref.Party], - createArgument: Option[InputStream], + createArgument: Option[Array[Byte]], createArgumentCompression: Option[Int], eventKind: Int, ledgerEffectiveTime: Option[Instant], ) - case class RawContract( - templateId: String, - createArgument: InputStream, - createArgumentCompression: Option[Int], + class RawContract( + val templateId: String, + val createArgument: Array[Byte], + val createArgumentCompression: Option[Int], ) case class RawContractStateEvent( @@ -364,9 +363,9 @@ object StorageBackend { contractId: ContractId, templateId: Option[Ref.Identifier], ledgerEffectiveTime: Option[Instant], - createKeyValue: Option[InputStream], + createKeyValue: Option[Array[Byte]], createKeyCompression: Option[Int], - createArgument: Option[InputStream], + createArgument: Option[Array[Byte]], createArgumentCompression: Option[Int], flatEventWitnesses: Set[Ref.Party], eventSequentialId: Long, @@ -386,17 +385,17 @@ object StorageBackend { createSignatories: Option[Array[String]], createObservers: Option[Array[String]], createAgreementText: Option[String], - createKeyValue: Option[InputStream], + createKeyValue: Option[Array[Byte]], createKeyCompression: Option[Int], - createArgument: Option[InputStream], + createArgument: Option[Array[Byte]], createArgumentCompression: Option[Int], treeEventWitnesses: Set[String], flatEventWitnesses: Set[String], submitters: Set[String], exerciseChoice: Option[String], - exerciseArgument: Option[InputStream], + exerciseArgument: Option[Array[Byte]], exerciseArgumentCompression: Option[Int], - exerciseResult: Option[InputStream], + exerciseResult: Option[Array[Byte]], exerciseResultCompression: Option[Int], exerciseActors: Option[Array[String]], exerciseChildEventIds: Option[Array[String]], diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala index 2458fcd779b5..a3749c698390 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CompletionStorageBackendTemplate.scala @@ -3,11 +3,10 @@ package com.daml.platform.store.backend.common -import java.io.InputStream import java.sql.Connection import java.time.Instant -import anorm.SqlParser.{binaryStream, int, long, str} +import anorm.SqlParser.{byteArray, int, long, str} import anorm.{Row, RowParser, SimpleSql, ~} import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.offset.Offset @@ -105,8 +104,8 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { private val rejectionStatusCodeColumn: RowParser[Int] = int("rejection_status_code") private val rejectionStatusMessageColumn: RowParser[String] = str("rejection_status_message") - private val rejectionStatusDetailsColumn: RowParser[Option[InputStream]] = - binaryStream("rejection_status_details").? + private val rejectionStatusDetailsColumn: RowParser[Option[Array[Byte]]] = + byteArray("rejection_status_details").? private val rejectedCommandParser: RowParser[CompletionStreamResponse] = sharedColumns ~ @@ -140,7 +139,7 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { private def buildStatusProto( rejectionStatusCode: Int, rejectionStatusMessage: String, - rejectionStatusDetails: Option[InputStream], + rejectionStatusDetails: Option[Array[Byte]], ): StatusProto = StatusProto.of( rejectionStatusCode, @@ -149,10 +148,11 @@ trait CompletionStorageBackendTemplate extends CompletionStorageBackend { ) private def parseRejectionStatusDetails( - rejectionStatusDetails: Option[InputStream] + rejectionStatusDetails: Option[Array[Byte]] ): Seq[any.Any] = rejectionStatusDetails - .map(stream => StatusDetails.parseFrom(stream).details) + .map(StatusDetails.parseFrom) + .map(_.details) .getOrElse(Seq.empty) def pruneCompletions( diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala index b6466604ff2c..1a89ab263d5e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala @@ -5,7 +5,8 @@ package com.daml.platform.store.backend.common import java.sql.Connection import java.time.Instant -import anorm.SqlParser.{binaryStream, int, long, str} + +import anorm.SqlParser.{byteArray, int, long, str} import anorm.{ResultSetParser, Row, RowParser, SimpleSql, SqlParser, ~} import com.daml.lf.data.Ref import com.daml.platform.store.Conversions.{ @@ -138,7 +139,7 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend { private val fullDetailsContractRowParser: RowParser[StorageBackend.RawContractState] = (str("template_id").? ~ flatEventWitnessesColumn("flat_event_witnesses") - ~ binaryStream("create_argument").? + ~ byteArray("create_argument").? ~ int("create_argument_compression").? ~ int("event_kind") ~ instantFromMicros("ledger_effective_time").?) .map(SqlParser.flatten) @@ -171,9 +172,9 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend { contractId("contract_id") ~ identifier("template_id").? ~ instantFromMicros("ledger_effective_time").? ~ - binaryStream("create_key_value").? ~ + byteArray("create_key_value").? ~ int("create_key_value_compression").? ~ - binaryStream("create_argument").? ~ + byteArray("create_argument").? ~ int("create_argument_compression").? ~ long("event_sequential_id") ~ flatEventWitnessesColumn("flat_event_witnesses") ~ @@ -221,10 +222,12 @@ trait ContractStorageBackendTemplate extends ContractStorageBackend { private val contractRowParser: RowParser[StorageBackend.RawContract] = (str("template_id") - ~ binaryStream("create_argument") + ~ byteArray("create_argument") ~ int("create_argument_compression").?) .map(SqlParser.flatten) - .map(StorageBackend.RawContract.tupled) + .map { case (templateId, createArgument, createArgumentCompression) => + new StorageBackend.RawContract(templateId, createArgument, createArgumentCompression) + } protected def activeContractSqlLiteral( contractId: ContractId, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala index 9f32c4742e87..1c7ae079e680 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala @@ -3,10 +3,9 @@ package com.daml.platform.store.backend.common -import java.io.InputStream import java.sql.Connection import java.time.Instant -import anorm.SqlParser.{array, binaryStream, bool, int, long, str} +import anorm.SqlParser.{array, bool, byteArray, int, long, str} import anorm.{Row, RowParser, SimpleSql, ~} import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref @@ -78,21 +77,21 @@ trait EventStorageBackendTemplate extends EventStorageBackend { array[String]("event_witnesses") private type CreatedEventRow = - SharedRow ~ InputStream ~ Option[Int] ~ Array[String] ~ Array[String] ~ Option[String] ~ - Option[InputStream] ~ Option[Int] + SharedRow ~ Array[Byte] ~ Option[Int] ~ Array[String] ~ Array[String] ~ Option[String] ~ + Option[Array[Byte]] ~ Option[Int] private val createdEventRow: RowParser[CreatedEventRow] = sharedRow ~ - binaryStream("create_argument") ~ + byteArray("create_argument") ~ int("create_argument_compression").? ~ array[String]("create_signatories") ~ array[String]("create_observers") ~ str("create_agreement_text").? ~ - binaryStream("create_key_value").? ~ + byteArray("create_key_value").? ~ int("create_key_value_compression").? private type ExercisedEventRow = - SharedRow ~ Boolean ~ String ~ InputStream ~ Option[Int] ~ Option[InputStream] ~ Option[Int] ~ + SharedRow ~ Boolean ~ String ~ Array[Byte] ~ Option[Int] ~ Option[Array[Byte]] ~ Option[Int] ~ Array[String] ~ Array[String] private val exercisedEventRow: RowParser[ExercisedEventRow] = { @@ -100,9 +99,9 @@ trait EventStorageBackendTemplate extends EventStorageBackend { sharedRow ~ bool("exercise_consuming") ~ str("exercise_choice") ~ - binaryStream("exercise_argument") ~ + byteArray("exercise_argument") ~ int("exercise_argument_compression").? ~ - binaryStream("exercise_result").? ~ + byteArray("exercise_result").? ~ int("exercise_result_compression").? ~ array[String]("exercise_actors") ~ array[String]("exercise_child_event_ids") @@ -521,17 +520,17 @@ trait EventStorageBackendTemplate extends EventStorageBackend { array[String]("create_signatories").? ~ array[String]("create_observers").? ~ str("create_agreement_text").? ~ - binaryStream("create_key_value").? ~ + byteArray("create_key_value").? ~ int("create_key_value_compression").? ~ - binaryStream("create_argument").? ~ + byteArray("create_argument").? ~ int("create_argument_compression").? ~ array[String]("tree_event_witnesses") ~ array[String]("flat_event_witnesses") ~ array[String]("submitters").? ~ str("exercise_choice").? ~ - binaryStream("exercise_argument").? ~ + byteArray("exercise_argument").? ~ int("exercise_argument_compression").? ~ - binaryStream("exercise_result").? ~ + byteArray("exercise_result").? ~ int("exercise_result_compression").? ~ array[String]("exercise_actors").? ~ array[String]("exercise_child_event_ids").? ~