Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge BlockSyncer with NetworkLayer #66

Merged
merged 2 commits into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Main where

import Prelude

import Cardano.Wallet.BlockSyncer
import Cardano.NetworkLayer
( listen )
import Cardano.Wallet.Primitive
( Block )
Expand Down
2 changes: 0 additions & 2 deletions cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ library
Cardano.Wallet.AddressDiscovery
Cardano.Wallet.Binary
Cardano.Wallet.Binary.Packfile
Cardano.Wallet.BlockSyncer
Cardano.WalletLayer
Cardano.Wallet.Mnemonic
Cardano.Wallet.Primitive
Expand Down Expand Up @@ -160,7 +159,6 @@ test-suite unit
Cardano.Wallet.AddressDiscoverySpec
Cardano.Wallet.Binary.PackfileSpec
Cardano.Wallet.BinarySpec
Cardano.Wallet.BlockSyncerSpec
Cardano.Wallet.MnemonicSpec
Cardano.Wallet.PrimitiveSpec
Cardano.WalletSpec
Expand Down
63 changes: 61 additions & 2 deletions src/Cardano/NetworkLayer.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Cardano.NetworkLayer
( NetworkLayer (..)
, tick
, listen
) where

import Prelude

import Cardano.Wallet.Primitive
( Block, BlockHeader (..), Hash (..), SlotId )
( Block (..), BlockHeader (..), Hash (..), SlotId (..) )
import Control.Concurrent
( threadDelay )
import Control.Monad.Except
( ExceptT )
( ExceptT, runExceptT )
import Control.Monad.IO.Class
( MonadIO, liftIO )
import Data.Time.Units
( Millisecond, toMicroseconds )
import Fmt
( fmt, (+||), (||+) )
import System.Exit
( die )


data NetworkLayer m e0 e1 = NetworkLayer
Expand All @@ -21,3 +38,45 @@ data NetworkLayer m e0 e1 = NetworkLayer
, networkTip
:: ExceptT e1 m (Hash "BlockHeader", BlockHeader)
}

