Skip to content

Commit

Permalink
Merge #2416
Browse files Browse the repository at this point in the history
2416: Use a single-striped connection pool for each database layer instead of a single shared connection r=KtorZ a=KtorZ

# Issue Number

<!-- Put here a reference to the issue that this PR relates to and which requirements it tackles. Jira issues of the form ADP- will be auto-linked. -->

ADP-586

# Overview

<!-- Detail in a few bullet points the work accomplished in this PR -->

- 73df15f
  📍 **use a single-striped connection pool for each database layer**

  It is a rather common practice to use a pool of database connection
  when dealing with databases. So far, we've been using a single shared
  connection per wallet worker with, in front of each connection a lock
  preventing concurrent access to the database. The lock is only
  necessary because of the way persistent handles query statements
  internally, in principle, SQLite handles concurrent database accesses
  just well.

  For basic wallets, this is a relatively useless change. But for larger
  wallets like those manipulated by exchanges, we've observed very slow
  response time due to concurrent access of the database lock. Indeed,
  some requests may grab the lock for 10 or 20 seconds, preventing any
  requests from going throug. However, most requests are read-only
  requests and could be executed in parallel, at the discretion of
  the SQLite engine. I hope that the introduction of a connection pool
  will improve the overall experience for large wallets by better
  serving concurrent requests on the database. Finger crossed.


# Comments

<!-- Additional comments or screenshots to attach if any -->

<!--
Don't forget to:

 ✓ Self-review your changes to make sure nothing unexpected slipped through
 ✓ Assign yourself to the PR
 ✓ Assign one or several reviewer(s)
 ✓ Jira will detect and link to this PR once created, but you can also link this PR in the description of the corresponding ticket
 ✓ Acknowledge any changes required to the Wiki
 ✓ Finally, in the PR description delete any empty sections and all text commented in <!--, so that this text does not appear in merge commit messages.
-->


Co-authored-by: KtorZ <[email protected]>
Co-authored-by: IOHK <[email protected]>
  • Loading branch information
