Skip to content

Commit

Permalink
Merge #2034
Browse files Browse the repository at this point in the history
2034: Call getAccountBalance for all wallets at once on tip change r=KtorZ a=Anviking

# Issue Number

#2005 

# Overview

- [x] Only send the LSQ query `GetFilteredDelegationsAndRewardAccounts` when the tip changes
- [x] Query _all_ wallet reward accounts in _the same_ `GetFilteredDelegationsAndRewardAccounts`

Listing stake pools with 10 wallets now takes ~5 seconds instead of 2-3 minutes!

- [x] Make the implementation easier to follow and actually sane

# Comments

- [ ] Not completely sure this doesn't mess up some some intricate interactions between the reward balance state and utxo state. The integration tests pass. But maybe there are some rollback scenarios which could lead to the reward balance being incorrectly reported for a while… Would be nice to think through properly and synchronise them. 

I think this happens on tip changes:

1. Wallet worker updates UTxO
2. Tip worker re-feches balances and writes to TVar
3. (I think on next tip change) Wallet reads the balance from the TVar and writes it to Sqlite
4. New reward balance is queryable from API

With this PR, I wonder if it may not risk being out of date for one more tip-change
<!-- Additional comments or screenshots to attach if any -->

<!-- 
Don't forget to:

 ✓ Self-review your changes to make sure nothing unexpected slipped through
 ✓ Assign yourself to the PR
 ✓ Assign one or several reviewer(s)
 ✓ Once created, link this PR to its corresponding ticket
 ✓ Assign the PR to a corresponding milestone
 ✓ Acknowledge any changes required to the Wiki
-->


Co-authored-by: Johannes Lund <[email protected]>
  • Loading branch information
iohk-bors[bot] and Anviking authored Aug 24, 2020
2 parents f9288a4 + 03ed329 commit b5b292b
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 47 deletions.
2 changes: 2 additions & 0 deletions lib/shelley/cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ test-suite unit
, cardano-wallet
, containers
, contra-tracer
, fmt
, iohk-monitoring
, hspec
, memory
, ouroboros-consensus-shelley
, ouroboros-network
, shelley-spec-ledger
, stm
, text
, text-class
, transformers
Expand Down
1 change: 1 addition & 0 deletions lib/shelley/src/Cardano/Wallet/Shelley/Compatibility.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ module Cardano.Wallet.Shelley.Compatibility
, toStakeKeyDeregCert
, toStakePoolDlgCert
, toStakeCredential
, fromStakeCredential
, toShelleyCoin
, fromShelleyCoin
, toHDPayloadAddress
Expand Down
232 changes: 189 additions & 43 deletions lib/shelley/src/Cardano/Wallet/Shelley/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

Expand All @@ -29,6 +30,10 @@ module Cardano.Wallet.Shelley.Network

, NodePoolLsqData (..)

, Observer (query,startObserving,stopObserving)
, newObserver
, ObserverLog (..)

