From 8d1fa23e7db24034cf10c966344d1612c6bce8dc Mon Sep 17 00:00:00 2001 From: KtorZ Date: Thu, 31 Dec 2020 16:43:11 +0100 Subject: [PATCH 1/3] use a single-striped connection pool for each database layer It is a rather common practice to use a pool of database connection when dealing with databases. So far, we've been using a single shared connection per wallet worker with, in front of each connection a lock preventing concurrent access to the database. The lock is only necessary because of the way persistent handles query statements internally, in principle, SQLite handles concurrent database accesses just well. For basic wallets, this is a relatively useless change. But for larger wallets like those manipulated by exchanges, we've observed very slow response time due to concurrent access of the database lock. Indeed, some requests may grab the lock for 10 or 20 seconds, preventing any requests from going throug. However, most requests are read-only requests and could be executed in parallel, at the discretion of the SQLite engine. I hope that the introduction of a connection pool will improve the overall experience for large wallets by better serving concurrent requests on the database. Finger crossed. --- lib/core/cardano-wallet-core.cabal | 1 + lib/core/src/Cardano/DB/Sqlite.hs | 163 +++++++++++------- lib/core/src/Cardano/Pool/DB/Sqlite.hs | 8 +- lib/core/src/Cardano/Wallet/DB/Sqlite.hs | 6 +- lib/core/test/bench/db/Main.hs | 6 +- .../test/unit/Cardano/Wallet/DB/SqliteSpec.hs | 23 ++- lib/shelley/bench/Restore.hs | 2 +- 7 files changed, 124 insertions(+), 85 deletions(-) diff --git a/lib/core/cardano-wallet-core.cabal b/lib/core/cardano-wallet-core.cabal index 8ebd1a87726..747f1de568d 100644 --- a/lib/core/cardano-wallet-core.cabal +++ b/lib/core/cardano-wallet-core.cabal @@ -82,6 +82,7 @@ library , quiet , random , random-shuffle + , resource-pool , retry , safe , scientific diff --git a/lib/core/src/Cardano/DB/Sqlite.hs b/lib/core/src/Cardano/DB/Sqlite.hs index 0bfd30ef05c..926f4b72b26 100644 --- a/lib/core/src/Cardano/DB/Sqlite.hs +++ b/lib/core/src/Cardano/DB/Sqlite.hs @@ -73,12 +73,16 @@ import Data.List.Split ( chunksOf ) import Data.Maybe ( fromMaybe ) +import Data.Pool + ( Pool, createPool, destroyAllResources, withResource ) import Data.Proxy ( Proxy (..) ) import Data.Text ( Text ) import Data.Text.Class ( ToText (..) ) +import Data.Time.Clock + ( NominalDiffTime ) import Database.Persist.Sql ( DBName (..) , EntityField @@ -108,9 +112,7 @@ import System.Log.FastLogger import UnliftIO.Compat ( handleIf, mkRetryHandler ) import UnliftIO.Exception - ( Exception, bracket_, handleJust, tryJust ) -import UnliftIO.MVar - ( newMVar, withMVarMasked ) + ( Exception, bracket_, handleJust, mask_, tryJust ) import qualified Data.Aeson as Aeson import qualified Data.ByteString.Char8 as B8 @@ -125,14 +127,12 @@ import qualified Database.Sqlite as Sqlite -- | Context for the SQLite 'DBLayer'. data SqliteContext = SqliteContext - { getSqlBackend :: SqlBackend + { connectionPool :: Pool (SqlBackend, Sqlite.Connection) -- ^ A handle to the Persistent SQL backend. , runQuery :: forall a. SqlPersistT IO a -> IO a -- ^ 'safely' run a query with logging and lock-protection , dbFile :: Maybe FilePath -- ^ The actual database file, if any. If none, runs in-memory - , trace :: Tracer IO DBLog - -- ^ A 'Tracer' for logging } -- | Error type for when migrations go wrong after opening a database. @@ -167,44 +167,16 @@ queryLogFunc tr _loc _source level str = traceWith tr (MsgQuery msg sev) handleConstraint :: MonadUnliftIO m => e -> m a -> m (Either e a) handleConstraint e = handleJust select handler . fmap Right where - select (SqliteException ErrorConstraint _ _) = Just () - select _ = Nothing - handler = const . pure . Left $ e + select (SqliteException ErrorConstraint _ _) = Just () + select _ = Nothing + handler = const . pure . Left $ e --- | Finalize database statements and close the database connection. --- --- If the database connection is still in use, it will retry for up to a minute, --- to let other threads finish up. +-- | Free all allocated database connections. See also 'destroySqliteBackend' -- --- This function is idempotent: if the database connection has already been --- closed, calling this function will exit without doing anything. --- -destroyDBLayer :: SqliteContext -> IO () -destroyDBLayer (SqliteContext {getSqlBackend, trace, dbFile}) = do - traceWith trace (MsgClosing dbFile) - recovering pol (mkRetryHandler isBusy) (const $ close' getSqlBackend) - & handleIf isAlreadyClosed - (traceWith trace . MsgIsAlreadyClosed . showT) - & handleIf statementAlreadyFinalized - (traceWith trace . MsgStatementAlreadyFinalized . showT) - where - isAlreadyClosed = \case - -- Thrown when an attempt is made to close a connection that is already - -- in the closed state: - Sqlite.SqliteException Sqlite.ErrorMisuse _ _ -> True - Sqlite.SqliteException {} -> False - - statementAlreadyFinalized = \case - -- Thrown - Persist.StatementAlreadyFinalized{} -> True - Persist.Couldn'tGetSQLConnection{} -> False - - showT :: Show a => a -> Text - showT = T.pack . show - - isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy) - pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms) - ms = 1000 -- microseconds in a millisecond +destroyDBLayer :: Tracer IO DBLog -> SqliteContext -> IO () +destroyDBLayer tr SqliteContext{connectionPool,dbFile} = do + traceWith tr (MsgDestroyConnectionPool dbFile) + destroyAllResources connectionPool {------------------------------------------------------------------------------- Internal / Database Setup @@ -219,32 +191,75 @@ startSqliteBackend -> Maybe FilePath -> IO (Either MigrationError SqliteContext) startSqliteBackend manualMigration autoMigration tr fp = do - (unsafeBackend, connection) <- - createSqliteBackend tr fp manualMigration (queryLogFunc tr) - lock <- newMVar unsafeBackend + pool <- createSqlitePool tr fp manualMigration (queryLogFunc tr) let observe :: IO a -> IO a observe = bracketTracer (contramap MsgRun tr) -- runSqlConn is guarded with a lock because it's not threadsafe in general. -- It is also masked, so that the SqlBackend state is not corrupted if a -- thread gets cancelled while running a query. -- See: https://github.com/yesodweb/persistent/issues/981 + -- + -- Note that `withResource` does already mask async exception but only for + -- dealing with the pool resource acquisition. The action is then ran + -- unmasked with the acquired resource. If an asynchronous exception occurs, + -- the resource is NOT placed back in the pool. let runQuery :: SqlPersistT IO a -> IO a - runQuery cmd = withMVarMasked lock $ \backend -> - observe $ runSqlConn cmd backend - autoMigrationResult <- + runQuery cmd = withResource pool $ \(backend, _) -> + observe $ mask_ $ runSqlConn cmd backend + + autoMigrationResult <- withResource pool $ \(backend, connection) -> do withForeignKeysDisabled tr connection - $ runQuery (runMigrationQuiet autoMigration) + $ mask_ (runSqlConn (runMigrationQuiet autoMigration) backend) & tryJust (matchMigrationError @PersistException) & tryJust (matchMigrationError @SqliteException) & fmap join traceWith tr $ MsgMigrations $ fmap length autoMigrationResult - let ctx = SqliteContext unsafeBackend runQuery fp tr + let ctx = SqliteContext pool runQuery fp case autoMigrationResult of Left e -> do - destroyDBLayer ctx + destroyDBLayer tr ctx pure $ Left e Right _ -> pure $ Right ctx +-- | Finalize database statements and close the database connection. +-- +-- If the database connection is still in use, it will retry for up to a minute, +-- to let other threads finish up. +-- +-- This function is idempotent: if the database connection has already been +-- closed, calling this function will exit without doing anything. +destroySqliteBackend + :: Tracer IO DBLog + -> SqlBackend + -> Maybe FilePath + -> IO () +destroySqliteBackend tr sqlBackend dbFile = do + traceWith tr (MsgCloseSingleConnection dbFile) + recovering pol (mkRetryHandler isBusy) (const $ close' sqlBackend) + & handleIf isAlreadyClosed + (traceWith tr . MsgIsAlreadyClosed . showT) + & handleIf statementAlreadyFinalized + (traceWith tr . MsgStatementAlreadyFinalized . showT) + where + isAlreadyClosed = \case + -- Thrown when an attempt is made to close a connection that is already + -- in the closed state: + Sqlite.SqliteException Sqlite.ErrorMisuse _ _ -> True + Sqlite.SqliteException {} -> False + + statementAlreadyFinalized = \case + -- Thrown + Persist.StatementAlreadyFinalized{} -> True + Persist.Couldn'tGetSQLConnection{} -> False + + showT :: Show a => a -> Text + showT = T.pack . show + + isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy) + pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms) + ms = 1000 -- microseconds in a millisecond + + -- | Run the given task in a context where foreign key constraints are -- /temporarily disabled/, before re-enabling them. -- @@ -345,19 +360,38 @@ instance MatchMigrationError SqliteException where newtype ManualMigration = ManualMigration { executeManualMigration :: Sqlite.Connection -> IO () } -createSqliteBackend +createSqlitePool :: Tracer IO DBLog -> Maybe FilePath -> ManualMigration -> LogFunc - -> IO (SqlBackend, Sqlite.Connection) -createSqliteBackend trace fp migration logFunc = do + -> IO (Pool (SqlBackend, Sqlite.Connection)) +createSqlitePool tr fp migration logFunc = do let connStr = sqliteConnStr fp - traceWith trace $ MsgConnStr connStr - conn <- Sqlite.open connStr - executeManualMigration migration conn - backend <- wrapConnectionInfo (mkSqliteConnectionInfo connStr) conn logFunc - pure (backend, conn) + traceWith tr $ MsgConnStr connStr + + let createConnection = do + let info = mkSqliteConnectionInfo connStr + conn <- Sqlite.open connStr + executeManualMigration migration conn + backend <- wrapConnectionInfo info conn logFunc + pure (backend, conn) + + let destroyConnection = \(backend, _) -> do + destroySqliteBackend tr backend fp + + createPool + createConnection + destroyConnection + numberOfStripes + timeToLive + maximumConnections + where + numberOfStripes = 1 + timeToLive = 600 :: NominalDiffTime + -- When running in :memory:, we want a single connection that does not get + -- cleaned up. + maximumConnections = maybe 1 (const 10) fp sqliteConnStr :: Maybe FilePath -> Text sqliteConnStr = maybe ":memory:" T.pack @@ -371,7 +405,8 @@ data DBLog | MsgQuery Text Severity | MsgRun BracketLog | MsgConnStr Text - | MsgClosing (Maybe FilePath) + | MsgCloseSingleConnection (Maybe FilePath) + | MsgDestroyConnectionPool (Maybe FilePath) | MsgWillOpenDB (Maybe FilePath) | MsgDatabaseReset | MsgIsAlreadyClosed Text @@ -446,7 +481,8 @@ instance HasSeverityAnnotation DBLog where MsgQuery _ sev -> sev MsgRun _ -> Debug MsgConnStr _ -> Debug - MsgClosing _ -> Debug + MsgCloseSingleConnection _ -> Debug + MsgDestroyConnectionPool _ -> Debug MsgWillOpenDB _ -> Info MsgDatabaseReset -> Notice MsgIsAlreadyClosed _ -> Warning @@ -473,7 +509,10 @@ instance ToText DBLog where MsgRun b -> "Running database action - " <> toText b MsgWillOpenDB fp -> "Will open db at " <> (maybe "in-memory" T.pack fp) MsgConnStr connStr -> "Using connection string: " <> connStr - MsgClosing fp -> "Closing database ("+|fromMaybe "in-memory" fp|+")" + MsgCloseSingleConnection fp -> + "Closing single database connection ("+|fromMaybe "in-memory" fp|+")" + MsgDestroyConnectionPool fp -> + "Destroy database connection pool ("+|fromMaybe "in-memory" fp|+")" MsgDatabaseReset -> "Non backward compatible database found. Removing old database \ \and re-creating it from scratch. Ignore the previous error." diff --git a/lib/core/src/Cardano/Pool/DB/Sqlite.hs b/lib/core/src/Cardano/Pool/DB/Sqlite.hs index 41adfd2a6c5..9236932fdaa 100644 --- a/lib/core/src/Cardano/Pool/DB/Sqlite.hs +++ b/lib/core/src/Cardano/Pool/DB/Sqlite.hs @@ -200,12 +200,12 @@ withDecoratedDBLayer -> (DBLayer IO -> IO a) -- ^ Action to run. -> IO a -withDecoratedDBLayer dbDecorator trace fp ti action = do - traceWith trace (MsgGeneric $ MsgWillOpenDB fp) +withDecoratedDBLayer dbDecorator tr fp ti action = do + traceWith tr (MsgGeneric $ MsgWillOpenDB fp) bracket before after (action . decorateDBLayer dbDecorator . snd) where - before = newDBLayer trace fp ti - after = destroyDBLayer . fst + before = newDBLayer tr fp ti + after = destroyDBLayer (contramap MsgGeneric tr) . fst -- | Sets up a connection to the SQLite database. -- diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs index 2a18a06ee9e..b40ad27560f 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs @@ -253,11 +253,11 @@ withDBLayer -> ((SqliteContext, DBLayer IO s k) -> IO a) -- ^ Action to run. -> IO a -withDBLayer trace defaultFieldValues mDatabaseDir ti = +withDBLayer tr defaultFieldValues mDatabaseDir ti = bracket before after where - before = newDBLayer trace defaultFieldValues mDatabaseDir ti - after = destroyDBLayer . fst + before = newDBLayer tr defaultFieldValues mDatabaseDir ti + after = destroyDBLayer tr . fst -- | Instantiate a 'DBFactory' from a given directory newDBFactory diff --git a/lib/core/test/bench/db/Main.hs b/lib/core/test/bench/db/Main.hs index a8d91c6427f..2d9aee36631 100644 --- a/lib/core/test/bench/db/Main.hs +++ b/lib/core/test/bench/db/Main.hs @@ -49,7 +49,7 @@ import Cardano.BM.Data.Severity import Cardano.BM.Data.Trace ( Trace ) import Cardano.BM.Data.Tracer - ( Tracer, filterSeverity ) + ( Tracer, filterSeverity, nullTracer ) import Cardano.BM.Setup ( setupTrace_, shutdown ) import Cardano.DB.Sqlite @@ -628,7 +628,7 @@ defaultFieldValues = DefaultFieldValues cleanupDB :: (FilePath, SqliteContext, DBLayer IO s k) -> IO () cleanupDB (db, ctx, _) = do - handle (\SqliteException{} -> pure ()) $ destroyDBLayer ctx + handle (\SqliteException{} -> pure ()) $ destroyDBLayer nullTracer ctx mapM_ remove [db, db <> "-shm", db <> "-wal"] where remove f = doesFileExist f >>= \case @@ -722,7 +722,7 @@ benchDiskSize :: Tracer IO DBLog -> (DBLayerBench -> IO ()) -> IO () benchDiskSize tr action = bracket (setupDB tr) cleanupDB $ \(f, ctx, db) -> do action db mapM_ (printFileSize "") [f, f <> "-shm", f <> "-wal"] - destroyDBLayer ctx + destroyDBLayer nullTracer ctx printFileSize " (closed)" f putStrLn "" where diff --git a/lib/core/test/unit/Cardano/Wallet/DB/SqliteSpec.hs b/lib/core/test/unit/Cardano/Wallet/DB/SqliteSpec.hs index f8b5670df5c..4599c5dc221 100644 --- a/lib/core/test/unit/Cardano/Wallet/DB/SqliteSpec.hs +++ b/lib/core/test/unit/Cardano/Wallet/DB/SqliteSpec.hs @@ -799,7 +799,7 @@ fileModeSpec = do replicateM_ 25 $ do db <- Just <$> temporaryDBFile (ctx, _) <- newDBLayer' @(SeqState 'Mainnet ShelleyKey) db - destroyDBLayer ctx + destroyDBLayer nullTracer ctx describe "DBFactory" $ do let ti = dummyTimeInterpreter @@ -879,7 +879,7 @@ fileModeSpec = do (ctx, DBLayer{..}) <- newDBLayer' (Just f) atomically $ unsafeRunExceptT $ initializeWallet testPk testCp testMetadata mempty gp pp - destroyDBLayer ctx + destroyDBLayer nullTracer ctx testOpeningCleaning f listWallets' [testPk] [] it "create and get meta works" $ \f -> do @@ -889,7 +889,7 @@ fileModeSpec = do { passphraseInfo = Just $ WalletPassphraseInfo now EncryptWithPBKDF2 } atomically $ unsafeRunExceptT $ initializeWallet testPk testCp meta mempty gp pp - destroyDBLayer ctx + destroyDBLayer nullTracer ctx testOpeningCleaning f (`readWalletMeta'` testPk) (Just meta) Nothing it "create and get private key" $ \f-> do @@ -897,7 +897,7 @@ fileModeSpec = do atomically $ unsafeRunExceptT $ initializeWallet testPk testCp testMetadata mempty gp pp (k, h) <- unsafeRunExceptT $ attachPrivateKey db testPk - destroyDBLayer ctx + destroyDBLayer nullTracer ctx testOpeningCleaning f (`readPrivateKey'` testPk) (Just (k, h)) Nothing it "put and read tx history (Ascending)" $ \f -> do @@ -906,7 +906,7 @@ fileModeSpec = do unsafeRunExceptT $ initializeWallet testPk testCp testMetadata mempty gp pp unsafeRunExceptT $ putTxHistory testPk testTxs - destroyDBLayer ctx + destroyDBLayer nullTracer ctx testOpeningCleaning f (\db' -> readTxHistory' db' testPk Ascending wholeRange Nothing) @@ -919,7 +919,7 @@ fileModeSpec = do unsafeRunExceptT $ initializeWallet testPk testCp testMetadata mempty gp pp unsafeRunExceptT $ putTxHistory testPk testTxs - destroyDBLayer ctx + destroyDBLayer nullTracer ctx testOpeningCleaning f (\db' -> readTxHistory' db' testPk Descending wholeRange Nothing) @@ -932,7 +932,7 @@ fileModeSpec = do unsafeRunExceptT $ initializeWallet testPk testCp testMetadata mempty gp pp unsafeRunExceptT $ putCheckpoint testPk testCp - destroyDBLayer ctx + destroyDBLayer nullTracer ctx testOpeningCleaning f (`readCheckpoint'` testPk) (Just testCp) Nothing describe "Golden rollback scenarios" $ do @@ -1029,9 +1029,9 @@ prop_randomOpChunks (KeyValPairs pairs) = cutRandomly pairs >>= mapM_ (\chunk -> do (ctx, db) <- newDBLayer' (Just filepath) forM_ chunk (insertPair db) - destroyDBLayer ctx) + destroyDBLayer nullTracer ctx) dbF `shouldBeConsistentWith` dbM - destroyDBLayer ctxF *> destroyDBLayer ctxM + destroyDBLayer nullTracer ctxF *> destroyDBLayer nullTracer ctxM insertPair :: DBLayer IO s k @@ -1077,11 +1077,10 @@ testOpeningCleaning filepath call expectedAfterOpen expectedAfterClean = do call db1 `shouldReturn` expectedAfterOpen _ <- cleanDB db1 call db1 `shouldReturn` expectedAfterClean - destroyDBLayer ctx1 + destroyDBLayer nullTracer ctx1 (ctx2,db2) <- newDBLayer' (Just filepath) call db2 `shouldReturn` expectedAfterClean - destroyDBLayer ctx2 - + destroyDBLayer nullTracer ctx2 -- | Run a test action inside withDBLayer, then check assertions. withTestDBFile diff --git a/lib/shelley/bench/Restore.hs b/lib/shelley/bench/Restore.hs index e2fa866cc0d..000693400e0 100644 --- a/lib/shelley/bench/Restore.hs +++ b/lib/shelley/bench/Restore.hs @@ -675,7 +675,7 @@ withBenchDBLayer withBenchDBLayer ti action = withSystemTempFile "bench.db" $ \dbFile _ -> do let before = newDBLayer nullTracer migrationDefaultValues (Just dbFile) ti - let after = destroyDBLayer . fst + let after = destroyDBLayer nullTracer . fst bracket before after $ \(_ctx, db) -> action db where migrationDefaultValues = Sqlite.DefaultFieldValues From e78f0ef89bd69780382d46f02ac62f62fe569d82 Mon Sep 17 00:00:00 2001 From: KtorZ Date: Tue, 5 Jan 2021 09:23:01 +0100 Subject: [PATCH 2/3] Properly handle 'SQLITE_BUSY' in the context of a connection pool I ran into quite a few issues with the integration tests since the unliftio merge and rebase (I think, as I am pretty I did observe unit and integration tests doing just fine with the resource pool at least once). I've been investigating this for most of the day, and found a few interesting cases: (a) SQLite may return 'SQLITE_BUSY' on pretty much any requests if two concurrent write queries hit the engine; though we currently only catch this kind of exception when we try closing the database so I generalized a bit our error handling here. (b) It seems that calling destroyAllResources from resource-pool does not prevent new threads from acquiring new resources. And there's no way with the resource-pool library itself to prevent the creation of new resources after a certain point. So it may happen that while the database layer is being destroyed, new database connections are created and start causing conflicts between each others. --- lib/core/cardano-wallet-core.cabal | 1 + lib/core/src/Cardano/DB/Sqlite.hs | 181 ++++++++++++++++------- lib/core/src/Cardano/Pool/DB/Sqlite.hs | 21 +-- lib/core/src/Cardano/Wallet/DB/Sqlite.hs | 60 ++++---- 4 files changed, 163 insertions(+), 100 deletions(-) diff --git a/lib/core/cardano-wallet-core.cabal b/lib/core/cardano-wallet-core.cabal index 747f1de568d..ef7fc4c5647 100644 --- a/lib/core/cardano-wallet-core.cabal +++ b/lib/core/cardano-wallet-core.cabal @@ -92,6 +92,7 @@ library , servant-server , split , statistics + , stm , streaming-commons , strict-non-empty-containers , string-interpolate diff --git a/lib/core/src/Cardano/DB/Sqlite.hs b/lib/core/src/Cardano/DB/Sqlite.hs index 926f4b72b26..21550dab17b 100644 --- a/lib/core/src/Cardano/DB/Sqlite.hs +++ b/lib/core/src/Cardano/DB/Sqlite.hs @@ -9,6 +9,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} @@ -28,7 +29,7 @@ module Cardano.DB.Sqlite , dbChunked' , destroyDBLayer , handleConstraint - , startSqliteBackend + , newSqliteContext , unsafeRunQuery -- * Manual Migration @@ -53,6 +54,8 @@ import Cardano.DB.Sqlite.Delete ( DeleteSqliteDatabaseLog ) import Cardano.Wallet.Logging ( BracketLog, bracketTracer ) +import Control.Concurrent.STM.TVar + ( TVar, newTVarIO, readTVarIO, writeTVar ) import Control.Monad ( join, mapM_, when ) import Control.Monad.IO.Unlift @@ -60,13 +63,19 @@ import Control.Monad.IO.Unlift import Control.Monad.Logger ( LogLevel (..) ) import Control.Retry - ( constantDelay, limitRetriesByCumulativeDelay, recovering ) + ( RetryStatus (..) + , constantDelay + , limitRetriesByCumulativeDelay + , recovering + ) import Control.Tracer ( Tracer, contramap, traceWith ) import Data.Aeson ( ToJSON (..) ) import Data.Function ( (&) ) +import Data.Functor + ( ($>), (<&>) ) import Data.List ( isInfixOf ) import Data.List.Split @@ -104,7 +113,7 @@ import Database.Persist.Sqlite import Database.Sqlite ( Error (ErrorConstraint), SqliteException (SqliteException) ) import Fmt - ( fmt, (+|), (+||), (|+), (||+) ) + ( fmt, ordinalF, (+|), (+||), (|+), (||+) ) import GHC.Generics ( Generic ) import System.Log.FastLogger @@ -112,8 +121,9 @@ import System.Log.FastLogger import UnliftIO.Compat ( handleIf, mkRetryHandler ) import UnliftIO.Exception - ( Exception, bracket_, handleJust, mask_, tryJust ) + ( Exception, bracket_, handleJust, mask_, throwIO, tryJust ) +import qualified Control.Concurrent.STM as STM import qualified Data.Aeson as Aeson import qualified Data.ByteString.Char8 as B8 import qualified Data.Text as T @@ -129,12 +139,24 @@ import qualified Database.Sqlite as Sqlite data SqliteContext = SqliteContext { connectionPool :: Pool (SqlBackend, Sqlite.Connection) -- ^ A handle to the Persistent SQL backend. + , isDatabaseActive :: TVar Bool + -- ^ A mutable reference to know whether the database is 'active'. This is + -- useful to prevent new requests from being accepted when we're trying to + -- shutdown the database. It is actually crucial with the connection pool + -- since, even though we can purge the pool of all existing resources, we + -- can't easily prevent the creation of new resources. This TVar must + -- therefore be used to guard any call to 'withResource'; if 'False', then + -- 'withResource' mustn't be called. , runQuery :: forall a. SqlPersistT IO a -> IO a -- ^ 'safely' run a query with logging and lock-protection , dbFile :: Maybe FilePath -- ^ The actual database file, if any. If none, runs in-memory } +data DatabaseIsShuttingDownError = DatabaseIsShuttingDownError deriving Show + +instance Exception DatabaseIsShuttingDownError + -- | Error type for when migrations go wrong after opening a database. newtype MigrationError = MigrationError { getMigrationErrorMessage :: Text } @@ -174,7 +196,8 @@ handleConstraint e = handleJust select handler . fmap Right -- | Free all allocated database connections. See also 'destroySqliteBackend' -- destroyDBLayer :: Tracer IO DBLog -> SqliteContext -> IO () -destroyDBLayer tr SqliteContext{connectionPool,dbFile} = do +destroyDBLayer tr SqliteContext{connectionPool,isDatabaseActive,dbFile} = do + STM.atomically $ writeTVar isDatabaseActive False traceWith tr (MsgDestroyConnectionPool dbFile) destroyAllResources connectionPool @@ -182,44 +205,45 @@ destroyDBLayer tr SqliteContext{connectionPool,dbFile} = do Internal / Database Setup -------------------------------------------------------------------------------} --- | Opens the SQLite database connection, sets up query logging and timing, +-- | Opens the SQLite database connection pool, sets up query logging and timing, -- runs schema migrations if necessary. -startSqliteBackend - :: ManualMigration +newSqliteContext + :: [ManualMigration] -> Migration -> Tracer IO DBLog -> Maybe FilePath -> IO (Either MigrationError SqliteContext) -startSqliteBackend manualMigration autoMigration tr fp = do - pool <- createSqlitePool tr fp manualMigration (queryLogFunc tr) - let observe :: IO a -> IO a - observe = bracketTracer (contramap MsgRun tr) - -- runSqlConn is guarded with a lock because it's not threadsafe in general. - -- It is also masked, so that the SqlBackend state is not corrupted if a - -- thread gets cancelled while running a query. - -- See: https://github.com/yesodweb/persistent/issues/981 - -- - -- Note that `withResource` does already mask async exception but only for - -- dealing with the pool resource acquisition. The action is then ran - -- unmasked with the acquired resource. If an asynchronous exception occurs, - -- the resource is NOT placed back in the pool. - let runQuery :: SqlPersistT IO a -> IO a - runQuery cmd = withResource pool $ \(backend, _) -> - observe $ mask_ $ runSqlConn cmd backend - - autoMigrationResult <- withResource pool $ \(backend, connection) -> do - withForeignKeysDisabled tr connection - $ mask_ (runSqlConn (runMigrationQuiet autoMigration) backend) - & tryJust (matchMigrationError @PersistException) - & tryJust (matchMigrationError @SqliteException) - & fmap join - traceWith tr $ MsgMigrations $ fmap length autoMigrationResult - let ctx = SqliteContext pool runQuery fp - case autoMigrationResult of - Left e -> do - destroyDBLayer tr ctx - pure $ Left e - Right _ -> pure $ Right ctx +newSqliteContext manualMigrations autoMigration tr dbFile = do + isDatabaseActive <- newTVarIO True + createSqlitePool tr dbFile manualMigrations autoMigration <&> \case + Left e -> Left e + Right connectionPool -> + let observe :: IO a -> IO a + observe = bracketTracer (contramap MsgRun tr) + + -- runSqlConn is guarded with a lock because it's not threadsafe in + -- general.It is also masked, so that the SqlBackend state is not + -- corrupted if a thread gets cancelled while running a query. + -- See: https://github.com/yesodweb/persistent/issues/981 + -- + -- Note that `withResource` does already mask async exception but + -- only for dealing with the pool resource acquisition. The action + -- is then ran unmasked with the acquired resource. If an + -- asynchronous exception occurs (or actually any exception), the + -- resource is NOT placed back in the pool. + runQuery :: SqlPersistT IO a -> IO a + runQuery cmd = do + readTVarIO isDatabaseActive >>= \case + False -> throwIO DatabaseIsShuttingDownError + True -> withResource connectionPool $ + mask_ . observe . retryOnBusy tr . runSqlConn cmd . fst + + in Right $ SqliteContext + { connectionPool + , isDatabaseActive + , runQuery + , dbFile + } -- | Finalize database statements and close the database connection. -- @@ -235,7 +259,7 @@ destroySqliteBackend -> IO () destroySqliteBackend tr sqlBackend dbFile = do traceWith tr (MsgCloseSingleConnection dbFile) - recovering pol (mkRetryHandler isBusy) (const $ close' sqlBackend) + retryOnBusy tr (close' sqlBackend) & handleIf isAlreadyClosed (traceWith tr . MsgIsAlreadyClosed . showT) & handleIf statementAlreadyFinalized @@ -255,11 +279,33 @@ destroySqliteBackend tr sqlBackend dbFile = do showT :: Show a => a -> Text showT = T.pack . show +-- | Retry an action if the database yields an 'SQLITE_BUSY' error. +-- +-- From +-- +-- The SQLITE_BUSY result code indicates that the database file could not be +-- written (or in some cases read) because of concurrent activity by some +-- other database connection, usually a database connection in a separate +-- process. +-- +-- For example, if process A is in the middle of a large write transaction +-- and at the same time process B attempts to start a new write transaction, +-- process B will get back an SQLITE_BUSY result because SQLite only supports +-- one writer at a time. Process B will need to wait for process A to finish +-- its transaction before starting a new transaction. The sqlite3_busy_timeout() +-- and sqlite3_busy_handler() interfaces and the busy_timeout pragma are +-- available to process B to help it deal with SQLITE_BUSY errors. +-- +retryOnBusy :: Tracer IO DBLog -> IO a -> IO a +retryOnBusy tr action = + recovering policy (mkRetryHandler isBusy) $ \RetryStatus{rsIterNumber} -> do + when (rsIterNumber > 0) $ traceWith tr (MsgRetryOnBusy rsIterNumber) + action + where isBusy (SqliteException name _ _) = pure (name == Sqlite.ErrorBusy) - pol = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms) + policy = limitRetriesByCumulativeDelay (60000*ms) $ constantDelay (25*ms) ms = 1000 -- microseconds in a millisecond - -- | Run the given task in a context where foreign key constraints are -- /temporarily disabled/, before re-enabling them. -- @@ -363,35 +409,53 @@ newtype ManualMigration = ManualMigration createSqlitePool :: Tracer IO DBLog -> Maybe FilePath - -> ManualMigration - -> LogFunc - -> IO (Pool (SqlBackend, Sqlite.Connection)) -createSqlitePool tr fp migration logFunc = do + -> [ManualMigration] + -> Migration + -> IO (Either MigrationError (Pool (SqlBackend, Sqlite.Connection))) +createSqlitePool tr fp migrations autoMigration = do let connStr = sqliteConnStr fp + let info = mkSqliteConnectionInfo connStr traceWith tr $ MsgConnStr connStr let createConnection = do - let info = mkSqliteConnectionInfo connStr conn <- Sqlite.open connStr - executeManualMigration migration conn - backend <- wrapConnectionInfo info conn logFunc - pure (backend, conn) + (,conn) <$> wrapConnectionInfo info conn (queryLogFunc tr) let destroyConnection = \(backend, _) -> do destroySqliteBackend tr backend fp - createPool + pool <- createPool createConnection destroyConnection numberOfStripes timeToLive maximumConnections + + -- Run migrations BEFORE making the pool widely accessible to other threads. + -- This works fine for the :memory: case because there's a single connection + -- in the pool, so the next 'withResource' will get exactly this + -- connection. + migrationResult <- withResource pool $ \(backend, conn) -> mask_ $ do + let executeAutoMigration = runSqlConn (runMigrationQuiet autoMigration) backend + migrationResult <- withForeignKeysDisabled tr conn $ do + mapM_ (`executeManualMigration` conn) migrations + executeAutoMigration + & tryJust (matchMigrationError @PersistException) + & tryJust (matchMigrationError @SqliteException) + & fmap join + traceWith tr $ MsgMigrations $ fmap length migrationResult + return migrationResult + + case migrationResult of + Left e -> destroyAllResources pool $> Left e + Right{} -> return (Right pool) where numberOfStripes = 1 - timeToLive = 600 :: NominalDiffTime -- When running in :memory:, we want a single connection that does not get - -- cleaned up. + -- cleaned up. Indeed, the pool will regularly remove connections, destroying + -- our :memory: database regularly otherwise. maximumConnections = maybe 1 (const 10) fp + timeToLive = maybe 31536000 {- one year -} (const 600) {- 10 minutes -} fp :: NominalDiffTime sqliteConnStr :: Maybe FilePath -> Text sqliteConnStr = maybe ":memory:" T.pack @@ -420,6 +484,7 @@ data DBLog | MsgUpdatingForeignKeysSetting ForeignKeysSetting | MsgFoundDatabase FilePath Text | MsgUnknownDBFile FilePath + | MsgRetryOnBusy Int deriving (Generic, Show, Eq, ToJSON) {------------------------------------------------------------------------------- @@ -480,9 +545,9 @@ instance HasSeverityAnnotation DBLog where MsgMigrations (Left _) -> Error MsgQuery _ sev -> sev MsgRun _ -> Debug - MsgConnStr _ -> Debug - MsgCloseSingleConnection _ -> Debug - MsgDestroyConnectionPool _ -> Debug + MsgConnStr _ -> Notice + MsgCloseSingleConnection _ -> Info + MsgDestroyConnectionPool _ -> Notice MsgWillOpenDB _ -> Info MsgDatabaseReset -> Notice MsgIsAlreadyClosed _ -> Warning @@ -496,6 +561,9 @@ instance HasSeverityAnnotation DBLog where MsgUpdatingForeignKeysSetting{} -> Debug MsgFoundDatabase _ _ -> Info MsgUnknownDBFile _ -> Notice + MsgRetryOnBusy n | n <= 1 -> Debug + MsgRetryOnBusy n | n <= 3 -> Notice + MsgRetryOnBusy _ -> Warning instance ToText DBLog where toText = \case @@ -557,6 +625,9 @@ instance ToText DBLog where [ "Found something other than a database file in " , "the database folder: ", T.pack file ] + MsgRetryOnBusy n -> + let nF = ordinalF n in + "Retrying db query because db was busy for the " +| nF |+ " time." {------------------------------------------------------------------------------- Extra DB Helpers diff --git a/lib/core/src/Cardano/Pool/DB/Sqlite.hs b/lib/core/src/Cardano/Pool/DB/Sqlite.hs index 9236932fdaa..3cc8f3e9ddf 100644 --- a/lib/core/src/Cardano/Pool/DB/Sqlite.hs +++ b/lib/core/src/Cardano/Pool/DB/Sqlite.hs @@ -43,7 +43,7 @@ import Cardano.DB.Sqlite , destroyDBLayer , fieldName , handleConstraint - , startSqliteBackend + , newSqliteContext , tableName ) import Cardano.Pool.DB @@ -225,7 +225,7 @@ newDBLayer -> TimeInterpreter IO -> IO (SqliteContext, DBLayer IO) newDBLayer trace fp ti = do - let io = startSqliteBackend + let io = newSqliteContext (migrateManually trace) migrateAll (contramap MsgGeneric trace) @@ -686,13 +686,14 @@ runRawQuery trace q = do migrateManually :: Tracer IO PoolDbLog - -> ManualMigration + -> [ManualMigration] migrateManually _tr = - ManualMigration $ \conn -> do - createView conn activePoolLifeCycleData - createView conn activePoolOwners - createView conn activePoolRegistrations - createView conn activePoolRetirements + ManualMigration <$> + [ createView activePoolLifeCycleData + , createView activePoolOwners + , createView activePoolRegistrations + , createView activePoolRetirements + ] -- | Represents a database view. -- @@ -705,8 +706,8 @@ data DatabaseView = DatabaseView -- | Creates the specified database view, if it does not already exist. -- -createView :: Sqlite.Connection -> DatabaseView -> IO () -createView conn (DatabaseView name definition) = do +createView :: DatabaseView -> Sqlite.Connection -> IO () +createView (DatabaseView name definition) conn = do deleteQuery <- Sqlite.prepare conn deleteQueryString Sqlite.step deleteQuery *> Sqlite.finalize deleteQuery createQuery <- Sqlite.prepare conn createQueryString diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs index b40ad27560f..9467c55ebbe 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs @@ -54,7 +54,7 @@ import Cardano.DB.Sqlite , fieldName , fieldType , handleConstraint - , startSqliteBackend + , newSqliteContext , tableName ) import Cardano.DB.Sqlite.Delete @@ -371,40 +371,30 @@ migrateManually => Tracer IO DBLog -> Proxy k -> DefaultFieldValues - -> ManualMigration + -> [ManualMigration] migrateManually tr proxy defaultFieldValues = - ManualMigration $ \conn -> do - cleanupCheckpointTable conn - - assignDefaultPassphraseScheme conn - - addDesiredPoolNumberIfMissing conn - - addMinimumUTxOValueIfMissing conn - - addHardforkEpochIfMissing conn - - -- FIXME - -- Temporary migration to fix Daedalus flight wallets. This should - -- really be removed as soon as we have a fix for the cardano-sl:wallet - -- currently in production. - removeSoftRndAddresses conn - - removeOldTxParametersTable conn - - addAddressStateIfMissing conn - - addSeqStateDerivationPrefixIfMissing conn - - renameRoleColumn conn - - renameRoleFields conn - - addScriptAddressGapIfMissing conn - - updateFeeValueAndAddKeyDeposit conn - - addFeeToTransaction conn + ManualMigration <$> + [ cleanupCheckpointTable + , assignDefaultPassphraseScheme + , addDesiredPoolNumberIfMissing + , addMinimumUTxOValueIfMissing + , addHardforkEpochIfMissing + + -- FIXME + -- Temporary migration to fix Daedalus flight wallets. This should + -- really be removed as soon as we have a fix for the cardano-sl:wallet + -- currently in production. + , removeSoftRndAddresses + + , removeOldTxParametersTable + , addAddressStateIfMissing + , addSeqStateDerivationPrefixIfMissing + , renameRoleColumn + , renameRoleFields + , addScriptAddressGapIfMissing + , updateFeeValueAndAddKeyDeposit + , addFeeToTransaction + ] where -- NOTE -- We originally stored protocol parameters in the 'Checkpoint' table, and @@ -1017,7 +1007,7 @@ newDBLayer newDBLayer trace defaultFieldValues mDatabaseFile ti = do ctx@SqliteContext{runQuery} <- either throwIO pure =<< - startSqliteBackend + newSqliteContext (migrateManually trace (Proxy @k) defaultFieldValues) migrateAll trace From 1b088a471dfb1ef8257103c0a5b78daa602776d6 Mon Sep 17 00:00:00 2001 From: IOHK Date: Thu, 31 Dec 2020 16:00:30 +0000 Subject: [PATCH 3/3] Regenerate nix --- nix/.stack.nix/cardano-wallet-core.nix | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nix/.stack.nix/cardano-wallet-core.nix b/nix/.stack.nix/cardano-wallet-core.nix index 1047d244cc6..7fab06b56c1 100644 --- a/nix/.stack.nix/cardano-wallet-core.nix +++ b/nix/.stack.nix/cardano-wallet-core.nix @@ -78,6 +78,7 @@ (hsPkgs."quiet" or (errorHandler.buildDepError "quiet")) (hsPkgs."random" or (errorHandler.buildDepError "random")) (hsPkgs."random-shuffle" or (errorHandler.buildDepError "random-shuffle")) + (hsPkgs."resource-pool" or (errorHandler.buildDepError "resource-pool")) (hsPkgs."retry" or (errorHandler.buildDepError "retry")) (hsPkgs."safe" or (errorHandler.buildDepError "safe")) (hsPkgs."scientific" or (errorHandler.buildDepError "scientific")) @@ -87,6 +88,7 @@ (hsPkgs."servant-server" or (errorHandler.buildDepError "servant-server")) (hsPkgs."split" or (errorHandler.buildDepError "split")) (hsPkgs."statistics" or (errorHandler.buildDepError "statistics")) + (hsPkgs."stm" or (errorHandler.buildDepError "stm")) (hsPkgs."streaming-commons" or (errorHandler.buildDepError "streaming-commons")) (hsPkgs."strict-non-empty-containers" or (errorHandler.buildDepError "strict-non-empty-containers")) (hsPkgs."string-interpolate" or (errorHandler.buildDepError "string-interpolate"))