Skip to content

Commit

Permalink
Remove ByteStrings from the cache and use ledger types instead
Browse files Browse the repository at this point in the history
This avoids many transformations from ledger types. But more importantly ledger types are backed by ShortByteString which is not pinned memory and doesn't cause heap fragmentation
  • Loading branch information
kderme committed May 24, 2022
1 parent c9a82cc commit 81cea00
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 369 deletions.
3 changes: 1 addition & 2 deletions cardano-db-sync/cardano-db-sync.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ library
Cardano.DbSync.Era.Shelley.Generic.EpochUpdate
Cardano.DbSync.Era.Shelley.Generic.ProtoParams
Cardano.DbSync.Era.Shelley.Generic.Rewards
Cardano.DbSync.Era.Shelley.Generic.StakeCred
Cardano.DbSync.Era.Shelley.Generic.StakeDist
Cardano.DbSync.Era.Shelley.Generic.StakePoolKeyHash
Cardano.DbSync.Era.Shelley.Generic.Metadata
Cardano.DbSync.Era.Shelley.Generic.ParamProposal
Cardano.DbSync.Era.Shelley.Generic.Tx
Expand Down Expand Up @@ -147,6 +145,7 @@ library
, containers
, contra-tracer
, directory
, either
, esqueleto
, extra
, filepath
Expand Down
99 changes: 62 additions & 37 deletions cardano-db-sync/src/Cardano/DbSync/Cache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

module Cardano.DbSync.Cache
( Cache
Expand All @@ -14,7 +15,10 @@ module Cardano.DbSync.Cache
, rollbackCache
, queryPoolKeyWithCache
, insertPoolKeyWithCache
, queryRewardAccountWithCache
, queryRewardAccountWithCacheRetBs
, queryStakeAddrWithCache
, queryStakeAddrWithCacheRetBs
, queryMAWithCache
, queryPrevBlockWithCache
, insertBlockAndCache
Expand All @@ -30,29 +34,30 @@ import Cardano.Prelude
import Control.Monad.Class.MonadSTM.Strict (StrictTVar, modifyTVar, newTVarIO, readTVarIO,
writeTVar)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Either.Combinators
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set

import Cardano.Ledger.Keys (KeyHash (..), KeyRole (..))
import qualified Cardano.Ledger.Address as Ledger
import Cardano.Ledger.BaseTypes (Network)
import Cardano.Ledger.Mary.Value

import qualified Cardano.Db as DB

import Cardano.DbSync.Cache.LRU (LRUCache)
import qualified Cardano.DbSync.Cache.LRU as LRU
import Cardano.DbSync.Era.Shelley.Generic
import qualified Cardano.DbSync.Era.Shelley.Generic.StakePoolKeyHash as Generic
import qualified Cardano.DbSync.Era.Shelley.Generic.Util as Generic
import Cardano.DbSync.Era.Shelley.Query
import Cardano.DbSync.Era.Util
import Cardano.DbSync.Error
import Cardano.DbSync.Types

import Ouroboros.Consensus.Cardano.Block (StandardCrypto)

import Database.Persist.Postgresql (SqlBackend)

type StakeAddrCache = Map StakeCred DB.StakeAddressId
type StakePoolCache = Map StakePoolKeyHash DB.PoolHashId
type StakePoolCache = Map PoolKeyHash DB.PoolHashId

-- The 'UninitiatedCache' makes it possible to call functions in this module
-- without having actually initiated the cache yet. It is used by genesis
Expand All @@ -70,7 +75,7 @@ data CacheNew
data CacheInternal = CacheInternal
{ cStakeCreds :: !(StrictTVar IO StakeAddrCache)
, cPools :: !(StrictTVar IO StakePoolCache)
, cMultiAssets :: !(StrictTVar IO (LRUCache (ByteString, AssetName) DB.MultiAssetId))
, cMultiAssets :: !(StrictTVar IO (LRUCache (PolicyID StandardCrypto, AssetName) DB.MultiAssetId))
, cPrevBlock :: !(StrictTVar IO (Maybe (DB.BlockId, ByteString)))
, cStats :: !(StrictTVar IO CacheStatistics)
}
Expand Down Expand Up @@ -216,47 +221,68 @@ rollbackStakeAddr ci mBlockNo nBlocks = do
let !mp = Map.filter (`Set.member` stakeAddrIdsSet) initMp
liftIO $ atomically $ writeTVar (cStakeCreds ci) mp

queryRewardAccountWithCache
:: forall m. MonadIO m => Cache -> CacheNew -> Ledger.RewardAcnt StandardCrypto
-> ReaderT SqlBackend m (Either DB.LookupFail DB.StakeAddressId)
queryRewardAccountWithCache cache cacheNew rwdAcc =
mapLeft fst <$> queryRewardAccountWithCacheRetBs cache cacheNew rwdAcc

queryRewardAccountWithCacheRetBs
:: forall m. MonadIO m => Cache -> CacheNew -> Ledger.RewardAcnt StandardCrypto
-> ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId)
queryRewardAccountWithCacheRetBs cache cacheNew rwdAcc =
queryStakeAddrWithCacheRetBs cache cacheNew (Ledger.getRwdNetwork rwdAcc) (Ledger.getRwdCred rwdAcc)

