From caded066cfaa5ddad7ebe9fe9eec83ebde468c79 Mon Sep 17 00:00:00 2001 From: KtorZ Date: Thu, 14 Mar 2019 15:51:30 +0100 Subject: [PATCH 1/2] mock 'tick' and 'listen' to the NetworkLayer --- src/Cardano/NetworkLayer.hs | 63 +++++++++++- test/unit/Cardano/NetworkLayerSpec.hs | 142 +++++++++++++++++++++++++- 2 files changed, 200 insertions(+), 5 deletions(-) diff --git a/src/Cardano/NetworkLayer.hs b/src/Cardano/NetworkLayer.hs index d554cfd97b4..314104b2c3c 100644 --- a/src/Cardano/NetworkLayer.hs +++ b/src/Cardano/NetworkLayer.hs @@ -1,13 +1,30 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE QuantifiedConstraints #-} +{-# LANGUAGE ScopedTypeVariables #-} module Cardano.NetworkLayer ( NetworkLayer (..) + , tick + , listen ) where +import Prelude + import Cardano.Wallet.Primitive - ( Block, BlockHeader (..), Hash (..), SlotId ) + ( Block (..), BlockHeader (..), Hash (..), SlotId (..) ) +import Control.Concurrent + ( threadDelay ) import Control.Monad.Except - ( ExceptT ) + ( ExceptT, runExceptT ) +import Control.Monad.IO.Class + ( MonadIO, liftIO ) +import Data.Time.Units + ( Millisecond, toMicroseconds ) +import Fmt + ( fmt, (+||), (||+) ) +import System.Exit + ( die ) data NetworkLayer m e0 e1 = NetworkLayer @@ -21,3 +38,45 @@ data NetworkLayer m e0 e1 = NetworkLayer , networkTip :: ExceptT e1 m (Hash "BlockHeader", BlockHeader) } + +-- | Every interval @delay@, fetches some data from a given source, and call +-- an action for each elements retrieved. +tick + :: forall st m b. (MonadIO m) + => (st -> m ([b], st)) + -- ^ A way to get a new elements + -> (b -> m ()) + -- ^ Action to be taken on new elements + -> Millisecond + -- ^ tick time + -> st + -> m () +tick next action delay !st = do + (bs, !st') <- next st + mapM_ action bs + liftIO $ threadDelay $ (fromIntegral . toMicroseconds) delay + tick next action delay st' + +-- | Retrieve blocks from a chain producer and execute some given action for +-- each block. +listen + :: forall e0 e1. (Show e0) + => NetworkLayer IO e0 e1 + -> (Block -> IO ()) + -> IO () +listen network action = do + tick getNextBlocks action 5000 (SlotId 0 0) + where + getNextBlocks :: SlotId -> IO ([Block], SlotId) + getNextBlocks current = do + res <- runExceptT $ nextBlocks network current + case res of + Left err -> + die $ fmt $ "Chain producer error: "+||err||+"" + Right [] -> + pure ([], current) + Right blocks -> + -- fixme: there are more blocks available, so we need not + -- wait for an interval to pass before getting more blocks. + let next = succ . slotId . header . last $ blocks + in pure (blocks, next) diff --git a/test/unit/Cardano/NetworkLayerSpec.hs b/test/unit/Cardano/NetworkLayerSpec.hs index 16af9aec9ae..9038123e815 100644 --- a/test/unit/Cardano/NetworkLayerSpec.hs +++ b/test/unit/Cardano/NetworkLayerSpec.hs @@ -1,14 +1,150 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeFamilies #-} + module Cardano.NetworkLayerSpec ( spec ) where + import Prelude import Cardano.NetworkLayer - () + ( tick ) +import Cardano.Wallet.Primitive + ( Block (..), BlockHeader (..), Hash (..), SlotId (..) ) +import Control.Concurrent + ( forkIO, killThread ) +import Control.Concurrent.MVar + ( MVar, modifyMVar_, newEmptyMVar, newMVar, putMVar, takeMVar ) +import Control.Monad + ( foldM ) +import Control.Monad.IO.Class + ( liftIO ) +import Data.Functor + ( ($>) ) +import Data.Time.Units + ( Millisecond, fromMicroseconds ) +import Data.Word + ( Word8 ) import Test.Hspec - ( Spec ) + ( Spec, describe, it, shouldReturn ) +import Test.QuickCheck + ( Arbitrary (..), Gen, Property, checkCoverage, choose, cover ) +import Test.QuickCheck.Monadic + ( monadicIO ) + +import qualified Data.ByteString.Char8 as B8 spec :: Spec -spec = return () +spec = do + describe "tick terminates an passes blocks from a source to a consumer" $ do + it "Check ticking function when blocks are sent" + (checkCoverage tickingFunctionTest) + + +{------------------------------------------------------------------------------- + Test Logic +-------------------------------------------------------------------------------} + +tickingFunctionTest + :: (TickingTime, Blocks) + -> Property +tickingFunctionTest (TickingTime tickTime, Blocks blocks) = + cover 80 (sum (length <$> blocks) /= 0) "Non empty blocks" prop + where + prop = monadicIO $ liftIO $ do + (readerChan, reader) <- mkReader + (writerChan, writer) <- mkWriter + waitFor writerChan $ tick writer reader tickTime blocks + takeMVar readerChan `shouldReturn` reverse (mconcat blocks) + +waitFor + :: MVar () + -> IO () + -> IO () +waitFor done action = do + threadId <- forkIO action + _ <- takeMVar done + killThread threadId + +mkWriter + :: st ~ [[a]] => IO (MVar (), st -> IO ([a], st)) +mkWriter = do + done <- newEmptyMVar + return + ( done + , \case + st@[] -> putMVar done () $> ([], st) + h:st -> return (h, st) + ) + +mkReader + :: IO (MVar [a], a -> IO ()) +mkReader = do + ref <- newMVar [] + return + ( ref + , \x -> modifyMVar_ ref $ return . (x :) + ) + +{------------------------------------------------------------------------------- + Arbitrary Instances +-------------------------------------------------------------------------------} + + +newtype TickingTime = TickingTime Millisecond + deriving (Show) + +instance Arbitrary TickingTime where + -- No shrinking + arbitrary = do + tickTime <- fromMicroseconds . (* 1000) <$> choose (1, 3) + return $ TickingTime tickTime + + +newtype Blocks = Blocks [[Block]] + deriving Show + +instance Arbitrary Blocks where + -- No Shrinking + arbitrary = do + n <- fromIntegral . (`mod` 42) <$> arbitrary @Word8 + let h0 = BlockHeader (SlotId 1 0) (Hash "initial block") + let b0 = (blockHeaderHash h0, Block h0 mempty) + Blocks <$> groups (map snd $ take n $ iterate next b0) + where + next :: (Hash "BlockHeader", Block) -> (Hash "BlockHeader", Block) + next (prev, b) = + let + slot = slotId (header b) + h = BlockHeader (succ slot) prev + in + (blockHeaderHash h, Block h mempty) + + blockHeaderHash :: BlockHeader -> Hash "BlockHeader" + blockHeaderHash (BlockHeader (SlotId e s) _) = + Hash (B8.pack (show e <> show s)) + +-- | Construct arbitrary groups of elements from a given list. +-- +-- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9] +-- [[0,1],[2,3],[4,5,6],[7,8,9]] +-- +-- +-- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9] +-- [[],[0],[1,2,3,4,5,6,7,8],[9]] +-- +groups :: [a] -> Gen [[a]] +groups = fmap reverse . foldM arbitraryGroup [[]] + where + arbitraryGroup :: [[a]] -> a -> Gen [[a]] + arbitraryGroup [] _ = return [] -- Can't happen with the given initial value + arbitraryGroup (grp:rest) a = do + choose (1 :: Int, 3) >>= \case + 1 -> return $ [a]:grp:rest + _ -> return $ (grp ++ [a]):rest From b9fad9effb83052837c58c88b5cd7048bc69086b Mon Sep 17 00:00:00 2001 From: KtorZ Date: Thu, 14 Mar 2019 15:53:30 +0100 Subject: [PATCH 2/2] remove now obsolete 'BlockSyncer' (merged with 'NetworkLayer') --- app/server/Main.hs | 2 +- cardano-wallet.cabal | 2 - src/Cardano/Wallet/BlockSyncer.hs | 84 ----------- test/unit/Cardano/Wallet/BlockSyncerSpec.hs | 150 -------------------- 4 files changed, 1 insertion(+), 237 deletions(-) delete mode 100644 src/Cardano/Wallet/BlockSyncer.hs delete mode 100644 test/unit/Cardano/Wallet/BlockSyncerSpec.hs diff --git a/app/server/Main.hs b/app/server/Main.hs index f023e07c850..412fa6a0023 100644 --- a/app/server/Main.hs +++ b/app/server/Main.hs @@ -11,7 +11,7 @@ module Main where import Prelude -import Cardano.Wallet.BlockSyncer +import Cardano.NetworkLayer ( listen ) import Cardano.Wallet.Primitive ( Block ) diff --git a/cardano-wallet.cabal b/cardano-wallet.cabal index 88b5ef27e37..81a88d24897 100644 --- a/cardano-wallet.cabal +++ b/cardano-wallet.cabal @@ -68,7 +68,6 @@ library Cardano.Wallet.AddressDiscovery Cardano.Wallet.Binary Cardano.Wallet.Binary.Packfile - Cardano.Wallet.BlockSyncer Cardano.WalletLayer Cardano.Wallet.Mnemonic Cardano.Wallet.Primitive @@ -160,7 +159,6 @@ test-suite unit Cardano.Wallet.AddressDiscoverySpec Cardano.Wallet.Binary.PackfileSpec Cardano.Wallet.BinarySpec - Cardano.Wallet.BlockSyncerSpec Cardano.Wallet.MnemonicSpec Cardano.Wallet.PrimitiveSpec Cardano.WalletSpec diff --git a/src/Cardano/Wallet/BlockSyncer.hs b/src/Cardano/Wallet/BlockSyncer.hs deleted file mode 100644 index 6d8ea39e063..00000000000 --- a/src/Cardano/Wallet/BlockSyncer.hs +++ /dev/null @@ -1,84 +0,0 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE QuantifiedConstraints #-} -{-# LANGUAGE ScopedTypeVariables #-} - --- | --- Copyright: © 2018-2019 IOHK --- License: MIT --- --- This module contains the ticking function that is responsible for invoking --- block acquisition functionality and executing it in periodic fashion. --- --- Known limitations: --- - Blocks are produced by the network layer. They are not expected to produce --- any duplicates and to rollback. --- --- - - -module Cardano.Wallet.BlockSyncer - ( tick - , listen - ) where - - -import Prelude - -import Cardano.NetworkLayer - ( NetworkLayer (..) ) -import Cardano.Wallet.Primitive - ( Block (..), BlockHeader (..), SlotId (..) ) -import Control.Concurrent - ( threadDelay ) -import Control.Monad.Except - ( runExceptT ) -import Control.Monad.IO.Class - ( MonadIO, liftIO ) -import Data.Time.Units - ( Millisecond, toMicroseconds ) -import Fmt - ( fmt, (+||), (||+) ) -import System.Exit - ( die ) - - --- | Every interval @delay@, fetches some data from a given source, and call --- an action for each elements retrieved. -tick - :: forall st m b. (MonadIO m) - => (st -> m ([b], st)) - -- ^ A way to get a new elements - -> (b -> m ()) - -- ^ Action to be taken on new elements - -> Millisecond - -- ^ tick time - -> st - -> m () -tick next action delay !st = do - (bs, !st') <- next st - mapM_ action bs - liftIO $ threadDelay $ (fromIntegral . toMicroseconds) delay - tick next action delay st' - --- | Retrieve blocks from a chain producer and execute some given action for --- each block. -listen - :: forall e0 e1. (Show e0) - => NetworkLayer IO e0 e1 - -> (Block -> IO ()) - -> IO () -listen network action = do - tick getNextBlocks action 5000 (SlotId 0 0) - where - getNextBlocks :: SlotId -> IO ([Block], SlotId) - getNextBlocks current = do - res <- runExceptT $ nextBlocks network current - case res of - Left err -> - die $ fmt $ "Chain producer error: "+||err||+"" - Right [] -> - pure ([], current) - Right blocks -> - -- fixme: there are more blocks available, so we need not - -- wait for an interval to pass before getting more blocks. - let next = succ . slotId . header . last $ blocks - in pure (blocks, next) diff --git a/test/unit/Cardano/Wallet/BlockSyncerSpec.hs b/test/unit/Cardano/Wallet/BlockSyncerSpec.hs deleted file mode 100644 index b5daf1285ab..00000000000 --- a/test/unit/Cardano/Wallet/BlockSyncerSpec.hs +++ /dev/null @@ -1,150 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE TypeFamilies #-} - -module Cardano.Wallet.BlockSyncerSpec - ( spec - ) where - - -import Prelude - -import Cardano.Wallet.BlockSyncer - ( tick ) -import Cardano.Wallet.Primitive - ( Block (..), BlockHeader (..), Hash (..), SlotId (..) ) -import Control.Concurrent - ( forkIO, killThread ) -import Control.Concurrent.MVar - ( MVar, modifyMVar_, newEmptyMVar, newMVar, putMVar, takeMVar ) -import Control.Monad - ( foldM ) -import Control.Monad.IO.Class - ( liftIO ) -import Data.Functor - ( ($>) ) -import Data.Time.Units - ( Millisecond, fromMicroseconds ) -import Data.Word - ( Word8 ) -import Test.Hspec - ( Spec, describe, it, shouldReturn ) -import Test.QuickCheck - ( Arbitrary (..), Gen, Property, checkCoverage, choose, cover ) -import Test.QuickCheck.Monadic - ( monadicIO ) - -import qualified Data.ByteString.Char8 as B8 - - -spec :: Spec -spec = do - describe "tick terminates an passes blocks from a source to a consumer" $ do - it "Check ticking function when blocks are sent" - (checkCoverage tickingFunctionTest) - - -{------------------------------------------------------------------------------- - Test Logic --------------------------------------------------------------------------------} - -tickingFunctionTest - :: (TickingTime, Blocks) - -> Property -tickingFunctionTest (TickingTime tickTime, Blocks blocks) = - cover 80 (sum (length <$> blocks) /= 0) "Non empty blocks" prop - where - prop = monadicIO $ liftIO $ do - (readerChan, reader) <- mkReader - (writerChan, writer) <- mkWriter - waitFor writerChan $ tick writer reader tickTime blocks - takeMVar readerChan `shouldReturn` reverse (mconcat blocks) - -waitFor - :: MVar () - -> IO () - -> IO () -waitFor done action = do - threadId <- forkIO action - _ <- takeMVar done - killThread threadId - -mkWriter - :: st ~ [[a]] => IO (MVar (), st -> IO ([a], st)) -mkWriter = do - done <- newEmptyMVar - return - ( done - , \case - st@[] -> putMVar done () $> ([], st) - h:st -> return (h, st) - ) - -mkReader - :: IO (MVar [a], a -> IO ()) -mkReader = do - ref <- newMVar [] - return - ( ref - , \x -> modifyMVar_ ref $ return . (x :) - ) - -{------------------------------------------------------------------------------- - Arbitrary Instances --------------------------------------------------------------------------------} - - -newtype TickingTime = TickingTime Millisecond - deriving (Show) - -instance Arbitrary TickingTime where - -- No shrinking - arbitrary = do - tickTime <- fromMicroseconds . (* 1000) <$> choose (1, 3) - return $ TickingTime tickTime - - -newtype Blocks = Blocks [[Block]] - deriving Show - -instance Arbitrary Blocks where - -- No Shrinking - arbitrary = do - n <- fromIntegral . (`mod` 42) <$> arbitrary @Word8 - let h0 = BlockHeader (SlotId 1 0) (Hash "initial block") - let b0 = (blockHeaderHash h0, Block h0 mempty) - Blocks <$> groups (map snd $ take n $ iterate next b0) - where - next :: (Hash "BlockHeader", Block) -> (Hash "BlockHeader", Block) - next (prev, b) = - let - slot = slotId (header b) - h = BlockHeader (succ slot) prev - in - (blockHeaderHash h, Block h mempty) - - blockHeaderHash :: BlockHeader -> Hash "BlockHeader" - blockHeaderHash (BlockHeader (SlotId e s) _) = - Hash (B8.pack (show e <> show s)) - --- | Construct arbitrary groups of elements from a given list. --- --- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9] --- [[0,1],[2,3],[4,5,6],[7,8,9]] --- --- --- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9] --- [[],[0],[1,2,3,4,5,6,7,8],[9]] --- -groups :: [a] -> Gen [[a]] -groups = fmap reverse . foldM arbitraryGroup [[]] - where - arbitraryGroup :: [[a]] -> a -> Gen [[a]] - arbitraryGroup [] _ = return [] -- Can't happen with the given initial value - arbitraryGroup (grp:rest) a = do - choose (1 :: Int, 3) >>= \case - 1 -> return $ [a]:grp:rest - _ -> return $ (grp ++ [a]):rest