Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing oracle backend from using varray to json array #9943

Merged
merged 4 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bazel-java-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def install_java_deps():
"commons-io:commons-io:2.5",
"com.oracle.database.jdbc:ojdbc8:19.8.0.0",
"com.sparkjava:spark-core:2.9.1",
"com.oracle.database.jdbc.debug:ojdbc8_g:19.8.0.0",
"com.squareup:javapoet:1.11.1",
"com.storm-enroute:scalameter_{}:0.19".format(scala_major_version),
"com.storm-enroute:scalameter-core_{}:0.19".format(scala_major_version),
Expand Down
4 changes: 1 addition & 3 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ compile_deps = [
"@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:org_slf4j_slf4j_api",
# this oracle import is problematic for daml assistant build
"@maven//:com_oracle_database_jdbc_ojdbc8",
]

scala_compile_deps = [
Expand All @@ -73,6 +72,7 @@ scala_compile_deps = [
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_scala_lang_modules_scala_java8_compat",
"@maven//:org_scalaz_scalaz_core",
"@maven//:io_spray_spray_json",
]

runtime_deps = [
Expand Down Expand Up @@ -148,7 +148,6 @@ da_scala_library(
visibility = ["//visibility:public"],
runtime_deps = [
"@maven//:com_h2database_h2",
"@maven//:com_oracle_database_jdbc_ojdbc8",
"@maven//:org_postgresql_postgresql",
],
deps = [
Expand Down Expand Up @@ -307,7 +306,6 @@ da_scala_test_suite(
],
tags = [] if oracle_testing else ["manual"],
runtime_deps = [
"@maven//:com_oracle_database_jdbc_ojdbc8",
],
deps = [
":participant-integration-api",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,6 @@

-- subsequently dropped by V30__

-- custom array of varchar2 type used by several columns across tables
-- declaring upfront so this type is defined globally
create type VARCHAR_ARRAY as VARRAY (32767) OF VARCHAR2(4000);
/
create type SMALLINT_ARRAY as VARRAY (32767) of SMALLINT;
/
create type BYTE_ARRAY_ARRAY as VARRAY (32767) of RAW(2000);
/
create type TIMESTAMP_ARRAY as VARRAY (32767) of TIMESTAMP;
/
create type BOOLEAN_ARRAY as VARRAY (32767) of NUMBER(1, 0);
/
CREATE TABLE parameters
-- this table is meant to have a single row storing all the parameters we have
(
Expand Down Expand Up @@ -193,15 +181,14 @@ CREATE TABLE participant_command_completions
record_time TIMESTAMP not null,

application_id NVARCHAR2(1000) not null,
submitters VARCHAR_ARRAY not null,
submitters CLOB NOT NULL CONSTRAINT ensure_json_submitters CHECK (submitters IS JSON),
command_id NVARCHAR2(1000) not null,

transaction_id NVARCHAR2(1000), -- null if the command was rejected and checkpoints
status_code INTEGER, -- null for successful command and checkpoints
status_message NVARCHAR2(1000) -- null for successful command and checkpoints
);

-- TODO https://github.com/digital-asset/daml/issues/9493
create index participant_command_completions_idx on participant_command_completions(completion_offset, application_id);

---------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -233,12 +220,12 @@ create table participant_events
command_id VARCHAR2(1000),
workflow_id VARCHAR2(1000), -- null unless provided by a Ledger API call
application_id VARCHAR2(1000),
submitters VARCHAR_ARRAY,
submitters CLOB NOT NULL CONSTRAINT ensure_json_participant_submitters CHECK (submitters IS JSON),

-- non-null iff this event is a create
create_argument BLOB,
create_signatories VARCHAR_ARRAY,
create_observers VARCHAR_ARRAY,
create_signatories CLOB NOT NULL CONSTRAINT ensure_json_participant_create_signatories CHECK (create_signatories IS JSON),
create_observers CLOB NOT NULL CONSTRAINT ensure_json_participant_create_observers CHECK (create_observers IS JSON),
create_agreement_text VARCHAR2(1000), -- null if agreement text is not provided
create_consumed_at VARCHAR2(4000), -- null if the contract created by this event is active
create_key_value BLOB, -- null if the contract created by this event has no key
Expand All @@ -248,11 +235,11 @@ create table participant_events
exercise_choice VARCHAR2(1000),
exercise_argument BLOB,
exercise_result BLOB,
exercise_actors VARCHAR_ARRAY,
exercise_child_event_ids VARCHAR_ARRAY, -- event identifiers of consequences of this exercise
exercise_actors CLOB NOT NULL CONSTRAINT ensure_json_participant_exercise_actors CHECK (exercise_actors IS JSON),
exercise_child_event_ids CLOB NOT NULL CONSTRAINT ensure_json_participant_exercise_child_event_ids CHECK (exercise_child_event_ids IS JSON), -- event identifiers of consequences of this exercise

flat_event_witnesses VARCHAR_ARRAY not null,
tree_event_witnesses VARCHAR_ARRAY not null,
flat_event_witnesses CLOB NOT NULL CONSTRAINT ensure_json_participant_flat_event_witnesses CHECK (flat_event_witnesses IS JSON),
tree_event_witnesses CLOB NOT NULL CONSTRAINT ensure_json_participant_tree_event_witnesses CHECK (tree_event_witnesses IS JSON),

event_sequential_id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY not null,
create_argument_compression NUMBER,
Expand Down Expand Up @@ -280,9 +267,8 @@ create index participant_events_event_sequential_id on participant_events (event
-- 5. we need this index to convert event_offset to event_sequential_id
create index participant_events_event_offset on participant_events (event_offset);

-- TODO https://github.com/digital-asset/daml/issues/9493
-- create index participant_events_flat_event_witnesses_idx on participant_events (flat_event_witnesses);
-- create index participant_events_tree_event_witnesses_idx on participant_events (tree_event_witnesses);
create index participant_events_flat_event_witnesses_idx on participant_events (JSON_ARRAY(flat_event_witnesses));
dasormeter marked this conversation as resolved.
Show resolved Hide resolved
create index participant_events_tree_event_witnesses_idx on participant_events (JSON_ARRAY(tree_event_witnesses));
dasormeter marked this conversation as resolved.
Show resolved Hide resolved


---------------------------------------------------------------------------------------------------
Expand All @@ -303,7 +289,7 @@ create table participant_contracts
create_argument BLOB not null,

-- the following fields are null for divulged contracts
create_stakeholders VARCHAR_ARRAY,
create_stakeholders CLOB NOT NULL CONSTRAINT ensure_json_create_stakeholders CHECK (create_stakeholders IS JSON),
create_key_hash VARCHAR2(4000),
create_ledger_effective_time TIMESTAMP,
create_argument_compression SMALLINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

package com.daml.platform.store

import java.sql.{Connection, JDBCType, PreparedStatement, Timestamp, Types}
import java.sql.{PreparedStatement, Timestamp, Types}
import java.time.Instant
import java.util.Date

import anorm.Column.nonNull
import anorm._
import com.daml.ledger.EventId
Expand All @@ -17,14 +16,33 @@ import com.daml.lf.crypto.Hash
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.lf.value.Value
import com.zaxxer.hikari.pool.HikariProxyConnection
import io.grpc.Status.Code

import spray.json._
import DefaultJsonProtocol._
import java.io.BufferedReader
import scala.language.implicitConversions
import java.util.stream.Collectors

private[platform] object OracleArrayConversions {
import oracle.jdbc.OracleConnection
implicit object PartyJsonFormat extends RootJsonFormat[Party] {
def write(c: Party) =
JsString(c)

def read(value: JsValue) = value match {
case JsString(s) => s.asInstanceOf[Party]
case _ => deserializationError("Party expected")
}
}

implicit object LedgerStringJsonFormat extends RootJsonFormat[Ref.LedgerString] {
def write(c: Ref.LedgerString) =
JsString(c)

def read(value: JsValue) = value match {
case JsString(s) => s.asInstanceOf[Ref.LedgerString]
case _ => deserializationError("Ledger string expected")
}
}
implicit object StringArrayParameterMetadata extends ParameterMetaData[Array[String]] {
override def sqlType: String = "ARRAY"
override def jdbcType: Int = java.sql.Types.ARRAY
Expand All @@ -37,90 +55,6 @@ private[platform] object OracleArrayConversions {
val jdbcType = Types.INTEGER
}

@SuppressWarnings(Array("org.wartremover.warts.ArrayEquals"))
abstract sealed class ArrayToStatement[T](oracleTypeName: String)
extends ToStatement[Array[T]]
with NotNullGuard {
override def set(s: PreparedStatement, index: Int, v: Array[T]): Unit = {
if (v == (null: AnyRef)) {
s.setNull(index, JDBCType.ARRAY.getVendorTypeNumber, oracleTypeName)
} else {
s.setObject(
index,
unwrapConnection(s).createARRAY(oracleTypeName, v.asInstanceOf[Array[AnyRef]]),
JDBCType.ARRAY.getVendorTypeNumber,
)
}
}
}

implicit object ByteArrayArrayToStatement
extends ArrayToStatement[Array[Byte]]("BYTE_ARRAY_ARRAY")

implicit object TimestampArrayToStatement extends ArrayToStatement[Timestamp]("TIMESTAMP_ARRAY")

implicit object RefPartyArrayToStatement extends ArrayToStatement[Ref.Party]("VARCHAR_ARRAY")

implicit object CharArrayToStatement extends ArrayToStatement[String]("VARCHAR_ARRAY")

implicit object IntegerArrayToStatement extends ArrayToStatement[Integer]("SMALLINT_ARRAY")

implicit object BooleanArrayToStatement
extends ArrayToStatement[java.lang.Boolean]("BOOLEAN_ARRAY")

implicit object InstantArrayToStatement extends ToStatement[Array[Instant]] {
override def set(s: PreparedStatement, index: Int, v: Array[Instant]): Unit = {
s.setObject(
index,
unwrapConnection(s).createARRAY("TIMESTAMP_ARRAY", v.map(java.sql.Timestamp.from)),
JDBCType.ARRAY.getVendorTypeNumber,
)
}
}

@SuppressWarnings(Array("org.wartremover.warts.ArrayEquals"))
implicit object StringOptionArrayArrayToStatement extends ToStatement[Option[Array[String]]] {
override def set(s: PreparedStatement, index: Int, stringOpts: Option[Array[String]]): Unit = {
stringOpts match {
case None => s.setNull(index, JDBCType.ARRAY.getVendorTypeNumber, "VARCHAR_ARRAY")
case Some(arr) =>
s.setObject(
index,
unwrapConnection(s)
.createARRAY("VARCHAR_ARRAY", arr.asInstanceOf[Array[AnyRef]]),
JDBCType.ARRAY.getVendorTypeNumber,
)
}
}
}

object IntToSmallIntConversions {

implicit object IntOptionArrayArrayToStatement extends ToStatement[Array[Option[Int]]] {
override def set(s: PreparedStatement, index: Int, intOpts: Array[Option[Int]]): Unit = {
val intOrNullsArray = intOpts.map(_.map(new Integer(_)).orNull)
s.setObject(
index,
unwrapConnection(s)
.createARRAY("SMALLINT_ARRAY", intOrNullsArray.asInstanceOf[Array[AnyRef]]),
JDBCType.ARRAY.getVendorTypeNumber,
)
}
}
}

private def unwrapConnection[T](s: PreparedStatement): OracleConnection = {
s.getConnection match {
case hikari: HikariProxyConnection =>
hikari.unwrap(classOf[OracleConnection])
case oracle: OracleConnection =>
oracle
case c: Connection =>
sys.error(
s"Unsupported connection type for creating Oracle integer array: ${c.getClass.getSimpleName}"
)
}
}
}

private[platform] object JdbcArrayConversions {
Expand Down Expand Up @@ -216,6 +150,49 @@ private[platform] object Conversions {
}
}

object DefaultImplicitArrayColumn {
val default = Column.of[Array[String]]
}

object ArrayColumnToStringArray {
// This is used to allow us to convert oracle CLOB fields storing JSON text into Array[String].
// We first summon the default Anorm column for an Array[String], and run that - this preserves
// the behavior PostgreSQL is expecting. If that fails, we then try our Oracle specific deserialization
// strategies

implicit val arrayColumnToStringArray: Column[Array[String]] = nonNull { (value, meta) =>
DefaultImplicitArrayColumn.default(value, meta) match {
danielporterda marked this conversation as resolved.
Show resolved Hide resolved
case Right(value) => Right(value)
case Left(_) =>
val MetaDataItem(qualified, _, _) = meta
value match {
case jsonArrayString: String =>
Right(jsonArrayString.parseJson.convertTo[Array[String]])
case clob: java.sql.Clob =>
try {
val reader = clob.getCharacterStream
val br = new BufferedReader(reader)
val jsonArrayString = br.lines.collect(Collectors.joining)
reader.close
Right(jsonArrayString.parseJson.convertTo[Array[String]])
} catch {
case e: Throwable =>
Left(
TypeDoesNotMatch(
s"Cannot convert $value: received CLOB but failed to deserialize to " +
s"string array for column $qualified. Error message: ${e.getMessage}"
)
)
}
case _ =>
Left(
TypeDoesNotMatch(s"Cannot convert $value: to string array for column $qualified")
)
}
}
}
}

// PackageId

implicit val columnToPackageId: Column[Ref.PackageId] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal
import spray.json._
import spray.json.DefaultJsonProtocol._

private final case class ParsedPartyData(
party: String,
Expand Down Expand Up @@ -1381,8 +1383,7 @@ private[platform] object JdbcLedgerDao {
recordTime: Instant,
): SimpleSql[Row] = {
import com.daml.platform.store.OracleArrayConversions._
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, transaction_id) values ($offset, $recordTime, ${submitterInfo.applicationId}, ${submitterInfo.actAs
.toArray[String]}, ${submitterInfo.commandId}, $transactionId)"
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, transaction_id) values ($offset, $recordTime, ${submitterInfo.applicationId}, ${submitterInfo.actAs.toJson.compactPrint}, ${submitterInfo.commandId}, $transactionId)"
}

override protected[JdbcLedgerDao] def prepareRejectionInsert(
Expand All @@ -1392,8 +1393,8 @@ private[platform] object JdbcLedgerDao {
reason: RejectionReason,
): SimpleSql[Row] = {
import com.daml.platform.store.OracleArrayConversions._
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status_code, status_message) values ($offset, $recordTime, ${submitterInfo.applicationId}, ${submitterInfo.actAs
.toArray[String]}, ${submitterInfo.commandId}, ${reason.code.value()}, ${reason.description})"
SQL"insert into participant_command_completions(completion_offset, record_time, application_id, submitters, command_id, status_code, status_message) values ($offset, $recordTime, ${submitterInfo.applicationId}, ${submitterInfo.actAs.toJson.compactPrint}, ${submitterInfo.commandId}, ${reason.code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dasormeter note for going forward with append-only these parse to json and then render string should go to the batching phase, and the DTO for injection should have already the string so we avoid blocking ingestion threads with dataconversion

.value()}, ${reason.description})"
}

// spaces which are subsequently trimmed left only for readability
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import com.daml.platform.store.Conversions._
import com.daml.platform.store.dao.events.ContractsTable.Executable
import com.daml.platform.store.serialization.Compression
import com.daml.platform.store.OracleArrayConversions._
import spray.json._
import spray.json.DefaultJsonProtocol._

object ContractsTableOracle extends ContractsTable {

Expand Down Expand Up @@ -43,7 +45,7 @@ object ContractsTableOracle extends ContractsTable {
"template_id" -> templateId,
"create_argument" -> createArgument,
"create_ledger_effective_time" -> ledgerEffectiveTime,
"create_stakeholders" -> stakeholders.toArray[String],
"create_stakeholders" -> stakeholders.toJson.compactPrint,
"create_key_hash" -> key.map(_.hash),
"create_argument_compression" -> createArgumentCompression.id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.daml.platform.store.dao.events
import java.io.InputStream
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{array, binaryStream, bool, int, long, str}
import anorm.{RowParser, ~}
import com.daml.ledger.participant.state.v1.Offset
Expand Down Expand Up @@ -46,6 +45,8 @@ private[events] object EventsTable {
Offset ~ String ~ Int ~ Long ~ String ~ String ~ Instant ~ Identifier ~ Option[String] ~
Option[String] ~ Array[String]

import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray

private val sharedRow: RowParser[SharedRow] =
offset("event_offset") ~
str("transaction_id") ~
Expand Down
Loading