Skip to content

Commit

Permalink
use 'Chan' instead of 'TQueue' for node's tip, and simplify the watch…
Browse files Browse the repository at this point in the history
…er logic

Chans have the nice property to be 'duplicable'. So, we can rely on this instead of re-implementing our own fan-out logic. I believe the original error that made worker crashes could have been a race-condition where the watcher would exit unexpectedly but I wasn't quite clearly able to identify the cause. Using 'Chan' looks definitely simpler.

I've already removed the possibility for workers to exit with a failure. This wasn't actually used and added on top of the complexity. If that's needed, we can work it out a bit later but for now, a simpler API makes it easier to debug.

Integration scenarios are no longer 'blowing up' unexpectedly, yet, the 'reward accumulate and stop' scenario still does not pass.
  • Loading branch information
KtorZ committed Jun 27, 2020
1 parent c372802 commit f42de76
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 111 deletions.
36 changes: 23 additions & 13 deletions lib/core/src/Cardano/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -921,19 +921,24 @@ manageRewardBalance
-> WalletId
-> IO ()
manageRewardBalance ctx wid = db & \DBLayer{..} -> do
Left err <- runExceptT $ watchNodeTip $ \bh -> do
lift $ traceWith tr $ MsgRewardBalanceQuery bh
res <- lift $ runExceptT (queryRewardBalance @ctx @s @t @k ctx wid)
lift $ traceWith tr $ MsgRewardBalanceResult res
case res of
Right amt ->
withExceptT show $
mapExceptT atomically $ putDelegationRewardBalance pk amt
watchNodeTip $ \bh -> do
traceWith tr $ MsgRewardBalanceQuery bh
query <- runExceptT $ queryRewardBalance @ctx @s @t @k ctx wid
traceWith tr $ MsgRewardBalanceResult query
case query of
Right amt -> do
res <- atomically $ runExceptT $ putDelegationRewardBalance pk amt
-- It can happen that the wallet doesn't exist _yet_, whereas we
-- already have a reward balance. If that's the case, we log and
-- move on.
case res of
Left err -> traceWith tr $ MsgRewardBalanceNoSuchWallet err
Right () -> pure ()
Left _err ->
-- Occasionaly failing to query is generally not fatal. It will
-- just update the balance next time the tip changes.
pure ()
traceWith tr $ MsgRewardBalanceFinish err
traceWith tr MsgRewardBalanceExited

where
pk = PrimaryKey wid
Expand Down Expand Up @@ -2090,7 +2095,8 @@ data WalletLog
| MsgPaymentCoinSelectionAdjusted CoinSelection
| MsgRewardBalanceQuery BlockHeader
| MsgRewardBalanceResult (Either ErrFetchRewards (Quantity "lovelace" Word64))
| MsgRewardBalanceFinish String
| MsgRewardBalanceNoSuchWallet ErrNoSuchWallet
| MsgRewardBalanceExited
deriving (Show, Eq)

instance ToText WalletLog where
Expand Down Expand Up @@ -2136,11 +2142,14 @@ instance ToText WalletLog where
"Updating the reward balance for block " <> pretty bh
MsgRewardBalanceResult (Right amt) ->
"The reward balance is " <> pretty amt
MsgRewardBalanceNoSuchWallet err ->
"Trying to store a balance for a wallet that doesn't exist (yet?): " <>
T.pack (show err)
MsgRewardBalanceResult (Left err) ->
"Problem fetching reward balance. Will try again on next chain update. " <>
T.pack (show err)
MsgRewardBalanceFinish err ->
"Reward balance worker has finished due to: " <> T.pack err
MsgRewardBalanceExited ->
"Reward balance worker has exited."

