Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
Add epoch stakepool size indexer (#850)
Browse files Browse the repository at this point in the history
* Add suffixes to sqlite databases

* Add ledger state streaming, equivalent to foldBlocks defined in cardano-api

- Cardano.Streaming.Callbacks: Add primitive callback based versions
  of local chainsync to Cardano.Streaming. `blocksCallback` and
  `blocksCallbackPipelined` both take a callback when iterating over the
  received blocks from local chainsync connection

- Cardano.Streaming: streaming package's versions to stream blocks or
  ledger states. `foldLedgerState` does essentially what `foldBlocks`
  does in cardano-api:Cardano.Api.LedgerState, with the difference
  that it doesn't drop connection when it's up to date with regarding
  to the node, but keeps listening.

* Add epoch stakepool size indexer

The `toEvents` and `getStakeMap` make up the core of the
implementation in Marconi.Index.EpochStakepoolSize.

* Refactor marconi test for reuse in epoch stakepool size indexer

* Add test for epoch stakepool size indexer

* Add changelog entry

* Disable test and add TODO

* Fix warnings

* Fix marconi.cabal
  • Loading branch information
eyeinsky authored Jan 17, 2023
1 parent 0d35e44 commit 6990714
Show file tree
Hide file tree
Showing 15 changed files with 1,287 additions and 208 deletions.
31 changes: 29 additions & 2 deletions cardano-streaming/cardano-streaming.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ common lang
library
import: lang
hs-source-dirs: src
exposed-modules: Cardano.Streaming
exposed-modules:
Cardano.Streaming
Cardano.Streaming.Callbacks
Cardano.Streaming.Helpers

--------------------------
-- Other IOG dependencies
Expand All @@ -42,9 +45,33 @@ library
-- Non-IOG dependencies
------------------------
build-depends:
, aeson
, async
, base >=4.9 && <5
, base >=4.9 && <5
, bytestring
, cardano-api
, cardano-binary
, cardano-crypto-class
, cardano-crypto-wrapper
, cardano-data
, cardano-ledger-alonzo
, cardano-ledger-byron
, cardano-ledger-core
, cardano-ledger-shelley
, cardano-protocol-tpraos
, cardano-slotting
, containers
, ouroboros-consensus
, ouroboros-consensus-byron
, ouroboros-consensus-cardano
, ouroboros-consensus-protocol
, ouroboros-consensus-shelley
, primitive
, small-steps
, streaming
, transformers
, typed-protocols
, vector

executable cardano-streaming-example-1
import: lang
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Fold blocks into ledger state at client side using local chainsync protocol, both pipelined and non-pipelined versions are provided.
248 changes: 191 additions & 57 deletions cardano-streaming/src/Cardano/Streaming.hs
Original file line number Diff line number Diff line change
@@ -1,48 +1,62 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
module Cardano.Streaming
( withChainSyncEventStream,
ChainSyncEvent (..),
ChainSyncEventException (..),
( withChainSyncEventStream
, CS.ChainSyncEvent (..)
, CS.ChainSyncEventException (..)

--
, CS.mkConnectInfo
, CS.mkLocalNodeConnectInfo

-- * Stream blocks and ledger states
, blocks
, blocksPipelined
, ledgerStates
, ledgerStatesPipelined
, foldLedgerState
, getEnvAndInitialLedgerStateHistory
, CS.ignoreRollbacks
)
where

import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip,
ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
LocalChainSyncClient (LocalChainSyncClient),
LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxMonitoringClient, localTxSubmissionClient),
LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
NetworkId, connectToLocalNode)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent qualified as IO
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Control.Exception (SomeException (SomeException), catch, throw)
import Control.Exception qualified as IO
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Except (runExceptT)
import Data.Foldable (forM_)
import Data.Function ((&))
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Word (Word32)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
deriving (Show, Functor, Generic)

data ChainSyncEventException
= NoIntersectionFound
deriving (Show)
import Cardano.Api qualified as C
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Cardano.Slotting.Slot (WithOrigin (At, Origin))

instance Exception ChainSyncEventException
import Cardano.Streaming.Callbacks qualified as CS
import Cardano.Streaming.Helpers qualified as CS

