Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache latest checkpoint in-memory to avoid deserializing and reserializing it too often #2161

Merged
merged 2 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 105 additions & 75 deletions lib/core/src/Cardano/Wallet/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ import Data.Either
( isRight )
import Data.Generics.Internal.VL.Lens
( (^.) )
import Data.IORef
( modifyIORef', newIORef, readIORef )
import Data.List
( nub, sortOn, unzip3 )
import Data.List.Split
Expand Down Expand Up @@ -598,6 +600,61 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
migrateAll
trace
mDatabaseFile

-- NOTE1
-- We cache the latest checkpoint for read operation such that we prevent
-- needless marshalling and unmarshalling with the database. Many handlers
-- dealing with the database are actually in the form of:
--
-- - read latest CP
-- - write latest CP
--
-- When chaining them, we end up paying the cost of unmarshalling data from
-- the database one extra time. That doesn't matter much for small wallets
-- because the time needed to unmarshall data is relatively negligible. For
-- large wallets however, this has a massive performance impact.
--
-- Instead, the cache now retains the Haskell data-types in-memory to
-- short-circuit the most frequent database lookups.
--
-- NOTE2
-- We use an IORef here without fearing race-conditions because every
-- database query can only be run within calls to `atomically` which
-- enforces that there's only a single thread executing a given
-- `SqlPersistT`.
cache <- newIORef Map.empty

let readCache :: W.WalletId -> SqlPersistT IO (Maybe (W.Wallet s))
readCache wid = Map.lookup wid <$> liftIO (readIORef cache)

let clearCache :: W.WalletId -> SqlPersistT IO ()
clearCache wid = liftIO $ modifyIORef' cache $ Map.delete wid

let writeCache :: W.WalletId -> W.Wallet s -> SqlPersistT IO ()
writeCache wid cp = liftIO $ modifyIORef' cache $ Map.alter alter wid
where
tip = cp ^. #currentTip . #blockHeight
alter = \case
Just old | tip < old ^. #currentTip . #blockHeight -> Just old
_ -> Just cp

let selectLatestCheckpoint
:: W.WalletId
-> SqlPersistT IO (Maybe (W.Wallet s))
selectLatestCheckpoint wid = do
readCache wid >>= maybe fromDatabase (pure . Just)
where
fromDatabase = do
mcp <- fmap entityVal <$> selectFirst
[ CheckpointWalletId ==. wid ]
[ LimitTo 1, Desc CheckpointSlot ]
case mcp of
Nothing -> pure Nothing
Just cp -> do
utxo <- selectUTxO cp
s <- selectState (checkpointId cp)
pure (checkpointFromEntity @s cp utxo <$> s)

return (ctx, DBLayer

{-----------------------------------------------------------------------
Expand All @@ -608,7 +665,7 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
res <- handleConstraint (ErrWalletAlreadyExists wid) $
insert_ (mkWalletEntity wid meta)
when (isRight res) $ do
insertCheckpoint wid cp
insertCheckpoint wid cp <* writeCache wid cp
let (metas, txins, txouts, ws) = mkTxHistory wid txs
putTxs metas txins txouts ws
insert_ (mkProtocolParametersEntity wid pp)
Expand All @@ -620,6 +677,7 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
Just _ -> Right <$> do
deleteCascadeWhere [WalId ==. wid]
deleteLooseTransactions
clearCache wid

, listWallets =
map (PrimaryKey . unWalletKey) <$> selectKeysList [] [Asc WalId]
Expand All @@ -631,15 +689,10 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
, putCheckpoint = \(PrimaryKey wid) cp -> ExceptT $ do
selectWallet wid >>= \case
Nothing -> pure $ Left $ ErrNoSuchWallet wid
Just _ -> Right <$> insertCheckpoint wid cp
Just _ -> Right <$> (insertCheckpoint wid cp <* writeCache wid cp)

, readCheckpoint = \(PrimaryKey wid) -> do
selectLatestCheckpoint wid >>= \case
Nothing -> pure Nothing
Just cp -> do
utxo <- selectUTxO cp
s <- selectState (checkpointId cp)
pure (checkpointFromEntity @s cp utxo <$> s)
selectLatestCheckpoint wid

, listCheckpoints = \(PrimaryKey wid) -> do
map (blockHeaderFromEntity . entityVal) <$> selectList
Expand All @@ -648,7 +701,7 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do

, rollbackTo = \(PrimaryKey wid) requestedPoint -> ExceptT $ do
findNearestPoint wid requestedPoint >>= \case
Nothing -> selectLatestCheckpoint wid >>= \case
Nothing -> selectWallet wid >>= \case
Nothing ->
pure $ Left $ ErrNoSuchWallet wid
Just _ ->
Expand All @@ -674,6 +727,7 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
deleteStakeKeyCerts wid
[ StakeKeyCertSlot >. nearestPoint
]
clearCache wid
pure (Right nearestPoint)

, prune = \(PrimaryKey wid) -> ExceptT $ do
Expand All @@ -700,7 +754,7 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
Nothing -> pure Nothing
Just cp -> do
currentEpoch <- liftIO $
timeInterpreter (epochOf $ cp ^. #checkpointSlot)
timeInterpreter (epochOf $ cp ^. #currentTip . #slotNo)
readWalletDelegation timeInterpreter wid currentEpoch
>>= readWalletMetadata wid

Expand Down Expand Up @@ -749,12 +803,14 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
pure $ Right ()

, readTxHistory = \(PrimaryKey wid) minWithdrawal order range status -> do
selectTxHistory
timeInterpreter wid minWithdrawal order $ catMaybes
[ (TxMetaSlot >=.) <$> W.inclusiveLowerBound range
, (TxMetaSlot <=.) <$> W.inclusiveUpperBound range
, (TxMetaStatus ==.) <$> status
]
selectLatestCheckpoint wid >>= \case
Nothing -> pure []
Just cp -> selectTxHistory cp
timeInterpreter wid minWithdrawal order $ catMaybes
[ (TxMetaSlot >=.) <$> W.inclusiveLowerBound range
, (TxMetaSlot <=.) <$> W.inclusiveUpperBound range
, (TxMetaStatus ==.) <$> status
]

, removePendingTx = \(PrimaryKey wid) tid -> ExceptT $ do
let errNoSuchWallet =
Expand All @@ -776,10 +832,10 @@ newDBLayer trace defaultFieldValues mDatabaseFile timeInterpreter = do
_ -> pure errNoMorePending

, getTx = \(PrimaryKey wid) tid -> ExceptT $ do
selectWallet wid >>= \case
selectLatestCheckpoint wid >>= \case
Nothing -> pure $ Left $ ErrNoSuchWallet wid
Just _ -> do
metas <- selectTxHistory
Just cp -> do
metas <- selectTxHistory cp
timeInterpreter wid Nothing W.Descending
[ TxMetaTxId ==. TxId tid ]
case metas of
Expand Down Expand Up @@ -1226,11 +1282,11 @@ deleteCheckpoints wid filters = do
-- | Prune checkpoints in the database to keep it tidy
pruneCheckpoints
:: W.WalletId
-> Checkpoint
-> W.Wallet s
-> SqlPersistT IO ()
pruneCheckpoints wid cp = do
let height = Quantity $ fromIntegral $ checkpointBlockHeight cp
let epochStability = Quantity $ checkpointEpochStability cp
let height = cp ^. #currentTip . #blockHeight
let epochStability = cp ^. #blockchainParameters . #getEpochStability
let cfg = defaultSparseCheckpointsConfig epochStability
let cps = sparseCheckpoints cfg height
deleteCheckpoints wid [ CheckpointBlockHeight /<-. cps ]
Expand Down Expand Up @@ -1301,14 +1357,6 @@ deleteDelegationCertificates
deleteDelegationCertificates wid filters = do
deleteCascadeWhere ((CertWalletId ==. wid) : filters)

selectLatestCheckpoint
:: W.WalletId
-> SqlPersistT IO (Maybe Checkpoint)
selectLatestCheckpoint wid = fmap entityVal <$>
selectFirst
[ CheckpointWalletId ==. wid
] [ LimitTo 1, Desc CheckpointSlot ]

selectUTxO
:: Checkpoint
-> SqlPersistT IO [UTxO]
Expand Down Expand Up @@ -1379,40 +1427,37 @@ combineChunked :: [a] -> ([a] -> SqlPersistT IO [b]) -> SqlPersistT IO [b]
combineChunked xs f = concatMapM f $ chunksOf chunkSize xs

selectTxHistory
:: TimeInterpreter IO
:: W.Wallet s
-> TimeInterpreter IO
-> W.WalletId
-> Maybe (Quantity "lovelace" Natural)
-> W.SortOrder
-> [Filter TxMeta]
-> SqlPersistT IO [W.TransactionInfo]
selectTxHistory ti wid minWithdrawal order conditions = do
selectLatestCheckpoint wid >>= \case
Nothing -> pure []
Just cp -> do
let txMetaFilter = (TxMetaWalletId ==. wid):conditions
metas <- case minWithdrawal of
Nothing -> fmap entityVal <$> selectList txMetaFilter sortOpt
Just inf -> do
let coin = W.Coin $ fromIntegral $ getQuantity inf
txids <- fmap (txWithdrawalTxId . entityVal)
<$> selectList [ TxWithdrawalAmount >=. coin ] []
ms <- combineChunked (nub txids) (\chunk -> selectList
((TxMetaTxId <-. chunk):txMetaFilter) [])
let sortTxId = case order of
W.Ascending -> sortOn (Down . txMetaTxId)
W.Descending -> sortOn txMetaTxId
let sortSlot = case order of
W.Ascending -> sortOn txMetaSlot
W.Descending -> sortOn (Down . txMetaSlot)
pure $ sortSlot $ sortTxId $ fmap entityVal ms

let txids = map txMetaTxId metas
(ins, outs, ws) <- selectTxs txids

let wal = checkpointFromEntity cp [] ()
let tip = W.currentTip wal

liftIO $ txHistoryFromEntity ti tip metas ins outs ws
selectTxHistory cp ti wid minWithdrawal order conditions = do
let txMetaFilter = (TxMetaWalletId ==. wid):conditions
metas <- case minWithdrawal of
Nothing -> fmap entityVal <$> selectList txMetaFilter sortOpt
Just inf -> do
let coin = W.Coin $ fromIntegral $ getQuantity inf
txids <- fmap (txWithdrawalTxId . entityVal)
<$> selectList [ TxWithdrawalAmount >=. coin ] []
ms <- combineChunked (nub txids) (\chunk -> selectList
((TxMetaTxId <-. chunk):txMetaFilter) [])
let sortTxId = case order of
W.Ascending -> sortOn (Down . txMetaTxId)
W.Descending -> sortOn txMetaTxId
let sortSlot = case order of
W.Ascending -> sortOn txMetaSlot
W.Descending -> sortOn (Down . txMetaSlot)
pure $ sortSlot $ sortTxId $ fmap entityVal ms

let txids = map txMetaTxId metas
(ins, outs, ws) <- selectTxs txids

let tip = W.currentTip cp

liftIO $ txHistoryFromEntity ti tip metas ins outs ws
where
-- Note: there are sorted indices on these columns.
-- The secondary sort by TxId is to make the ordering stable
Expand Down Expand Up @@ -1623,22 +1668,7 @@ instance PersistState (Rnd.RndState t) where
pendingAddresses <- lift $ selectRndStatePending wid
pure $ Rnd.RndState
{ hdPassphrase = pwd
, accountIndex =
-- FIXME
-- In the early days when Daedalus Flight was shipped, the
-- wallet backend was generating addresses indexes across the
-- whole domain which was causing a great deal of issues with
-- the legacy cardano-sl:wallet ...
--
-- We later changed that to instead use "hardened indexes". Yet,
-- for the few wallets which were already created, we revert
-- this dynamically by replacing the index here.
--
-- This ugly hack could / should be removed eventually, in a few
-- releases from 2020-04-06.
if ix == 0
then minBound
else W.Index ix
, accountIndex = W.Index ix
, addresses = addresses
, pendingAddresses = pendingAddresses
, gen = gen
Expand Down
30 changes: 2 additions & 28 deletions lib/core/test/unit/Cardano/Wallet/DB/SqliteSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import Cardano.Wallet.DB
, cleanDB
)
import Cardano.Wallet.DB.Arbitrary
( InitialCheckpoint (..), KeyValPairs (..) )
( KeyValPairs (..) )
import Cardano.Wallet.DB.Properties
( properties, withDB )
import Cardano.Wallet.DB.Sqlite
Expand All @@ -79,8 +79,6 @@ import Cardano.Wallet.Logging
( trMessageText )
import Cardano.Wallet.Primitive.AddressDerivation
( Depth (..)
, DerivationType (..)
, Index (..)
, NetworkDiscriminant (..)
, Passphrase (..)
, PersistPrivateKey
Expand All @@ -106,7 +104,6 @@ import Cardano.Wallet.Primitive.Model
, currentTip
, getState
, initWallet
, updateState
)
import Cardano.Wallet.Primitive.Types
( ActiveSlotCoefficient (..)
Expand Down Expand Up @@ -208,7 +205,7 @@ import Test.Hspec
, xit
)
import Test.QuickCheck
( Property, arbitrary, generate, property, (==>) )
( Property, generate, property, (==>) )
import Test.QuickCheck.Monadic
( monadicIO )
import Test.Utils.Paths
Expand Down Expand Up @@ -245,33 +242,10 @@ sqliteSpecSeq = withDB newMemoryDBLayer $ do

sqliteSpecRnd :: Spec
sqliteSpecRnd = withDB newMemoryDBLayer $ do
describe "Sqlite (RndState)" $ do
it "insertState . selectState (regression account index)"
testRegressionInsertSelectRndState
describe "Sqlite State machine (RndState)" $ do
it "Sequential state machine tests"
(prop_sequential :: TestDBRnd -> Property)

testRegressionInsertSelectRndState
:: DBLayer IO (RndState 'Mainnet) ByronKey
-> IO ()
testRegressionInsertSelectRndState db = do
-- NOTE Abusing the index type here, for the sake of testing.
old <- (\s -> s { accountIndex = Index 0 }) <$> generate arbitraryRndState
wid <- generate arbitrary
cp <- getInitialCheckpoint <$> generate arbitrary
meta <- generate arbitrary

new <- db & \DBLayer{..} -> atomically $ do
unsafeRunExceptT $ initializeWallet wid cp meta mempty pp
unsafeRunExceptT $ putCheckpoint wid (updateState old cp)
(fmap getState) <$> readCheckpoint wid

(accountIndex <$> new) `shouldBe` Just (minBound :: Index 'Hardened 'AccountK)

where
arbitraryRndState = arbitrary @(RndState 'Mainnet)

testMigrationPassphraseScheme
:: forall s k. (k ~ ShelleyKey, s ~ SeqState 'Mainnet k)
=> IO ()
Expand Down