instance HasPrivacyAnnotation WalletLog
instance HasSeverityAnnotation WalletLog where
Expand All @@ -2162,4 +2171,5 @@ instance HasSeverityAnnotation WalletLog where
MsgRewardBalanceQuery _ -> Debug
MsgRewardBalanceResult (Right _) -> Debug
MsgRewardBalanceResult (Left _) -> Notice
MsgRewardBalanceFinish _ -> Debug
MsgRewardBalanceNoSuchWallet{} -> Warning
MsgRewardBalanceExited -> Notice
6 changes: 3 additions & 3 deletions lib/core/src/Cardano/Wallet/Api/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ import Cardano.Wallet.Unsafe
import Control.Arrow
( second )
import Control.Concurrent.Async
( concurrently_ )
( race_ )
import Control.Exception
( IOException, bracket, throwIO, tryJust )
import Control.Monad
Expand Down Expand Up @@ -1483,7 +1483,7 @@ initWorker ctx wid createWallet restoreWallet coworker =
, workerMain = \ctx' _ -> do
-- FIXME:
-- Review error handling here
concurrently_
race_
(unsafeRunExceptT $ restoreWallet ctx')
(coworker ctx')

Expand Down Expand Up @@ -1635,7 +1635,7 @@ registerWorker ctx coworker wid =
, workerMain = \ctx' _ -> do
-- FIXME:
-- Review error handling here
concurrently_
race_
(unsafeRunExceptT $ W.restoreWallet @(WorkerCtx ctx) @s @t ctx' wid)
(coworker ctx' wid)

Expand Down
8 changes: 3 additions & 5 deletions lib/core/src/Cardano/Wallet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,10 @@ data NetworkLayer m target block = NetworkLayer
-- ^ Get the current tip from the chain producer

, watchNodeTip
:: (BlockHeader -> ExceptT String m ())
-> ExceptT String m ()
:: (BlockHeader -> m ())
-> m ()
-- ^ Register a callback for when the node tip changes.
-- If the given callback fails, the watcher stops. Otherwise, this
-- function never returns.
-- fixme: needs to be polymorphic on error type
-- The callback isn't allowed to fail.

, getProtocolParameters
:: m ProtocolParameters
Expand Down
1 change: 0 additions & 1 deletion lib/shelley/cardano-wallet-shelley.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ library
, cryptonite
, directory
, exceptions
, extra
, filepath
, fmt
, generic-lens
Expand Down
113 changes: 24 additions & 89 deletions lib/shelley/src/Cardano/Wallet/Shelley/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ import Cardano.Wallet.Shelley.Compatibility
import Control.Concurrent
( ThreadId )
import Control.Concurrent.Async
( Async, async, asyncThreadId, cancel, forConcurrently, link )
( Async, async, asyncThreadId, cancel, link )
import Control.Concurrent.Chan
( dupChan, newChan, readChan, writeChan )
import Control.Exception
( IOException )
import Control.Monad
( forever, join, unless, (>=>) )
( forever, unless, (>=>) )
import Control.Monad.Catch
( Handler (..) )
import Control.Monad.Class.MonadAsync
Expand All @@ -82,20 +84,14 @@ import Control.Monad.Class.MonadST
( MonadST )
import Control.Monad.Class.MonadSTM
( MonadSTM
, MonadSTMTx
, TQueue
, TVar_
, atomically
, modifyTVar
, newEmptyTMVar
, newTMVarM
, newTQueue
, newTVar
, putTMVar
, readTQueue
, readTVar
, takeTMVar
, writeTQueue
, writeTVar
)
import Control.Monad.Class.MonadThrow
Expand All @@ -105,7 +101,7 @@ import Control.Monad.Class.MonadTimer
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Trans.Except
( ExceptT (..), runExceptT, throwE, withExceptT )
( ExceptT (..), throwE, withExceptT )
import Control.Retry
( RetryPolicyM, RetryStatus (..), capDelay, fibonacciBackoff, recovering )
import Control.Tracer
Expand All @@ -128,8 +124,6 @@ import Data.Text
( Text )
import Data.Text.Class
( ToText (..) )
import Data.Tuple.Extra
( fst3 )
import Data.Void
( Void )
import Data.Word
Expand Down Expand Up @@ -249,20 +243,18 @@ withNetworkLayer tr np addrInfo versionData action = do
-- doesn't rely on the intersection to be up-to-date.
let handlers = retryOnConnectionLost tr

(nodeTipUpdate, protocolParamsVar, localTxSubmissionQ) <-
(nodeTipQ, protocolParamsVar, localTxSubmissionQ) <-
connectNodeTipClient handlers

queryRewardQ <- connectDelegationRewardsClient handlers

registerCallback <- setupWatchers (contramap MsgWatchTip tr) nodeTipUpdate

nodeTipVar <- atomically $ newTVar TipGenesis
updateNodeTip <- registerCallback (fmap Right . atomically . writeTVar nodeTipVar)
async updateNodeTip >>= link
let updateNodeTip = readChan nodeTipQ >>= (atomically . writeTVar nodeTipVar)
link =<< async (forever updateNodeTip)

action $ NetworkLayer
{ currentNodeTip = liftIO $ _currentNodeTip nodeTipVar
, watchNodeTip = _watchNodeTip registerCallback
, watchNodeTip = _watchNodeTip nodeTipQ
, nextBlocks = _nextBlocks
, initCursor = _initCursor
, destroyCursor = _destroyCursor
Expand All @@ -280,16 +272,14 @@ withNetworkLayer tr np addrInfo versionData action = do

connectNodeTipClient handlers = do
localTxSubmissionQ <- atomically newTQueue
nodeTipQ <- atomically newTQueue
nodeTipQ <- newChan
protocolParamsVar <- atomically $ newTVar $ W.protocolParameters np
nodeTipClient <- mkTipSyncClient tr np
localTxSubmissionQ
(atomically . writeTQueue nodeTipQ)
(writeChan nodeTipQ)
(atomically . writeTVar protocolParamsVar)
link =<< async
(connectClient tr handlers nodeTipClient versionData addrInfo)
let nodeTipUpdate = atomically $ readTQueue nodeTipQ
pure (nodeTipUpdate, protocolParamsVar, localTxSubmissionQ)
link =<< async (connectClient tr handlers nodeTipClient versionData addrInfo)
pure (nodeTipQ, protocolParamsVar, localTxSubmissionQ)

connectDelegationRewardsClient handlers = do
cmdQ <- atomically newTQueue
Expand Down Expand Up @@ -391,9 +381,13 @@ withNetworkLayer tr np addrInfo versionData action = do
liftIO $ traceWith tr $ MsgFetchedNodePoolLsqData res
return res

_watchNodeTip registerCallback cb =
ExceptT $ join $ registerCallback $
runExceptT . cb . fromTip getGenesisBlockHash getEpochLength
_watchNodeTip nodeTipQ cb = do
chan <- dupChan nodeTipQ
let toBlockHeader = fromTip getGenesisBlockHash getEpochLength
forever $ do
header <- toBlockHeader <$> readChan chan
traceWith tr (MsgWatcherUpdate header)
cb header

type instance GetStakeDistribution (IO Shelley) m
= (Point ShelleyBlock
Expand Down Expand Up @@ -697,51 +691,6 @@ handleMuxError tr onResourceVanished = pure . errorType >=> \case
traceWith tr Nothing
pure onResourceVanished

{-------------------------------------------------------------------------------
Tip Watcher
-------------------------------------------------------------------------------}

setupWatchers
:: Tracer IO (WatcherLog e)
-> IO v
-> IO ((v -> IO (Either e ())) -> IO (IO (Either e a)))
setupWatchers tr nodeTipUpdate = do
watchersVar <- atomically $ newTVar (0 :: Int, [])

let register cb = do
done <- atomically newEmptyTMVar
let finish = atomically . putTMVar done
i <- atomically $ stateTVar watchersVar $ \(nextId, watchers) ->
(nextId, (nextId + 1, ((nextId, cb, finish):watchers)))
traceWith tr $ MsgWatcherRegister i
pure $ atomically $ takeTMVar done

worker <- async $ forever $ do
update <- nodeTipUpdate
watchers <- fmap snd $ atomically $ readTVar watchersVar
forConcurrently watchers $ \(i, cb, finish) -> do
traceWith tr $ MsgWatcherUpdate i
cb update >>= \case
Right () -> pure ()
Left msg -> do
traceWith tr $ MsgWatcherDone i msg
-- unregister watcher
atomically $ modifyTVar watchersVar $ \(nextId, ws) ->
(nextId, filter ((/= i) . fst3) ws)
-- signal watcher to stop waiting
finish $ Left msg
link worker

pure register

stateTVar :: MonadSTMTx m => TVar_ m a -> (a -> (b, a)) -> m b
stateTVar var f = do
s <- readTVar var
let (a, s') = f s -- since we destructure this, we are strict in f
writeTVar var s'
return a
{-# INLINE stateTVar #-}

{-------------------------------------------------------------------------------
Logging
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -771,7 +720,7 @@ data NetworkLayerLog
Delegations RewardAccounts
| MsgDestroyCursor ThreadId
| MsgFetchedNodePoolLsqData NodePoolLsqData
| MsgWatchTip (WatcherLog String)
| MsgWatcherUpdate W.BlockHeader

data QueryClientName
= TipSyncClient
Expand Down Expand Up @@ -841,7 +790,8 @@ instance ToText NetworkLayerLog where
]
MsgFetchedNodePoolLsqData d ->
"Fetched pool data from node tip using LSQ: " <> pretty d
MsgWatchTip msg -> toText msg
MsgWatcherUpdate tip ->
"Update watcher with tip: " <> pretty tip

instance HasPrivacyAnnotation NetworkLayerLog
instance HasSeverityAnnotation NetworkLayerLog where
Expand All @@ -864,19 +814,4 @@ instance HasSeverityAnnotation NetworkLayerLog where
MsgAccountDelegationAndRewards{} -> Info
MsgDestroyCursor{} -> Notice
MsgFetchedNodePoolLsqData{} -> Info
MsgWatchTip{} -> Debug

data WatcherLog e
= MsgWatcherRegister Int
| MsgWatcherUpdate Int
| MsgWatcherDone Int e
deriving (Show, Eq)

instance ToText e => ToText (WatcherLog e) where
toText = \case
MsgWatcherRegister n ->
"watcher register " <> toText n
MsgWatcherUpdate n ->
"watcher update " <> toText n
MsgWatcherDone n msg ->
"watcher done " <> toText n <> " " <> toText msg
MsgWatcherUpdate{} -> Debug

0 comments on commit f42de76

Please sign in to comment.