3 people authored Jan 4, 2021
2 parents a256c3e + e559e43 commit a026785
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 85 deletions.
1 change: 1 addition & 0 deletions lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ library
, quiet
, random
, random-shuffle
, resource-pool
, retry
, safe
, scientific
Expand Down
163 changes: 101 additions & 62 deletions lib/core/src/Cardano/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ import Data.List.Split
( chunksOf )
import Data.Maybe
( fromMaybe )
import Data.Pool
( Pool, createPool, destroyAllResources, withResource )
import Data.Proxy
( Proxy (..) )
import Data.Text
( Text )
import Data.Text.Class
( ToText (..) )
import Data.Time.Clock
( NominalDiffTime )
import Database.Persist.Sql
( DBName (..)
, EntityField
Expand Down Expand Up @@ -108,9 +112,7 @@ import System.Log.FastLogger
import UnliftIO.Compat
( handleIf, mkRetryHandler )
import UnliftIO.Exception
( Exception, bracket_, handleJust, tryJust )
import UnliftIO.MVar
( newMVar, withMVarMasked )
( Exception, bracket_, handleJust, mask_, tryJust )

import qualified Data.Aeson as Aeson
import qualified Data.ByteString.Char8 as B8
Expand All @@ -125,14 +127,12 @@ import qualified Database.Sqlite as Sqlite

-- | Context for the SQLite 'DBLayer'.
data SqliteContext = SqliteContext
{ getSqlBackend :: SqlBackend
{ connectionPool :: Pool (SqlBackend, Sqlite.Connection)
-- ^ A handle to the Persistent SQL backend.
, runQuery :: forall a. SqlPersistT IO a -> IO a
-- ^ 'safely' run a query with logging and lock-protection
, dbFile :: Maybe FilePath
-- ^ The actual database file, if any. If none, runs in-memory
, trace :: Tracer IO DBLog
-- ^ A 'Tracer' for logging
}

-- | Error type for when migrations go wrong after opening a database.
Expand Down Expand Up @@ -167,44 +167,16 @@ queryLogFunc tr _loc _source level str = traceWith tr (MsgQuery msg sev)
handleConstraint :: MonadUnliftIO m => e -> m a -> m (Either e a)
handleConstraint e = handleJust select handler . fmap Right
where
select (SqliteException ErrorConstraint _ _) = Just ()
select _ = Nothing
handler = const . pure . Left $ e
select (SqliteException ErrorConstraint _ _) = Just ()
select _ = Nothing
handler = const . pure . Left $ e

-- | Finalize database statements and close the database connection.
--
-- If the database connection is still in use, it will retry for up to a minute,
-- to let other threads finish up.
-- | Free all allocated database connections. See also 'destroySqliteBackend'
--
-- This function is idempotent: if the database connection has already been
-- closed, calling this function will exit without doing anything.
--
destroyDBLayer :: SqliteContext -> IO ()
destroyDBLayer (SqliteContext {getSqlBackend, trace, dbFile}) = do
traceWith trace (MsgClosing dbFile)
recovering pol (mkRetryHandler isBusy) (const $ close' getSqlBackend)
& handleIf isAlreadyClosed
(traceWith trace . MsgIsAlreadyClosed . showT)
& handleIf statementAlreadyFinalized
(traceWith trace . MsgStatementAlreadyFinalized . showT)
where
isAlreadyClosed = \case
-- Thrown when an attempt is made to close a connection that is already
-- in the closed state:
Sqlite.SqliteException Sqlite.ErrorMisuse _ _ -> True
Sqlite.SqliteException {} -> False

statementAlreadyFinalized = \case
-- Thrown
Persist.StatementAlreadyFinalized{} -> True
Persist.Couldn'tGetSQLConnection{} -> False

showT :: Show a => a -> Text
showT = T.pack . show

isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy)
pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms)
ms = 1000 -- microseconds in a millisecond
destroyDBLayer :: Tracer IO DBLog -> SqliteContext -> IO ()
destroyDBLayer tr SqliteContext{connectionPool,dbFile} = do
traceWith tr (MsgDestroyConnectionPool dbFile)
destroyAllResources connectionPool

{-------------------------------------------------------------------------------
Internal / Database Setup
Expand All @@ -219,32 +191,75 @@ startSqliteBackend
-> Maybe FilePath
-> IO (Either MigrationError SqliteContext)
startSqliteBackend manualMigration autoMigration tr fp = do
(unsafeBackend, connection) <-
createSqliteBackend tr fp manualMigration (queryLogFunc tr)
lock <- newMVar unsafeBackend
pool <- createSqlitePool tr fp manualMigration (queryLogFunc tr)
let observe :: IO a -> IO a
observe = bracketTracer (contramap MsgRun tr)
-- runSqlConn is guarded with a lock because it's not threadsafe in general.
-- It is also masked, so that the SqlBackend state is not corrupted if a
-- thread gets cancelled while running a query.
-- See: https://github.com/yesodweb/persistent/issues/981
--
-- Note that `withResource` does already mask async exception but only for
-- dealing with the pool resource acquisition. The action is then ran
-- unmasked with the acquired resource. If an asynchronous exception occurs,
-- the resource is NOT placed back in the pool.
let runQuery :: SqlPersistT IO a -> IO a
runQuery cmd = withMVarMasked lock $ \backend ->
observe $ runSqlConn cmd backend
autoMigrationResult <-
runQuery cmd = withResource pool $ \(backend, _) ->
observe $ mask_ $ runSqlConn cmd backend

autoMigrationResult <- withResource pool $ \(backend, connection) -> do
withForeignKeysDisabled tr connection
$ runQuery (runMigrationQuiet autoMigration)
$ mask_ (runSqlConn (runMigrationQuiet autoMigration) backend)
& tryJust (matchMigrationError @PersistException)
& tryJust (matchMigrationError @SqliteException)
& fmap join
traceWith tr $ MsgMigrations $ fmap length autoMigrationResult
let ctx = SqliteContext unsafeBackend runQuery fp tr
let ctx = SqliteContext pool runQuery fp
case autoMigrationResult of
Left e -> do
destroyDBLayer ctx
destroyDBLayer tr ctx
pure $ Left e
Right _ -> pure $ Right ctx

-- | Finalize database statements and close the database connection.
--
-- If the database connection is still in use, it will retry for up to a minute,
-- to let other threads finish up.
--
-- This function is idempotent: if the database connection has already been
-- closed, calling this function will exit without doing anything.
destroySqliteBackend
:: Tracer IO DBLog
-> SqlBackend
-> Maybe FilePath
-> IO ()
destroySqliteBackend tr sqlBackend dbFile = do
traceWith tr (MsgCloseSingleConnection dbFile)
recovering pol (mkRetryHandler isBusy) (const $ close' sqlBackend)
& handleIf isAlreadyClosed
(traceWith tr . MsgIsAlreadyClosed . showT)
& handleIf statementAlreadyFinalized
(traceWith tr . MsgStatementAlreadyFinalized . showT)
where
isAlreadyClosed = \case
-- Thrown when an attempt is made to close a connection that is already
-- in the closed state:
Sqlite.SqliteException Sqlite.ErrorMisuse _ _ -> True
Sqlite.SqliteException {} -> False

statementAlreadyFinalized = \case
-- Thrown
Persist.StatementAlreadyFinalized{} -> True
Persist.Couldn'tGetSQLConnection{} -> False

showT :: Show a => a -> Text
showT = T.pack . show

isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy)
pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms)
ms = 1000 -- microseconds in a millisecond


-- | Run the given task in a context where foreign key constraints are
-- /temporarily disabled/, before re-enabling them.
--
Expand Down Expand Up @@ -345,19 +360,38 @@ instance MatchMigrationError SqliteException where
newtype ManualMigration = ManualMigration
{ executeManualMigration :: Sqlite.Connection -> IO () }

createSqliteBackend
createSqlitePool
:: Tracer IO DBLog
-> Maybe FilePath
-> ManualMigration
-> LogFunc
-> IO (SqlBackend, Sqlite.Connection)
createSqliteBackend trace fp migration logFunc = do
-> IO (Pool (SqlBackend, Sqlite.Connection))
createSqlitePool tr fp migration logFunc = do
let connStr = sqliteConnStr fp
traceWith trace $ MsgConnStr connStr
conn <- Sqlite.open connStr
executeManualMigration migration conn
backend <- wrapConnectionInfo (mkSqliteConnectionInfo connStr) conn logFunc
pure (backend, conn)
traceWith tr $ MsgConnStr connStr

let createConnection = do
let info = mkSqliteConnectionInfo connStr
conn <- Sqlite.open connStr
executeManualMigration migration conn
backend <- wrapConnectionInfo info conn logFunc
pure (backend, conn)

let destroyConnection = \(backend, _) -> do
destroySqliteBackend tr backend fp

createPool
createConnection
destroyConnection
numberOfStripes
timeToLive
maximumConnections
where
numberOfStripes = 1
timeToLive = 600 :: NominalDiffTime
-- When running in :memory:, we want a single connection that does not get
-- cleaned up.
maximumConnections = maybe 1 (const 10) fp

sqliteConnStr :: Maybe FilePath -> Text
sqliteConnStr = maybe ":memory:" T.pack
Expand All @@ -371,7 +405,8 @@ data DBLog
| MsgQuery Text Severity
| MsgRun BracketLog
| MsgConnStr Text
| MsgClosing (Maybe FilePath)
| MsgCloseSingleConnection (Maybe FilePath)
| MsgDestroyConnectionPool (Maybe FilePath)
| MsgWillOpenDB (Maybe FilePath)
| MsgDatabaseReset
| MsgIsAlreadyClosed Text
Expand Down Expand Up @@ -446,7 +481,8 @@ instance HasSeverityAnnotation DBLog where
MsgQuery _ sev -> sev
MsgRun _ -> Debug
MsgConnStr _ -> Debug
MsgClosing _ -> Debug
MsgCloseSingleConnection _ -> Debug
MsgDestroyConnectionPool _ -> Debug
MsgWillOpenDB _ -> Info
MsgDatabaseReset -> Notice
MsgIsAlreadyClosed _ -> Warning
Expand All @@ -473,7 +509,10 @@ instance ToText DBLog where
MsgRun b -> "Running database action - " <> toText b
MsgWillOpenDB fp -> "Will open db at " <> (maybe "in-memory" T.pack fp)
MsgConnStr connStr -> "Using connection string: " <> connStr
MsgClosing fp -> "Closing database ("+|fromMaybe "in-memory" fp|+")"
MsgCloseSingleConnection fp ->
"Closing single database connection ("+|fromMaybe "in-memory" fp|+")"
MsgDestroyConnectionPool fp ->
"Destroy database connection pool ("+|fromMaybe "in-memory" fp|+")"
MsgDatabaseReset ->
"Non backward compatible database found. Removing old database \
\and re-creating it from scratch. Ignore the previous error."
Expand Down
8 changes: 4 additions & 4 deletions lib/core/src/Cardano/Pool/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ withDecoratedDBLayer
-> (DBLayer IO -> IO a)
-- ^ Action to run.
-> IO a
withDecoratedDBLayer dbDecorator trace fp ti action = do
traceWith trace (MsgGeneric $ MsgWillOpenDB fp)
withDecoratedDBLayer dbDecorator tr fp ti action = do
traceWith tr (MsgGeneric $ MsgWillOpenDB fp)
bracket before after (action . decorateDBLayer dbDecorator . snd)
where
before = newDBLayer trace fp ti
after = destroyDBLayer . fst
before = newDBLayer tr fp ti
after = destroyDBLayer (contramap MsgGeneric tr) . fst

-- | Sets up a connection to the SQLite database.
--
Expand Down
6 changes: 3 additions & 3 deletions lib/core/src/Cardano/Wallet/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ withDBLayer
-> ((SqliteContext, DBLayer IO s k) -> IO a)
-- ^ Action to run.
-> IO a
withDBLayer trace defaultFieldValues mDatabaseDir ti =
withDBLayer tr defaultFieldValues mDatabaseDir ti =
bracket before after
where
before = newDBLayer trace defaultFieldValues mDatabaseDir ti
after = destroyDBLayer . fst
before = newDBLayer tr defaultFieldValues mDatabaseDir ti
after = destroyDBLayer tr . fst

-- | Instantiate a 'DBFactory' from a given directory
newDBFactory
Expand Down
6 changes: 3 additions & 3 deletions lib/core/test/bench/db/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import Cardano.BM.Data.Severity
import Cardano.BM.Data.Trace
( Trace )
import Cardano.BM.Data.Tracer
( Tracer, filterSeverity )
( Tracer, filterSeverity, nullTracer )
import Cardano.BM.Setup
( setupTrace_, shutdown )
import Cardano.DB.Sqlite
Expand Down Expand Up @@ -628,7 +628,7 @@ defaultFieldValues = DefaultFieldValues

cleanupDB :: (FilePath, SqliteContext, DBLayer IO s k) -> IO ()
cleanupDB (db, ctx, _) = do
handle (\SqliteException{} -> pure ()) $ destroyDBLayer ctx
handle (\SqliteException{} -> pure ()) $ destroyDBLayer nullTracer ctx
mapM_ remove [db, db <> "-shm", db <> "-wal"]
where
remove f = doesFileExist f >>= \case
Expand Down Expand Up @@ -722,7 +722,7 @@ benchDiskSize :: Tracer IO DBLog -> (DBLayerBench -> IO ()) -> IO ()
benchDiskSize tr action = bracket (setupDB tr) cleanupDB $ \(f, ctx, db) -> do
action db
mapM_ (printFileSize "") [f, f <> "-shm", f <> "-wal"]
destroyDBLayer ctx
destroyDBLayer nullTracer ctx
printFileSize " (closed)" f
putStrLn ""
where
Expand Down
Loading

0 comments on commit a026785

Please sign in to comment.