From 31df5d0abbcc89d0566e0fa0edbb342a0717718d Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Fri, 3 Jun 2022 13:19:43 +0300 Subject: [PATCH 1/3] Add disable-ledger option --- cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs | 1 + cardano-db-sync/app/cardano-db-sync.hs | 8 ++++++++ cardano-db-sync/src/Cardano/DbSync/Api.hs | 1 + cardano-db-sync/src/Cardano/DbSync/Config/Types.hs | 5 +++-- cardano-db-sync/src/Cardano/DbSync/Sync.hs | 6 +++--- 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs b/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs index 3f630f510..18c006149 100644 --- a/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs +++ b/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs @@ -213,6 +213,7 @@ mkSyncNodeParams staticDir mutableDir = do , enpPGPassSource = Db.PGPassCached pgconfig , enpExtended = True , enpHasCache = True + , enpHasLedger = True , enpMaybeRollback = Nothing } diff --git a/cardano-db-sync/app/cardano-db-sync.hs b/cardano-db-sync/app/cardano-db-sync.hs index 988595189..e09e67613 100644 --- a/cardano-db-sync/app/cardano-db-sync.hs +++ b/cardano-db-sync/app/cardano-db-sync.hs @@ -65,6 +65,7 @@ pRunDbSyncNode = <*> pPGPassSource <*> pExtended <*> pHasCache + <*> pHasLedger <*> optional pSlotNo pConfigFile :: Parser ConfigFile @@ -113,6 +114,13 @@ pHasCache = <> Opt.help "Disables the db-sync caches. Reduces memory usage but it takes longer to sync." ) +pHasLedger :: Parser Bool +pHasLedger = + Opt.flag True False + ( Opt.long "disable-ledger" + <> Opt.help "Disables the leger state. Drastically reduces memory usage and it syncs faster, but some data are missing." + ) + pSocketPath :: Parser SocketPath pSocketPath = SocketPath <$> Opt.strOption diff --git a/cardano-db-sync/src/Cardano/DbSync/Api.hs b/cardano-db-sync/src/Cardano/DbSync/Api.hs index 208a80820..9b49fe6c7 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api.hs @@ -65,6 +65,7 @@ data SyncOptions = SyncOptions { soptExtended :: !Bool , soptAbortOnInvalid :: !Bool , soptCache :: !Bool + , soptLedger :: !Bool , snapshotEveryFollowing :: !Word64 , snapshotEveryLagging :: !Word64 } diff --git a/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs b/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs index e4efef84c..62c35233e 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs @@ -68,6 +68,7 @@ data SyncNodeParams = SyncNodeParams , enpPGPassSource :: !PGPassSource , enpExtended :: !Bool , enpHasCache :: !Bool + , enpHasLedger :: !Bool , enpMaybeRollback :: !(Maybe SlotNo) } @@ -156,8 +157,8 @@ pcNodeConfigFilePath = unNodeConfigFile . pcNodeConfigFile -- ------------------------------------------------------------------------------------------------- instance FromJSON SyncPreConfig where - parseJSON o = - Aeson.withObject "top-level" parseGenSyncNodeConfig o + parseJSON = + Aeson.withObject "top-level" parseGenSyncNodeConfig parseGenSyncNodeConfig :: Object -> Parser SyncPreConfig parseGenSyncNodeConfig o = diff --git a/cardano-db-sync/src/Cardano/DbSync/Sync.hs b/cardano-db-sync/src/Cardano/DbSync/Sync.hs index f690002df..68bdcd6b1 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Sync.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Sync.hs @@ -130,7 +130,7 @@ runSyncNode metricsSetters trce backend iomgr aop snEveryFollowing snEveryLaggin case genCfg of GenesisCardano {} -> do syncEnv <- ExceptT $ mkSyncEnvFromConfig trce backend - (SyncOptions (enpExtended enp) aop (enpHasCache enp) snEveryFollowing snEveryLagging) + (SyncOptions (enpExtended enp) aop (enpHasCache enp) (enpHasLedger enp) snEveryFollowing snEveryLagging) (enpLedgerStateDir enp) genCfg liftIO $ epochStartup syncEnv liftIO $ runSyncNodeClient metricsSetters syncEnv iomgr trce (enpSocketPath enp) @@ -298,8 +298,8 @@ chainSyncClient metricsSetters trce latestPoints currentTip actionQueue = do goTip :: MkPipelineDecision -> Nat n -> WithOrigin BlockNo -> Tip CardanoBlock -> Maybe [CardanoPoint] -> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO () - goTip mkPipelineDecision n clientTip serverTip mPoint = - go mkPipelineDecision n clientTip (getTipBlockNo serverTip) mPoint + goTip mkPipelineDecision n clientTip serverTip = + go mkPipelineDecision n clientTip (getTipBlockNo serverTip) go :: MkPipelineDecision -> Nat n -> WithOrigin BlockNo -> WithOrigin BlockNo -> Maybe [CardanoPoint] -> ClientPipelinedStIdle n CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO () From d3da72535227b2e299bd420a8399ecc0a25f49c1 Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Fri, 3 Jun 2022 14:04:38 +0300 Subject: [PATCH 2/3] Remove from Ledger whatever doesn't belong there --- cardano-db-sync/src/Cardano/DbSync/Api.hs | 38 ++++++++----------- cardano-db-sync/src/Cardano/DbSync/Default.hs | 4 +- .../src/Cardano/DbSync/Era/Shelley/Insert.hs | 4 +- .../src/Cardano/DbSync/Era/Shelley/Offline.hs | 10 ++--- .../src/Cardano/DbSync/LedgerState.hs | 24 ++---------- cardano-db-sync/src/Cardano/DbSync/Sync.hs | 2 +- 6 files changed, 29 insertions(+), 53 deletions(-) diff --git a/cardano-db-sync/src/Cardano/DbSync/Api.hs b/cardano-db-sync/src/Cardano/DbSync/Api.hs index 9b49fe6c7..9f6a8e481 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api.hs @@ -39,7 +39,9 @@ import Cardano.DbSync.Error import Cardano.DbSync.LedgerState import Cardano.DbSync.Types +import Control.Monad.Class.MonadSTM.Strict (StrictTVar, TBQueue, newTBQueueIO, newTVarIO) import Control.Monad.Trans.Maybe (MaybeT (..)) +import Data.Time.Clock (UTCTime, getCurrentTime) import Database.Persist.Sql (SqlBackend) @@ -58,6 +60,9 @@ data SyncEnv = SyncEnv , envBackend :: !SqlBackend , envOptions :: !SyncOptions , envCache :: !Cache + , envOfflineWorkQueue :: !(TBQueue IO PoolFetchRetry) + , envOfflineResultQueue :: !(TBQueue IO FetchResult) + , envEpochSyncTime :: !(StrictTVar IO UTCTime) , envLedger :: !LedgerEnv } @@ -119,12 +124,15 @@ getCurrentTipBlockNo env = do mkSyncEnv :: Trace IO Text -> SqlBackend -> SyncOptions -> ProtocolInfo IO CardanoBlock -> Ledger.Network - -> NetworkMagic -> SystemStart -> LedgerStateDir -> EpochSlot + -> NetworkMagic -> SystemStart -> LedgerStateDir -> IO SyncEnv -mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart dir stableEpochSlot = do - ledgerEnv <- mkLedgerEnv trce protoInfo dir nw stableEpochSlot systemStart (soptAbortOnInvalid syncOptions) +mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart dir = do + ledgerEnv <- mkLedgerEnv trce protoInfo dir nw systemStart (soptAbortOnInvalid syncOptions) (snapshotEveryFollowing syncOptions) (snapshotEveryLagging syncOptions) cache <- if soptCache syncOptions then newEmptyCache 100000 else pure uninitiatedCache + owq <- newTBQueueIO 100 + orq <- newTBQueueIO 100 + epochSyncTime <- newTVarIO =<< getCurrentTime pure $ SyncEnv { envProtocol = SyncProtocolCardano , envNetworkMagic = nwMagic @@ -132,6 +140,9 @@ mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart dir stableEp , envBackend = backend , envOptions = syncOptions , envCache = cache + , envOfflineWorkQueue = owq + , envOfflineResultQueue = orq + , envEpochSyncTime = epochSyncTime , envLedger = ledgerEnv } @@ -155,7 +166,7 @@ mkSyncEnvFromConfig trce backend syncOptions dir genCfg = Right <$> mkSyncEnv trce backend syncOptions (mkProtocolInfoCardano genCfg []) (Shelley.sgNetworkId $ scConfig sCfg) (NetworkMagic . unProtocolMagicId $ Byron.configProtocolMagicId bCfg) (SystemStart .Byron.gdStartTime $ Byron.configGenesisData bCfg) - dir (calculateStableEpochSlot $ scConfig sCfg) + dir getLatestPoints :: SyncEnv -> IO [CardanoPoint] @@ -181,22 +192,3 @@ verifyFilePoints env files = convertHashBlob :: ByteString -> Maybe (HeaderHash CardanoBlock) convertHashBlob = Just . fromRawHash (Proxy @CardanoBlock) - --- ------------------------------------------------------------------------------------------------- --- This is incredibly suboptimal. It should work, for now, but may break at some future time and --- when it is wrong then data in `db-sync` will simply be wrong and we do not have any way of --- detecting that it is wrong. --- --- An epoch is `10 k / f` long, and the stability window is `3 k / f` so the time from the start --- of the epoch to start of the stability window is `7 k / f`. --- --- Hopefully lower level libraries will be able to provide us with something better than this soon. -calculateStableEpochSlot :: Shelley.ShelleyGenesis era -> EpochSlot -calculateStableEpochSlot cfg = - EpochSlot $ ceiling (7.0 * secParam / actSlotCoeff) - where - secParam :: Double - secParam = fromIntegral $ Shelley.sgSecurityParam cfg - - actSlotCoeff :: Double - actSlotCoeff = fromRational (Ledger.unboundRational $ Shelley.sgActiveSlotsCoeff cfg) diff --git a/cardano-db-sync/src/Cardano/DbSync/Default.hs b/cardano-db-sync/src/Cardano/DbSync/Default.hs index 7f35b072c..729a04605 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Default.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Default.hs @@ -69,7 +69,7 @@ applyAndInsert env cblk = do insertLedgerEvents env (sdEpochNo details) (apEvents applyResult) insertEpoch details let firstBlockOfEpoch = hasEpochStartEvent (apEvents applyResult) - let isMember = \poolId -> Set.member poolId (apPoolsRegistered applyResult) + let isMember poolId = Set.member poolId (apPoolsRegistered applyResult) case cblk of BlockByron blk -> newExceptT $ insertByronBlock env firstBlockOfEpoch blk details @@ -129,7 +129,7 @@ insertLedgerEvents env currentEpochNo@(EpochNo curEpoch) = case ev of LedgerNewEpoch en ss -> do lift $ do - insertEpochSyncTime en (toSyncState ss) (leEpochSyncTime lenv) + insertEpochSyncTime en (toSyncState ss) (envEpochSyncTime env) sqlBackend <- lift ask persistantCacheSize <- liftIO $ statementCacheSize $ connStmtMap sqlBackend liftIO . logInfo tracer $ "Persistant SQL Statement Cache size is " <> textShow persistantCacheSize diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs index 418756e44..492428986 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs @@ -146,8 +146,8 @@ insertShelleyBlock env firstBlockOfEpoch blk details isMember mNewEpoch stakeSli when (unBlockNo (Generic.blkBlockNo blk) `mod` offlineModBase == 0) . lift $ do - insertOfflineResults tracer (leOfflineResultQueue lenv) - loadOfflineWorkQueue tracer (leOfflineWorkQueue lenv) + insertOfflineResults tracer (envOfflineResultQueue env) + loadOfflineWorkQueue tracer (envOfflineWorkQueue env) when (getSyncStatus details == SyncFollowing) $ -- Serializiing things during syncing can drastically slow down full sync diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Offline.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Offline.hs index 1e214334d..ccf6e0fd8 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Offline.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Offline.hs @@ -39,7 +39,7 @@ import qualified Cardano.Crypto.Hash.Class as Crypto import Cardano.Db import qualified Cardano.Db as DB -import Cardano.DbSync.LedgerState +import Cardano.DbSync.Api import Cardano.DbSync.Util import Database.Persist.Sql (SqlBackend) @@ -93,18 +93,18 @@ insertOfflineResults trce resultQueue = do ResultError {} -> True -runOfflineFetchThread :: Trace IO Text -> LedgerEnv -> IO () -runOfflineFetchThread trce lenv = do +runOfflineFetchThread :: Trace IO Text -> SyncEnv -> IO () +runOfflineFetchThread trce env = do logInfo trce "Running Offline fetch thread" forever $ do threadDelay 60_000_000 -- 60 second sleep - xs <- blockingFlushTBQueue (leOfflineWorkQueue lenv) + xs <- blockingFlushTBQueue (envOfflineWorkQueue env) manager <- Http.newManager tlsManagerSettings now <- liftIO Time.getPOSIXTime mapM_ (queueInsert <=< fetchOfflineData trce manager now) xs where queueInsert :: FetchResult -> IO () - queueInsert = atomically . writeTBQueue (leOfflineResultQueue lenv) + queueInsert = atomically . writeTBQueue (envOfflineResultQueue env) -- ------------------------------------------------------------------------------------------------- diff --git a/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs b/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs index 323e548a5..ab20c380d 100644 --- a/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs +++ b/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs @@ -57,8 +57,8 @@ import Cardano.Slotting.EpochInfo (EpochInfo, epochInfoEpoch) import Cardano.Slotting.Slot (EpochNo (..), SlotNo (..), WithOrigin (..), fromWithOrigin) import qualified Control.Exception as Exception -import Control.Monad.Class.MonadSTM.Strict (StrictTVar, TBQueue, atomically, newTBQueueIO, - newTVarIO, readTVar, writeTVar) +import Control.Monad.Class.MonadSTM.Strict (StrictTVar, atomically, newTVarIO, readTVar, + writeTVar) -- import Codec.CBOR.Write (toBuilder) import qualified Data.ByteString.Base16 as Base16 @@ -126,13 +126,6 @@ data LedgerEnv = LedgerEnv , leInterpreter :: !(StrictTVar IO (Strict.Maybe CardanoInterpreter)) , leStateVar :: !(StrictTVar IO (Strict.Maybe LedgerDB)) , leEventState :: !(StrictTVar IO LedgerEventState) - -- The following do not really have anything to do with maintaining ledger - -- state. They are here due to the ongoing headaches around the split between - -- `cardano-sync` and `cardano-db-sync`. - , leOfflineWorkQueue :: !(TBQueue IO PoolFetchRetry) - , leOfflineResultQueue :: !(TBQueue IO FetchResult) - , leEpochSyncTime :: !(StrictTVar IO UTCTime) - , leStableEpochSlot :: !EpochSlot } -- TODO this is unstable in terms of restarts and we should try to remove it. @@ -227,17 +220,12 @@ ledgerDbCurrent = either id id . AS.head . ledgerDbCheckpoints mkLedgerEnv :: Trace IO Text -> Consensus.ProtocolInfo IO CardanoBlock -> LedgerStateDir - -> Ledger.Network -> EpochSlot -> SystemStart -> Bool -> Word64 -> Word64 + -> Ledger.Network -> SystemStart -> Bool -> Word64 -> Word64 -> IO LedgerEnv -mkLedgerEnv trce protocolInfo dir nw stableEpochSlot systemStart aop snapshotEveryFollowing snapshotEveryLagging = do +mkLedgerEnv trce protocolInfo dir nw systemStart aop snapshotEveryFollowing snapshotEveryLagging = do svar <- newTVarIO Strict.Nothing evar <- newTVarIO initLedgerEventState intervar <- newTVarIO Strict.Nothing - -- 2.5 days worth of slots. If we try to stick more than this number of - -- items in the queue, bad things are likely to happen. - owq <- newTBQueueIO 100 - orq <- newTBQueueIO 100 - est <- newTVarIO =<< getCurrentTime pure LedgerEnv { leTrace = trce , leProtocolInfo = protocolInfo @@ -250,10 +238,6 @@ mkLedgerEnv trce protocolInfo dir nw stableEpochSlot systemStart aop snapshotEve , leInterpreter = intervar , leStateVar = svar , leEventState = evar - , leOfflineWorkQueue = owq - , leOfflineResultQueue = orq - , leEpochSyncTime = est - , leStableEpochSlot = stableEpochSlot } where initLedgerEventState :: LedgerEventState diff --git a/cardano-db-sync/src/Cardano/DbSync/Sync.hs b/cardano-db-sync/src/Cardano/DbSync/Sync.hs index 68bdcd6b1..0652514aa 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Sync.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Sync.hs @@ -226,7 +226,7 @@ dbSyncProtocols trce env metricsSetters _version codecs _connectionId = race_ (race (runDbThread env metricsSetters actionQueue) - (runOfflineFetchThread trce (envLedger env)) + (runOfflineFetchThread trce env) ) (runPipelinedPeer localChainSyncTracer From 354af2638ea862b530021ba5e9fc2fbcc58c42af Mon Sep 17 00:00:00 2001 From: Kostas Dermentzis Date: Fri, 3 Jun 2022 21:56:55 +0300 Subject: [PATCH 3/3] Implement a version that doesn't maintain the ledger state --- cardano-db-sync/cardano-db-sync.cabal | 1 + cardano-db-sync/src/Cardano/DbSync/Api.hs | 31 +++- .../src/Cardano/DbSync/Database.hs | 4 +- cardano-db-sync/src/Cardano/DbSync/Default.hs | 13 +- .../src/Cardano/DbSync/LedgerState.hs | 15 +- .../src/Cardano/DbSync/LocalStateQuery.hs | 149 ++++++++++++++++++ cardano-db-sync/src/Cardano/DbSync/Sync.hs | 8 +- cardano-db/src/Cardano/Db/Query.hs | 11 ++ 8 files changed, 217 insertions(+), 15 deletions(-) create mode 100644 cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs diff --git a/cardano-db-sync/cardano-db-sync.cabal b/cardano-db-sync/cardano-db-sync.cabal index 15414efe0..0927ff551 100644 --- a/cardano-db-sync/cardano-db-sync.cabal +++ b/cardano-db-sync/cardano-db-sync.cabal @@ -109,6 +109,7 @@ library Cardano.DbSync.Rollback + Cardano.DbSync.LocalStateQuery Cardano.DbSync.StateQuery Cardano.DbSync.Sync Cardano.DbSync.Tracing.ToObjectOrphans diff --git a/cardano-db-sync/src/Cardano/DbSync/Api.hs b/cardano-db-sync/src/Cardano/DbSync/Api.hs index 9f6a8e481..1949f45be 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api.hs @@ -1,5 +1,6 @@ {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} module Cardano.DbSync.Api @@ -9,6 +10,7 @@ module Cardano.DbSync.Api , mkSyncEnvFromConfig , verifyFilePoints , getTrace + , hasLedgerState , getLatestPoints , getSlotHash , getDbLatestBlockInfo @@ -37,6 +39,7 @@ import Cardano.DbSync.Config.Shelley import Cardano.DbSync.Config.Types import Cardano.DbSync.Error import Cardano.DbSync.LedgerState +import Cardano.DbSync.LocalStateQuery import Cardano.DbSync.Types import Control.Monad.Class.MonadSTM.Strict (StrictTVar, TBQueue, newTBQueueIO, newTVarIO) @@ -63,6 +66,7 @@ data SyncEnv = SyncEnv , envOfflineWorkQueue :: !(TBQueue IO PoolFetchRetry) , envOfflineResultQueue :: !(TBQueue IO FetchResult) , envEpochSyncTime :: !(StrictTVar IO UTCTime) + , envNoLedgerEnv :: !NoLedgerStateEnv -- only used when configured without ledger state. , envLedger :: !LedgerEnv } @@ -81,6 +85,9 @@ getTrace = leTrace . envLedger getSlotHash :: SqlBackend -> SlotNo -> IO [(SlotNo, ByteString)] getSlotHash backend = DB.runDbIohkNoLogging backend . DB.querySlotHash +hasLedgerState :: SyncEnv -> Bool +hasLedgerState = soptLedger . envOptions + getDbLatestBlockInfo :: SqlBackend -> IO (Maybe TipInfo) getDbLatestBlockInfo backend = do runMaybeT $ do @@ -133,6 +140,7 @@ mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart dir = do owq <- newTBQueueIO 100 orq <- newTBQueueIO 100 epochSyncTime <- newTVarIO =<< getCurrentTime + noLegdState <- mkNoLedgerStateEnv trce systemStart pure $ SyncEnv { envProtocol = SyncProtocolCardano , envNetworkMagic = nwMagic @@ -143,6 +151,7 @@ mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart dir = do , envOfflineWorkQueue = owq , envOfflineResultQueue = orq , envEpochSyncTime = epochSyncTime + , envNoLedgerEnv = noLegdState , envLedger = ledgerEnv } @@ -171,8 +180,16 @@ mkSyncEnvFromConfig trce backend syncOptions dir genCfg = getLatestPoints :: SyncEnv -> IO [CardanoPoint] getLatestPoints env = do - files <- listLedgerStateFilesOrdered $ leDir (envLedger env) - verifyFilePoints env files + if hasLedgerState env then do + files <- listLedgerStateFilesOrdered $ leDir (envLedger env) + verifyFilePoints env files + else do + -- Brings the 5 latest. + lastPoints <- DB.runDbIohkNoLogging (envBackend env) DB.queryLatestPoints + pure $ mapMaybe convert' lastPoints + where + convert' (Nothing, _) = Nothing + convert' (Just slot, bs) = convert (SlotNo slot) bs verifyFilePoints :: SyncEnv -> [LedgerStateFile] -> IO [CardanoPoint] verifyFilePoints env files = @@ -183,12 +200,12 @@ verifyFilePoints env files = hashes <- getSlotHash (envBackend env) (lsfSlotNo lsf) let valid = find (\(_, h) -> lsfHash lsf == hashToAnnotation h) hashes case valid of - Just (slot, hash) | slot == lsfSlotNo lsf -> pure $ convert (slot, hash) + Just (slot, hash) | slot == lsfSlotNo lsf -> pure $ convert slot hash _ -> pure Nothing - convert :: (SlotNo, ByteString) -> Maybe CardanoPoint - convert (slot, hashBlob) = - Point . Point.block slot <$> convertHashBlob hashBlob - +convert :: SlotNo -> ByteString -> Maybe CardanoPoint +convert slot hashBlob = + Point . Point.block slot <$> convertHashBlob hashBlob + where convertHashBlob :: ByteString -> Maybe (HeaderHash CardanoBlock) convertHashBlob = Just . fromRawHash (Proxy @CardanoBlock) diff --git a/cardano-db-sync/src/Cardano/DbSync/Database.hs b/cardano-db-sync/src/Cardano/DbSync/Database.hs index 6b09b8aa0..8efefbb06 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Database.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Database.hs @@ -89,7 +89,9 @@ runActions env actions = do pure Done ([], DbRollBackToPoint pt resultVar : ys) -> do runRollbacksDB env pt - points <- lift $ rollbackLedger env pt + points <- if hasLedgerState env + then lift $ rollbackLedger env pt + else pure Nothing blockNo <- lift $ getDbTipBlockNo env lift $ atomically $ putTMVar resultVar (points, blockNo) dbAction Continue ys diff --git a/cardano-db-sync/src/Cardano/DbSync/Default.hs b/cardano-db-sync/src/Cardano/DbSync/Default.hs index 729a04605..daac16f72 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Default.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Default.hs @@ -27,7 +27,8 @@ import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertPoolDepositRefun import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards) import Cardano.DbSync.Error import Cardano.DbSync.LedgerState (ApplyResult (..), LedgerEvent (..), - applyBlockAndSnapshot) + applyBlockAndSnapshot, defaultApplyResult) +import Cardano.DbSync.LocalStateQuery import Cardano.DbSync.Rollback (rollbackToPoint) import Cardano.DbSync.Types import Cardano.DbSync.Util @@ -64,7 +65,7 @@ insertListBlocks env blocks = do applyAndInsert :: SyncEnv -> CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) () applyAndInsert env cblk = do - !applyResult <- liftIO $ applyBlockAndSnapshot (envLedger env) cblk + !applyResult <- liftIO mkApplyResult let !details = apSlotDetails applyResult insertLedgerEvents env (sdEpochNo details) (apEvents applyResult) insertEpoch details @@ -99,6 +100,14 @@ applyAndInsert env cblk = do Strict.Just pr -> pr Strict.Nothing -> Ledger.Prices minBound minBound + mkApplyResult :: IO ApplyResult + mkApplyResult = do + if hasLedgerState env then + applyBlockAndSnapshot (envLedger env) cblk + else do + slotDetails <- getSlotDetailsNode (envNoLedgerEnv env) (cardanoBlockSlotNo cblk) + pure $ defaultApplyResult slotDetails + -- ------------------------------------------------------------------------------------------------- insertLedgerEvents diff --git a/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs b/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs index ab20c380d..26de0168d 100644 --- a/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs +++ b/cardano-db-sync/src/Cardano/DbSync/LedgerState.hs @@ -15,6 +15,7 @@ module Cardano.DbSync.LedgerState , LedgerEvent (..) , ApplyResult (..) , LedgerStateFile (..) + , defaultApplyResult , mkLedgerEnv , applyBlockAndSnapshot , listLedgerStateFilesOrdered @@ -74,7 +75,7 @@ import qualified Data.Text as Text import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import Ouroboros.Consensus.Block (CodecConfig, WithOrigin (..), blockHash, blockIsEBB, - blockPoint, blockPrevHash, pointSlot) + blockPrevHash, pointSlot) import Ouroboros.Consensus.Block.Abstract (ConvertRawHash (..)) import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..)) import Ouroboros.Consensus.Cardano.Block (LedgerState (..), StandardCrypto) @@ -187,11 +188,20 @@ data ApplyResult = ApplyResult , apPoolsRegistered :: !(Set.Set PoolKeyHash) -- registered before the block application , apNewEpoch :: !(Strict.Maybe Generic.NewEpoch) -- Only Just for a single block at the epoch boundary , apSlotDetails :: !SlotDetails - , apPoint :: !CardanoPoint , apStakeSlice :: !Generic.StakeSliceRes , apEvents :: ![LedgerEvent] } +defaultApplyResult :: SlotDetails -> ApplyResult +defaultApplyResult slotDetails = ApplyResult + { apPrices = Strict.Nothing + , apPoolsRegistered = Set.empty + , apNewEpoch = Strict.Nothing + , apSlotDetails = slotDetails + , apStakeSlice = Generic.NoSlices + , apEvents = [] + } + newtype LedgerDB = LedgerDB { ledgerDbCheckpoints :: AnchoredSeq (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState } @@ -293,7 +303,6 @@ applyBlock env blk = do , apPoolsRegistered = getRegisteredPools oldState , apNewEpoch = maybeToStrict newEpoch , apSlotDetails = details - , apPoint = blockPoint blk , apStakeSlice = stakeSlice newState details , apEvents = sort $ events ++ ledgerEvents } diff --git a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs new file mode 100644 index 000000000..7e46554c6 --- /dev/null +++ b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs @@ -0,0 +1,149 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeFamilies #-} + +module Cardano.DbSync.LocalStateQuery + ( NoLedgerStateEnv (..) + , mkNoLedgerStateEnv + , getSlotDetailsNode + , localStateQueryHandler + , newStateQueryTMVar + ) where + +import Cardano.BM.Trace (Trace, logInfo) + +import Cardano.Prelude hiding (atomically, (.)) + +import Cardano.Slotting.Slot (SlotNo (..)) + +import Cardano.Ledger.Crypto (StandardCrypto) + +import Cardano.Db (textShow) + +import Cardano.DbSync.StateQuery +import Cardano.DbSync.Types + +import Control.Monad.Class.MonadSTM.Strict (StrictTMVar, StrictTVar, atomically, + newEmptyTMVarIO, newTVarIO, putTMVar, readTVar, takeTMVar, writeTVar) + +import qualified Data.Strict.Maybe as Strict +import Data.Time.Clock (getCurrentTime) + +import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..)) +import Ouroboros.Consensus.Cardano.Block (BlockQuery (QueryHardFork), CardanoEras) +import Ouroboros.Consensus.Cardano.Node () +import Ouroboros.Consensus.HardFork.Combinator.Ledger.Query + (QueryHardFork (GetInterpreter)) +import Ouroboros.Consensus.HardFork.History.Qry (Interpreter, PastHorizonException, + interpretQuery) +import Ouroboros.Consensus.Ledger.Query (Query (..)) + +import Ouroboros.Network.Block (Point (..)) +import Ouroboros.Network.Protocol.LocalStateQuery.Client (ClientStAcquired (..), + ClientStAcquiring (..), ClientStIdle (..), ClientStQuerying (..), + LocalStateQueryClient (..)) +import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as StateQuery +import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure) + +data NoLedgerStateEnv = NoLedgerStateEnv + { nlsTracer :: Trace IO Text + , nlsSystemStart :: !SystemStart + , nlsQueryVar :: StateQueryTMVar CardanoBlock CardanoInterpreter + , nlsHistoryInterpreterVar :: StrictTVar IO (Strict.Maybe CardanoInterpreter) + } + +newtype StateQueryTMVar blk result = StateQueryTMVar + { unStateQueryTMVar :: + StrictTMVar IO + ( Query blk result + , StrictTMVar IO (Either AcquireFailure result) + ) + } + +mkNoLedgerStateEnv :: Trace IO Text -> SystemStart -> IO NoLedgerStateEnv +mkNoLedgerStateEnv trce systemStart = do + qVar <- newStateQueryTMVar + interVar <- newTVarIO Strict.Nothing + pure $ NoLedgerStateEnv trce systemStart qVar interVar + +newStateQueryTMVar :: IO (StateQueryTMVar blk result) +newStateQueryTMVar = StateQueryTMVar <$> newEmptyTMVarIO + +-- Get the requested slot details using a history interpreter stashed in an IORef. +-- If the history interpreter does not exist, get one. +-- If the existing history interpreter returns an error, get a new one and try again. +getSlotDetailsNode + :: NoLedgerStateEnv + -> SlotNo + -> IO SlotDetails +getSlotDetailsNode env slot = do + einterp1 <- maybe (getHistoryInterpreter env) pure =<< atomically (fromStrictMaybe <$> readTVar interVar) + case evalSlotDetails einterp1 of + Right sd -> insertCurrentTime sd + Left _ -> do + einterp2 <- getHistoryInterpreter env + case evalSlotDetails einterp2 of + Left err -> panic $ "getSlotDetailsNode: " <> textShow err + Right sd -> insertCurrentTime sd + where + interVar = nlsHistoryInterpreterVar env + + evalSlotDetails :: Interpreter (CardanoEras StandardCrypto) -> Either PastHorizonException SlotDetails + evalSlotDetails interp = + interpretQuery interp (querySlotDetails (nlsSystemStart env) slot) + + insertCurrentTime :: SlotDetails -> IO SlotDetails + insertCurrentTime sd = do + time <- getCurrentTime + pure $ sd { sdCurrentTime = time } + + fromStrictMaybe :: Strict.Maybe a -> Maybe a + fromStrictMaybe (Strict.Just a) = Just a + fromStrictMaybe Strict.Nothing = Nothing + +getHistoryInterpreter + :: NoLedgerStateEnv + -> IO CardanoInterpreter +getHistoryInterpreter env = do + respVar <- newEmptyTMVarIO + atomically $ putTMVar reqVar (BlockQuery $ QueryHardFork GetInterpreter, respVar) + res <- atomically $ takeTMVar respVar + case res of + Left err -> + panic $ "getHistoryInterpreter: " <> textShow err + Right interp -> do + logInfo tracer "getHistoryInterpreter: acquired" + atomically $ writeTVar interVar $ Strict.Just interp + pure interp + where + reqVar = unStateQueryTMVar $ nlsQueryVar env + interVar = nlsHistoryInterpreterVar env + tracer = nlsTracer env + +-- This is called during the ChainSync setup and loops forever. Queries can be posted to +-- it and responses retrieved via a TVar. +localStateQueryHandler + :: forall a + . NoLedgerStateEnv -> LocalStateQueryClient CardanoBlock (Point CardanoBlock) (Query CardanoBlock) IO a +localStateQueryHandler env = + LocalStateQueryClient idleState + where + idleState :: IO (StateQuery.ClientStIdle CardanoBlock (Point CardanoBlock) (Query CardanoBlock) IO a) + idleState = do + (query, respVar) <- atomically $ takeTMVar reqVar + pure . + SendMsgAcquire Nothing $ + ClientStAcquiring + { recvMsgAcquired = + pure . SendMsgQuery query $ + ClientStQuerying + { recvMsgResult = \result -> do + atomically $ putTMVar respVar (Right result) + pure $ SendMsgRelease idleState + } + , recvMsgFailure = \failure -> do + atomically $ putTMVar respVar (Left failure) + idleState + } + + reqVar = unStateQueryTMVar $ nlsQueryVar env diff --git a/cardano-db-sync/src/Cardano/DbSync/Sync.hs b/cardano-db-sync/src/Cardano/DbSync/Sync.hs index 0652514aa..b11dff0ef 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Sync.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Sync.hs @@ -45,6 +45,7 @@ import Cardano.DbSync.DbAction import Cardano.DbSync.Epoch import Cardano.DbSync.Era import Cardano.DbSync.Error +import Cardano.DbSync.LocalStateQuery import Cardano.DbSync.Metrics import Cardano.DbSync.Tracing.ToObjectOrphans () import Cardano.DbSync.Types @@ -55,6 +56,8 @@ import qualified Codec.CBOR.Term as CBOR import Control.Monad.Trans.Except.Exit (orDie) import qualified Data.ByteString.Lazy as BSL +import Data.Functor.Contravariant (contramap) +import qualified Data.Text as Text import Database.Persist.Sql (SqlBackend) @@ -92,6 +95,7 @@ import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.PipelineDecision (MkPipelineDecision, PipelineDecision (..), pipelineDecisionLowHighMark, runPipelineDecision) import Ouroboros.Network.Protocol.ChainSync.Type (ChainSync) +import Ouroboros.Network.Protocol.LocalStateQuery.Client (localStateQueryClientPeer) import qualified Ouroboros.Network.Snocket as Snocket import Ouroboros.Network.Subscription (SubscriptionTrace) @@ -251,9 +255,9 @@ dbSyncProtocols trce env metricsSetters _version codecs _connectionId = localStateQuery :: RunMiniProtocol 'InitiatorMode BSL.ByteString IO () Void localStateQuery = InitiatorProtocolOnly $ MuxPeer - Logging.nullTracer + (if hasLedgerState env then Logging.nullTracer else contramap (Text.pack . show) . toLogObject $ appendName "local-state-query" trce) (cStateQueryCodec codecs) - localStateQueryPeerNull + (if hasLedgerState env then localStateQueryPeerNull else localStateQueryClientPeer (localStateQueryHandler (envNoLedgerEnv env))) -- | 'ChainSyncClient' which traces received blocks and ignores when it -- receives a request to rollbackwar. A real wallet client should: diff --git a/cardano-db/src/Cardano/Db/Query.hs b/cardano-db/src/Cardano/Db/Query.hs index e8c891303..a1d85953d 100644 --- a/cardano-db/src/Cardano/Db/Query.hs +++ b/cardano-db/src/Cardano/Db/Query.hs @@ -37,6 +37,7 @@ module Cardano.Db.Query , queryGenesisSupply , queryShelleyGenesisSupply , queryLatestBlock + , queryLatestPoints , queryLatestCachedEpochNo , queryLatestEpochNo , queryLatestBlockId @@ -552,6 +553,16 @@ queryLatestBlock = do pure blk pure $ fmap entityVal (listToMaybe res) +queryLatestPoints :: MonadIO m => ReaderT SqlBackend m [(Maybe Word64, ByteString)] +queryLatestPoints = do + res <- select $ do + blk <- from $ table @Block + where_ (isJust $ blk ^. BlockSlotNo) + orderBy [desc (blk ^. BlockSlotNo)] + limit 5 + pure (blk ^. BlockSlotNo, blk ^. BlockHash) + pure $ fmap unValue2 res + queryLatestCachedEpochNo :: MonadIO m => ReaderT SqlBackend m (Maybe Word64) queryLatestCachedEpochNo = do res <- select $ do