Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor chain following #2750

Merged
merged 10 commits into from
Nov 4, 2021
151 changes: 120 additions & 31 deletions lib/core/src/Cardano/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,10 @@ import Cardano.Wallet.Logging
, unliftIOTracer
)
import Cardano.Wallet.Network
( ErrPostTx (..)
, FollowAction (..)
, FollowExceptionRecovery (..)
, FollowLog (..)
( ChainFollowLog (..)
, ChainFollower (..)
, ErrPostTx (..)
, NetworkLayer (..)
, follow
)
import Cardano.Wallet.Primitive.AddressDerivation
( DelegationAddress (..)
Expand Down Expand Up @@ -346,6 +344,7 @@ import Cardano.Wallet.Primitive.Types
( ActiveSlotCoefficient (..)
, Block (..)
, BlockHeader (..)
, ChainPoint (..)
, DelegationCertificate (..)
, FeePolicy (LinearFee)
, GenesisParameters (..)
Expand Down Expand Up @@ -532,7 +531,7 @@ import Statistics.Quantile
import Type.Reflection
( Typeable, typeRep )
import UnliftIO.Exception
( Exception )
( Exception, catch, throwIO )
import UnliftIO.MVar
( modifyMVar_, newMVar )

Expand Down Expand Up @@ -879,15 +878,17 @@ listUtxoStatistics ctx wid = do
let utxo = availableUTxO @s pending wal
pure $ computeUtxoStatistics log10 utxo

-- | Restore a wallet from its current tip up to the network tip.
-- | Restore a wallet from its current tip.
--
-- This function returns immediately, starting a worker thread in the
-- background that will fetch and apply remaining blocks until the
-- network tip is reached or until failure.
-- After the wallet has been restored,
-- this action will continue to fetch newly created blocks
-- and apply them, or roll back to a previous point whenever
-- the chain switches.
restoreWallet
:: forall ctx s k.
( HasNetworkLayer IO ctx
, HasDBLayer IO s k ctx
, HasGenesisData ctx
, HasLogger IO WalletWorkerLog ctx
, IsOurs s Address
, IsOurs s RewardAccount
Expand All @@ -896,31 +897,121 @@ restoreWallet
-> WalletId
-> ExceptT ErrNoSuchWallet IO ()
restoreWallet ctx wid = db & \DBLayer{..} -> do
let readCps = liftIO $ atomically $ listCheckpoints wid
let forward bs h innerTr = run $ do
restoreBlocks @ctx @s @k ctx innerTr wid bs h
let backward = runExceptT . rollbackBlocks @ctx @s @k ctx wid
liftIO $ follow nw tr readCps forward backward RetryOnExceptions (view #header)
catchFromIO $ chainSync nw (contramap MsgChainFollow tr) $ ChainFollower
{ readLocalTip = liftIO $ atomically $
map (toChainPoint block0) <$> listCheckpoints wid
, rollForward = \blocks tip -> throwInIO $
restoreBlocks @ctx @s @k
ctx (contramap MsgWalletFollow tr) wid blocks tip
, rollBackward =
throwInIO . rollbackBlocks @ctx @s @k ctx wid
}
where
db = ctx ^. dbLayer @IO @s @k
nw = ctx ^. networkLayer
tr = contramap MsgFollow (ctx ^. logger @_ @WalletWorkerLog)
nw = ctx ^. networkLayer @IO
tr = ctx ^. logger @_ @WalletWorkerLog
(block0, _, _) = ctx ^. genesisData

-- See Note [CheckedExceptionsAndCallbacks]
HeinrichApfelmus marked this conversation as resolved.
Show resolved Hide resolved
throwInIO :: ExceptT ErrNoSuchWallet IO a -> IO a
throwInIO x = runExceptT x >>= \case
Right a -> pure a
Left e -> throwIO $ UncheckErrNoSuchWallet e

run :: ExceptT ErrNoSuchWallet IO () -> IO (FollowAction ErrNoSuchWallet)
run = fmap (either ExitWith (const Continue)) . runExceptT
catchFromIO :: IO a -> ExceptT ErrNoSuchWallet IO a
catchFromIO m = ExceptT $
(Right <$> m) `catch` (\(UncheckErrNoSuchWallet e) -> pure $ Left e)

newtype UncheckErrNoSuchWallet = UncheckErrNoSuchWallet ErrNoSuchWallet
deriving (Eq, Show)
instance Exception UncheckErrNoSuchWallet

{- NOTE [CheckedExceptionsAndCallbacks]

Callback functions (such as the fields of 'ChainFollower')
may throw exceptions. Such exceptions typically cause the thread
(such as 'chainSync') which calls the callbacks to exit and
to return control to its parent.

Ideally, we would like these exceptions to be \"checked exceptions\",
which means that they are visible on the type level.
In our codebase, we (should) make sure that exceptions which are checked
cannot be instances of the 'Exception' class -- in this way,
it is statically guaranteed that they cannot be thrown in the 'IO' monad.

On the flip side, visibility on the type level does imply that
the calling thread (here 'chainSync') needs to be either polymorphic
in the checked exceptions or aware of them.
Making 'chainSync' aware of the checked exception is currently
not a good idea, because this function is used in different contexts,
which have different checked exceptions.
So, it would need to be polymorophic in the the undelrying monad,
but at present, 'chainSync' is restricted to 'IO' beause some
of its constituents are also restricted to 'IO'.

As a workaround / solution, we wrap the checked exception into a new type
which can be thrown in the 'IO' monad.
When the calling thread exits, we catch the exception again
and present it as a checked exception.

-}

-- | Rewind the UTxO snapshots, transaction history and other information to a
-- the earliest point in the past that is before or is the point of rollback.
rollbackBlocks
:: forall ctx s k. (HasDBLayer IO s k ctx)
:: forall ctx s k.
( HasDBLayer IO s k ctx
, HasGenesisData ctx
)
=> ctx
-> WalletId
-> SlotNo
-> ExceptT ErrNoSuchWallet IO SlotNo
-> ChainPoint
-> ExceptT ErrNoSuchWallet IO ChainPoint
rollbackBlocks ctx wid point = db & \DBLayer{..} -> do
mapExceptT atomically $ rollbackTo wid point
mapExceptT atomically $ (toChainPoint block0)
<$> rollbackTo wid (pseudoPointSlot point)
where
db = ctx ^. dbLayer @IO @s @k
(block0, _, _) = ctx ^. genesisData

-- See NOTE [PointSlotNo]
pseudoPointSlot :: ChainPoint -> SlotNo
pseudoPointSlot ChainPointAtGenesis = W.SlotNo 0
pseudoPointSlot (ChainPoint slot _) = slot

toChainPoint :: W.Block -> W.BlockHeader -> ChainPoint
Anviking marked this conversation as resolved.
Show resolved Hide resolved
toChainPoint genesisBlock (BlockHeader slot _ h _)
| slot == 0 && h == genesisHash = ChainPointAtGenesis
| otherwise = ChainPoint slot h
where
genesisHash = genesisBlock ^. (#header . #headerHash)

{- NOTE [PointSlotNo]

`SlotNo` cannot represent the genesis point `Origin`.

Historical hack. Our DB layers can't represent `Origin` when rolling
back, so we map `Origin` to `SlotNo 0`, which is wrong.

Rolling back to SlotNo 0 instead of Origin is fine for followers starting
from genesis (which should be the majority of cases). Other, non-trivial
rollbacks to genesis cannot occur on mainnet (genesis is years within
stable part, and there were no rollbacks in byron).

Could possibly be problematic in the beginning of a testnet without a
byron era. /Perhaps/ this is what is happening in the
>>> [cardano-wallet.pools-engine:Error:1293] [2020-11-24 10:02:04.00 UTC]
>>> Couldn't store production for given block before it conflicts with
>>> another block. Conflicting block header is:
>>> 5bde7e7b<-[f1b35b98-4290#2008]
errors observed in the integration tests.

FIXME: Fix should be relatively straight-forward, so we should probably
do it.
Heinrich: I have introduced the 'ChainPoint' type to represent points
on the chain. This type is already used in chain sync protocol,
but it still needs to be propagated to the database layer.
-}

-- | Apply the given blocks to the wallet and update the wallet state,
-- transaction history and corresponding metadata.
Expand Down Expand Up @@ -996,7 +1087,6 @@ restoreBlocks ctx tr wid blocks nodeTip = db & \DBLayer{..} -> mapExceptT atomic

liftIO $ do
traceWith tr $ MsgDiscoveredTxs txs
traceWith tr $ MsgBlocks blocks
traceWith tr $ MsgDiscoveredTxsContent txs
where
nl = ctx ^. networkLayer
Expand Down Expand Up @@ -2992,28 +3082,30 @@ guardQuit WalletDelegation{active,next} rewards = do
-- | Log messages for actions running within a wallet worker context.
data WalletWorkerLog
= MsgWallet WalletLog
| MsgFollow (FollowLog WalletFollowLog)
| MsgWalletFollow WalletFollowLog
| MsgChainFollow ChainFollowLog
HeinrichApfelmus marked this conversation as resolved.
Show resolved Hide resolved
deriving (Show, Eq)

instance ToText WalletWorkerLog where
toText = \case
MsgWallet msg -> toText msg
MsgFollow msg -> toText msg
MsgWalletFollow msg -> toText msg
MsgChainFollow msg -> toText msg

instance HasPrivacyAnnotation WalletWorkerLog

instance HasSeverityAnnotation WalletWorkerLog where
getSeverityAnnotation = \case
MsgWallet msg -> getSeverityAnnotation msg
MsgFollow msg -> getSeverityAnnotation msg
MsgWalletFollow msg -> getSeverityAnnotation msg
MsgChainFollow msg -> getSeverityAnnotation msg

-- | Log messages arising from the restore and follow process.
data WalletFollowLog
= MsgDiscoveredDelegationCert SlotNo DelegationCertificate
| MsgCheckpoint BlockHeader
| MsgDiscoveredTxs [(Tx, TxMeta)]
| MsgDiscoveredTxsContent [(Tx, TxMeta)]
| MsgBlocks (NonEmpty Block)
deriving (Show, Eq)

-- | Log messages from API server actions running in a wallet worker context.
Expand Down Expand Up @@ -3056,8 +3148,6 @@ instance ToText WalletFollowLog where
"discovered " <> pretty (length txs) <> " new transaction(s)"
MsgDiscoveredTxsContent txs ->
"transactions: " <> pretty (blockListF (snd <$> txs))
MsgBlocks blocks ->
"blocks: " <> pretty (NE.toList blocks)

instance ToText WalletLog where
toText = \case
Expand Down Expand Up @@ -3102,7 +3192,6 @@ instance HasSeverityAnnotation WalletFollowLog where
MsgDiscoveredTxs [] -> Debug
MsgDiscoveredTxs _ -> Info
MsgDiscoveredTxsContent _ -> Debug
MsgBlocks _ -> Debug -- Ideally move to FollowLog or remove

instance HasPrivacyAnnotation WalletLog
instance HasSeverityAnnotation WalletLog where
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/Cardano/Wallet/Api.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ data ApiLayer s (k :: Depth -> Type -> Type)
(Tracer IO TxSubmitLog)
(Tracer IO (WorkerLog WalletId WalletWorkerLog))
(Block, NetworkParameters, SyncTolerance)
(NetworkLayer IO (Block))
(NetworkLayer IO Block)
(TransactionLayer k SealedTx)
(DBFactory IO s k)
(WorkerRegistry WalletId (DBLayer IO s k))
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/Cardano/Wallet/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ data DBLayer m s k = forall stm. (MonadIO stm, MonadFail stm) => DBLayer
, rollbackTo
:: WalletId
-> SlotNo
-> ExceptT ErrNoSuchWallet stm SlotNo
-> ExceptT ErrNoSuchWallet stm BlockHeader
-- ^ Drops all checkpoints and transaction data after the given slot.
--
-- Returns the actual slot to which the database has rolled back. This
Expand Down
4 changes: 2 additions & 2 deletions lib/core/src/Cardano/Wallet/DB/Model.hs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mRemovePendingOrExpiredTx wid tid = alterModelErr wid $ \wal ->
, submittedTxs = Map.delete tid (submittedTxs wal)
} )

mRollbackTo :: Ord wid => wid -> SlotNo -> ModelOp wid s xprv SlotNo
mRollbackTo :: Ord wid => wid -> SlotNo -> ModelOp wid s xprv BlockHeader
mRollbackTo wid requested db@(Database wallets txs) = case Map.lookup wid wallets of
Nothing ->
( Left (NoSuchWallet wid), db )
Expand All @@ -319,7 +319,7 @@ mRollbackTo wid requested db@(Database wallets txs) = case Map.lookup wid wallet
Map.mapMaybe (rescheduleOrForget point) (txHistory wal)
}
in
( Right point
( Right $ view #currentTip (checkpoints wal Map.! point)
, Database (Map.insert wid wal' wallets) txs
)
where
Expand Down
4 changes: 3 additions & 1 deletion lib/core/src/Cardano/Wallet/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,9 @@ newDBLayerWith cacheBehavior tr ti SqliteContext{runQuery} = do
[ StakeKeyCertSlot >. nearestPoint
]
refreshCache wid
pure (Right nearestPoint)
selectLatestCheckpointCached wid >>= \case
HeinrichApfelmus marked this conversation as resolved.
Show resolved Hide resolved
Nothing -> error "Sqlite.rollbackTo: impossible code path"
Just cp -> pure $ Right $ cp ^. #currentTip

, prune = \wid epochStability -> ExceptT $ do
selectLatestCheckpointCached wid >>= \case
Expand Down
11 changes: 11 additions & 0 deletions lib/core/src/Cardano/Wallet/Gen.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Cardano.Wallet.Gen
, shrinkPercentage
, genLegacyAddress
, genBlockHeader
, genChainPoint
, genActiveSlotCoefficient
, shrinkActiveSlotCoefficient
, genSlotNo
Expand Down Expand Up @@ -50,6 +51,7 @@ import Cardano.Wallet.Primitive.AddressDiscovery.Shared
import Cardano.Wallet.Primitive.Types
( ActiveSlotCoefficient (..)
, BlockHeader (..)
, ChainPoint (..)
, ProtocolMagic (..)
, SlotNo (..)
)
Expand Down Expand Up @@ -91,6 +93,7 @@ import Test.QuickCheck
, arbitrarySizedNatural
, choose
, elements
, frequency
, listOf
, listOf1
, oneof
Expand Down Expand Up @@ -167,6 +170,14 @@ genSlotNo = SlotNo . fromIntegral <$> arbitrary @Word32
shrinkSlotNo :: SlotNo -> [SlotNo]
shrinkSlotNo (SlotNo x) = map SlotNo $ shrink x

genChainPoint :: Gen ChainPoint
genChainPoint = frequency
[ ( 1, pure ChainPointAtGenesis) -- "common" but not "very common"
, (40, toChainPoint <$> (genBlockHeader =<< genSlotNo))
]
where
toChainPoint (BlockHeader slot _ h _) = ChainPoint slot h

genBlockHeader :: SlotNo -> Gen BlockHeader
genBlockHeader sl = do
BlockHeader sl (mockBlockHeight sl) <$> genHash <*> genHash
Expand Down
Loading