Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JSON-API] Backport of connection pool for 1.11.x , configurable db options needs forward porting #11615

Merged
merged 4 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ trait JsonApiFixture
)
.flatMap({
case -\/(e) => Future.failed(new IllegalStateException(e.toString))
case \/-(a) => Future.successful(a)
case \/-(a) => Future.successful(a._1)
})
}((binding: ServerBinding) => binding.unbind().map(_ => ()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.http

import com.daml.http.dbbackend.ConnectionPool
import org.scalatest.Inside
import org.scalatest.AsyncTestSuite
import org.scalatest.matchers.should.Matchers
Expand All @@ -22,6 +23,7 @@ trait HttpServiceOracleInt extends AbstractHttpServiceIntegrationTestFuns {
url = s"jdbc:oracle:thin:@//localhost:$oraclePort/XEPDB1",
user = oracleUsername,
password = oraclePwd,
poolSize = ConnectionPool.PoolSize.Integration,
createSchema = true,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package com.daml.http.perf

import java.io.File

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.daml.gatling.stats.{SimulationLog, SimulationLogSyntax}
import com.daml.grpc.adapter.{AkkaExecutionSequencerPool, ExecutionSequencerFactory}
import com.daml.http.HttpServiceTestFixture.{withLedger, withHttpService}
import com.daml.http.HttpServiceTestFixture.{withHttpService, withLedger}
import com.daml.http.dbbackend.ConnectionPool
import com.daml.http.domain.{JwtPayload, LedgerId}
import com.daml.http.perf.scenario.SimulationConfig
import com.daml.http.util.FutureUtil._
Expand Down Expand Up @@ -162,6 +162,7 @@ object Main extends StrictLogging {
url = c.url,
user = "test",
password = "",
poolSize = ConnectionPool.PoolSize.Integration,
createSchema = true,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.daml.http

import java.io.File
import java.time.Instant

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
Expand Down Expand Up @@ -79,7 +78,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {

val contractDaoF: Future[Option[ContractDao]] = jdbcConfig.map(c => initializeDb(c)).sequence

val httpServiceF: Future[ServerBinding] = for {
val httpServiceF: Future[(ServerBinding, Option[ContractDao])] = for {
contractDao <- contractDaoF
config = Config(
ledgerHost = "localhost",
Expand Down Expand Up @@ -116,7 +115,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
} yield codecs

val fa: Future[A] = for {
httpService <- httpServiceF
(httpService, _) <- httpServiceF
address = httpService.localAddress
uri = Uri.from(scheme = "http", host = address.getHostName, port = address.getPort)
(encoder, decoder) <- codecsF
Expand All @@ -125,12 +124,13 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
} yield a

fa.transformWith { ta =>
Future
.sequence(
Seq(
httpServiceF.flatMap(_.unbind())
) map (_ fallbackTo Future.unit)
)
httpServiceF
.flatMap { case (serv, dao) =>
logger.info("Shutting down http service")
dao.foreach(_.close())
serv.unbind()
}
.fallbackTo(Future.unit)
.transform(_ => ta)
}
}
Expand Down Expand Up @@ -222,8 +222,8 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
}

private def stripLeft(
fa: Future[HttpService.Error \/ ServerBinding]
)(implicit ec: ExecutionContext): Future[ServerBinding] =
fa: Future[HttpService.Error \/ (ServerBinding, Option[ContractDao])]
)(implicit ec: ExecutionContext): Future[(ServerBinding, Option[ContractDao])] =
fa.flatMap {
case -\/(e) =>
Future.failed(new IllegalStateException(s"Cannot start HTTP Service: ${e.message}"))
Expand All @@ -233,7 +233,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {

private def initializeDb(c: JdbcConfig)(implicit ec: ExecutionContext): Future[ContractDao] =
for {
dao <- Future(ContractDao(c.driver, c.url, c.user, c.password))
dao <- Future(ContractDao(c))
_ <- {
import dao.{logHandler, jdbcDriver}
dao.transact(ContractDao.initialize).unsafeToFuture(): Future[Unit]
Expand Down
1 change: 1 addition & 0 deletions ledger-service/http-json/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ da_scala_library(
"//libs-scala/doobie-slf4j",
"//libs-scala/ports",
"//libs-scala/scala-utils",
"@maven//:com_zaxxer_HikariCP",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package com.daml.http

import akka.http.scaladsl.model.Uri
import com.daml.bazeltools.BazelRunfiles
import com.daml.http.dbbackend.ConnectionPool
import com.daml.http.json.{DomainJsonDecoder, DomainJsonEncoder}
import com.daml.ledger.client.LedgerClient
import com.daml.ports.LockedFreePort
import com.daml.testing.postgresql.PostgresAroundAll

import java.net.InetAddress
import org.scalatest.Suite

Expand Down Expand Up @@ -38,6 +40,7 @@ trait HttpFailureTestFixture extends ToxicSandboxFixture with PostgresAroundAll
s"jdbc:postgresql://${postgresDatabase.hostName}:$dbProxyPort/${postgresDatabase.databaseName}?user=${postgresDatabase.userName}&password=${postgresDatabase.password}",
user = "test",
password = "",
poolSize = ConnectionPool.PoolSize.Integration,
createSchema = true,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.http

import com.daml.http.dbbackend.ConnectionPool
import com.daml.testing.postgresql.PostgresAroundAll
import org.scalatest.Inside
import org.scalatest.AsyncTestSuite
Expand All @@ -13,12 +14,21 @@ trait HttpServicePostgresInt extends AbstractHttpServiceIntegrationTestFuns with

override final def jdbcConfig: Option[JdbcConfig] = Some(jdbcConfig_)

// has to be lazy because jdbcConfig_ is NOT initialized yet
protected lazy val dao = dbbackend.ContractDao(jdbcConfig_)

// has to be lazy because postgresFixture is NOT initialized yet
protected[this] lazy val jdbcConfig_ = JdbcConfig(
driver = "org.postgresql.Driver",
url = postgresDatabase.url,
user = "test",
password = "",
poolSize = ConnectionPool.PoolSize.Integration,
createSchema = true,
)

override protected def afterAll(): Unit = {
dao.close()
super.afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@ class HttpServiceWithPostgresIntTest

override def wsConfig: Option[WebsocketConfig] = None

// has to be lazy because jdbcConfig_ is NOT initialized yet
private lazy val dao = dbbackend.ContractDao(
jdbcDriver = jdbcConfig_.driver,
jdbcUrl = jdbcConfig_.url,
username = jdbcConfig_.user,
password = jdbcConfig_.password,
)

"query persists all active contracts" in withHttpService { (uri, encoder, _) =>
searchExpectOk(
searchDataSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package com.daml.http
import java.io.File
import java.nio.file.Path
import java.util.concurrent.TimeUnit

import akka.stream.ThrottleMode
import com.daml.http.dbbackend.ConnectionPool
import com.daml.util.ExceptionOps._
import com.daml.ledger.api.tls.TlsConfiguration
import scalaz.std.option._
Expand Down Expand Up @@ -76,6 +76,9 @@ private[http] abstract class ConfigCompanion[A](name: String) {
protected def optionalLongField(m: Map[String, String])(k: String): Either[String, Option[Long]] =
m.get(k).traverse(v => parseLong(k)(v)).toEither

protected def optionalIntField(m: Map[String, String])(k: String): Either[String, Option[Int]] =
m.get(k).traverse(v => parseInt(k)(v)).toEither

import scalaz.syntax.std.string._

protected def parseBoolean(k: String)(v: String): String \/ Boolean =
Expand All @@ -84,6 +87,9 @@ private[http] abstract class ConfigCompanion[A](name: String) {
protected def parseLong(k: String)(v: String): String \/ Long =
v.parseLong.leftMap(e => s"$k=$v must be a int value: ${e.description}").disjunction

protected def parseInt(k: String)(v: String): String \/ Int =
v.parseInt.leftMap(e => s"$k=$v must be a int value: ${e.description}").disjunction

protected def requiredDirectoryField(m: Map[String, String])(k: String): Either[String, File] =
requiredField(m)(k).flatMap(directory)

Expand All @@ -101,16 +107,28 @@ private[http] final case class JdbcConfig(
url: String,
user: String,
password: String,
poolSize: Int,
createSchema: Boolean = false,
minIdle: Int = JdbcConfig.MinIdle,
akshayshirahatti-da marked this conversation as resolved.
Show resolved Hide resolved
connectionTimeout: Long = JdbcConfig.ConnectionTimeout,
idleTimeout: Long = JdbcConfig.IdleTimeout,
)

private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig") {

import ConnectionPool.PoolSize.Production
final val MinIdle = 8
final val IdleTimeout = 10000L // ms, minimum according to log, defaults to 600s
final val ConnectionTimeout = 5000L

import dbbackend.ContractDao.supportedJdbcDriverNames

private[this] def supportedDriversHelp = supportedJdbcDriverNames mkString ", "

implicit val showInstance: Show[JdbcConfig] = Show.shows(a =>
s"JdbcConfig(driver=${a.driver}, url=${a.url}, user=${a.user}, createSchema=${a.createSchema})"
s"JdbcConfig(driver=${a.driver}, url=${a.url}, user=${a.user}, createSchema=${a.createSchema}, " +
s"poolSize=${a.poolSize}), minIdle=${a.minIdle}, connectionTimeout=${a.connectionTimeout}, " +
s"idleTimeout=${a.idleTimeout}"
)

lazy val help: String =
Expand All @@ -120,12 +138,20 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
s"${indent}user -- database user name,\n" +
s"${indent}password -- database user password,\n" +
s"${indent}createSchema -- boolean flag, if set to true, the process will re-create database schema and terminate immediately.\n" +
s"${indent}poolSize -- int value, specifies the max pool size for the database connection pool.\n" +
s"${indent}minIdle -- int value, specifies the min idle connections for database connection pool.\n" +
s"${indent}connectionTimeout -- long value, specifies the connection timeout for database connection pool.\n" +
s"${indent}idleTimeout -- long value, specifies the idle timeout for the database connection pool.\n" +
s"${indent}Example: " + helpString(
"org.postgresql.Driver",
"jdbc:postgresql://localhost:5432/test?&ssl=true",
"postgres",
"password",
"false",
Production.toString,
MinIdle.toString,
ConnectionTimeout.toString,
IdleTimeout.toString,
)

lazy val usage: String = helpString(
Expand All @@ -134,6 +160,10 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
"<user>",
"<password>",
"<true|false>",
"<poolSize>",
akshayshirahatti-da marked this conversation as resolved.
Show resolved Hide resolved
"<minIdle>",
"<connectionTimeout>",
"<idleTimeout>",
)

override def create(x: Map[String, String]): Either[String, JdbcConfig] =
Expand All @@ -148,12 +178,20 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
user <- requiredField(x)("user")
password <- requiredField(x)("password")
createSchema <- optionalBooleanField(x)("createSchema")
maxPoolSize <- optionalIntField(x)("poolSize")
minIdle <- optionalIntField(x)("minIdle")
connTimeout <- optionalLongField(x)("connectionTimeout")
idleTimeout <- optionalLongField(x)("idleTimeout")
} yield JdbcConfig(
driver = driver,
url = url,
user = user,
password = password,
poolSize = maxPoolSize.getOrElse(ConnectionPool.PoolSize.Production),
createSchema = createSchema.getOrElse(false),
minIdle = minIdle.getOrElse(MinIdle),
connectionTimeout = connTimeout.getOrElse(ConnectionTimeout),
idleTimeout = idleTimeout.getOrElse(IdleTimeout),
)

private def helpString(
Expand All @@ -162,8 +200,13 @@ private[http] object JdbcConfig extends ConfigCompanion[JdbcConfig]("JdbcConfig"
user: String,
password: String,
createSchema: String,
poolSize: String,
minIdle: String,
connectionTimeout: String,
idleTimeout: String,
): String =
s"""\"driver=$driver,url=$url,user=$user,password=$password,createSchema=$createSchema\""""
s"""\"driver=$driver,url=$url,user=$user,password=$password,createSchema=$createSchema,poolSize=$poolSize,
|minIdle=$minIdle, connectionTimeout=$connectionTimeout,idleTimeout=$idleTimeout\"""".stripMargin
}

// It is public for DABL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object HttpService extends StrictLogging {
mat: Materializer,
aesf: ExecutionSequencerFactory,
ec: ExecutionContext,
): Future[Error \/ ServerBinding] = {
): Future[Error \/ (ServerBinding, Option[ContractDao])] = {
import startSettings._

implicit val settings: ServerSettings = ServerSettings(asys).withTransparentHeadRequests(true)
Expand All @@ -113,7 +113,7 @@ object HttpService extends StrictLogging {
maxInboundMessageSize = maxInboundMessageSize,
)

val bindingEt: EitherT[Future, Error, ServerBinding] = for {
val bindingEt: EitherT[Future, Error, (ServerBinding, Option[ContractDao])] = for {
client <- eitherT(
ledgerClient(ledgerHost, ledgerPort, clientConfig)
): ET[LedgerClient]
Expand Down Expand Up @@ -222,9 +222,9 @@ object HttpService extends StrictLogging {

_ <- either(portFile.cata(f => createPortFile(f, binding), \/-(()))): ET[Unit]

} yield binding
} yield (binding, contractDao)

bindingEt.run: Future[Error \/ ServerBinding]
bindingEt.run: Future[Error \/ (ServerBinding, Option[ContractDao])]
}

private[http] def refreshToken(
Expand Down Expand Up @@ -267,9 +267,14 @@ object HttpService extends StrictLogging {
} yield ()
}

def stop(f: Future[Error \/ ServerBinding])(implicit ec: ExecutionContext): Future[Unit] = {
def stop(
f: Future[Error \/ (ServerBinding, Option[ContractDao])]
)(implicit ec: ExecutionContext): Future[Unit] = {
logger.info("Stopping server...")
f.collect { case \/-(a) => a.unbind().void }.join
f.collect { case \/-((a, dao)) =>
dao.foreach(_.close())
a.unbind().void
}.join
}

// Decode JWT without any validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object Main extends StrictLogging {

def terminate(): Unit = discard { Await.result(asys.terminate(), 10.seconds) }

val contractDao = config.jdbcConfig.map(c => ContractDao(c.driver, c.url, c.user, c.password))
val contractDao = config.jdbcConfig.map(c => ContractDao(c))

(contractDao, config.jdbcConfig) match {
case (Some(dao), Some(c)) if c.createSchema =>
Expand All @@ -82,7 +82,7 @@ object Main extends StrictLogging {
case _ =>
}

val serviceF: Future[HttpService.Error \/ ServerBinding] =
val serviceF: Future[HttpService.Error \/ (ServerBinding, Option[ContractDao])] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth having an httpservice class that contains the server binding & optional dao and abstracts away the details of closing? Not as part of the backport but separately

HttpService.start(
startSettings = config,
contractDao = contractDao,
Expand Down
Loading