Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ledger integration #585

Merged
merged 22 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions nix/.stack.nix/ouroboros-consensus.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 36 additions & 7 deletions ouroboros-consensus/demo-playground/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module CLI (
CLI(..)
, TopologyInfo(..)
, Command(..)
, Protocol(..)
, fromProtocol
, parseCLI
-- * Handy re-exports
, execParser
Expand All @@ -25,16 +27,39 @@ import Ouroboros.Consensus.Util
import Mock.TxSubmission (command', parseMockTx)
import Topology (TopologyInfo (..))

import qualified Test.Cardano.Chain.Genesis.Dummy as Dummy

data CLI = CLI {
systemStart :: SystemStart
, slotDuration :: SlotLength
, command :: Command
}

data Command =
SimpleNode TopologyInfo (Some DemoProtocol)
SimpleNode TopologyInfo Protocol
| TxSubmitter TopologyInfo Mock.Tx

data Protocol =
BFT
| Praos
| MockPBFT
| RealPBFT

fromProtocol :: Protocol -> IO (Some DemoProtocol)
fromProtocol BFT =
return $ Some $ DemoBFT defaultSecurityParam
fromProtocol Praos =
return $ Some $ DemoPraos defaultDemoPraosParams
fromProtocol MockPBFT =
return $ Some $ DemoMockPBFT (defaultDemoPBftParams genesisConfig)
where
-- TODO: This is nasty
genesisConfig = error "genesis config not needed when using mock ledger"
edsko marked this conversation as resolved.
Show resolved Hide resolved
fromProtocol RealPBFT = do
return $ Some $ DemoRealPBFT (defaultDemoPBftParams genesisConfig)
where
genesisConfig = Dummy.dummyConfig

parseCLI :: Parser CLI
parseCLI = CLI
<$> parseSystemStart
Expand All @@ -57,19 +82,23 @@ parseSlotDuration = option (mkSlotLength <$> auto) $ mconcat [
mkSlotLength :: Integer -> SlotLength
mkSlotLength = slotLengthFromMillisec . (* 1000)

parseProtocol :: Parser (Some DemoProtocol)
parseProtocol :: Parser Protocol
parseProtocol = asum [
flag' (Some (DemoBFT defaultSecurityParam)) $ mconcat [
flag' BFT $ mconcat [
long "bft"
, help "Use the BFT consensus algorithm"
]
, flag' (Some (DemoPraos defaultDemoPraosParams)) $ mconcat [
, flag' Praos $ mconcat [
long "praos"
, help "Use the Praos consensus algorithm"
]
, flag' (Some (DemoPBFT defaultDemoPBftParams)) $ mconcat [
long "pbft"
, help "Use the Permissive BFT consensus algorithm"
, flag' MockPBFT $ mconcat [
long "mock-pbft"
, help "Use the Permissive BFT consensus algorithm using a mock ledger"
]
, flag' RealPBFT $ mconcat [
long "real-pbft"
, help "Use the Permissive BFT consensus algorithm using the real ledger"
]
]

Expand Down
20 changes: 12 additions & 8 deletions ouroboros-consensus/demo-playground/Mock/TxSubmission.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand All @@ -10,7 +12,7 @@ module Mock.TxSubmission (
, spawnMempoolListener
) where

import Codec.Serialise (hPutSerialise)
import Codec.Serialise (decode, hPutSerialise)
import qualified Control.Concurrent.Async as Async
import Control.Monad.Except
import Control.Tracer
Expand All @@ -22,6 +24,7 @@ import System.IO (IOMode (..))

import Ouroboros.Consensus.Crypto.Hash (ShortHash)
import qualified Ouroboros.Consensus.Crypto.Hash as H
import Ouroboros.Consensus.Demo
import qualified Ouroboros.Consensus.Ledger.Mock as Mock
import Ouroboros.Consensus.Mempool
import Ouroboros.Consensus.Node (NodeId (..), NodeKernel (..))
Expand Down Expand Up @@ -80,7 +83,6 @@ command' c descr p =
Main logic
-------------------------------------------------------------------------------}


handleTxSubmission :: TopologyInfo -> Mock.Tx -> IO ()
handleTxSubmission tinfo tx = do
topoE <- readTopologyFile (topologyFile tinfo)
Expand All @@ -97,21 +99,23 @@ submitTx n tx = do
putStrLn $ "The Id for this transaction is: " <> condense (H.hash @ShortHash tx)

