Skip to content

Commit

Permalink
Merge #2750
Browse files Browse the repository at this point in the history
2750: Refactor chain following r=piotr-iohk a=Anviking

### Issue Number

Based on #2745 
ADP-871

### Motivation

Simplify and clean up the chain following code, in the hope of making the wallet more robust against node disconnects. In particular, reduce the number of threads used to run the ChainSync protocol.

### Overview

* The `chainSync` function now takes a record of callbacks, `ChainFollower`, as an argument. These callbacks are used to request the current intersection, as well as react to roll forward and roll backwards messages.
* Remove the old `Cursor` type.
* Reduce the number of threads used for implementing the above.

### Progress

- [x] Integration tests run without errors
- [ ] Clean up / review more closely
    - [x] Corner cases
    - [x] Logging 
        - [x] Aggregation of sync progress works again
        - [x] Ensure that all `ChainFollowLog` messages are traced or removed if redundant.
        - [x] `withFollowStatsMonitoring` runs in a separate thread so that it can compute statistics in regular time intervals.
    - [ ] Node disconnect errors
        - [x] … are thrown as exceptions in `connectTo` and are handled by `recoveringNodeConnection`
        - [ ] Check again at the end that the PR solves the bug
    - [x] Can we property-test this? (Probably no benefit.)
    - [x] Check for performance regressions (running the chain-sync and db operations on the same thread might make it slightly slower)
        - Actually seemed 10% faster running integration tests locally with -j 8.
            - ef3fdc3 -> 274s (n=2)
            - master -> 303s (n=1) <- but I only ran once
            - Speedup _could be_ related to chain-following now being able to rollback without necessarily re-negotiating the intersection.

### Comments


<!-- Additional comments or screenshots to attach if any -->

<!--
Don't forget to:

 ✓ Self-review your changes to make sure nothing unexpected slipped through
 ✓ Assign yourself to the PR
 ✓ Assign one or several reviewer(s)
 ✓ Jira will detect and link to this PR once created, but you can also link this PR in the description of the corresponding ticket
 ✓ Acknowledge any changes required to the Wiki
 ✓ Finally, in the PR description delete any empty sections and all text commented in <!--, so that this text does not appear in merge commit messages.
-->


Co-authored-by: Heinrich Apfelmus <[email protected]>
Co-authored-by: Johannes Lund <[email protected]>
  • Loading branch information
