Skip to content

Commit

Permalink
destroy node's connection when they're no longer needed
Browse files Browse the repository at this point in the history
  • Loading branch information
KtorZ committed Jun 16, 2020
1 parent 1f17b4d commit 9ff29c0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 22 deletions.
28 changes: 21 additions & 7 deletions lib/byron/src/Cardano/Wallet/Byron/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ import Cardano.Wallet.Byron.Compatibility
)
import Cardano.Wallet.Network
( Cursor, ErrPostTx (..), NetworkLayer (..), mapCursor )
import Control.Concurrent
( ThreadId )
import Control.Concurrent.Async
( async, link )
( Async, async, asyncThreadId, cancel, link )
import Control.Exception
( IOException )
import Control.Monad
Expand Down Expand Up @@ -192,6 +194,7 @@ import qualified Data.Text.Encoding as T
-- | Network layer cursor for Byron. Mostly useless since the protocol itself is
-- stateful and the node's keep track of the associated connection's cursor.
data instance Cursor (m Byron) = Cursor
(Async ())
(Point ByronBlock)
(TQueue m (ChainSyncCmd ByronBlock m))

Expand Down Expand Up @@ -232,6 +235,7 @@ withNetworkLayer tr np addrInfo versionData action = do
{ currentNodeTip = liftIO $ _currentNodeTip nodeTipVar
, nextBlocks = _nextBlocks
, initCursor = _initCursor
, destroyCursor = _destroyCursor
, cursorSlotId = _cursorSlotId
, getProtocolParameters = atomically $ readTVar protocolParamsVar
, postTx = _postTx localTxSubmissionQ
Expand All @@ -248,8 +252,8 @@ withNetworkLayer tr np addrInfo versionData action = do
chainSyncQ <- atomically newTQueue
client <- mkWalletClient gp chainSyncQ
let handlers = failOnConnectionLost tr
link =<< async
(connectClient tr handlers client versionData addrInfo)
thread <- async (connectClient tr handlers client versionData addrInfo)
link thread
let points = reverse $ genesisPoint :
(toPoint getGenesisBlockHash getEpochLength <$> headers)
let findIt = chainSyncQ `send` CmdFindIntersection points
Expand All @@ -261,18 +265,22 @@ withNetworkLayer tr np addrInfo versionData action = do
$ MsgIntersectionFound
$ fromChainHash getGenesisBlockHash
$ pointHash intersection
pure $ Cursor intersection chainSyncQ
pure $ Cursor thread intersection chainSyncQ
_ -> fail $ unwords
[ "initCursor: intersection not found? This can't happen"
, "because we always give at least the genesis point."
, "Here are the points we gave: " <> show headers
]

_nextBlocks (Cursor _ chainSyncQ) = do
let toCursor point = Cursor point chainSyncQ
_destroyCursor (Cursor thread _ _) = do
liftIO $ traceWith tr $ MsgDestroyCursor (asyncThreadId thread)
cancel thread

_nextBlocks (Cursor thread _ chainSyncQ) = do
let toCursor point = Cursor thread point chainSyncQ
liftIO $ mapCursor toCursor <$> chainSyncQ `send` CmdNextBlocks

_cursorSlotId (Cursor point _) = do
_cursorSlotId (Cursor _ point _) = do
fromSlotNo getEpochLength $ fromWithOrigin (SlotNo 0) $ pointSlot point

_getAccountBalance _ =
Expand Down Expand Up @@ -572,6 +580,7 @@ data NetworkLayerLog
| MsgNodeTip W.BlockHeader
| MsgProtocolParameters W.ProtocolParameters
| MsgLocalStateQueryError String
| MsgDestroyCursor ThreadId

type HandshakeTrace = TraceSendRecv (Handshake NodeToClientVersion CBOR.Term)

Expand Down Expand Up @@ -617,6 +626,10 @@ instance ToText NetworkLayerLog where
[ "Error when querying local state parameters:"
, T.pack e
]
MsgDestroyCursor threadId -> T.unwords
[ "Destroying cursor connection at"
, T.pack (show threadId)
]

instance HasPrivacyAnnotation NetworkLayerLog
instance HasSeverityAnnotation NetworkLayerLog where
Expand All @@ -635,3 +648,4 @@ instance HasSeverityAnnotation NetworkLayerLog where
MsgNodeTip{} -> Debug
MsgProtocolParameters{} -> Info
MsgLocalStateQueryError{} -> Error
MsgDestroyCursor{} -> Notice
22 changes: 14 additions & 8 deletions lib/core/src/Cardano/Wallet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import Control.Retry
( RetryPolicyM, constantDelay, limitRetriesByCumulativeDelay, retrying )
import Control.Tracer
( Tracer, traceWith )
import Data.Functor
( ($>) )
import Data.List.NonEmpty
( NonEmpty (..) )
import Data.Quantity
Expand Down Expand Up @@ -109,6 +111,10 @@ data NetworkLayer m target block = NetworkLayer
-- ^ Creates a cursor from the given block header so that 'nextBlocks'
-- can be used to fetch blocks.

, destroyCursor
:: Cursor target -> m ()
-- ^ Cleanup network connection once we're done with them.

, cursorSlotId
:: Cursor target -> SlotId
-- ^ Get the slot corresponding to a cursor.
Expand Down Expand Up @@ -340,18 +346,18 @@ follow nl tr cps yield header =
step delay cursor
where
retry (e :: SomeException) = case asyncExceptionFromException e of
Just ThreadKilled ->
return FollowInterrupted
Just UserInterrupt ->
return FollowInterrupted
Just ThreadKilled -> do
destroyCursor nl cursor $> FollowInterrupted
Just UserInterrupt -> do
destroyCursor nl cursor $> FollowInterrupted
Nothing | fromException e == Just AsyncCancelled -> do
return FollowInterrupted
destroyCursor nl cursor $> FollowInterrupted
Just _ -> do
traceWith tr $ MsgUnhandledException eT
return FollowFailure
destroyCursor nl cursor $> FollowFailure
_ -> do
traceWith tr $ MsgUnhandledException eT
return FollowFailure
destroyCursor nl cursor $> FollowFailure
where
eT = T.pack (show e)

Expand Down Expand Up @@ -388,7 +394,7 @@ follow nl tr cps yield header =
(sl, _:_) | sl == slotId (last cps) ->
step delay0 cursor'
(sl, _) ->
pure (FollowRollback sl)
destroyCursor nl cursor' $> FollowRollback sl
where
continueWith :: Cursor target -> FollowAction e -> IO FollowExit
continueWith cursor' = \case
Expand Down
3 changes: 3 additions & 0 deletions lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ mkRawNetworkLayer np batchSize st j = NetworkLayer
, initCursor =
_initCursor

, destroyCursor =
const (pure ())

, cursorSlotId =
_cursorSlotId

Expand Down
28 changes: 21 additions & 7 deletions lib/shelley/src/Cardano/Wallet/Shelley/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ import Cardano.Wallet.Shelley.Compatibility
, toShelleyCoin
, toStakeCredential
)
import Control.Concurrent
( ThreadId )
import Control.Concurrent.Async
( async, link )
( Async, async, asyncThreadId, cancel, link )
import Control.Exception
( IOException )
import Control.Monad
Expand Down Expand Up @@ -214,6 +216,7 @@ import qualified Shelley.Spec.Ledger.PParams as SL
-- | Network layer cursor for Shelley. Mostly useless since the protocol itself is
-- stateful and the node's keep track of the associated connection's cursor.
data instance Cursor (m Shelley) = Cursor
(Async ())
(Point ShelleyBlock)
(TQueue m (ChainSyncCmd ShelleyBlock m))