-- | Auxiliary to 'spawnMempoolListener'
readIncomingTx :: Tracer IO String
-> NodeKernel IO NodeId (Mock.SimpleBlock p c) (Mock.SimpleHeader p c)
readIncomingTx :: RunDemo p
=> Tracer IO String
-> NodeKernel IO NodeId (Block p) (Header p)
-> Decoder IO
-> IO ()
readIncomingTx tracer kernel Decoder{..} = forever $ do
newTx :: Mock.Tx <- decodeNext
rejected <- addTxs (getMempool kernel) [newTx]
newTx :: Mock.Tx <- decodeNext decode
rejected <- addTxs (getMempool kernel) [demoMockTx (getNodeConfig kernel) newTx]
traceWith tracer $
(if null rejected then "Accepted" else "Rejected") <>
" transaction: " <> show newTx

-- | Listen for transactions coming a named pipe and add them to the mempool
spawnMempoolListener :: Tracer IO String
spawnMempoolListener :: RunDemo p
=> Tracer IO String
-> NodeId
-> NodeKernel IO NodeId (Mock.SimpleBlock p c) (Mock.SimpleHeader p c)
-> NodeKernel IO NodeId (Block p) (Header p)
-> IO (Async.Async ())
spawnMempoolListener tracer myNodeId kernel = do
Async.async $ do
Expand Down
102 changes: 66 additions & 36 deletions ouroboros-consensus/demo-playground/Run.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand All @@ -9,17 +10,20 @@ module Run (
runNode
) where

import Codec.Serialise (decode, encode)
import Codec.CBOR.Decoding (Decoder)
import Codec.CBOR.Encoding (Encoding)
import qualified Control.Concurrent.Async as Async
import Control.Monad
import Control.Tracer
import Crypto.Random
import Data.Functor.Contravariant (contramap)
import qualified Data.Map.Strict as M
import Data.Maybe
import Data.Semigroup ((<>))

import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block
import qualified Ouroboros.Network.Block as Block
import Ouroboros.Network.Chain (genesisPoint, pointHash)
import qualified Ouroboros.Network.Chain as Chain
import Ouroboros.Network.Protocol.BlockFetch.Codec
Expand All @@ -28,7 +32,6 @@ import Ouroboros.Network.Protocol.ChainSync.Codec
import Ouroboros.Consensus.BlockchainTime
import Ouroboros.Consensus.ChainSyncClient (ClockSkew (..))
import Ouroboros.Consensus.Demo
import qualified Ouroboros.Consensus.Ledger.Mock as Mock
import Ouroboros.Consensus.Node
import Ouroboros.Consensus.Util
import Ouroboros.Consensus.Util.Condense
Expand All @@ -53,15 +56,15 @@ runNode cli@CLI{..} = do
case command of
TxSubmitter topology tx ->
handleTxSubmission topology tx
SimpleNode topology protocol ->
case protocol of
Some p -> case demoProtocolConstraints p of
Dict -> handleSimpleNode p cli topology
SimpleNode topology protocol -> do
Some p <- fromProtocol protocol
case runDemo p of
Dict -> handleSimpleNode p cli topology

-- | Sets up a simple node, which will run the chain sync protocol and block
-- fetch protocol, and, if core, will also look at the mempool when trying to
-- create a new block.
handleSimpleNode :: forall p. DemoProtocolConstraints p
handleSimpleNode :: forall p. RunDemo p
=> DemoProtocol p -> CLI -> TopologyInfo -> IO ()
handleSimpleNode p CLI{..} (TopologyInfo myNodeId topologyFile) = do
putStrLn $ "System started at " <> show systemStart
Expand All @@ -77,59 +80,61 @@ handleSimpleNode p CLI{..} (TopologyInfo myNodeId topologyFile) = do
putStrLn $ "My producers are " <> show (producers nodeSetup)
putStrLn $ "**************************************"

let ProtocolInfo{..} = protocolInfo
p
(NumCoreNodes (length nodeSetups))
(CoreNodeId nid)
let pInfo@ProtocolInfo{..} =
protocolInfo p (NumCoreNodes (length nodeSetups)) (CoreNodeId nid)

withThreadRegistry $ \registry -> do

let callbacks :: NodeCallbacks IO (Block p)
callbacks = NodeCallbacks {
produceDRG = drgNew
, produceBlock = \proof _l slot prevPoint prevBlockNo txs -> do
let curNo :: BlockNo
curNo = succ prevBlockNo
let curNo :: BlockNo
curNo = succ prevBlockNo

prevHash :: ChainHash (Header p)
prevHash = castHash (pointHash prevPoint)
prevHash :: ChainHash (Header p)
prevHash = castHash (pointHash prevPoint)

-- The transactions we get are consistent; the only reason not
-- to include all of them would be maximum block size, which
-- we ignore for now.
Mock.forgeBlock pInfoConfig
slot
curNo
prevHash
txs
proof
demoForgeBlock pInfoConfig
slot
curNo
prevHash
txs
proof
}