3 people authored Nov 4, 2021
2 parents 159c80a + 99cfe7c commit 75c0f19
Show file tree
Hide file tree
Showing 21 changed files with 1,057 additions and 1,088 deletions.
151 changes: 120 additions & 31 deletions lib/core/src/Cardano/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,10 @@ import Cardano.Wallet.Logging
, unliftIOTracer
)
import Cardano.Wallet.Network
( ErrPostTx (..)
, FollowAction (..)
, FollowExceptionRecovery (..)
, FollowLog (..)
( ChainFollowLog (..)
, ChainFollower (..)
, ErrPostTx (..)
, NetworkLayer (..)
, follow
)
import Cardano.Wallet.Primitive.AddressDerivation
( DelegationAddress (..)
Expand Down Expand Up @@ -349,6 +347,7 @@ import Cardano.Wallet.Primitive.Types
( ActiveSlotCoefficient (..)
, Block (..)
, BlockHeader (..)
, ChainPoint (..)
, DelegationCertificate (..)
, FeePolicy (LinearFee)
, GenesisParameters (..)
Expand Down Expand Up @@ -535,7 +534,7 @@ import Statistics.Quantile
import Type.Reflection
( Typeable, typeRep )
import UnliftIO.Exception
( Exception )
( Exception, catch, throwIO )
import UnliftIO.MVar
( modifyMVar_, newMVar )

Expand Down Expand Up @@ -881,15 +880,17 @@ listUtxoStatistics ctx wid = do
let utxo = availableUTxO @s pending wal
pure $ computeUtxoStatistics log10 utxo

-- | Restore a wallet from its current tip up to the network tip.
-- | Restore a wallet from its current tip.
--
-- This function returns immediately, starting a worker thread in the
-- background that will fetch and apply remaining blocks until the
-- network tip is reached or until failure.
-- After the wallet has been restored,
-- this action will continue to fetch newly created blocks
-- and apply them, or roll back to a previous point whenever
-- the chain switches.
restoreWallet
:: forall ctx s k.
( HasNetworkLayer IO ctx
, HasDBLayer IO s k ctx
, HasGenesisData ctx
, HasLogger IO WalletWorkerLog ctx
, IsOurs s Address
, IsOurs s RewardAccount
Expand All @@ -898,31 +899,121 @@ restoreWallet
-> WalletId
-> ExceptT ErrNoSuchWallet IO ()
restoreWallet ctx wid = db & \DBLayer{..} -> do
let readCps = liftIO $ atomically $ listCheckpoints wid
let forward bs h innerTr = run $ do
restoreBlocks @ctx @s @k ctx innerTr wid bs h
let backward = runExceptT . rollbackBlocks @ctx @s @k ctx wid
liftIO $ follow nw tr readCps forward backward RetryOnExceptions (view #header)
catchFromIO $ chainSync nw (contramap MsgChainFollow tr) $ ChainFollower
{ readLocalTip = liftIO $ atomically $
map (toChainPoint block0) <$> listCheckpoints wid
, rollForward = \blocks tip -> throwInIO $
restoreBlocks @ctx @s @k
ctx (contramap MsgWalletFollow tr) wid blocks tip
, rollBackward =
throwInIO . rollbackBlocks @ctx @s @k ctx wid
}
where
db = ctx ^. dbLayer @IO @s @k
nw = ctx ^. networkLayer
tr = contramap MsgFollow (ctx ^. logger @_ @WalletWorkerLog)
nw = ctx ^. networkLayer @IO
tr = ctx ^. logger @_ @WalletWorkerLog
(block0, _, _) = ctx ^. genesisData

-- See Note [CheckedExceptionsAndCallbacks]
throwInIO :: ExceptT ErrNoSuchWallet IO a -> IO a
throwInIO x = runExceptT x >>= \case
Right a -> pure a
Left e -> throwIO $ UncheckErrNoSuchWallet e

run :: ExceptT ErrNoSuchWallet IO () -> IO (FollowAction ErrNoSuchWallet)
run = fmap (either ExitWith (const Continue)) . runExceptT
catchFromIO :: IO a -> ExceptT ErrNoSuchWallet IO a
catchFromIO m = ExceptT $
(Right <$> m) `catch` (\(UncheckErrNoSuchWallet e) -> pure $ Left e)

newtype UncheckErrNoSuchWallet = UncheckErrNoSuchWallet ErrNoSuchWallet
deriving (Eq, Show)
instance Exception UncheckErrNoSuchWallet

{- NOTE [CheckedExceptionsAndCallbacks]
Callback functions (such as the fields of 'ChainFollower')
may throw exceptions. Such exceptions typically cause the thread
(such as 'chainSync') which calls the callbacks to exit and
to return control to its parent.
Ideally, we would like these exceptions to be \"checked exceptions\",
which means that they are visible on the type level.
In our codebase, we (should) make sure that exceptions which are checked
cannot be instances of the 'Exception' class -- in this way,
it is statically guaranteed that they cannot be thrown in the 'IO' monad.
On the flip side, visibility on the type level does imply that
the calling thread (here 'chainSync') needs to be either polymorphic
in the checked exceptions or aware of them.
Making 'chainSync' aware of the checked exception is currently
not a good idea, because this function is used in different contexts,
which have different checked exceptions.
So, it would need to be polymorophic in the the undelrying monad,
but at present, 'chainSync' is restricted to 'IO' beause some
of its constituents are also restricted to 'IO'.
As a workaround / solution, we wrap the checked exception into a new type
which can be thrown in the 'IO' monad.
When the calling thread exits, we catch the exception again
and present it as a checked exception.
-}

-- | Rewind the UTxO snapshots, transaction history and other information to a
-- the earliest point in the past that is before or is the point of rollback.
rollbackBlocks
:: forall ctx s k. (HasDBLayer IO s k ctx)
:: forall ctx s k.
( HasDBLayer IO s k ctx
, HasGenesisData ctx
)
=> ctx
-> WalletId
-> SlotNo
-> ExceptT ErrNoSuchWallet IO SlotNo
-> ChainPoint
-> ExceptT ErrNoSuchWallet IO ChainPoint
rollbackBlocks ctx wid point = db & \DBLayer{..} -> do
mapExceptT atomically $ rollbackTo wid point
mapExceptT atomically $ (toChainPoint block0)
<$> rollbackTo wid (pseudoPointSlot point)
where
db = ctx ^. dbLayer @IO @s @k
(block0, _, _) = ctx ^. genesisData

-- See NOTE [PointSlotNo]
pseudoPointSlot :: ChainPoint -> SlotNo
pseudoPointSlot ChainPointAtGenesis = W.SlotNo 0
pseudoPointSlot (ChainPoint slot _) = slot

toChainPoint :: W.Block -> W.BlockHeader -> ChainPoint
toChainPoint genesisBlock (BlockHeader slot _ h _)
| slot == 0 && h == genesisHash = ChainPointAtGenesis
| otherwise = ChainPoint slot h
where
genesisHash = genesisBlock ^. (#header . #headerHash)

{- NOTE [PointSlotNo]
`SlotNo` cannot represent the genesis point `Origin`.
Historical hack. Our DB layers can't represent `Origin` when rolling
back, so we map `Origin` to `SlotNo 0`, which is wrong.
Rolling back to SlotNo 0 instead of Origin is fine for followers starting
from genesis (which should be the majority of cases). Other, non-trivial
rollbacks to genesis cannot occur on mainnet (genesis is years within
stable part, and there were no rollbacks in byron).
Could possibly be problematic in the beginning of a testnet without a
byron era. /Perhaps/ this is what is happening in the
>>> [cardano-wallet.pools-engine:Error:1293] [2020-11-24 10:02:04.00 UTC]
>>> Couldn't store production for given block before it conflicts with
>>> another block. Conflicting block header is:
>>> 5bde7e7b<-[f1b35b98-4290#2008]
errors observed in the integration tests.
FIXME: Fix should be relatively straight-forward, so we should probably
do it.
Heinrich: I have introduced the 'ChainPoint' type to represent points
on the chain. This type is already used in chain sync protocol,
but it still needs to be propagated to the database layer.
-}

-- | Apply the given blocks to the wallet and update the wallet state,
-- transaction history and corresponding metadata.
Expand Down Expand Up @@ -998,7 +1089,6 @@ restoreBlocks ctx tr wid blocks nodeTip = db & \DBLayer{..} -> mapExceptT atomic

liftIO $ do
traceWith tr $ MsgDiscoveredTxs txs
traceWith tr $ MsgBlocks blocks
traceWith tr $ MsgDiscoveredTxsContent txs
where
nl = ctx ^. networkLayer
Expand Down Expand Up @@ -3043,28 +3133,30 @@ guardQuit WalletDelegation{active,next} rewards = do
-- | Log messages for actions running within a wallet worker context.
data WalletWorkerLog
= MsgWallet WalletLog
| MsgFollow (FollowLog WalletFollowLog)
| MsgWalletFollow WalletFollowLog
| MsgChainFollow ChainFollowLog
deriving (Show, Eq)

instance ToText WalletWorkerLog where
toText = \case
MsgWallet msg -> toText msg
MsgFollow msg -> toText msg
MsgWalletFollow msg -> toText msg
MsgChainFollow msg -> toText msg

instance HasPrivacyAnnotation WalletWorkerLog

instance HasSeverityAnnotation WalletWorkerLog where
getSeverityAnnotation = \case
MsgWallet msg -> getSeverityAnnotation msg
MsgFollow msg -> getSeverityAnnotation msg
MsgWalletFollow msg -> getSeverityAnnotation msg
MsgChainFollow msg -> getSeverityAnnotation msg

-- | Log messages arising from the restore and follow process.
data WalletFollowLog
= MsgDiscoveredDelegationCert SlotNo DelegationCertificate
| MsgCheckpoint BlockHeader
| MsgDiscoveredTxs [(Tx, TxMeta)]
| MsgDiscoveredTxsContent [(Tx, TxMeta)]
| MsgBlocks (NonEmpty Block)
deriving (Show, Eq)

-- | Log messages from API server actions running in a wallet worker context.
Expand Down Expand Up @@ -3107,8 +3199,6 @@ instance ToText WalletFollowLog where
"discovered " <> pretty (length txs) <> " new transaction(s)"
MsgDiscoveredTxsContent txs ->
"transactions: " <> pretty (blockListF (snd <$> txs))
MsgBlocks blocks ->
"blocks: " <> pretty (NE.toList blocks)

instance ToText WalletLog where
toText = \case
Expand Down Expand Up @@ -3153,7 +3243,6 @@ instance HasSeverityAnnotation WalletFollowLog where
MsgDiscoveredTxs [] -> Debug
MsgDiscoveredTxs _ -> Info
MsgDiscoveredTxsContent _ -> Debug
MsgBlocks _ -> Debug -- Ideally move to FollowLog or remove

instance HasPrivacyAnnotation WalletLog
instance HasSeverityAnnotation WalletLog where
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/Cardano/Wallet/Api.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ data ApiLayer s (k :: Depth -> Type -> Type)
(Tracer IO TxSubmitLog)
(Tracer IO (WorkerLog WalletId WalletWorkerLog))
(Block, NetworkParameters, SyncTolerance)
(NetworkLayer IO (Block))
(NetworkLayer IO Block)
(TransactionLayer k SealedTx)
(DBFactory IO s k)
(WorkerRegistry WalletId (DBLayer IO s k))
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/Cardano/Wallet/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ data DBLayer m s k = forall stm. (MonadIO stm, MonadFail stm) => DBLayer
, rollbackTo
:: WalletId
-> SlotNo
-> ExceptT ErrNoSuchWallet stm SlotNo
-> ExceptT ErrNoSuchWallet stm BlockHeader
-- ^ Drops all checkpoints and transaction data after the given slot.
--
-- Returns the actual slot to which the database has rolled back. This
Expand Down
4 changes: 2 additions & 2 deletions lib/core/src/Cardano/Wallet/DB/Model.hs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mRemovePendingOrExpiredTx wid tid = alterModelErr wid $ \wal ->
, submittedTxs = Map.delete tid (submittedTxs wal)
} )

mRollbackTo :: Ord wid => wid -> SlotNo -> ModelOp wid s xprv SlotNo
mRollbackTo :: Ord wid => wid -> SlotNo -> ModelOp wid s xprv BlockHeader
mRollbackTo wid requested db@(Database wallets txs) = case Map.lookup wid wallets of
Nothing ->
( Left (NoSuchWallet wid), db )
Expand All @@ -319,7 +319,7 @@ mRollbackTo wid requested db@(Database wallets txs) = case Map.lookup wid wallet
Map.mapMaybe (rescheduleOrForget point) (txHistory wal)
}
in
( Right point
( Right $ view #currentTip (checkpoints wal Map.! point)
, Database (Map.insert wid wal' wallets) txs
)
where
Expand Down
4 changes: 3 additions & 1 deletion lib/core/src/Cardano/Wallet/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,9 @@ newDBLayerWith cacheBehavior tr ti SqliteContext{runQuery} = do
[ StakeKeyCertSlot >. nearestPoint
]
refreshCache wid
pure (Right nearestPoint)
selectLatestCheckpointCached wid >>= \case
Nothing -> error "Sqlite.rollbackTo: impossible code path"
Just cp -> pure $ Right $ cp ^. #currentTip

, prune = \wid epochStability -> ExceptT $ do
selectLatestCheckpointCached wid >>= \case
Expand Down
11 changes: 11 additions & 0 deletions lib/core/src/Cardano/Wallet/Gen.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Cardano.Wallet.Gen
, shrinkPercentage
, genLegacyAddress
, genBlockHeader
, genChainPoint
, genActiveSlotCoefficient
, shrinkActiveSlotCoefficient
, genSlotNo
Expand Down Expand Up @@ -50,6 +51,7 @@ import Cardano.Wallet.Primitive.AddressDiscovery.Shared
import Cardano.Wallet.Primitive.Types
( ActiveSlotCoefficient (..)
, BlockHeader (..)
, ChainPoint (..)
, ProtocolMagic (..)
, SlotNo (..)
)
Expand Down Expand Up @@ -91,6 +93,7 @@ import Test.QuickCheck
, arbitrarySizedNatural
, choose
, elements
, frequency
, listOf
, listOf1
, oneof
Expand Down Expand Up @@ -167,6 +170,14 @@ genSlotNo = SlotNo . fromIntegral <$> arbitrary @Word32
shrinkSlotNo :: SlotNo -> [SlotNo]
shrinkSlotNo (SlotNo x) = map SlotNo $ shrink x

genChainPoint :: Gen ChainPoint
genChainPoint = frequency
[ ( 1, pure ChainPointAtGenesis) -- "common" but not "very common"
, (40, toChainPoint <$> (genBlockHeader =<< genSlotNo))
]
where
toChainPoint (BlockHeader slot _ h _) = ChainPoint slot h

genBlockHeader :: SlotNo -> Gen BlockHeader
genBlockHeader sl = do
BlockHeader sl (mockBlockHeight sl) <$> genHash <*> genHash
Expand Down
Loading

0 comments on commit 75c0f19

Please sign in to comment.