-- | Every interval @delay@, fetches some data from a given source, and call
-- an action for each elements retrieved.
tick
:: forall st m b. (MonadIO m)
=> (st -> m ([b], st))
-- ^ A way to get a new elements
-> (b -> m ())
-- ^ Action to be taken on new elements
-> Millisecond
-- ^ tick time
-> st
-> m ()
tick next action delay !st = do
(bs, !st') <- next st
mapM_ action bs
liftIO $ threadDelay $ (fromIntegral . toMicroseconds) delay
tick next action delay st'

-- | Retrieve blocks from a chain producer and execute some given action for
-- each block.
listen
:: forall e0 e1. (Show e0)
=> NetworkLayer IO e0 e1
-> (Block -> IO ())
-> IO ()
listen network action = do
tick getNextBlocks action 5000 (SlotId 0 0)
where
getNextBlocks :: SlotId -> IO ([Block], SlotId)
getNextBlocks current = do
res <- runExceptT $ nextBlocks network current
case res of
Left err ->
die $ fmt $ "Chain producer error: "+||err||+""
Right [] ->
pure ([], current)
Right blocks ->
-- fixme: there are more blocks available, so we need not
-- wait for an interval to pass before getting more blocks.
let next = succ . slotId . header . last $ blocks
in pure (blocks, next)
84 changes: 0 additions & 84 deletions src/Cardano/Wallet/BlockSyncer.hs

This file was deleted.

142 changes: 139 additions & 3 deletions test/unit/Cardano/NetworkLayerSpec.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,150 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

module Cardano.NetworkLayerSpec
( spec
) where


import Prelude

import Cardano.NetworkLayer
()
( tick )
import Cardano.Wallet.Primitive
( Block (..), BlockHeader (..), Hash (..), SlotId (..) )
import Control.Concurrent
( forkIO, killThread )
import Control.Concurrent.MVar
( MVar, modifyMVar_, newEmptyMVar, newMVar, putMVar, takeMVar )
import Control.Monad
( foldM )
import Control.Monad.IO.Class
( liftIO )
import Data.Functor
( ($>) )
import Data.Time.Units
( Millisecond, fromMicroseconds )
import Data.Word
( Word8 )
import Test.Hspec
( Spec )
( Spec, describe, it, shouldReturn )
import Test.QuickCheck
( Arbitrary (..), Gen, Property, checkCoverage, choose, cover )
import Test.QuickCheck.Monadic
( monadicIO )

import qualified Data.ByteString.Char8 as B8


spec :: Spec
spec = return ()
spec = do
describe "tick terminates an passes blocks from a source to a consumer" $ do
it "Check ticking function when blocks are sent"
(checkCoverage tickingFunctionTest)


{-------------------------------------------------------------------------------
Test Logic
-------------------------------------------------------------------------------}

tickingFunctionTest
:: (TickingTime, Blocks)
-> Property
tickingFunctionTest (TickingTime tickTime, Blocks blocks) =
cover 80 (sum (length <$> blocks) /= 0) "Non empty blocks" prop
where
prop = monadicIO $ liftIO $ do
(readerChan, reader) <- mkReader
(writerChan, writer) <- mkWriter
waitFor writerChan $ tick writer reader tickTime blocks
takeMVar readerChan `shouldReturn` reverse (mconcat blocks)

waitFor
:: MVar ()
-> IO ()
-> IO ()
waitFor done action = do
threadId <- forkIO action
_ <- takeMVar done
killThread threadId

mkWriter
:: st ~ [[a]] => IO (MVar (), st -> IO ([a], st))
mkWriter = do
done <- newEmptyMVar
return
( done
, \case
st@[] -> putMVar done () $> ([], st)
h:st -> return (h, st)
)

mkReader
:: IO (MVar [a], a -> IO ())
mkReader = do
ref <- newMVar []
return
( ref
, \x -> modifyMVar_ ref $ return . (x :)
)

{-------------------------------------------------------------------------------
Arbitrary Instances
-------------------------------------------------------------------------------}


newtype TickingTime = TickingTime Millisecond
deriving (Show)

instance Arbitrary TickingTime where
-- No shrinking
arbitrary = do
tickTime <- fromMicroseconds . (* 1000) <$> choose (1, 3)
return $ TickingTime tickTime


newtype Blocks = Blocks [[Block]]
deriving Show

instance Arbitrary Blocks where
-- No Shrinking
arbitrary = do
n <- fromIntegral . (`mod` 42) <$> arbitrary @Word8
let h0 = BlockHeader (SlotId 1 0) (Hash "initial block")
let b0 = (blockHeaderHash h0, Block h0 mempty)
Blocks <$> groups (map snd $ take n $ iterate next b0)
where
next :: (Hash "BlockHeader", Block) -> (Hash "BlockHeader", Block)
next (prev, b) =
let
slot = slotId (header b)
h = BlockHeader (succ slot) prev
in
(blockHeaderHash h, Block h mempty)

blockHeaderHash :: BlockHeader -> Hash "BlockHeader"
blockHeaderHash (BlockHeader (SlotId e s) _) =
Hash (B8.pack (show e <> show s))

-- | Construct arbitrary groups of elements from a given list.
--
-- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9]
-- [[0,1],[2,3],[4,5,6],[7,8,9]]
--
--
-- >>> generate $ groups [0,1,2,3,4,5,6,7,8,9]
-- [[],[0],[1,2,3,4,5,6,7,8],[9]]
--
groups :: [a] -> Gen [[a]]
groups = fmap reverse . foldM arbitraryGroup [[]]
where
arbitraryGroup :: [[a]] -> a -> Gen [[a]]
arbitraryGroup [] _ = return [] -- Can't happen with the given initial value
arbitraryGroup (grp:rest) a = do
choose (1 :: Int, 3) >>= \case
1 -> return $ [a]:grp:rest
_ -> return $ (grp ++ [a]):rest
Loading