From 8574e9815252a2120856fc79570708636d312a3c Mon Sep 17 00:00:00 2001 From: Rodney Lorrimar Date: Tue, 16 Jun 2020 23:35:31 +1000 Subject: [PATCH] Store delegation reward account balances in the database --- lib/byron/src/Cardano/Wallet/Byron.hs | 1 + lib/byron/src/Cardano/Wallet/Byron/Network.hs | 7 ++ lib/core/src/Cardano/Wallet.hs | 112 ++++++++++++------ lib/core/src/Cardano/Wallet/Api/Server.hs | 44 ++++--- lib/core/src/Cardano/Wallet/DB.hs | 20 +++- lib/core/src/Cardano/Wallet/DB/MVar.hs | 12 ++ lib/core/src/Cardano/Wallet/DB/Model.hs | 16 ++- lib/core/src/Cardano/Wallet/DB/Sqlite.hs | 18 +++ lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs | 10 ++ lib/core/src/Cardano/Wallet/Network.hs | 8 ++ .../unit/Cardano/Wallet/DB/StateMachine.hs | 18 +++ lib/core/test/unit/Cardano/WalletSpec.hs | 5 +- .../cardano-wallet-jormungandr.cabal | 3 + .../src/Cardano/Wallet/Jormungandr.hs | 1 + .../src/Cardano/Wallet/Jormungandr/Network.hs | 40 +++++-- .../Cardano/Wallet/Jormungandr/NetworkSpec.hs | 5 +- .../Cardano/Pool/Jormungandr/MetricsSpec.hs | 2 + .../Cardano/Wallet/Jormungandr/NetworkSpec.hs | 9 +- lib/shelley/cardano-wallet-shelley.cabal | 1 + lib/shelley/src/Cardano/Wallet/Shelley.hs | 11 +- .../src/Cardano/Wallet/Shelley/Network.hs | 98 +++++++++++++-- nix/.stack.nix/cardano-wallet-jormungandr.nix | 3 + nix/.stack.nix/cardano-wallet-shelley.nix | 1 + 23 files changed, 371 insertions(+), 74 deletions(-) diff --git a/lib/byron/src/Cardano/Wallet/Byron.hs b/lib/byron/src/Cardano/Wallet/Byron.hs index bf11d08f1c3..4210c109fd8 100644 --- a/lib/byron/src/Cardano/Wallet/Byron.hs +++ b/lib/byron/src/Cardano/Wallet/Byron.hs @@ -274,6 +274,7 @@ serveWallet (DefaultFieldValues $ getActiveSlotCoefficient gp) databaseDir Server.newApiLayer walletEngineTracer params nl' tl db + (\_ _ -> pure ()) where gp@GenesisParameters { getGenesisBlockHash diff --git a/lib/byron/src/Cardano/Wallet/Byron/Network.hs b/lib/byron/src/Cardano/Wallet/Byron/Network.hs index e64f93aaaff..49b98cf51d8 100644 --- a/lib/byron/src/Cardano/Wallet/Byron/Network.hs +++ b/lib/byron/src/Cardano/Wallet/Byron/Network.hs @@ -241,6 +241,7 @@ withNetworkLayer tr np addrInfo versionData action = do , postTx = _postTx localTxSubmissionQ , stakeDistribution = _stakeDistribution , getAccountBalance = _getAccountBalance + , watchNodeTip = _watchNodeTip } where gp@W.GenesisParameters @@ -295,9 +296,15 @@ withNetworkLayer tr np addrInfo versionData action = do case result of SubmitSuccess -> pure () SubmitFail err -> throwE $ ErrPostTxBadRequest $ T.pack (show err) + _stakeDistribution = notImplemented "stakeDistribution" + -- At the moment it's not necessasy to implement this method unless + -- monitoring a reward account. + _watchNodeTip = + notImplemented "stakeDistribution" + -- | Type representing a network client running two mini-protocols to sync -- from the chain and, submit transactions. type NetworkClient m = OuroborosApplication diff --git a/lib/core/src/Cardano/Wallet.hs b/lib/core/src/Cardano/Wallet.hs index 491198c071d..fa402a573d1 100644 --- a/lib/core/src/Cardano/Wallet.hs +++ b/lib/core/src/Cardano/Wallet.hs @@ -74,6 +74,7 @@ module Cardano.Wallet , updateWalletPassphrase , walletSyncProgress , fetchRewardBalance + , manageRewardBalance , rollbackBlocks , checkWalletIntegrity , ErrWalletAlreadyExists (..) @@ -209,8 +210,6 @@ import Cardano.Wallet.Primitive.AddressDerivation.Byron ( ByronKey ) import Cardano.Wallet.Primitive.AddressDerivation.Icarus ( IcarusKey ) -import Cardano.Wallet.Primitive.AddressDerivation.Shelley - ( ShelleyKey ) import Cardano.Wallet.Primitive.AddressDiscovery ( CompareDiscovery (..) , GenChange (..) @@ -355,7 +354,7 @@ import Data.Generics.Product.Typed import Data.List.NonEmpty ( NonEmpty ) import Data.Maybe - ( isJust, mapMaybe ) + ( mapMaybe ) import Data.Quantity ( Quantity (..) ) import Data.Set @@ -364,8 +363,6 @@ import Data.Text.Class ( ToText (..) ) import Data.Time.Clock ( UTCTime, getCurrentTime ) -import Data.Type.Equality - ( testEquality ) import Data.Vector.Shuffle ( shuffle ) import Data.Word @@ -380,8 +377,6 @@ import Safe ( lastMay ) import Statistics.Quantile ( medianUnbiased, quantiles ) -import Type.Reflection - ( Typeable, typeRep ) import qualified Cardano.Wallet.Primitive.AddressDiscovery.Random as Rnd import qualified Cardano.Wallet.Primitive.AddressDiscovery.Sequential as Seq @@ -866,41 +861,47 @@ deleteWallet ctx wid = db & \DBLayer{..} -> do where db = ctx ^. dbLayer @s @k --- | Fetch the reward balance of a given wallet. +-- | Fetch the cached reward balance of a given wallet from the database. +fetchRewardBalance + :: forall ctx s k. + ( HasDBLayer s k ctx + ) + => ctx + -> WalletId + -> IO (Quantity "lovelace" Word64) +fetchRewardBalance ctx wid = db & \DBLayer{..} -> + atomically $ readDelegationRewardBalance pk + where + pk = PrimaryKey wid + db = ctx ^. dbLayer @s @k + +-- | Query the node for the reward balance of a given wallet. -- -- Rather than force all callers of 'readWallet' to wait for fetching the -- account balance (via the 'NetworkLayer'), we expose this function for it. -fetchRewardBalance +queryRewardBalance :: forall ctx s t k. ( HasDBLayer s k ctx , HasNetworkLayer t ctx , HasRewardAccount s k - , HasLogger WalletLog ctx - , Typeable k ) => ctx -> WalletId -> ExceptT ErrFetchRewards IO (Quantity "lovelace" Word64) -fetchRewardBalance ctx wid = db & \DBLayer{..} -> do - -- FIXME: issue #1750 re-enable querying reward balance when it's faster - if isShelleyKey then do - lift $ traceWith tr MsgTemporaryDisableFetchReward - pure $ Quantity 0 - else do - let pk = PrimaryKey wid - cp <- withExceptT ErrFetchRewardsNoSuchWallet - . mapExceptT atomically - . withNoSuchWallet wid - $ readCheckpoint pk - mapExceptT (fmap handleErr) - . getAccountBalance nw - . toChimericAccount @s @k - . rewardAccountKey - $ getState cp +queryRewardBalance ctx wid = db & \DBLayer{..} -> do + cp <- withExceptT ErrFetchRewardsNoSuchWallet + . mapExceptT atomically + . withNoSuchWallet wid + $ readCheckpoint pk + mapExceptT (fmap handleErr) + . getAccountBalance nw + . toChimericAccount @s @k + . rewardAccountKey + $ getState cp where + pk = PrimaryKey wid db = ctx ^. dbLayer @s @k nw = ctx ^. networkLayer @t - tr = ctx ^. logger handleErr = \case Right x -> Right x Left (ErrGetAccountBalanceAccountNotFound _) -> @@ -908,9 +909,37 @@ fetchRewardBalance ctx wid = db & \DBLayer{..} -> do Left (ErrGetAccountBalanceNetworkUnreachable e) -> Left $ ErrFetchRewardsNetworkUnreachable e - isShelleyKey = isJust $ testEquality - (typeRep @(k 'RootK XPrv)) - (typeRep @(ShelleyKey 'RootK XPrv)) +manageRewardBalance + :: forall ctx s t k. + ( HasLogger WalletLog ctx + , HasNetworkLayer t ctx + , HasDBLayer s k ctx + , HasRewardAccount s k + , ctx ~ WalletLayer s t k + ) + => ctx + -> WalletId + -> IO () +manageRewardBalance ctx wid = db & \DBLayer{..} -> do + Left err <- runExceptT $ watchNodeTip $ \bh -> do + lift $ traceWith tr $ MsgRewardBalanceQuery bh + res <- lift $ runExceptT (queryRewardBalance @ctx @s @t @k ctx wid) + lift $ traceWith tr $ MsgRewardBalanceResult res + case res of + Right amt -> + withExceptT show $ + mapExceptT atomically $ putDelegationRewardBalance pk amt + Left _err -> + -- Occasionaly failing to query is generally not fatal. It will + -- just update the balance next time the tip changes. + pure () + traceWith tr $ MsgRewardBalanceFinish err + + where + pk = PrimaryKey wid + db = ctx ^. dbLayer @s @k + NetworkLayer{watchNodeTip} = ctx ^. networkLayer @t + tr = ctx ^. logger @WalletLog {------------------------------------------------------------------------------- Address @@ -1930,6 +1959,7 @@ data ErrQuitStakePool data ErrFetchRewards = ErrFetchRewardsNetworkUnreachable ErrNetworkUnavailable | ErrFetchRewardsNoSuchWallet ErrNoSuchWallet + deriving (Generic, Eq, Show) data ErrSelectForMigration = ErrSelectForMigrationNoSuchWallet ErrNoSuchWallet @@ -2031,7 +2061,9 @@ data WalletLog | MsgDelegationCoinSelection CoinSelection | MsgPaymentCoinSelection CoinSelection | MsgPaymentCoinSelectionAdjusted CoinSelection - | MsgTemporaryDisableFetchReward + | MsgRewardBalanceQuery BlockHeader + | MsgRewardBalanceResult (Either ErrFetchRewards (Quantity "lovelace" Word64)) + | MsgRewardBalanceFinish String deriving (Show, Eq) instance ToText WalletLog where @@ -2073,8 +2105,15 @@ instance ToText WalletLog where "Coins selected for payment: \n" <> pretty sel MsgPaymentCoinSelectionAdjusted sel -> "Coins after fee adjustment: \n" <> pretty sel - MsgTemporaryDisableFetchReward -> - "FIXME: (issue #1750) fetching reward temporarily disabled." + MsgRewardBalanceQuery bh -> + "Updating the reward balance for block " <> pretty bh + MsgRewardBalanceResult (Right amt) -> + "The reward balance is " <> pretty amt + MsgRewardBalanceResult (Left err) -> + "Problem fetching reward balance. Will try again on next chain update. " <> + T.pack (show err) + MsgRewardBalanceFinish err -> + "Reward balance worker has finished due to: " <> T.pack err instance HasPrivacyAnnotation WalletLog instance HasSeverityAnnotation WalletLog where @@ -2093,4 +2132,7 @@ instance HasSeverityAnnotation WalletLog where MsgDelegationCoinSelection _ -> Debug MsgPaymentCoinSelection _ -> Debug MsgPaymentCoinSelectionAdjusted _ -> Debug - MsgTemporaryDisableFetchReward -> Warning + MsgRewardBalanceQuery _ -> Debug + MsgRewardBalanceResult (Right _) -> Debug + MsgRewardBalanceResult (Left _) -> Notice + MsgRewardBalanceFinish _ -> Debug diff --git a/lib/core/src/Cardano/Wallet/Api/Server.hs b/lib/core/src/Cardano/Wallet/Api/Server.hs index 2309862d17b..cb04fa8268c 100644 --- a/lib/core/src/Cardano/Wallet/Api/Server.hs +++ b/lib/core/src/Cardano/Wallet/Api/Server.hs @@ -83,6 +83,9 @@ module Cardano.Wallet.Api.Server , rndStateChange , assignMigrationAddresses , withWorkerCtx + + -- * Workers + , manageRewardBalance ) where import Prelude @@ -130,6 +133,7 @@ import Cardano.Wallet , HasLogger , WalletLog , genesisData + , manageRewardBalance ) import Cardano.Wallet.Api ( ApiLayer (..) @@ -275,6 +279,8 @@ import Cardano.Wallet.Unsafe ( unsafeRunExceptT ) import Control.Arrow ( second ) +import Control.Concurrent.Async + ( concurrently_ ) import Control.Exception ( IOException, bracket, throwIO, tryJust ) import Control.Monad @@ -368,8 +374,6 @@ import System.IO.Error ) import System.Random ( getStdRandom, random ) -import Type.Reflection - ( Typeable ) import qualified Cardano.Wallet as W import qualified Cardano.Wallet.Network as NW @@ -512,7 +516,6 @@ postWallet , Bounded (Index (AddressIndexDerivationType k) 'AddressK) , HasDBFactory s k ctx , HasWorkerRegistry s k ctx - , Typeable k ) => ctx -> ((SomeMnemonic, Maybe SomeMnemonic) -> Passphrase "encryption" -> k 'RootK XPrv) @@ -535,7 +538,6 @@ postShelleyWallet , HasDBFactory s k ctx , HasWorkerRegistry s k ctx , HasRewardAccount s k - , Typeable k ) => ctx -> ((SomeMnemonic, Maybe SomeMnemonic) -> Passphrase "encryption" -> k 'RootK XPrv) @@ -546,6 +548,7 @@ postShelleyWallet ctx generateKey body = do void $ liftHandler $ initWorker @_ @s @k ctx wid (\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName state) (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @t @k wrk wid) + (\wrk -> W.manageRewardBalance @(WorkerCtx ctx) @s @t @k wrk wid) withWorkerCtx @_ @s @k ctx wid liftE liftE $ \wrk -> liftHandler $ W.attachPrivateKeyFromPwd @_ @s @k wrk wid (rootXPrv, pwd) fst <$> getWallet ctx (mkShelleyWallet @_ @s @t @k) (ApiT wid) @@ -579,6 +582,7 @@ postAccountWallet ctx mkWallet liftKey body = do void $ liftHandler $ initWorker @_ @s @k ctx wid (\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName state) (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @t @k wrk wid) + (\wrk -> W.manageRewardBalance @(WorkerCtx ctx) @s @t @k wrk wid) fst <$> getWallet ctx mkWallet (ApiT wid) where g = maybe defaultAddressPoolGap getApiT (body ^. #addressPoolGap) @@ -592,14 +596,14 @@ mkShelleyWallet ( ctx ~ ApiLayer s t k , s ~ SeqState n k , IsOurs s Address - , HasRewardAccount s k , HasWorkerRegistry s k ctx - , Typeable k ) => MkApiWallet ctx s ApiWallet mkShelleyWallet ctx wid cp meta pending progress = do - reward <- withWorkerCtx @_ @s @k ctx wid liftE liftE $ \wrk -> liftHandler $ - W.fetchRewardBalance @_ @s @t @k wrk wid + reward <- withWorkerCtx @_ @s @k ctx wid liftE liftE $ \wrk -> + -- never fails - returns zero if balance not found + Handler $ ExceptT $ Right <$> + W.fetchRewardBalance @_ @s @k wrk wid pure ApiWallet { addressPoolGap = ApiT $ getState cp ^. #externalPool . #gap , balance = ApiT $ WalletBalance @@ -665,6 +669,7 @@ postLegacyWallet postLegacyWallet ctx (rootXPrv, pwd) createWallet = do void $ liftHandler $ initWorker @_ @s @k ctx wid (`createWallet` wid) (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @t @k wrk wid) + (\_ -> pure ()) withWorkerCtx ctx wid liftE liftE $ \wrk -> liftHandler $ W.attachPrivateKeyFromPwd wrk wid (rootXPrv, pwd) fst <$> getWallet ctx mkLegacyWallet (ApiT wid) @@ -757,6 +762,7 @@ postRandomWalletFromXPrv ctx body = do void $ liftHandler $ initWorker @_ @s @k ctx wid (\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName s) (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @t @k wrk wid) + (\_ -> pure ()) withWorkerCtx ctx wid liftE liftE $ \wrk -> liftHandler $ W.attachPrivateKeyFromPwdHash wrk wid (byronKey, pwd) fst <$> getWallet ctx mkLegacyWallet (ApiT wid) @@ -1454,8 +1460,10 @@ initWorker -- ^ Create action -> (WorkerCtx ctx -> ExceptT ErrNoSuchWallet IO ()) -- ^ Restore action + -> (WorkerCtx ctx -> IO ()) + -- ^ Action to run concurrently with restore -> ExceptT ErrCreateWallet IO WalletId -initWorker ctx wid createWallet restoreWallet = +initWorker ctx wid createWallet restoreWallet coworker = liftIO (Registry.lookup re wid) >>= \case Just _ -> throwE $ ErrCreateWalletAlreadyExists $ ErrWalletAlreadyExists wid @@ -1475,7 +1483,9 @@ initWorker ctx wid createWallet restoreWallet = , workerMain = \ctx' _ -> do -- FIXME: -- Review error handling here - unsafeRunExceptT $ restoreWallet ctx' + concurrently_ + (unsafeRunExceptT $ restoreWallet ctx') + (coworker ctx') , workerAfter = defaultWorkerAfter @@ -1591,11 +1601,13 @@ newApiLayer -> NetworkLayer IO t Block -> TransactionLayer t k -> DBFactory IO s k + -> (WorkerCtx ctx -> WalletId -> IO ()) + -- ^ Action to run concurrently with wallet restore -> IO ctx -newApiLayer tr g0 nw tl df = do +newApiLayer tr g0 nw tl df coworker = do re <- Registry.empty let ctx = ApiLayer tr g0 nw tl df re - listDatabases df >>= mapM_ (registerWorker ctx) + listDatabases df >>= mapM_ (registerWorker ctx coworker) return ctx -- | Register a restoration worker to the registry. @@ -1606,9 +1618,10 @@ registerWorker , IsOurs s Address ) => ApiLayer s t k + -> (WorkerCtx ctx -> WalletId -> IO ()) -> WalletId -> IO () -registerWorker ctx wid = +registerWorker ctx coworker wid = void $ Registry.register @_ @ctx re ctx wid config where (_, NetworkParameters gp _, _) = ctx ^. genesisData @@ -1622,8 +1635,9 @@ registerWorker ctx wid = , workerMain = \ctx' _ -> do -- FIXME: -- Review error handling here - unsafeRunExceptT $ - W.restoreWallet @(WorkerCtx ctx) @s @t ctx' wid + concurrently_ + (unsafeRunExceptT $ W.restoreWallet @(WorkerCtx ctx) @s @t ctx' wid) + (coworker ctx' wid) , workerAfter = defaultWorkerAfter diff --git a/lib/core/src/Cardano/Wallet/DB.hs b/lib/core/src/Cardano/Wallet/DB.hs index d21a6e942b8..c1b88515ef3 100644 --- a/lib/core/src/Cardano/Wallet/DB.hs +++ b/lib/core/src/Cardano/Wallet/DB.hs @@ -59,7 +59,7 @@ import Control.Monad.Trans.Except import Data.Quantity ( Quantity (..) ) import Data.Word - ( Word32 ) + ( Word32, Word64 ) import qualified Data.List as L @@ -184,6 +184,24 @@ data DBLayer m s k = forall stm. (MonadIO stm, MonadFail stm) => DBLayer -- 1. Stored on-chain. -- 2. Affected by rollbacks (or said differently, tied to a 'SlotId'). + , putDelegationRewardBalance + :: PrimaryKey WalletId + -> Quantity "lovelace" Word64 + -> ExceptT ErrNoSuchWallet stm () + -- ^ Store the latest known reward account balance. + -- + -- This is separate from checkpoints because the data corresponds to the + -- node tip. + -- This is separate from putWalletMeta because it's not meta data + + , readDelegationRewardBalance + :: PrimaryKey WalletId + -> stm (Quantity "lovelace" Word64) + -- ^ Get the reward account balance. + -- + -- Returns zero if the wallet isn't found or if wallet hasn't delegated + -- stake. + , putTxHistory :: PrimaryKey WalletId -> [(Tx, TxMeta)] diff --git a/lib/core/src/Cardano/Wallet/DB/MVar.hs b/lib/core/src/Cardano/Wallet/DB/MVar.hs index 728314d37f9..b9aad9b7868 100644 --- a/lib/core/src/Cardano/Wallet/DB/MVar.hs +++ b/lib/core/src/Cardano/Wallet/DB/MVar.hs @@ -39,11 +39,13 @@ import Cardano.Wallet.DB.Model , mListWallets , mPutCheckpoint , mPutDelegationCertificate + , mPutDelegationRewardBalance , mPutPrivateKey , mPutProtocolParameters , mPutTxHistory , mPutWalletMeta , mReadCheckpoint + , mReadDelegationRewardBalance , mReadPrivateKey , mReadProtocolParameters , mReadTxHistory @@ -162,6 +164,16 @@ newDBLayer = do , readProtocolParameters = readDB db . mReadProtocolParameters + {----------------------------------------------------------------------- + Delegation Rewards + -----------------------------------------------------------------------} + + , putDelegationRewardBalance = \pk amt -> ExceptT $ + alterDB errNoSuchWallet db (mPutDelegationRewardBalance pk amt) + + , readDelegationRewardBalance = + readDB db . mReadDelegationRewardBalance + {----------------------------------------------------------------------- Execution -----------------------------------------------------------------------} diff --git a/lib/core/src/Cardano/Wallet/DB/Model.hs b/lib/core/src/Cardano/Wallet/DB/Model.hs index 9a6170736e4..593283d9c8f 100644 --- a/lib/core/src/Cardano/Wallet/DB/Model.hs +++ b/lib/core/src/Cardano/Wallet/DB/Model.hs @@ -57,6 +57,8 @@ module Cardano.Wallet.DB.Model , mReadPrivateKey , mPutProtocolParameters , mReadProtocolParameters + , mPutDelegationRewardBalance + , mReadDelegationRewardBalance , mCheckWallet ) where @@ -107,7 +109,7 @@ import Data.Ord import Data.Quantity ( Quantity (..) ) import Data.Word - ( Word32 ) + ( Word32, Word64 ) import GHC.Generics ( Generic ) @@ -141,6 +143,7 @@ data WalletDatabase s xprv = WalletDatabase , txHistory :: !(Map (Hash "Tx") TxMeta) , xprv :: !(Maybe xprv) , protocolParameters :: !ProtocolParameters + , rewardAccountBalance :: !(Quantity "lovelace" Word64) } deriving (Show, Eq, Generic) -- | Shorthand for the putTxHistory argument type. @@ -204,6 +207,7 @@ mInitializeWallet wid cp meta txs0 pp db@Database{wallets,txs} , txHistory = history , xprv = Nothing , protocolParameters = pp + , rewardAccountBalance = minBound } txs' = Map.fromList $ (\(tx, _) -> (txId tx, tx)) <$> txs0 history = Map.fromList $ first txId <$> txs0 @@ -440,6 +444,16 @@ mReadProtocolParameters mReadProtocolParameters wid db@(Database wallets _) = (Right (protocolParameters <$> Map.lookup wid wallets), db) +mPutDelegationRewardBalance + :: Ord wid => wid -> Quantity "lovelace" Word64 -> ModelOp wid s xprv () +mPutDelegationRewardBalance wid amt = alterModel wid $ \wal -> + ((), wal { rewardAccountBalance = amt }) + +mReadDelegationRewardBalance + :: Ord wid => wid -> ModelOp wid s xprv (Quantity "lovelace" Word64) +mReadDelegationRewardBalance wid db@(Database wallets _) = + (Right (maybe minBound rewardAccountBalance $ Map.lookup wid wallets), db) + {------------------------------------------------------------------------------- Model function helpers -------------------------------------------------------------------------------} diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs index 77314f730be..240c3a04ccc 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs @@ -66,6 +66,7 @@ import Cardano.Wallet.DB import Cardano.Wallet.DB.Sqlite.TH ( Checkpoint (..) , DelegationCertificate (..) + , DelegationReward (..) , EntityField (..) , Key (..) , PrivateKey (..) @@ -683,6 +684,23 @@ newDBLayer trace defaultFieldValues mDatabaseFile = do , readProtocolParameters = \(PrimaryKey wid) -> selectProtocolParameters wid + {----------------------------------------------------------------------- + Delegation Rewards + -----------------------------------------------------------------------} + + , putDelegationRewardBalance = + \(PrimaryKey wid) (Quantity amt) -> ExceptT $ do + selectWallet wid >>= \case + Nothing -> pure $ Left $ ErrNoSuchWallet wid + Just _ -> Right <$> repsert + (DelegationRewardKey wid) + (DelegationReward wid amt) + + , readDelegationRewardBalance = + \(PrimaryKey wid) -> + maybe minBound (Quantity . rewardAccountBalance . entityVal) <$> + selectFirst [RewardWalletId ==. wid] [] + {----------------------------------------------------------------------- ACID Execution -----------------------------------------------------------------------} diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs index 226ad53e56a..007ae870d5b 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs @@ -165,6 +165,16 @@ DelegationCertificate Foreign Wallet delegationCertificate certWalletId ! ON DELETE CASCADE deriving Show Generic +-- Latest balance of the reward account associated with +-- the stake key of this wallet. +DelegationReward + rewardWalletId W.WalletId sql=wallet_id + rewardAccountBalance Word64 sql=account_balance + + Primary rewardWalletId + Foreign Wallet delegationReward rewardWalletId ! ON DELETE CASCADE + deriving Show Generic + -- The UTxO for a given wallet checkpoint is a one-to-one mapping from TxIn -> -- TxOut. This table does not need to refer to the TxIn or TxOut tables. All -- necessary information for the UTxO is in this table. diff --git a/lib/core/src/Cardano/Wallet/Network.hs b/lib/core/src/Cardano/Wallet/Network.hs index 0269b03414e..b86ab67943a 100644 --- a/lib/core/src/Cardano/Wallet/Network.hs +++ b/lib/core/src/Cardano/Wallet/Network.hs @@ -123,6 +123,14 @@ data NetworkLayer m target block = NetworkLayer :: ExceptT ErrCurrentNodeTip m BlockHeader -- ^ Get the current tip from the chain producer + , watchNodeTip + :: (BlockHeader -> ExceptT String m ()) + -> ExceptT String m () + -- ^ Register a callback for when the node tip changes. + -- If the given callback fails, the watcher stops. Otherwise, this + -- function never returns. + -- fixme: needs to be polymorphic on error type + , getProtocolParameters :: m ProtocolParameters diff --git a/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs b/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs index a0c76f503ac..247a7ed97af 100644 --- a/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs +++ b/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs @@ -75,11 +75,13 @@ import Cardano.Wallet.DB.Model , mListWallets , mPutCheckpoint , mPutDelegationCertificate + , mPutDelegationRewardBalance , mPutPrivateKey , mPutProtocolParameters , mPutTxHistory , mPutWalletMeta , mReadCheckpoint + , mReadDelegationRewardBalance , mReadPrivateKey , mReadProtocolParameters , mReadTxHistory @@ -162,6 +164,8 @@ import Data.Set ( Set ) import Data.TreeDiff ( ToExpr (..), defaultExprViaShow, genericToExpr ) +import Data.Word + ( Word64 ) import GHC.Generics ( Generic ) import System.Random @@ -294,6 +298,8 @@ data Cmd s wid | RollbackTo wid SlotId | RemovePendingTx wid (Hash "Tx") | PutDelegationCertificate wid DelegationCertificate SlotId + | PutDelegationRewardBalance wid (Quantity "lovelace" Word64) + | ReadDelegationRewardBalance wid deriving (Show, Functor, Foldable, Traversable) data Success s wid @@ -307,6 +313,7 @@ data Success s wid | ProtocolParams (Maybe ProtocolParameters) | BlockHeaders [BlockHeader] | Point SlotId + | DelegationRewardBalance (Quantity "lovelace" Word64) deriving (Show, Eq, Functor, Foldable, Traversable) newtype Resp s wid @@ -362,6 +369,10 @@ runMock = \case first (Resp . fmap Unit) . mPutProtocolParameters wid pp ReadProtocolParameters wid -> first (Resp . fmap ProtocolParams) . mReadProtocolParameters wid + PutDelegationRewardBalance wid amt -> + first (Resp . fmap Unit) . mPutDelegationRewardBalance wid amt + ReadDelegationRewardBalance wid -> + first (Resp . fmap DelegationRewardBalance) . mReadDelegationRewardBalance wid RollbackTo wid sl -> first (Resp . fmap Point) . mRollbackTo wid sl RemovePendingTx wid tid -> @@ -422,6 +433,10 @@ runIO db@DBLayer{..} = fmap Resp . go mapExceptT atomically $ putProtocolParameters (PrimaryKey wid) pp ReadProtocolParameters wid -> Right . ProtocolParams <$> atomically (readProtocolParameters $ PrimaryKey wid) + PutDelegationRewardBalance wid amt -> catchNoSuchWallet Unit $ + mapExceptT atomically $ putDelegationRewardBalance (PrimaryKey wid) amt + ReadDelegationRewardBalance wid -> Right . DelegationRewardBalance <$> + atomically (readDelegationRewardBalance $ PrimaryKey wid) RollbackTo wid sl -> catchNoSuchWallet Point $ mapExceptT atomically $ rollbackTo (PrimaryKey wid) sl @@ -692,6 +707,8 @@ instance CommandNames (At (Cmd s)) where cmdName (At ReadPrivateKey{}) = "ReadPrivateKey" cmdName (At PutProtocolParameters{}) = "PutProtocolParameters" cmdName (At ReadProtocolParameters{}) = "ReadProtocolParameters" + cmdName (At PutDelegationRewardBalance{}) = "PutDelegationRewardBalance" + cmdName (At ReadDelegationRewardBalance{}) = "ReadDelegationRewardBalance" cmdName (At RollbackTo{}) = "RollbackTo" cmdName (At RemovePendingTx{}) = "RemovePendingTx" cmdNames _ = @@ -702,6 +719,7 @@ instance CommandNames (At (Cmd s)) where , "PutTxHistory", "ReadTxHistory", "RemovePendingTx" , "PutPrivateKey", "ReadPrivateKey" , "PutProtocolParameters", "ReadProtocolParameters" + , "PutDelegationRewardBalance", "ReadDelegationRewardBalance" ] instance Functor f => Rank2.Functor (At f) where diff --git a/lib/core/test/unit/Cardano/WalletSpec.hs b/lib/core/test/unit/Cardano/WalletSpec.hs index aadbacefa2f..dd81ea4c075 100644 --- a/lib/core/test/unit/Cardano/WalletSpec.hs +++ b/lib/core/test/unit/Cardano/WalletSpec.hs @@ -659,7 +659,7 @@ dummyNetworkLayer = NetworkLayer , cursorSlotId = error "dummyNetworkLayer: cursorSlotId not implemented" , currentNodeTip = - pure $ BlockHeader (SlotId 0 0) (Quantity 0) dummyHash dummyHash + pure dummyTip , getProtocolParameters = error "dummyNetworkLayer: getProtocolParameters not implemented" , postTx = @@ -668,8 +668,11 @@ dummyNetworkLayer = NetworkLayer error "dummyNetworkLayer: stakeDistribution not implemented" , getAccountBalance = error "dummyNetworkLayer: getAccountBalance not implemented" + , watchNodeTip = + error "dummyNetworkLayer: watchNodeTip not implemented" } where + dummyTip = BlockHeader (SlotId 0 0) (Quantity 0) dummyHash dummyHash dummyHash = Hash "dummy hash" newtype DummyState diff --git a/lib/jormungandr/cardano-wallet-jormungandr.cabal b/lib/jormungandr/cardano-wallet-jormungandr.cabal index bf279f91103..dda678737ca 100644 --- a/lib/jormungandr/cardano-wallet-jormungandr.cabal +++ b/lib/jormungandr/cardano-wallet-jormungandr.cabal @@ -69,6 +69,7 @@ library , servant-client , servant-client-core , servant-server + , stm , temporary , text , text-class @@ -169,6 +170,7 @@ test-suite unit , safe , servant , servant-swagger + , stm , swagger2 , temporary , text @@ -243,6 +245,7 @@ test-suite jormungandr-integration , process , retry , servant + , stm , temporary , text , text-class diff --git a/lib/jormungandr/src/Cardano/Wallet/Jormungandr.hs b/lib/jormungandr/src/Cardano/Wallet/Jormungandr.hs index 89be77891f1..59e92b9fe14 100644 --- a/lib/jormungandr/src/Cardano/Wallet/Jormungandr.hs +++ b/lib/jormungandr/src/Cardano/Wallet/Jormungandr.hs @@ -287,6 +287,7 @@ serveWallet Tracers{..} sTolerance databaseDir hostPref listen backend beforeMai databaseDir Server.newApiLayer walletEngineTracer (toWLBlock block0, np, sTolerance) nl' tl db + (\_ _ -> pure ()) where nl' = toWLBlock <$> nl diff --git a/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs b/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs index 85d8e643f45..585466cc939 100644 --- a/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs +++ b/lib/jormungandr/src/Cardano/Wallet/Jormungandr/Network.hs @@ -139,12 +139,16 @@ import Cardano.Wallet.Primitive.Types ) import Control.Concurrent.MVar.Lifted ( MVar, modifyMVar, newMVar, readMVar ) +import Control.Concurrent.STM + ( atomically ) +import Control.Concurrent.STM.TChan + ( TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan ) import Control.Exception ( Exception ) import Control.Monad - ( void ) + ( forever, unless, void ) import Control.Monad.IO.Class - ( liftIO ) + ( MonadIO (..), liftIO ) import Control.Monad.Trans.Class ( lift ) import Control.Monad.Trans.Control @@ -157,6 +161,8 @@ import Data.ByteArray.Encoding ( Base (Base16), convertToBase ) import Data.Coerce ( coerce ) +import Data.IORef + ( newIORef, readIORef, writeIORef ) import Data.Map.Strict ( Map ) import Data.Quantity @@ -278,7 +284,8 @@ newNetworkLayer tr baseUrl block0H = do liftIO $ waitForService "Jörmungandr" tr' (Port $ baseUrlPort baseUrl) $ waitForNetwork (void $ getTipId jor) defaultRetryPolicy (block0, np) <- getInitialBlockchainParameters jor (coerce block0H) - return ((block0, np), mkRawNetworkLayer np 1000 st jor) + chan <- liftIO newBroadcastTChanIO + return ((block0, np), mkRawNetworkLayer np 1000 st chan jor) -- | Wrap a Jormungandr client into a 'NetworkLayer' common interface. -- @@ -287,6 +294,7 @@ newNetworkLayer tr baseUrl block0H = do mkRawNetworkLayer :: forall m t block. ( MonadBaseControl IO m + , MonadIO m , t ~ Jormungandr , block ~ J.Block ) @@ -294,9 +302,10 @@ mkRawNetworkLayer -> Word -- ^ Batch size when fetching blocks from Jörmungandr -> MVar BlockHeaders + -> TChan BlockHeader -> JormungandrClient m -> NetworkLayer m t block -mkRawNetworkLayer np batchSize st j = NetworkLayer +mkRawNetworkLayer np batchSize st tipNotify j = NetworkLayer { currentNodeTip = _currentNodeTip @@ -323,6 +332,9 @@ mkRawNetworkLayer np batchSize st j = NetworkLayer , getAccountBalance = _getAccountBalance + + , watchNodeTip = + _watchNodeTip } where -- security parameter, the maximum number of unstable blocks. @@ -343,9 +355,11 @@ mkRawNetworkLayer np batchSize st j = NetworkLayer _currentNodeTip = modifyMVar st $ \bs -> do let tip = withExceptT liftE $ getTipId j bs' <- withExceptT liftE $ updateUnstableBlocks k tip (getBlockHeader j) bs - ExceptT . pure $ case blockHeadersTip bs' of - Just t -> Right (bs', t) - Nothing -> Left ErrCurrentNodeTipNotFound + ExceptT $ case blockHeadersTip bs' of + Just t -> do + liftIO $ notifyWatchers t + pure $ Right (bs', t) + Nothing -> pure $ Left ErrCurrentNodeTipNotFound _getProtocolParameters :: m ProtocolParameters _getProtocolParameters = pure $ protocolParameters np @@ -459,6 +473,18 @@ mkRawNetworkLayer np batchSize st j = NetworkLayer _ -> RollBackward $ Cursor emptyBlockHeaders + _watchNodeTip cb = do + watcher <- liftIO . atomically $ dupTChan tipNotify + prevVar <- liftIO $ newIORef Nothing + forever $ do + bh <- liftIO . atomically $ readTChan watcher + prev <- liftIO $ readIORef prevVar + unless (Just bh == prev) $ do + cb bh + liftIO $ writeIORef prevVar (Just bh) + + notifyWatchers = atomically . writeTChan tipNotify + {------------------------------------------------------------------------------- Queries diff --git a/lib/jormungandr/test/integration/Cardano/Wallet/Jormungandr/NetworkSpec.hs b/lib/jormungandr/test/integration/Cardano/Wallet/Jormungandr/NetworkSpec.hs index d293c565fe8..ef2bcd56731 100644 --- a/lib/jormungandr/test/integration/Cardano/Wallet/Jormungandr/NetworkSpec.hs +++ b/lib/jormungandr/test/integration/Cardano/Wallet/Jormungandr/NetworkSpec.hs @@ -84,6 +84,8 @@ import Control.Concurrent ( threadDelay ) import Control.Concurrent.MVar ( newMVar ) +import Control.Concurrent.STM.TChan + ( newBroadcastTChanIO ) import Control.DeepSeq ( deepseq ) import Control.Exception @@ -198,9 +200,10 @@ spec = do newBrokenNetworkLayer baseUrl = do mgr <- newManager defaultManagerSettings st <- newMVar emptyBlockHeaders + chan <- newBroadcastTChanIO let jor = Jormungandr.mkJormungandrClient mgr baseUrl let g0 = error "GenesisParameters" - return (void $ mkRawNetworkLayer g0 1000 st jor) + return (void $ mkRawNetworkLayer g0 1000 st chan jor) let makeUnreachableNetworkLayer = do port <- head <$> randomUnusedTCPPorts 1 diff --git a/lib/jormungandr/test/unit/Cardano/Pool/Jormungandr/MetricsSpec.hs b/lib/jormungandr/test/unit/Cardano/Pool/Jormungandr/MetricsSpec.hs index 81f056aa7c3..4aae3fac9c7 100644 --- a/lib/jormungandr/test/unit/Cardano/Pool/Jormungandr/MetricsSpec.hs +++ b/lib/jormungandr/test/unit/Cardano/Pool/Jormungandr/MetricsSpec.hs @@ -340,6 +340,8 @@ mockNetworkLayer = NetworkLayer error "mockNetworkLayer: stakeDistribution" , getAccountBalance = \_ -> error "mockNetworkLayer: getAccountBalance" + , watchNodeTip = + \_ -> error "mockNetworkLayer: watchNodeTip" } header0 :: BlockHeader diff --git a/lib/jormungandr/test/unit/Cardano/Wallet/Jormungandr/NetworkSpec.hs b/lib/jormungandr/test/unit/Cardano/Wallet/Jormungandr/NetworkSpec.hs index 329c76eb4e7..aa75b7f00df 100644 --- a/lib/jormungandr/test/unit/Cardano/Wallet/Jormungandr/NetworkSpec.hs +++ b/lib/jormungandr/test/unit/Cardano/Wallet/Jormungandr/NetworkSpec.hs @@ -40,8 +40,12 @@ import Cardano.Wallet.Primitive.Types ) import Control.Concurrent.MVar.Lifted ( MVar, newMVar, readMVar ) +import Control.Concurrent.STM.TChan + ( newBroadcastTChanIO ) import Control.Monad.Fail ( MonadFail ) +import Control.Monad.IO.Class + ( MonadIO, liftIO ) import Control.Monad.Trans.Class ( lift ) import Control.Monad.Trans.Control @@ -337,17 +341,18 @@ type TestNetworkLayer m = -- | Instantiate new network layer with mock jormungandr. mockNetworkLayer - :: forall m. (MonadFail m, MonadBaseControl IO m) + :: forall m. (MonadFail m, MonadBaseControl IO m, MonadIO m) => (String -> StateT S m ()) -- ^ logger function -> StateT S m (TestNetworkLayer m, Cursor Jormungandr -> m (Maybe BlockHeader)) mockNetworkLayer logLine = do let jm = mockJormungandrClient logLine Quantity k <- gets mockNodeK st <- newMVar emptyBlockHeaders + chan <- liftIO newBroadcastTChanIO Right (_b0, np) <- runExceptT $ getInitialBlockchainParameters jm genesisHash pure - ( fromJBlock <$> mkRawNetworkLayer np (fromIntegral k) st jm + ( fromJBlock <$> mkRawNetworkLayer np (fromIntegral k) st chan jm , findIntersection st ) where diff --git a/lib/shelley/cardano-wallet-shelley.cabal b/lib/shelley/cardano-wallet-shelley.cabal index b13fa5cb086..7b01d62e636 100644 --- a/lib/shelley/cardano-wallet-shelley.cabal +++ b/lib/shelley/cardano-wallet-shelley.cabal @@ -54,6 +54,7 @@ library , cryptonite , directory , exceptions + , extra , filepath , fmt , generic-lens diff --git a/lib/shelley/src/Cardano/Wallet/Shelley.hs b/lib/shelley/src/Cardano/Wallet/Shelley.hs index 64f103267f9..ef535d9b3ba 100644 --- a/lib/shelley/src/Cardano/Wallet/Shelley.hs +++ b/lib/shelley/src/Cardano/Wallet/Shelley.hs @@ -101,7 +101,7 @@ import Cardano.Wallet.Primitive.Types , WalletId ) import Cardano.Wallet.Registry - ( WorkerLog (..), defaultWorkerAfter ) + ( HasWorkerCtx (..), WorkerLog (..), defaultWorkerAfter ) import Cardano.Wallet.Shelley.Api.Server ( server ) import Cardano.Wallet.Shelley.Compatibility @@ -238,8 +238,12 @@ serveWallet let gp = genesisParameters np let el = getEpochLength gp randomApi <- apiLayer (newTransactionLayer proxy pm el) nl + (\_ _ -> pure ()) icarusApi <- apiLayer (newTransactionLayer proxy pm el ) nl + (\_ _ -> pure ()) shelleyApi <- apiLayer (newTransactionLayer proxy pm el) nl + Server.manageRewardBalance + withPoolsMonitoring databaseDir gp nl $ \spl -> do startServer proxy @@ -309,14 +313,15 @@ serveWallet ) => TransactionLayer t k -> NetworkLayer IO t ShelleyBlock + -> (WorkerCtx (ApiLayer s t k) -> WalletId -> IO ()) -> IO (ApiLayer s t k) - apiLayer tl nl = do + apiLayer tl nl coworker = do let params = (block0, np, sTolerance) db <- Sqlite.newDBFactory walletDbTracer (DefaultFieldValues $ getActiveSlotCoefficient gp) databaseDir - Server.newApiLayer walletEngineTracer params nl' tl db + Server.newApiLayer walletEngineTracer params nl' tl db coworker where gp@GenesisParameters { getGenesisBlockHash diff --git a/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs b/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs index 1011e947460..9e5bf891cd6 100644 --- a/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs +++ b/lib/shelley/src/Cardano/Wallet/Shelley/Network.hs @@ -34,7 +34,6 @@ module Cardano.Wallet.Shelley.Network import Prelude - import Cardano.BM.Data.Severity ( Severity (..) ) import Cardano.BM.Data.Tracer @@ -70,11 +69,11 @@ import Cardano.Wallet.Shelley.Compatibility import Control.Concurrent ( ThreadId ) import Control.Concurrent.Async - ( Async, async, asyncThreadId, cancel, link ) + ( Async, async, asyncThreadId, cancel, forConcurrently, link ) import Control.Exception ( IOException ) import Control.Monad - ( forever, unless, (>=>) ) + ( forever, join, unless, (>=>) ) import Control.Monad.Catch ( Handler (..) ) import Control.Monad.Class.MonadAsync @@ -83,14 +82,20 @@ import Control.Monad.Class.MonadST ( MonadST ) import Control.Monad.Class.MonadSTM ( MonadSTM + , MonadSTMTx , TQueue + , TVar_ , atomically + , modifyTVar + , newEmptyTMVar , newTMVarM , newTQueue , newTVar , putTMVar + , readTQueue , readTVar , takeTMVar + , writeTQueue , writeTVar ) import Control.Monad.Class.MonadThrow @@ -100,7 +105,7 @@ import Control.Monad.Class.MonadTimer import Control.Monad.IO.Class ( liftIO ) import Control.Monad.Trans.Except - ( ExceptT (..), throwE, withExceptT ) + ( ExceptT (..), runExceptT, throwE, withExceptT ) import Control.Retry ( RetryPolicyM, RetryStatus (..), capDelay, fibonacciBackoff, recovering ) import Control.Tracer @@ -123,6 +128,8 @@ import Data.Text ( Text ) import Data.Text.Class ( ToText (..) ) +import Data.Tuple.Extra + ( fst3 ) import Data.Void ( Void ) import Data.Word @@ -242,13 +249,20 @@ withNetworkLayer tr np addrInfo versionData action = do -- doesn't rely on the intersection to be up-to-date. let handlers = retryOnConnectionLost tr - (nodeTipVar, protocolParamsVar, localTxSubmissionQ) <- + (nodeTipUpdate, protocolParamsVar, localTxSubmissionQ) <- connectNodeTipClient handlers queryRewardQ <- connectDelegationRewardsClient handlers + registerCallback <- setupWatchers (contramap MsgWatchTip tr) nodeTipUpdate + + nodeTipVar <- atomically $ newTVar TipGenesis + updateNodeTip <- registerCallback (fmap Right . atomically . writeTVar nodeTipVar) + async updateNodeTip >>= link + action $ NetworkLayer { currentNodeTip = liftIO $ _currentNodeTip nodeTipVar + , watchNodeTip = _watchNodeTip registerCallback , nextBlocks = _nextBlocks , initCursor = _initCursor , destroyCursor = _destroyCursor @@ -266,15 +280,16 @@ withNetworkLayer tr np addrInfo versionData action = do connectNodeTipClient handlers = do localTxSubmissionQ <- atomically newTQueue - nodeTipVar <- atomically $ newTVar TipGenesis + nodeTipQ <- atomically newTQueue protocolParamsVar <- atomically $ newTVar $ W.protocolParameters np nodeTipClient <- mkTipSyncClient tr np localTxSubmissionQ - (atomically . writeTVar nodeTipVar) + (atomically . writeTQueue nodeTipQ) (atomically . writeTVar protocolParamsVar) link =<< async (connectClient tr handlers nodeTipClient versionData addrInfo) - pure (nodeTipVar, protocolParamsVar, localTxSubmissionQ) + let nodeTipUpdate = atomically $ readTQueue nodeTipQ + pure (nodeTipUpdate, protocolParamsVar, localTxSubmissionQ) connectDelegationRewardsClient handlers = do cmdQ <- atomically newTQueue @@ -376,6 +391,10 @@ withNetworkLayer tr np addrInfo versionData action = do liftIO $ traceWith tr $ MsgFetchedNodePoolLsqData res return res + _watchNodeTip registerCallback cb = + ExceptT $ join $ registerCallback $ + runExceptT . cb . fromTip getGenesisBlockHash getEpochLength + type instance GetStakeDistribution (IO Shelley) m = (Point ShelleyBlock -> W.Coin @@ -678,6 +697,51 @@ handleMuxError tr onResourceVanished = pure . errorType >=> \case traceWith tr Nothing pure onResourceVanished +{------------------------------------------------------------------------------- + Tip Watcher +-------------------------------------------------------------------------------} + +setupWatchers + :: Tracer IO (WatcherLog e) + -> IO v + -> IO ((v -> IO (Either e ())) -> IO (IO (Either e a))) +setupWatchers tr nodeTipUpdate = do + watchersVar <- atomically $ newTVar (0 :: Int, []) + + let register cb = do + done <- atomically newEmptyTMVar + let finish = atomically . putTMVar done + i <- atomically $ stateTVar watchersVar $ \(nextId, watchers) -> + (nextId, (nextId + 1, ((nextId, cb, finish):watchers))) + traceWith tr $ MsgWatcherRegister i + pure $ atomically $ takeTMVar done + + worker <- async $ forever $ do + update <- nodeTipUpdate + watchers <- fmap snd $ atomically $ readTVar watchersVar + forConcurrently watchers $ \(i, cb, finish) -> do + traceWith tr $ MsgWatcherUpdate i + cb update >>= \case + Right () -> pure () + Left msg -> do + traceWith tr $ MsgWatcherDone i msg + -- unregister watcher + atomically $ modifyTVar watchersVar $ \(nextId, ws) -> + (nextId, filter ((/= i) . fst3) ws) + -- signal watcher to stop waiting + finish $ Left msg + link worker + + pure register + +stateTVar :: MonadSTMTx m => TVar_ m a -> (a -> (b, a)) -> m b +stateTVar var f = do + s <- readTVar var + let (a, s') = f s -- since we destructure this, we are strict in f + writeTVar var s' + return a +{-# INLINE stateTVar #-} + {------------------------------------------------------------------------------- Logging -------------------------------------------------------------------------------} @@ -707,6 +771,7 @@ data NetworkLayerLog Delegations RewardAccounts | MsgDestroyCursor ThreadId | MsgFetchedNodePoolLsqData NodePoolLsqData + | MsgWatchTip (WatcherLog String) data QueryClientName = TipSyncClient @@ -776,6 +841,7 @@ instance ToText NetworkLayerLog where ] MsgFetchedNodePoolLsqData d -> "Fetched pool data from node tip using LSQ: " <> pretty d + MsgWatchTip msg -> toText msg instance HasPrivacyAnnotation NetworkLayerLog instance HasSeverityAnnotation NetworkLayerLog where @@ -798,3 +864,19 @@ instance HasSeverityAnnotation NetworkLayerLog where MsgAccountDelegationAndRewards{} -> Info MsgDestroyCursor{} -> Notice MsgFetchedNodePoolLsqData{} -> Info + MsgWatchTip{} -> Debug + +data WatcherLog e + = MsgWatcherRegister Int + | MsgWatcherUpdate Int + | MsgWatcherDone Int e + deriving (Show, Eq) + +instance ToText e => ToText (WatcherLog e) where + toText = \case + MsgWatcherRegister n -> + "watcher register " <> toText n + MsgWatcherUpdate n -> + "watcher update " <> toText n + MsgWatcherDone n msg -> + "watcher done " <> toText n <> " " <> toText msg diff --git a/nix/.stack.nix/cardano-wallet-jormungandr.nix b/nix/.stack.nix/cardano-wallet-jormungandr.nix index 82c9908a28d..1b076b2cf4d 100644 --- a/nix/.stack.nix/cardano-wallet-jormungandr.nix +++ b/nix/.stack.nix/cardano-wallet-jormungandr.nix @@ -68,6 +68,7 @@ (hsPkgs."servant-client" or (errorHandler.buildDepError "servant-client")) (hsPkgs."servant-client-core" or (errorHandler.buildDepError "servant-client-core")) (hsPkgs."servant-server" or (errorHandler.buildDepError "servant-server")) + (hsPkgs."stm" or (errorHandler.buildDepError "stm")) (hsPkgs."temporary" or (errorHandler.buildDepError "temporary")) (hsPkgs."text" or (errorHandler.buildDepError "text")) (hsPkgs."text-class" or (errorHandler.buildDepError "text-class")) @@ -152,6 +153,7 @@ (hsPkgs."safe" or (errorHandler.buildDepError "safe")) (hsPkgs."servant" or (errorHandler.buildDepError "servant")) (hsPkgs."servant-swagger" or (errorHandler.buildDepError "servant-swagger")) + (hsPkgs."stm" or (errorHandler.buildDepError "stm")) (hsPkgs."swagger2" or (errorHandler.buildDepError "swagger2")) (hsPkgs."temporary" or (errorHandler.buildDepError "temporary")) (hsPkgs."text" or (errorHandler.buildDepError "text")) @@ -202,6 +204,7 @@ (hsPkgs."process" or (errorHandler.buildDepError "process")) (hsPkgs."retry" or (errorHandler.buildDepError "retry")) (hsPkgs."servant" or (errorHandler.buildDepError "servant")) + (hsPkgs."stm" or (errorHandler.buildDepError "stm")) (hsPkgs."temporary" or (errorHandler.buildDepError "temporary")) (hsPkgs."text" or (errorHandler.buildDepError "text")) (hsPkgs."text-class" or (errorHandler.buildDepError "text-class")) diff --git a/nix/.stack.nix/cardano-wallet-shelley.nix b/nix/.stack.nix/cardano-wallet-shelley.nix index d5c1e7821f1..f83cab221d1 100644 --- a/nix/.stack.nix/cardano-wallet-shelley.nix +++ b/nix/.stack.nix/cardano-wallet-shelley.nix @@ -51,6 +51,7 @@ (hsPkgs."cryptonite" or (errorHandler.buildDepError "cryptonite")) (hsPkgs."directory" or (errorHandler.buildDepError "directory")) (hsPkgs."exceptions" or (errorHandler.buildDepError "exceptions")) + (hsPkgs."extra" or (errorHandler.buildDepError "extra")) (hsPkgs."filepath" or (errorHandler.buildDepError "filepath")) (hsPkgs."fmt" or (errorHandler.buildDepError "fmt")) (hsPkgs."generic-lens" or (errorHandler.buildDepError "generic-lens"))