-- | `withChainSyncEventStream` uses the chain-sync mini-protocol to
-- connect to a locally running node and fetch blocks from the given
-- starting point.
withChainSyncEventStream ::
-- | Path to the node socket
FilePath ->
NetworkId ->
C.NetworkId ->
-- | The point on the chain to start streaming from
[ChainPoint] ->
[C.ChainPoint] ->
-- | The stream consumer
(Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
(Stream (Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r -> IO b) ->
IO b
withChainSyncEventStream socketPath networkId points consumer = do
-- The chain-sync client runs in a different thread passing the blocks it
Expand All @@ -65,29 +79,16 @@ withChainSyncEventStream socketPath networkId points consumer = do
nextBlockVar <- newEmptyMVar

let client = chainSyncStreamingClient points nextBlockVar

localNodeClientProtocols =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient client,
localStateQueryClient = Nothing,
localTxMonitoringClient = Nothing,
localTxSubmissionClient = Nothing
}

connectInfo =
LocalNodeConnectInfo
{ localConsensusModeParams = CardanoModeParams epochSlots,
localNodeNetworkId = networkId,
localNodeSocketPath = socketPath
C.LocalNodeClientProtocols
{ C.localChainSyncClient = C.LocalChainSyncClient client,
C.localStateQueryClient = Nothing,
C.localTxMonitoringClient = Nothing,
C.localTxSubmissionClient = Nothing
}
connectInfo = CS.mkLocalNodeConnectInfo networkId socketPath

-- This a parameter needed only for the Byron era. Since the Byron
-- era is over and the parameter has never changed it is ok to
-- hardcode this. See comment on `Cardano.Api.ConsensusModeParams` in
-- cardano-node.
epochSlots = EpochSlots 21600

withAsync (connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do
withAsync (C.connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do
-- Make sure all exceptions in the client thread are passed to the consumer thread
link a
-- Run the consumer
Expand All @@ -103,21 +104,21 @@ withChainSyncEventStream socketPath networkId points consumer = do
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
[ChainPoint] ->
MVar (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
[C.ChainPoint] ->
MVar (CS.ChainSyncEvent e) ->
C.ChainSyncClient e C.ChainPoint C.ChainTip IO ()
chainSyncStreamingClient points nextChainEventVar =
ChainSyncClient $ pure $ SendMsgFindIntersect points onIntersect
C.ChainSyncClient $ pure $ SendMsgFindIntersect points onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \cp ct ->
ChainSyncClient $ do
putMVar nextChainEventVar (RollBackward cp ct)
C.ChainSyncClient $ do
putMVar nextChainEventVar (CS.RollBackward cp ct)
sendRequestNext,
recvMsgIntersectNotFound =
-- There is nothing we can do here
throw NoIntersectionFound
throw CS.NoIntersectionFound
}

sendRequestNext =
Expand All @@ -126,12 +127,145 @@ chainSyncStreamingClient points nextChainEventVar =
onNext =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
putMVar nextChainEventVar (RollForward bim ct)
C.ChainSyncClient $ do
putMVar nextChainEventVar (CS.RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
putMVar nextChainEventVar (RollBackward cp ct)
C.ChainSyncClient $ do
putMVar nextChainEventVar (CS.RollBackward cp ct)
sendRequestNext
}

-- | Create stream of @ChainSyncEvent (BlockInMode CardanoMode)@ from
-- a node at @socketPath@ with @networkId@ starting at @point@.
blocks
:: C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
-> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
blocks con chainPoint = do
chan <- liftIO IO.newChan
void $ liftIO $ CS.linkedAsync $ CS.blocksCallback con chainPoint $ IO.writeChan chan
S.repeatM $ IO.readChan chan

blocksPipelined
:: Word32 -> C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
-> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
blocksPipelined pipelineSize con chainPoint = do
chan <- liftIO IO.newChan
void $ liftIO $ CS.linkedAsync $ CS.blocksCallbackPipelined pipelineSize con chainPoint $ IO.writeChan chan
S.repeatM $ IO.readChan chan

-- * Ledger states

-- | Get a stream of permanent ledger states
ledgerStates :: FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStates config socket validationMode = do
(env, initialLedgerStateHistory) <- liftIO $ getEnvAndInitialLedgerStateHistory config
blocks (CS.mkConnectInfo env socket) C.ChainPointAtGenesis
& foldLedgerState env initialLedgerStateHistory validationMode

-- | Get a stream of ledger states over a pipelined chain sync
ledgerStatesPipelined
:: Word32 -> FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStatesPipelined pipelineSize config socket validationMode = do
(env, initialLedgerStateHistory) <- liftIO $ getEnvAndInitialLedgerStateHistory config
blocksPipelined pipelineSize (CS.mkConnectInfo env socket) C.ChainPointAtGenesis
& foldLedgerState env initialLedgerStateHistory validationMode

-- * Apply block

-- | Fold a stream of blocks into a stream of ledger states. This is
-- implemented in a similar way as `foldBlocks` in
-- cardano-api:Cardano.Api.LedgerState, the difference being that this
-- keeps waiting for more blocks when chainsync server and client are
-- fully synchronized.
foldLedgerState
:: C.Env -> LedgerStateHistory -> C.ValidationMode
-> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
-> S.Stream (S.Of C.LedgerState) IO r
foldLedgerState env initialLedgerStateHistory validationMode = loop initialLedgerStateHistory
where
applyBlock_ :: C.LedgerState -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlock_ ledgerState block = applyBlockThrow env ledgerState validationMode block

loop
:: LedgerStateHistory
-> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
-> S.Stream (S.Of C.LedgerState) IO r
loop ledgerStateHistory source = lift (S.next source) >>= \case
Left r -> pure r
Right (chainSyncEvent, source') -> do
ledgerStateHistory' <- case chainSyncEvent of
CS.RollForward (blockInMode@(C.BlockInMode block _)) _ct -> do
newLedgerState <- liftIO $ applyBlock_ (getLastLedgerState ledgerStateHistory) block
let (ledgerStateHistory', committedStates) = pushLedgerState env ledgerStateHistory (CS.bimSlotNo blockInMode) newLedgerState blockInMode
forM_ committedStates $ \(_, (ledgerState, _ledgerEvents), currBlockMay) -> case currBlockMay of
Origin -> return ()
At _currBlock -> S.yield ledgerState
pure ledgerStateHistory'
CS.RollBackward cp _ct -> pure $ case cp of
C.ChainPointAtGenesis -> initialLedgerStateHistory
C.ChainPoint slotNo _ -> rollBackLedgerStateHist ledgerStateHistory slotNo

loop ledgerStateHistory' source'

getEnvAndInitialLedgerStateHistory :: FilePath -> IO (C.Env, LedgerStateHistory)
getEnvAndInitialLedgerStateHistory configPath = do
(env, initialLedgerState) <- either IO.throw pure =<< (runExceptT $ C.initialLedgerState configPath)
let initialLedgerStateHistory = singletonLedgerStateHistory initialLedgerState
return (env, initialLedgerStateHistory)


applyBlockThrow :: C.Env -> C.LedgerState -> C.ValidationMode -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlockThrow env ledgerState validationMode block = case C.applyBlock env ledgerState validationMode block of
Left err -> IO.throw err
Right ls -> pure ls

-- * Copy-paste code
--
-- The following is pasted in from cardano-api:Cardano.Api.LedgerState.
-- (`getLastLedgerState` and `singletonLedgerStateHistory` aren't a
-- direct copy-paste, but they are extracted from within `foldBlocks`)


-- | A history of k (security parameter) recent ledger states. The head is the
-- most recent item. Elements are:
--
-- * Slot number that a new block occurred
-- * The ledger state and events after applying the new block
-- * The new block
--
type LedgerStateHistory = History LedgerStateEvents
type History a = Seq (C.SlotNo, a, WithOrigin (C.BlockInMode C.CardanoMode))

type LedgerStateEvents = (C.LedgerState, [C.LedgerEvent])

-- | Add a new ledger state to the history
pushLedgerState
:: C.Env -- ^ Environment used to get the security param, k.
-> History a -- ^ History of k items.
-> C.SlotNo -- ^ Slot number of the new item.
-> a -- ^ New item to add to the history
-> C.BlockInMode C.CardanoMode
-- ^ The block that (when applied to the previous
-- item) resulted in the new item.
-> (History a, History a)
-- ^ ( The new history with the new item appended
-- , Any existing items that are now past the security parameter
-- and hence can no longer be rolled back.
-- )
pushLedgerState env hist ix st block
= Seq.splitAt
(fromIntegral $ C.envSecurityParam env + 1)
((ix, st, At block) Seq.:<| hist)

rollBackLedgerStateHist :: History a -> C.SlotNo -> History a
rollBackLedgerStateHist hist maxInc = Seq.dropWhileL ((> maxInc) . (\(x,_,_) -> x)) hist

getLastLedgerState :: LedgerStateHistory -> C.LedgerState
getLastLedgerState ledgerStates' = maybe
(error "Impossible! Missing Ledger state")
(\(_,(ledgerState, _),_) -> ledgerState)
(Seq.lookup 0 ledgerStates')

singletonLedgerStateHistory :: C.LedgerState -> LedgerStateHistory
singletonLedgerStateHistory ledgerState = Seq.singleton (0, (ledgerState, []), Origin)
Loading

0 comments on commit 6990714

Please sign in to comment.