-- * Logging
, NetworkLayerLog (..)
) where
Expand All @@ -45,7 +50,6 @@ import Cardano.Wallet.Logging
( BracketLog, bracketTracer )
import Cardano.Wallet.Network
( Cursor
, ErrGetAccountBalance (..)
, ErrNetworkUnavailable (..)
, ErrPostTx (..)
, GetStakeDistribution
Expand All @@ -61,7 +65,9 @@ import Cardano.Wallet.Shelley.Compatibility
, fromChainHash
, fromNonMyopicMemberRewards
, fromPoolDistr
, fromShelleyCoin
, fromShelleyPParams
, fromStakeCredential
, fromTip
, fromTip'
, optimumNumberOfPools
Expand All @@ -79,7 +85,7 @@ import Control.Concurrent.Chan
import Control.Exception
( IOException, throwIO )
import Control.Monad
( forever, unless, void, (>=>) )
( forever, unless, void, when, (>=>) )
import Control.Monad.Catch
( Handler (..) )
import Control.Monad.Class.MonadAsync
Expand All @@ -93,6 +99,7 @@ import Control.Monad.Class.MonadSTM
, TVar
, atomically
, isEmptyTMVar
, modifyTVar'
, newEmptyTMVar
, newTMVarM
, newTQueue
Expand Down Expand Up @@ -134,6 +141,8 @@ import Data.Proxy
( Proxy (..) )
import Data.Quantity
( Percentage, Quantity (..) )
import Data.Set
( Set )
import Data.Text
( Text )
import Data.Text.Class
Expand All @@ -145,7 +154,7 @@ import Data.Void
import Data.Word
( Word64 )
import Fmt
( Buildable (..), listF', mapF, pretty )
( Buildable (..), fmt, listF, listF', mapF, pretty )
import GHC.Stack
( HasCallStack )
import Network.Mux
Expand Down Expand Up @@ -253,7 +262,6 @@ import qualified Data.Text.Encoding as T
import qualified Ouroboros.Consensus.Byron.Ledger as Byron
import qualified Ouroboros.Consensus.Shelley.Ledger as Shelley
import qualified Ouroboros.Network.Point as Point
import qualified Shelley.Spec.Ledger.Coin as SL
import qualified Shelley.Spec.Ledger.Credential as SL
import qualified Shelley.Spec.Ledger.Keys as SL
import qualified Shelley.Spec.Ledger.LedgerState as SL
Expand Down Expand Up @@ -290,13 +298,19 @@ withNetworkLayer tr np addrInfo versionData action = do
-- doesn't rely on the intersection to be up-to-date.
let handlers = retryOnConnectionLost tr

queryRewardQ <- connectDelegationRewardsClient handlers

(nodeTipChan, protocolParamsVar, interpreterVar, localTxSubmissionQ) <-
connectNodeTipClient handlers

queryRewardQ <- connectDelegationRewardsClient handlers
(rewardsObserver,refreshRewards) <- newRewardBalanceFetcher tr gp queryRewardQ

nodeTipVar <- atomically $ newTVar TipGenesis
let updateNodeTip = readChan nodeTipChan >>= (atomically . writeTVar nodeTipVar)
let updateNodeTip = do
tip <- readChan nodeTipChan
atomically $ writeTVar nodeTipVar tip
refreshRewards tip

link =<< async (forever updateNodeTip)

action $ NetworkLayer
Expand All @@ -309,10 +323,17 @@ withNetworkLayer tr np addrInfo versionData action = do
, getProtocolParameters = atomically $ readTVar protocolParamsVar
, postTx = _postSealedTx localTxSubmissionQ
, stakeDistribution = _stakeDistribution queryRewardQ
, getAccountBalance = _getAccountBalance nodeTipVar queryRewardQ
, getAccountBalance = \k -> liftIO $ do
-- TODO(#2042): Make wallets call manually, with matching
-- stopObserving.
startObserving rewardsObserver k
coinToQuantity . fromMaybe (W.Coin 0)
<$> query rewardsObserver k
, timeInterpreter = _timeInterpreterQuery interpreterVar
}
where
coinToQuantity (W.Coin x) = Quantity $ fromIntegral x

gp@W.GenesisParameters
{ getGenesisBlockHash
, getGenesisBlockDate
Expand Down Expand Up @@ -396,33 +417,6 @@ withNetworkLayer tr np addrInfo versionData action = do
_cursorSlotNo (Cursor _ point _) = do
fromWithOrigin (SlotNo 0) $ pointSlot point

_getAccountBalance nodeTipVar queryRewardQ acct = do
tip <- liftIO . atomically $ readTVar nodeTipVar
let bh = fromTip' gp tip
liftIO $ traceWith tr $ MsgGetRewardAccountBalance bh acct
let cred = toStakeCredential acct
let q = QueryIfCurrentShelley (Shelley.GetFilteredDelegationsAndRewardAccounts (Set.singleton cred))
let cmd = CmdQueryLocalState (getTipPoint tip) q
res <- liftIO . timeQryAndLog "getAccountBalance" tr $
queryRewardQ `send` cmd
case res of
Right (Right (deleg, rewardAccounts)) -> do
liftIO $ traceWith tr $
MsgAccountDelegationAndRewards acct deleg rewardAccounts
case Map.elems rewardAccounts of
[SL.Coin amt] -> pure (Quantity (fromIntegral amt))
_ -> throwE $ ErrGetAccountBalanceAccountNotFound acct
Right (Left _) -> pure minBound -- wrong era
Left acqFail -> do
-- NOTE: this could possibly happen in rare circumstances when
-- the chain is switched and the local state query is made
-- before the node tip variable is updated.
liftIO $ traceWith tr $
MsgLocalStateQueryError DelegationRewardsClient $
show acqFail
throwE $ ErrGetAccountBalanceNetworkUnreachable $
ErrNetworkUnreachable $
T.pack $ "Unexpected " ++ show acqFail

_currentNodeTip nodeTipVar =
fromTip getGenesisBlockHash <$>
Expand Down Expand Up @@ -770,6 +764,151 @@ mkTipSyncClient tr np localTxSubmissionQ onTipUpdate onPParamsUpdate onInterpret
})
NodeToClientV_3

-- Reward Account Balances

-- | Monitors values for keys, and allows clients to @query@ them.
--
-- Designed to be used for observing reward balances, where we want to cache the
-- balances of /all/ the wallets' accounts on tip change, and allow wallet
-- workers to @query@ the cache later, often, and whenever they want.
--
-- NOTE: One could imagine replacing @query@ getter with a push-based approach.
data Observer m key value = Observer
{ startObserving :: key -> m ()
, stopObserving :: key -> m ()
, query :: key -> m (Maybe value)
}

newRewardBalanceFetcher
:: forall sc. Tracer IO (NetworkLayerLog sc)
-> W.GenesisParameters
-- ^ Used to convert tips for logging
-> TQueue IO (LocalStateQueryCmd (CardanoBlock sc) IO)
-> IO ( Observer IO W.ChimericAccount W.Coin
, Tip (CardanoBlock sc) -> IO ()
-- Call on tip-change to refresh
)
newRewardBalanceFetcher tr gp queryRewardQ =
newObserver (contramap MsgObserverLog tr) fetch
where
fetch
:: Tip (CardanoBlock sc)
-> Set W.ChimericAccount
-> IO (Maybe (Map W.ChimericAccount W.Coin))
fetch tip accounts = do
liftIO $ traceWith tr $
MsgGetRewardAccountBalance (fromTip' gp tip) accounts
let creds = Set.map toStakeCredential accounts
let q = QueryIfCurrentShelley (Shelley.GetFilteredDelegationsAndRewardAccounts creds)
let cmd = CmdQueryLocalState (getTipPoint tip) q

res <- liftIO . timeQryAndLog "getAccountBalance" tr $
queryRewardQ `send` cmd
case res of
Right (Right (deleg, rewardAccounts)) -> do
liftIO $ traceWith tr $ MsgAccountDelegationAndRewards deleg rewardAccounts
return $ Just $ Map.mapKeys fromStakeCredential
$ Map.map fromShelleyCoin rewardAccounts

Right (Left _) -> -- wrong era
return
. Just
. Map.fromList
. map (, minBound)
$ Set.toList accounts
Left acqFail -> do
-- NOTE: this could possibly happen in rare circumstances when
-- the chain is switched and the local state query is made
-- before the node tip variable is updated.
liftIO $ traceWith tr $
MsgLocalStateQueryError DelegationRewardsClient $
show acqFail
return Nothing

data ObserverLog key value
= MsgWillFetch (Set key)
| MsgDidFetch (Map key value)
| MsgAddedObserver key
| MsgRemovedObserver key
deriving (Eq, Show)

instance (Ord key, Buildable key, Buildable value)
=> ToText (ObserverLog key value) where
toText (MsgWillFetch keys) = mconcat
[ "Will fetch values for keys "
, fmt $ listF keys
]
toText (MsgDidFetch m) = mconcat
[ "Did fetch values "
, fmt $ mapF m
]
toText (MsgAddedObserver key) = mconcat
[ "Started observing values for key "
, pretty key
]
toText (MsgRemovedObserver key) = mconcat
[ "Stopped observing values for key "
, pretty key
]

-- | Given a way to fetch values for a set of keys, create:
-- 1. An @Observer@ for consuming values
-- 2. A refresh action
--
-- The @env@ parameter can be used to pass in information needed for refreshing,
-- like the current tip when fetching rewards.
--
-- If the given @fetch@ function returns @Nothing@ the the cache will not be
-- updated.
--
-- If it returns @Just values@, the cache will be set to @values@.
newObserver
:: forall m key value env. (MonadSTM m, Ord key)
=> Tracer m (ObserverLog key value)
-> (env -> Set key -> m (Maybe (Map key value)))
-> m (Observer m key value, env -> m ())
newObserver tr fetch = do
cacheVar <- atomically $ newTVar Map.empty
toBeObservedVar <- atomically $ newTVar Set.empty
return (observer cacheVar toBeObservedVar, refresh cacheVar toBeObservedVar)
where
observer
:: TVar m (Map key value)
-> TVar m (Set key)
-> Observer m key value
observer cacheVar observedKeysVar =
Observer
{ startObserving = \k -> do
wasAdded <- atomically $ do
notAlreadyThere <- Set.notMember k <$> readTVar observedKeysVar
modifyTVar' observedKeysVar (Set.insert k)
return notAlreadyThere
when wasAdded $ traceWith tr $ MsgAddedObserver k
, stopObserving = \k -> do
atomically $ do
modifyTVar' observedKeysVar (Set.delete k)
modifyTVar' cacheVar (Map.delete k)
traceWith tr $ MsgRemovedObserver k
, query = \k -> do
m <- atomically (readTVar cacheVar)
return $ Map.lookup k m
}

refresh
:: TVar m (Map key value)
-> TVar m (Set key)
-> env
-> m ()
refresh cacheVar observedKeysVar env = do
keys <- atomically $ readTVar observedKeysVar
traceWith tr $ MsgWillFetch keys
mvalues <- fetch env keys

case mvalues of
Nothing -> pure ()
Just values -> do
traceWith tr $ MsgDidFetch values
atomically $ writeTVar cacheVar values

-- | Return a function to run an action only if its single parameter has changed
-- since the previous time it was called.
Expand Down Expand Up @@ -938,10 +1077,12 @@ data NetworkLayerLog sc where
MsgProtocolParameters :: W.ProtocolParameters -> NetworkLayerLog sc
MsgLocalStateQueryError :: QueryClientName -> String -> NetworkLayerLog sc
MsgLocalStateQueryEraMismatch :: MismatchEraInfo (CardanoEras sc) -> NetworkLayerLog sc
MsgGetRewardAccountBalance :: W.BlockHeader -> W.ChimericAccount -> NetworkLayerLog sc
MsgGetRewardAccountBalance
:: W.BlockHeader
-> Set W.ChimericAccount
-> NetworkLayerLog sc
MsgAccountDelegationAndRewards
:: W.ChimericAccount
-> (Map (SL.Credential 'SL.Staking sc) (SL.KeyHash 'SL.StakePool sc))
:: (Map (SL.Credential 'SL.Staking sc) (SL.KeyHash 'SL.StakePool sc))
-> SL.RewardAccounts sc
-> NetworkLayerLog sc
MsgDestroyCursor :: ThreadId -> NetworkLayerLog sc
Expand All @@ -955,6 +1096,9 @@ data NetworkLayerLog sc where
MsgInterpreter :: CardanoInterpreter sc -> NetworkLayerLog sc
MsgInterpreterPastHorizon :: PastHorizonException -> NetworkLayerLog sc
MsgQueryTime :: String -> NominalDiffTime -> NetworkLayerLog sc
MsgObserverLog
:: ObserverLog W.ChimericAccount W.Coin
-> NetworkLayerLog sc

data QueryClientName
= TipSyncClient
Expand Down Expand Up @@ -1014,15 +1158,14 @@ instance TPraosCrypto sc => ToText (NetworkLayerLog sc) where
MsgLocalStateQueryEraMismatch mismatch ->
"Local state query for the wrong era - this is fine. " <>
T.pack (show mismatch)
MsgGetRewardAccountBalance bh acct -> T.unwords
MsgGetRewardAccountBalance tip accts -> T.unwords
[ "Querying the reward account balance for"
, pretty acct
, fmt $ listF accts
, "at"
, pretty bh
, pretty tip
]
MsgAccountDelegationAndRewards acct delegations rewards -> T.unlines
[ "Result for account " <> pretty acct <> ":"
, " delegations = " <> T.pack (show delegations)
MsgAccountDelegationAndRewards delegations rewards -> T.unlines
[ " delegations = " <> T.pack (show delegations)
, " rewards = " <> T.pack (show rewards)
]
MsgDestroyCursor threadId -> T.unwords
Expand Down Expand Up @@ -1051,6 +1194,8 @@ instance TPraosCrypto sc => ToText (NetworkLayerLog sc) where
MsgInterpreterPastHorizon e ->
"Time interpreter queried past the horizon: " <> T.pack (show e)

MsgObserverLog msg -> toText msg

instance HasPrivacyAnnotation (NetworkLayerLog b)
instance HasSeverityAnnotation (NetworkLayerLog b) where
getSeverityAnnotation = \case
Expand Down Expand Up @@ -1081,3 +1226,4 @@ instance HasSeverityAnnotation (NetworkLayerLog b) where
MsgInterpreter{} -> Debug
MsgQueryTime{} -> Info
MsgInterpreterPastHorizon{} -> Error
MsgObserverLog{} -> Debug
Loading

0 comments on commit b5b292b

Please sign in to comment.