queryStakeAddrWithCache
:: forall m. MonadIO m => Cache -> CacheNew -> StakeCred
:: forall m. MonadIO m => Cache -> CacheNew -> Network -> StakeCred
-> ReaderT SqlBackend m (Either DB.LookupFail DB.StakeAddressId)
queryStakeAddrWithCache cache cacheNew cred = do
queryStakeAddrWithCache cache cacheNew nw cred =
mapLeft fst <$> queryStakeAddrWithCacheRetBs cache cacheNew nw cred

queryStakeAddrWithCacheRetBs
:: forall m. MonadIO m => Cache -> CacheNew -> Network -> StakeCred
-> ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId)
queryStakeAddrWithCacheRetBs cache cacheNew nw cred = do
case cache of
UninitiatedCache -> queryStakeAddress (unStakeCred cred)
UninitiatedCache -> do
let !bs = Ledger.serialiseRewardAcnt (Ledger.RewardAcnt nw cred)
mapLeft (,bs) <$> queryStakeAddress bs
Cache ci -> do
mp <- liftIO $ readTVarIO (cStakeCreds ci)
(mAddrId, mp') <- queryStakeAddrAux cacheNew mp (cStats ci) cred
(mAddrId, mp') <- queryStakeAddrAux cacheNew mp (cStats ci) nw cred
liftIO $ atomically $ writeTVar (cStakeCreds ci) mp'
pure mAddrId

queryStakeAddrAux
:: MonadIO m
=> CacheNew -> StakeAddrCache -> StrictTVar IO CacheStatistics -> StakeCred
-> ReaderT SqlBackend m (Either DB.LookupFail DB.StakeAddressId, StakeAddrCache)
queryStakeAddrAux cacheNew mp sts hsh =
case Map.lookup hsh mp of
=> CacheNew -> StakeAddrCache -> StrictTVar IO CacheStatistics -> Network -> StakeCred
-> ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId, StakeAddrCache)
queryStakeAddrAux cacheNew mp sts nw cred =
case Map.lookup cred mp of
Just addrId -> do
liftIO $ hitCreds sts
case cacheNew of
EvictAndReturn -> pure (Right addrId, Map.delete hsh mp)
EvictAndReturn -> pure (Right addrId, Map.delete cred mp)
_ -> pure (Right addrId, mp)
Nothing -> do
liftIO $ missCreds sts
mAddrId <- queryStakeAddress (unStakeCred hsh)
let !bs = Ledger.serialiseRewardAcnt (Ledger.RewardAcnt nw cred)
mAddrId <- mapLeft (,bs) <$> queryStakeAddress bs
case (mAddrId, cacheNew) of
(Right addrId, CacheNew) -> pure (Right addrId, Map.insert hsh addrId mp)
(Right addrId, CacheNew) -> pure (Right addrId, Map.insert cred addrId mp)
(Right addrId, _) -> pure (Right addrId, mp)
(err, _) -> pure (err, mp)

queryPoolKeyWithCache
:: MonadIO m
=> Cache -> CacheNew -> StakePoolKeyHash
=> Cache -> CacheNew -> PoolKeyHash
-> ReaderT SqlBackend m (Either DB.LookupFail DB.PoolHashId)
queryPoolKeyWithCache cache cacheNew hsh =
case cache of
UninitiatedCache -> do
mPhId <- queryPoolHashId (unStakePoolKeyHash hsh)
mPhId <- queryPoolHashId (Generic.unKeyHashRaw hsh)
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "StakePoolKeyHash")
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> pure $ Right phId
Cache ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
Expand All @@ -269,17 +295,17 @@ queryPoolKeyWithCache cache cacheNew hsh =
pure $ Right phId
Nothing -> do
liftIO $ missPools (cStats ci)
mPhId <- queryPoolHashId (unStakePoolKeyHash hsh)
mPhId <- queryPoolHashId (Generic.unKeyHashRaw hsh)
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "StakePoolKeyHash")
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> do
-- missed so we can't evict even with 'EvictAndReturn'
when (cacheNew == CacheNew) $
liftIO $ atomically $ modifyTVar (cPools ci) $ Map.insert hsh phId
pure $ Right phId

