diff --git a/lib/byron/src/Cardano/Wallet/Byron/Network.hs b/lib/byron/src/Cardano/Wallet/Byron/Network.hs index 4b30b239c55..e64f93aaaff 100644 --- a/lib/byron/src/Cardano/Wallet/Byron/Network.hs +++ b/lib/byron/src/Cardano/Wallet/Byron/Network.hs @@ -49,8 +49,10 @@ import Cardano.Wallet.Byron.Compatibility ) import Cardano.Wallet.Network ( Cursor, ErrPostTx (..), NetworkLayer (..), mapCursor ) +import Control.Concurrent + ( ThreadId ) import Control.Concurrent.Async - ( async, link ) + ( Async, async, asyncThreadId, cancel, link ) import Control.Exception ( IOException ) import Control.Monad @@ -192,6 +194,7 @@ import qualified Data.Text.Encoding as T -- | Network layer cursor for Byron. Mostly useless since the protocol itself is -- stateful and the node's keep track of the associated connection's cursor. data instance Cursor (m Byron) = Cursor + (Async ()) (Point ByronBlock) (TQueue m (ChainSyncCmd ByronBlock m)) @@ -232,6 +235,7 @@ withNetworkLayer tr np addrInfo versionData action = do { currentNodeTip = liftIO $ _currentNodeTip nodeTipVar , nextBlocks = _nextBlocks , initCursor = _initCursor + , destroyCursor = _destroyCursor , cursorSlotId = _cursorSlotId , getProtocolParameters = atomically $ readTVar protocolParamsVar , postTx = _postTx localTxSubmissionQ @@ -248,8 +252,8 @@ withNetworkLayer tr np addrInfo versionData action = do chainSyncQ <- atomically newTQueue client <- mkWalletClient gp chainSyncQ let handlers = failOnConnectionLost tr - link =<< async - (connectClient tr handlers client versionData addrInfo) + thread <- async (connectClient tr handlers client versionData addrInfo) + link thread let points = reverse $ genesisPoint : (toPoint getGenesisBlockHash getEpochLength <$> headers) let findIt = chainSyncQ `send` CmdFindIntersection points @@ -261,18 +265,22 @@ withNetworkLayer tr np addrInfo versionData action = do $ MsgIntersectionFound $ fromChainHash getGenesisBlockHash $ pointHash intersection - pure $ Cursor intersection chainSyncQ + pure $ Cursor thread intersection chainSyncQ _ -> fail $ unwords [ "initCursor: intersection not found? This can't happen" , "because we always give at least the genesis point." , "Here are the points we gave: " <> show headers ] - _nextBlocks (Cursor _ chainSyncQ) = do - let toCursor point = Cursor point chainSyncQ + _destroyCursor (Cursor thread _ _) = do + liftIO $ traceWith tr $ MsgDestroyCursor (asyncThreadId thread) + cancel thread + + _nextBlocks (Cursor thread _ chainSyncQ) = do + let toCursor point = Cursor thread point chainSyncQ liftIO $ mapCursor toCursor <$> chainSyncQ `send` CmdNextBlocks - _cursorSlotId (Cursor point _) = do + _cursorSlotId (Cursor _ point _) = do fromSlotNo getEpochLength $ fromWithOrigin (SlotNo 0) $ pointSlot point _getAccountBalance _ = @@ -572,6 +580,7 @@ data NetworkLayerLog | MsgNodeTip W.BlockHeader | MsgProtocolParameters W.ProtocolParameters | MsgLocalStateQueryError String + | MsgDestroyCursor ThreadId type HandshakeTrace = TraceSendRecv (Handshake NodeToClientVersion CBOR.Term) @@ -617,6 +626,10 @@ instance ToText NetworkLayerLog where [ "Error when querying local state parameters:" , T.pack e ] + MsgDestroyCursor threadId -> T.unwords + [ "Destroying cursor connection at" + , T.pack (show threadId) + ] instance HasPrivacyAnnotation NetworkLayerLog instance HasSeverityAnnotation NetworkLayerLog where @@ -635,3 +648,4 @@ instance HasSeverityAnnotation NetworkLayerLog where MsgNodeTip{} -> Debug MsgProtocolParameters{} -> Info MsgLocalStateQueryError{} -> Error + MsgDestroyCursor{} -> Notice diff --git a/lib/core/src/Cardano/Wallet/Network.hs b/lib/core/src/Cardano/Wallet/Network.hs index 662f9b91d9a..1a8decbc8d5 100644 --- a/lib/core/src/Cardano/Wallet/Network.hs +++ b/lib/core/src/Cardano/Wallet/Network.hs @@ -68,6 +68,8 @@ import Control.Retry ( RetryPolicyM, constantDelay, limitRetriesByCumulativeDelay, retrying ) import Control.Tracer ( Tracer, traceWith ) +import Data.Functor + ( ($>) ) import Data.List.NonEmpty ( NonEmpty (..) ) import Data.Quantity @@ -109,6 +111,10 @@ data NetworkLayer m target block = NetworkLayer -- ^ Creates a cursor from the given block header so that 'nextBlocks' -- can be used to fetch blocks. + , destroyCursor + :: Cursor target -> m () + -- ^ Cleanup network connection once we're done with them. + , cursorSlotId :: Cursor target -> SlotId -- ^ Get the slot corresponding to a cursor. @@ -340,18 +346,18 @@ follow nl tr cps yield header = step delay cursor where retry (e :: SomeException) = case asyncExceptionFromException e of - Just ThreadKilled -> - return FollowInterrupted - Just UserInterrupt -> - return FollowInterrupted + Just ThreadKilled -> do + destroyCursor nl cursor $> FollowInterrupted + Just UserInterrupt -> do + destroyCursor nl cursor $> FollowInterrupted Nothing | fromException e == Just AsyncCancelled -> do - return FollowInterrupted + destroyCursor nl cursor $> FollowInterrupted Just _ -> do traceWith tr $ MsgUnhandledException eT - return FollowFailure + destroyCursor nl cursor $> FollowFailure _ -> do traceWith tr $ MsgUnhandledException eT - return FollowFailure + destroyCursor nl cursor $> FollowFailure where eT = T.pack (show e) @@ -388,7 +394,7 @@ follow nl tr cps yield header = (sl, _:_) | sl == slotId (last cps) -> step delay0 cursor' (sl, _) -> - pure (FollowRollback sl) + destroyCursor nl cursor' $> FollowRollback sl where continueWith :: Cursor target -> FollowAction e -> IO FollowExit continueWith cursor' = \case diff --git a/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs b/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs index 816866628f7..e417d9a0694 100644 --- a/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs +++ b/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs @@ -308,6 +308,9 @@ mkRawNetworkLayer np batchSize st j = NetworkLayer , initCursor = _initCursor + , destroyCursor = + const (pure ()) + , cursorSlotId = _cursorSlotId diff --git a/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs b/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs index da0417f3234..edf6a82838e 100644 --- a/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs +++ b/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs @@ -67,8 +67,10 @@ import Cardano.Wallet.Shelley.Compatibility , toShelleyCoin , toStakeCredential ) +import Control.Concurrent + ( ThreadId ) import Control.Concurrent.Async - ( async, link ) + ( Async, async, asyncThreadId, cancel, link ) import Control.Exception ( IOException ) import Control.Monad @@ -214,6 +216,7 @@ import qualified Shelley.Spec.Ledger.PParams as SL -- | Network layer cursor for Shelley. Mostly useless since the protocol itself is -- stateful and the node's keep track of the associated connection's cursor. data instance Cursor (m Shelley) = Cursor + (Async ()) (Point ShelleyBlock) (TQueue m (ChainSyncCmd ShelleyBlock m)) @@ -248,6 +251,7 @@ withNetworkLayer tr np addrInfo versionData action = do { currentNodeTip = liftIO $ _currentNodeTip nodeTipVar , nextBlocks = _nextBlocks , initCursor = _initCursor + , destroyCursor = _destroyCursor , cursorSlotId = _cursorSlotId , getProtocolParameters = atomically $ readTVar protocolParamsVar , postTx = _postTx localTxSubmissionQ @@ -282,8 +286,8 @@ withNetworkLayer tr np addrInfo versionData action = do chainSyncQ <- atomically newTQueue client <- mkWalletClient gp chainSyncQ let handlers = failOnConnectionLost tr - link =<< async - (connectClient tr handlers client versionData addrInfo) + thread <- async (connectClient tr handlers client versionData addrInfo) + link thread let points = reverse $ genesisPoint : (toPoint getGenesisBlockHash getEpochLength <$> headers) let findIt = chainSyncQ `send` CmdFindIntersection points @@ -295,18 +299,22 @@ withNetworkLayer tr np addrInfo versionData action = do $ MsgIntersectionFound $ fromChainHash getGenesisBlockHash $ pointHash intersection - pure $ Cursor intersection chainSyncQ + pure $ Cursor thread intersection chainSyncQ _ -> fail $ unwords [ "initCursor: intersection not found? This can't happen" , "because we always give at least the genesis point." , "Here are the points we gave: " <> show headers ] - _nextBlocks (Cursor _ chainSyncQ) = do - let toCursor point = Cursor point chainSyncQ + _destroyCursor (Cursor thread _ _) = do + liftIO $ traceWith tr $ MsgDestroyCursor (asyncThreadId thread) + cancel thread + + _nextBlocks (Cursor thread _ chainSyncQ) = do + let toCursor point = Cursor thread point chainSyncQ liftIO $ mapCursor toCursor <$> chainSyncQ `send` CmdNextBlocks - _cursorSlotId (Cursor point _) = do + _cursorSlotId (Cursor _ point _) = do fromSlotNo getEpochLength $ fromWithOrigin (SlotNo 0) $ pointSlot point _getAccountBalance nodeTipVar queryRewardQ acct = do @@ -685,6 +693,7 @@ data NetworkLayerLog | MsgGetRewardAccountBalance W.BlockHeader W.ChimericAccount | MsgAccountDelegationAndRewards W.ChimericAccount Delegations RewardAccounts + | MsgDestroyCursor ThreadId type HandshakeTrace = TraceSendRecv (Handshake NodeToClientVersion CBOR.Term) @@ -741,6 +750,10 @@ instance ToText NetworkLayerLog where , " delegations = " <> T.pack (show delegations) , " rewards = " <> T.pack (show rewards) ] + MsgDestroyCursor threadId -> T.unwords + [ "Destroying cursor connection at" + , T.pack (show threadId) + ] instance HasPrivacyAnnotation NetworkLayerLog instance HasSeverityAnnotation NetworkLayerLog where @@ -761,3 +774,4 @@ instance HasSeverityAnnotation NetworkLayerLog where MsgLocalStateQueryError{} -> Error MsgGetRewardAccountBalance{} -> Info MsgAccountDelegationAndRewards{} -> Info + MsgDestroyCursor{} -> Notice