Skip to content

Commit

Permalink
Convert storage backend tests to AnyFlatSuite (#12278)
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
rautenrieth-da authored Jan 12, 2022
1 parent 9513c64 commit 8e9f993
Show file tree
Hide file tree
Showing 19 changed files with 1,103 additions and 1,199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,86 @@ package com.daml.platform.store.backend

import java.sql.Connection
import java.util.concurrent.atomic.AtomicInteger

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.resources.{Resource, ResourceContext}
import com.daml.ledger.resources.ResourceContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.store.FlywayMigrations
import com.daml.platform.store.appendonlydao.DbDispatcher
import org.scalatest.{AsyncTestSuite, BeforeAndAfterEach}
import org.scalatest.{TestSuite, BeforeAndAfterAll, BeforeAndAfterEach}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, Future}
import java.util.concurrent.Executors
import javax.sql.DataSource
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try

private[backend] trait StorageBackendSpec
extends AkkaBeforeAndAfterAll
extends StorageBackendProvider
with BeforeAndAfterEach
with StorageBackendProvider { this: AsyncTestSuite =>
with BeforeAndAfterAll { this: TestSuite =>

protected val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
implicit protected val loggingContext: LoggingContext = LoggingContext.ForTesting

private val connectionPoolSize: Int = 16
private val metrics = new Metrics(new MetricRegistry)
// Data source (initialized once)
private var dataSource: DataSource = _

// Default connection for database operations (initialized for each test, since connections are stateful)
private var defaultConnection: Connection = _

// Initialized in beforeAll()
private var dbDispatcherResource: Resource[DbDispatcher] = _
private var dbDispatcher: DbDispatcher = _
// Execution context with a fixed number of threads, used for running parallel queries (where each query is
// executed in its own thread).
private val connectionPoolSize = 16
private val connectionPoolExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(
connectionPoolSize
)
)

/** Runs the given database operations in parallel.
* Each operation will run in a separate thread and will use a separate database connection.
*/
protected def executeParallelSql[T](fs: Vector[Connection => T]): Vector[T] = {
require(fs.size <= connectionPoolSize)

val connections = Vector.fill(fs.size)(dataSource.getConnection())

implicit val ec: ExecutionContext = connectionPoolExecutionContext
val result = Try(
Await.result(
Future.sequence(
Vector.tabulate(fs.size)(i => Future(fs(i)(connections(i))))
),
60.seconds,
)
)

protected def executeSql[T](sql: Connection => T): Future[T] = {
dbDispatcher.executeSql(metrics.test.db)(sql)
connections.foreach(_.close())
result.get
}

/** Runs the given database operation */
protected def executeSql[T](f: Connection => T): T = f(defaultConnection)

override protected def beforeAll(): Unit = {
super.beforeAll()

implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher)
// Note: reusing the connection pool EC for initialization
implicit val ec: ExecutionContext = connectionPoolExecutionContext
implicit val resourceContext: ResourceContext = ResourceContext(ec)
implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
dbDispatcherResource = for {
_ <- Resource.fromFuture(
new FlywayMigrations(jdbcUrl).migrate()
)
dispatcher <- DbDispatcher
.owner(
dataSource = backend.dataSource.createDataSource(jdbcUrl),
serverRole = ServerRole.Testing(this.getClass),
connectionPoolSize = connectionPoolSize,
connectionTimeout = FiniteDuration(250, "millis"),
metrics = metrics,
)
.acquire()
} yield dispatcher

dbDispatcher = Await.result(dbDispatcherResource.asFuture, 60.seconds)

val dataSourceFuture = for {
_ <- new FlywayMigrations(jdbcUrl).migrate()
dataSource <- VerifiedDataSource(jdbcUrl)
} yield dataSource

dataSource = Await.result(dataSourceFuture, 60.seconds)

logger.info(
s"Finished setting up database $jdbcUrl for tests. You can now connect to this database to debug failed tests. Note that tables are truncated between each test."
)
}

override protected def afterAll(): Unit = {
Await.result(dbDispatcherResource.release(), 60.seconds)
super.afterAll()
}

Expand All @@ -77,23 +96,23 @@ private[backend] trait StorageBackendSpec
override protected def beforeEach(): Unit = {
super.beforeEach()

defaultConnection = dataSource.getConnection()

assert(
runningTests.incrementAndGet() == 1,
"StorageBackendSpec tests must not run in parallel, as they all run against the same database.",
)
Await.result(
executeSql { c =>
backend.reset.resetAll(c)
updateLedgerEndCache(c)
// Note: here we reset the MockStringInterning object to make sure each test starts with empty interning state.
// This is not strictly necessary, as tryInternalize() always succeeds in MockStringInterning - we don't have
// a problem where the interning would be affected by data left over by previous tests.
// To write tests that are sensitive to interning unknown data, we would have to use a custom storage backend
// implementation.
backend.stringInterningSupport.reset()
},
60.seconds,
)

// Reset the content of the index database
backend.reset.resetAll(defaultConnection)
updateLedgerEndCache(defaultConnection)

// Note: here we reset the MockStringInterning object to make sure each test starts with empty interning state.
// This is not strictly necessary, as tryInternalize() always succeeds in MockStringInterning - we don't have
// a problem where the interning would be affected by data left over by previous tests.
// To write tests that are sensitive to interning unknown data, we would have to use a custom storage backend
// implementation.
backend.stringInterningSupport.reset()
}

override protected def afterEach(): Unit = {
Expand All @@ -102,6 +121,8 @@ private[backend] trait StorageBackendSpec
"StorageBackendSpec tests must not run in parallel, as they all run against the same database.",
)

defaultConnection.close()

super.afterEach()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.store.backend

import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.flatspec.AnyFlatSpec

trait StorageBackendSuite
extends StorageBackendTestsInitialization
Expand All @@ -18,5 +18,5 @@ trait StorageBackendSuite
with StorageBackendTestsDeduplication
with StorageBackendTestsTimestamps
with StorageBackendTestsStringInterning {
this: AsyncFlatSpec =>
this: AnyFlatSpec =>
}
Loading

0 comments on commit 8e9f993

Please sign in to comment.