insertPoolKeyWithCache
:: (MonadBaseControl IO m, MonadIO m) => Cache -> CacheNew -> KeyHash 'StakePool StandardCrypto
:: (MonadBaseControl IO m, MonadIO m) => Cache -> CacheNew -> PoolKeyHash
-> ReaderT SqlBackend m DB.PoolHashId
insertPoolKeyWithCache cache cacheNew pHash =
case cache of
Expand All @@ -291,12 +317,11 @@ insertPoolKeyWithCache cache cacheNew pHash =
}
Cache ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
let !keyHash = Generic.toStakePoolKeyHash pHash
case Map.lookup keyHash mp of
case Map.lookup pHash mp of
Just phId -> do
liftIO $ hitPools (cStats ci)
when (cacheNew == EvictAndReturn) $
liftIO $ atomically $ modifyTVar (cPools ci) $ Map.delete keyHash
liftIO $ atomically $ modifyTVar (cPools ci) $ Map.delete pHash
pure phId
Nothing -> do
liftIO $ missPools (cStats ci)
Expand All @@ -306,31 +331,31 @@ insertPoolKeyWithCache cache cacheNew pHash =
, DB.poolHashView = Generic.unKeyHashView pHash
}
when (cacheNew == CacheNew) $
liftIO $ atomically $ modifyTVar (cPools ci) $ Map.insert keyHash phId
liftIO $ atomically $ modifyTVar (cPools ci) $ Map.insert pHash phId
pure phId