Expand Down Expand Up @@ -248,6 +251,7 @@ withNetworkLayer tr np addrInfo versionData action = do
{ currentNodeTip = liftIO $ _currentNodeTip nodeTipVar
, nextBlocks = _nextBlocks
, initCursor = _initCursor
, destroyCursor = _destroyCursor
, cursorSlotId = _cursorSlotId
, getProtocolParameters = atomically $ readTVar protocolParamsVar
, postTx = _postTx localTxSubmissionQ
Expand Down Expand Up @@ -282,8 +286,8 @@ withNetworkLayer tr np addrInfo versionData action = do
chainSyncQ <- atomically newTQueue
client <- mkWalletClient gp chainSyncQ
let handlers = failOnConnectionLost tr
link =<< async
(connectClient tr handlers client versionData addrInfo)
thread <- async (connectClient tr handlers client versionData addrInfo)
link thread
let points = reverse $ genesisPoint :
(toPoint getGenesisBlockHash getEpochLength <$> headers)
let findIt = chainSyncQ `send` CmdFindIntersection points
Expand All @@ -295,18 +299,22 @@ withNetworkLayer tr np addrInfo versionData action = do
$ MsgIntersectionFound
$ fromChainHash getGenesisBlockHash
$ pointHash intersection
pure $ Cursor intersection chainSyncQ
pure $ Cursor thread intersection chainSyncQ
_ -> fail $ unwords
[ "initCursor: intersection not found? This can't happen"
, "because we always give at least the genesis point."
, "Here are the points we gave: " <> show headers
]

_nextBlocks (Cursor _ chainSyncQ) = do
let toCursor point = Cursor point chainSyncQ
_destroyCursor (Cursor thread _ _) = do
liftIO $ traceWith tr $ MsgDestroyCursor (asyncThreadId thread)
cancel thread

_nextBlocks (Cursor thread _ chainSyncQ) = do
let toCursor point = Cursor thread point chainSyncQ
liftIO $ mapCursor toCursor <$> chainSyncQ `send` CmdNextBlocks

_cursorSlotId (Cursor point _) = do
_cursorSlotId (Cursor _ point _) = do
fromSlotNo getEpochLength $ fromWithOrigin (SlotNo 0) $ pointSlot point

_getAccountBalance nodeTipVar queryRewardQ acct = do
Expand Down Expand Up @@ -685,6 +693,7 @@ data NetworkLayerLog
| MsgGetRewardAccountBalance W.BlockHeader W.ChimericAccount
| MsgAccountDelegationAndRewards W.ChimericAccount
Delegations RewardAccounts
| MsgDestroyCursor ThreadId

type HandshakeTrace = TraceSendRecv (Handshake NodeToClientVersion CBOR.Term)

Expand Down Expand Up @@ -741,6 +750,10 @@ instance ToText NetworkLayerLog where
, " delegations = " <> T.pack (show delegations)
, " rewards = " <> T.pack (show rewards)
]
MsgDestroyCursor threadId -> T.unwords
[ "Destroying cursor connection at"
, T.pack (show threadId)
]

instance HasPrivacyAnnotation NetworkLayerLog
instance HasSeverityAnnotation NetworkLayerLog where
Expand All @@ -761,3 +774,4 @@ instance HasSeverityAnnotation NetworkLayerLog where
MsgLocalStateQueryError{} -> Error
MsgGetRewardAccountBalance{} -> Info
MsgAccountDelegationAndRewards{} -> Info
MsgDestroyCursor{} -> Notice

0 comments on commit 9ff29c0

Please sign in to comment.