chainDB <- ChainDB.openDB encode pInfoConfig pInfoInitLedger Mock.simpleHeader
chainDB :: ChainDB IO (Block p) (Header p) <- ChainDB.openDB
(demoEncodePreHeader pInfoConfig) pInfoConfig pInfoInitLedger
demoGetHeader

btime <- realBlockchainTime registry slotDuration systemStart
let tracer = contramap ((show myNodeId <> " | ") <>) stdoutTracer
nodeParams = NodeParams
{ tracer
{ encoder = demoEncodePreHeader pInfoConfig
, tracer = tracer
, threadRegistry = registry
, maxClockSkew = ClockSkew 1
, cfg = pInfoConfig
, initState = pInfoInitState
, btime
, chainDB
, callbacks
, blockFetchSize = Mock.headerBlockSize . Mock.headerPreHeader
, blockMatchesHeader = Mock.blockMatchesHeader
, blockFetchSize = demoBlockFetchSize
, blockMatchesHeader = demoBlockMatchesHeader
}

kernel <- nodeKernel nodeParams

watchChain registry tracer chainDB

-- Spawn the thread which listens to the mempool.
mempoolThread <- spawnMempoolListener tracer myNodeId kernel

forM_ (producers nodeSetup) (addUpstream' kernel)
forM_ (consumers nodeSetup) (addDownstream' kernel)
forM_ (producers nodeSetup) (addUpstream' pInfo kernel)
forM_ (consumers nodeSetup) (addDownstream' pInfo kernel)

Async.wait mempoolThread
where
Expand All @@ -156,35 +161,60 @@ handleSimpleNode p CLI{..} (TopologyInfo myNodeId topologyFile) = do
-- We need to make sure that both nodes read from the same file
-- We therefore use the convention to distinguish between
-- upstream and downstream from the perspective of the "lower numbered" node
addUpstream' :: NodeKernel IO NodeId (Block p) (Header p)
addUpstream' :: ProtocolInfo p
-> NodeKernel IO NodeId (Block p) (Header p)
-> NodeId
-> IO ()
addUpstream' kernel producerNodeId =
addUpstream' pInfo@ProtocolInfo{..} kernel producerNodeId =
addUpstream kernel producerNodeId nodeCommsCS nodeCommsBF
where
direction = Upstream (producerNodeId :==>: myNodeId)
nodeCommsCS = NodeComms {
ncCodec = codecChainSync encode decode encode decode
ncCodec = codecChainSync
(demoEncodeHeader pInfoConfig)
(demoDecodeHeader pInfoConfig)
(encodePoint' pInfo)
(decodePoint' pInfo)
, ncWithChan = NamedPipe.withPipeChannel "chain-sync" direction
}
nodeCommsBF = NodeComms {
ncCodec = codecBlockFetch encode encode decode decode
ncCodec = codecBlockFetch
(demoEncodeBlock pInfoConfig)
(demoEncodeHeaderHash pInfoConfig)
(demoDecodeBlock pInfoConfig)
(demoDecodeHeaderHash pInfoConfig)
, ncWithChan = NamedPipe.withPipeChannel "block-fetch" direction
}


addDownstream' :: NodeKernel IO NodeId (Block p) (Header p)
addDownstream' :: ProtocolInfo p
-> NodeKernel IO NodeId (Block p) (Header p)
-> NodeId
-> IO ()
addDownstream' kernel consumerNodeId =
addDownstream' pInfo@ProtocolInfo{..} kernel consumerNodeId =
addDownstream kernel nodeCommsCS nodeCommsBF
where
direction = Downstream (myNodeId :==>: consumerNodeId)
nodeCommsCS = NodeComms {
ncCodec = codecChainSync encode decode encode decode
ncCodec = codecChainSync
(demoEncodeHeader pInfoConfig)
(demoDecodeHeader pInfoConfig)
(encodePoint' pInfo)
(decodePoint' pInfo)
, ncWithChan = NamedPipe.withPipeChannel "chain-sync" direction
}
nodeCommsBF = NodeComms {
ncCodec = codecBlockFetch encode encode decode decode
ncCodec = codecBlockFetch
(demoEncodeBlock pInfoConfig)
(demoEncodeHeaderHash pInfoConfig)
(demoDecodeBlock pInfoConfig)
(demoDecodeHeaderHash pInfoConfig)
, ncWithChan = NamedPipe.withPipeChannel "block-fetch" direction
}

encodePoint' :: ProtocolInfo p -> Point (Header p) -> Encoding
encodePoint' ProtocolInfo{..} =
Block.encodePoint $ Block.encodeChainHash (demoEncodeHeaderHash pInfoConfig)

decodePoint' :: forall s. ProtocolInfo p -> Decoder s (Point (Header p))
decodePoint' ProtocolInfo{..} =
Block.decodePoint $ Block.decodeChainHash (demoDecodeHeaderHash pInfoConfig)
Loading