queryMAWithCache :: MonadIO m => Cache -> ByteString -> AssetName
-> ReaderT SqlBackend m (Maybe DB.MultiAssetId)
queryMAWithCache :: MonadIO m => Cache -> PolicyID StandardCrypto -> AssetName
-> ReaderT SqlBackend m (Either ByteString DB.MultiAssetId)
queryMAWithCache cache policyId asset =
case cache of
UninitiatedCache -> DB.queryMultiAssetId policyId (unAssetName asset)
UninitiatedCache -> do
let !bs = Generic.unScriptHash $ policyID policyId
fmap (maybe (Left bs) Right) $ DB.queryMultiAssetId bs (unAssetName asset)
Cache ci -> do
mp <- liftIO $ readTVarIO (cMultiAssets ci)
case LRU.lookup (policyId, asset) mp of
Just (maId, mp') -> do
liftIO $ hitMAssets (cStats ci)
liftIO $ atomically $ writeTVar (cMultiAssets ci) mp'
pure $ Just maId
pure $ Right maId
Nothing -> do
liftIO $ missMAssets (cStats ci)
-- miss. The lookup doesn't change the cache on a miss.
maId <- DB.queryMultiAssetId policyId (unAssetName asset)
case maId of
Nothing -> do
pure Nothing
Just mId -> do
liftIO $ atomically $ modifyTVar (cMultiAssets ci) $ LRU.insert (policyId, asset) mId
pure maId
let !bs = Generic.unScriptHash $ policyID policyId
maId <- fmap (maybe (Left bs) Right) $ DB.queryMultiAssetId bs (unAssetName asset)
whenRight maId $
liftIO . atomically . modifyTVar (cMultiAssets ci) . LRU.insert (policyId, asset)
pure maId

queryPrevBlockWithCache :: MonadIO m => Text -> Cache -> ByteString
-> ExceptT SyncNodeError (ReaderT SqlBackend m) DB.BlockId
Expand Down
39 changes: 8 additions & 31 deletions cardano-db-sync/src/Cardano/DbSync/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ import Cardano.DbSync.Types
import Cardano.DbSync.Util

import qualified Cardano.Ledger.Alonzo.Scripts as Ledger
import Cardano.Ledger.BaseTypes (Network)
import Cardano.Ledger.Coin (Coin (..))
import Cardano.Ledger.Credential (StakeCredential)
import Cardano.Ledger.Crypto (StandardCrypto)
import Cardano.Ledger.Keys (KeyHash (..), KeyRole (..))

import Cardano.Slotting.Slot (EpochNo (..))

Expand Down Expand Up @@ -144,44 +139,26 @@ insertLedgerEvents env currentEpochNo@(EpochNo curEpoch) =
-- This is different from the previous case in that the db-sync started
-- in this epoch, for example after a restart, instead of after an epoch boundary.
liftIO . logInfo tracer $ "Starting at epoch " <> textShow (unEpochNo en)
LedgerDeltaRewards rwd -> do
LedgerDeltaRewards _e rwd -> do
let rewards = Map.toList $ Generic.rwdRewards rwd
insertRewards (subFromCurrentEpoch 2) currentEpochNo cache (Map.toList $ Generic.rwdRewards rwd)
insertRewards ntw (subFromCurrentEpoch 2) currentEpochNo cache (Map.toList $ Generic.rwdRewards rwd)
-- This event is only created when it's not empty, so we don't need to check for null here.
liftIO . logInfo tracer $ "Inserted " <> show (length rewards) <> " Delta rewards"
LedgerIncrementalRewards rwd -> do
LedgerIncrementalRewards _ rwd -> do
let rewards = Map.toList $ Generic.rwdRewards rwd
insertRewards (subFromCurrentEpoch 1) (EpochNo $ curEpoch + 1) cache rewards
insertRewards ntw (subFromCurrentEpoch 1) (EpochNo $ curEpoch + 1) cache rewards
LedgerRestrainedRewards e rwd creds -> do
lift $ adjustEpochRewards tracer cache e rwd creds
lift $ adjustEpochRewards tracer ntw cache e rwd creds
LedgerTotalRewards _e rwd -> do
lift $ validateEpochRewards tracer ntw (subFromCurrentEpoch 2) currentEpochNo rwd
LedgerMirDist rwd -> do
unless (Map.null rwd) $ do
let rewards = Map.toList rwd
insertRewards (subFromCurrentEpoch 1) currentEpochNo cache rewards
insertRewards ntw (subFromCurrentEpoch 1) currentEpochNo cache rewards
liftIO . logInfo tracer $ "Inserted " <> show (length rewards) <> " Mir rewards"
LedgerPoolReap en drs -> do
unless (Map.null drs) $ do
insertPoolDepositRefunds env (Generic.Rewards en $ convertPoolDepositReunds (leNetwork lenv) drs)

convertPoolDepositReunds
:: Network -> Map (StakeCredential StandardCrypto) (Map (KeyHash 'StakePool StandardCrypto) Coin)
-> Map Generic.StakeCred (Set Generic.Reward)
convertPoolDepositReunds nw =
mapBimap (Generic.toStakeCred nw) (Set.fromList . map convert . Map.toList)
where
convert :: (KeyHash 'StakePool StandardCrypto, Coin) -> Generic.Reward
convert (kh, coin) =
Generic.Reward
{ Generic.rewardSource = DB.RwdDepositRefund
, Generic.rewardPool = Strict.Just (Generic.toStakePoolKeyHash kh)
, Generic.rewardAmount = coin
}

mapBimap :: Ord k2 => (k1 -> k2) -> (a1 -> a2) -> Map k1 a1 -> Map k2 a2
mapBimap fk fa = Map.fromAscList . map (bimap fk fa) . Map.toAscList

unless (Map.null $ Generic.rwdRewards drs) $ do
insertPoolDepositRefunds env en drs

hasEpochStartEvent :: [LedgerEvent] -> Bool
hasEpochStartEvent = any isNewEpoch
Expand Down
18 changes: 10 additions & 8 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Adjust.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import qualified Cardano.Db as Db

import Cardano.DbSync.Cache
import qualified Cardano.DbSync.Era.Shelley.Generic.Rewards as Generic
import Cardano.DbSync.Era.Shelley.Generic.StakeCred
import Cardano.DbSync.Types

import Cardano.Ledger.BaseTypes (Network)

import Cardano.Slotting.Slot (EpochNo (..))

Expand All @@ -41,26 +43,26 @@ import Database.Esqueleto.Experimental (SqlBackend, delete, from, in_,

adjustEpochRewards
:: (MonadBaseControl IO m, MonadIO m)
=> Trace IO Text -> Cache -> EpochNo -> Generic.Rewards
=> Trace IO Text -> Network -> Cache -> EpochNo -> Generic.Rewards
-> Set StakeCred
-> ReaderT SqlBackend m ()
adjustEpochRewards tracer cache epochNo rwds creds = do
adjustEpochRewards tracer nw cache epochNo rwds creds = do
let eraIgnored = Map.toList $ Generic.rwdRewards rwds
liftIO . logInfo tracer $ mconcat
[ "Removing ", if null eraIgnored then "" else Db.textShow (length eraIgnored) <> " rewards and "
, show (length creds), " orphaned rewards"]
forM_ eraIgnored $ \(cred, rewards)->
forM_ (Set.toList rewards) $ \rwd ->
deleteReward cache epochNo (cred, rwd)
crds <- rights <$> forM (Set.toList creds) (queryStakeAddrWithCache cache DontCacheNew)
deleteReward nw cache epochNo (cred, rwd)
crds <- rights <$> forM (Set.toList creds) (queryStakeAddrWithCache cache DontCacheNew nw)
deleteOrphanedRewards epochNo crds

deleteReward
:: (MonadBaseControl IO m, MonadIO m)
=> Cache -> EpochNo -> (StakeCred, Generic.Reward)
=> Network -> Cache -> EpochNo -> (StakeCred, Generic.Reward)
-> ReaderT SqlBackend m ()
deleteReward cache epochNo (cred, rwd) = do
mAddrId <- queryStakeAddrWithCache cache DontCacheNew cred
deleteReward nw cache epochNo (cred, rwd) = do
mAddrId <- queryStakeAddrWithCache cache DontCacheNew nw cred
eiPoolId <- case Generic.rewardPool rwd of
Strict.Nothing -> pure $ Left $ Db.DbLookupMessage "deleteReward.queryPoolKeyWithCache"
Strict.Just poolHash -> queryPoolKeyWithCache cache DontCacheNew poolHash
Expand Down
2 changes: 0 additions & 2 deletions cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ import Cardano.DbSync.Era.Shelley.Generic.Util as X
import Cardano.DbSync.Era.Shelley.Generic.EpochUpdate as X
import Cardano.DbSync.Era.Shelley.Generic.ProtoParams as X
import Cardano.DbSync.Era.Shelley.Generic.Rewards as X
import Cardano.DbSync.Era.Shelley.Generic.StakeCred as X
import Cardano.DbSync.Era.Shelley.Generic.StakeDist as X
import Cardano.DbSync.Era.Shelley.Generic.StakePoolKeyHash as X
Loading

0 comments on commit 81cea00

Please sign in to comment.