Skip to content

Commit

Permalink
Retry tx submission in wallet server
Browse files Browse the repository at this point in the history
  • Loading branch information
rvl committed Mar 29, 2021
1 parent ea78c5b commit 8bee89a
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 111 deletions.
2 changes: 1 addition & 1 deletion lib/core/cardano-wallet-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ library
Cardano.Wallet.Primitive.Types.UTxO
Cardano.Wallet.Primitive.Types.UTxOIndex
Cardano.Wallet.Primitive.Types.UTxOIndex.Internal
Cardano.Wallet.TokenMetadata.MockServer
Cardano.Wallet.Registry
Cardano.Wallet.TokenMetadata.MockServer
Cardano.Wallet.Transaction
Cardano.Wallet.Unsafe
Cardano.Wallet.Version
Expand Down
69 changes: 54 additions & 15 deletions lib/core/src/Cardano/Wallet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ import Cardano.Wallet.Primitive.Types.TokenBundle
( TokenBundle )
import Cardano.Wallet.Primitive.Types.Tx
( Direction (..)
, LocalTxSubmissionStatus
, SealedTx (..)
, TransactionInfo (..)
, Tx
Expand Down Expand Up @@ -348,8 +349,10 @@ import Control.DeepSeq
( NFData )
import Control.Monad
( forM, forM_, forever, replicateM, unless, when )
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Class.MonadTime
( DiffTime, MonadMonotonicTime (..), diffTime, getCurrentTime )
import Control.Monad.IO.Unlift
( MonadUnliftIO, liftIO )
import Control.Monad.Trans.Class
( lift )
import Control.Monad.Trans.Except
Expand Down Expand Up @@ -404,7 +407,7 @@ import Data.Set
import Data.Text.Class
( ToText (..) )
import Data.Time.Clock
( NominalDiffTime, UTCTime, getCurrentTime )
( NominalDiffTime, UTCTime )
import Data.Type.Equality
( (:~:) (..), testEquality )
import Data.Word
Expand All @@ -423,6 +426,8 @@ import Type.Reflection
( Typeable, typeRep )
import UnliftIO.Exception
( Exception )
import UnliftIO.MVar
( modifyMVar_, newMVar )

import qualified Cardano.Crypto.Wallet as CC
import qualified Cardano.Wallet.Primitive.AddressDiscovery.Random as Rnd
Expand Down Expand Up @@ -1555,7 +1560,8 @@ submitTx ctx wid (tx, meta, binary) = db & \DBLayer{..} -> do
withExceptT ErrSubmitTxNetwork $ traceWithExceptT tr $
postTx nw binary
mapExceptT atomically $ do
lift $ putLocalTxSubmission (meta ^. #slotNo) (tx ^. #txId) binary
lift $ putLocalTxSubmission (PrimaryKey wid)
(tx ^. #txId) binary (meta ^. #slotNo)
withExceptT ErrSubmitTxNoSuchWallet $
putTxHistory (PrimaryKey wid) [(tx, meta)]
where
Expand Down Expand Up @@ -1628,30 +1634,63 @@ getTxExpiry ti maybeTTL = do
defaultTTL :: NominalDiffTime
defaultTTL = 7200 -- that's 2 hours

-- | Given a LocalTxSubmission record, calculate the slot when it should be
-- retried next.
--
-- The current implementation is really basic. Retry every 10 slots.
scheduleLocalTxSubmission :: LocalTxSubmissionStatus tx -> SlotNo
scheduleLocalTxSubmission st = (st ^. #latestSubmission) + 10

-- | Retry submission of pending transactions.
runLocalTxSubmissionPool
:: forall ctx s k a (n :: NetworkDiscriminant).
:: forall ctx s k a.
( HasLogger WalletLog ctx
, HasNetworkLayer ctx
, HasDBLayer s k ctx
)
=> Proxy n
-> ctx
=> ctx
-> WalletId
-> IO a
runLocalTxSubmissionPool _ ctx = db & \DBLayer{..} -> forever
watchNodeTip $ \bh -> bracketTracer trBracket $ do
pending <- atomically $ getLocalTxSubmissionPending (bh ^. #slotNo)
-- re-submit those transactions, ignore errors
forM_ pending $ \(txId, sealedTx) -> do
res <- runExceptT $ postTx nw sealedTx
traceWith tr (MsgRetryPostTxResult txId res)
runLocalTxSubmissionPool ctx wid = db & \DBLayer{..} -> forever $ do
submitPending <- rateLimited $ \bh -> bracketTracer trBracket $ do
pending <- atomically $ getLocalTxSubmissionPending (PrimaryKey wid)
let sl = bh ^. #slotNo
-- Re-submit transactions due, ignore errors
forM_ (filter (isScheduled sl) pending) $ \st -> do
res <- runExceptT $ postTx nw (st ^. #submittedTx)
traceWith tr (MsgRetryPostTxResult (st ^. #txId) res)
atomically $ putLocalTxSubmission
(PrimaryKey wid)
(st ^. #txId)
(st ^. #submittedTx)
sl
watchNodeTip submitPending
where
nw = ctx ^. networkLayer
db = ctx ^. dbLayer @s @k
NetworkLayer{watchNodeTip} = ctx ^. networkLayer
tr = contramap MsgTxSubmit (ctx ^. logger @WalletLog)
trBracket = contramap MsgProcessPendingPool tr

isScheduled now = (<= now) . scheduleLocalTxSubmission

-- Limit pool check frequency to every 1000ms at most.
rateLimited = throttle 1

-- | Return a function to run an action at most once every _interval_.
throttle
:: (MonadUnliftIO m, MonadMonotonicTime m)
=> DiffTime
-> (a -> m ())
-> m (a -> m ())
throttle interval action = do
var <- newMVar Nothing
pure $ \arg -> modifyMVar_ var $ \prev -> do
now <- getMonotonicTime
if (maybe interval (diffTime now) prev >= interval)
then action arg $> Just now
else pure prev

-- | List all transactions and metadata from history for a given wallet.
listTransactions
:: forall ctx s k.
Expand Down Expand Up @@ -2481,7 +2520,7 @@ instance ToText TxSubmitLog where
Right _ -> "accepted again"
Left _ -> "not accepted (this is expected)"
MsgProcessPendingPool msg ->
"Processing the pending local tx submittion pool: " <> toText msg
"Processing the pending local tx submission pool: " <> toText msg

instance HasPrivacyAnnotation TxSubmitLog
instance HasSeverityAnnotation TxSubmitLog where
Expand Down
138 changes: 65 additions & 73 deletions lib/core/src/Cardano/Wallet/Api/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ module Cardano.Wallet.Api.Server
, manageRewardBalance
, idleWorker


-- * Logging
, WalletEngineLog (..)
) where
Expand Down Expand Up @@ -148,7 +147,6 @@ import Cardano.Wallet
, ErrWithdrawalNotWorth (..)
, ErrWrongPassphrase (..)
, FeeEstimation (..)
, HasLogger
, HasNetworkLayer
, TxSubmitLog
, WalletLog
Expand Down Expand Up @@ -674,8 +672,7 @@ postShelleyWallet ctx generateKey body = do
let state = mkSeqStateFromRootXPrv (rootXPrv, pwd) purposeCIP1852 g
void $ liftHandler $ initWorker @_ @s @k ctx wid
(\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName state)
(\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid)
(\wrk -> W.manageRewardBalance @(WorkerCtx ctx) @s @k (Proxy @n) wrk wid)
(\wrk _ -> W.manageRewardBalance @(WorkerCtx ctx) @s @k (Proxy @n) wrk wid)
withWorkerCtx @_ @s @k ctx wid liftE liftE $ \wrk -> liftHandler $
W.attachPrivateKeyFromPwd @_ @s @k wrk wid (rootXPrv, pwd)
fst <$> getWallet ctx (mkShelleyWallet @_ @s @k) (ApiT wid)
Expand Down Expand Up @@ -711,8 +708,7 @@ postAccountWallet ctx mkWallet liftKey coworker body = do
let state = mkSeqStateFromAccountXPub (liftKey accXPub) purposeCIP1852 g
void $ liftHandler $ initWorker @_ @s @k ctx wid
(\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName state)
(\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid)
(`coworker` wid)
coworker
fst <$> getWallet ctx mkWallet (ApiT wid)
where
g = maybe defaultAddressPoolGap getApiT (body ^. #addressPoolGap)
Expand Down Expand Up @@ -823,8 +819,7 @@ postLegacyWallet
-> Handler ApiByronWallet
postLegacyWallet ctx (rootXPrv, pwd) createWallet = do
void $ liftHandler $ initWorker @_ @s @k ctx wid (`createWallet` wid)
(\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid)
(`idleWorker` wid)
idleWorker
withWorkerCtx ctx wid liftE liftE $ \wrk -> liftHandler $
W.attachPrivateKeyFromPwd wrk wid (rootXPrv, pwd)
fst <$> getWallet ctx mkLegacyWallet (ApiT wid)
Expand Down Expand Up @@ -929,8 +924,7 @@ postRandomWalletFromXPrv ctx body = do
s <- liftIO $ mkRndState byronKey <$> getStdRandom random
void $ liftHandler $ initWorker @_ @s @k ctx wid
(\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName s)
(\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid)
(`idleWorker` wid)
idleWorker
withWorkerCtx ctx wid liftE liftE $ \wrk -> liftHandler $
W.attachPrivateKeyFromPwdHash wrk wid (byronKey, pwd)
fst <$> getWallet ctx mkLegacyWallet (ApiT wid)
Expand Down Expand Up @@ -1964,58 +1958,6 @@ postAccountPublicKey ctx (ApiT wid) (ApiT ix) (ApiPostAccountKeyData (ApiT pwd)
Helpers
-------------------------------------------------------------------------------}

-- | see 'Cardano.Wallet#createWallet'
initWorker
:: forall ctx s k.
( HasWorkerRegistry s k ctx
, HasDBFactory s k ctx
, HasLogger (WorkerLog WalletId WalletLog) ctx
)
=> ctx
-- ^ Surrounding API context
-> WalletId
-- ^ Wallet Id
-> (WorkerCtx ctx -> ExceptT ErrWalletAlreadyExists IO WalletId)
-- ^ Create action
-> (WorkerCtx ctx -> ExceptT ErrNoSuchWallet IO ())
-- ^ Restore action
-> (WorkerCtx ctx -> IO ())
-- ^ Action to run concurrently with restore action
-> ExceptT ErrCreateWallet IO WalletId
initWorker ctx wid createWallet restoreWallet coworker =
liftIO (Registry.lookup re wid) >>= \case
Just _ ->
throwE $ ErrCreateWalletAlreadyExists $ ErrWalletAlreadyExists wid
Nothing ->
liftIO (Registry.register @_ @ctx re ctx wid config) >>= \case
Nothing ->
throwE ErrCreateWalletFailedToCreateWorker
Just _ ->
pure wid
where
config = MkWorker
{ workerBefore = \ctx' _ -> do
void $ unsafeRunExceptT $ createWallet ctx'

, workerMain = \ctx' _ -> do
race_
(unsafeRunExceptT $ restoreWallet ctx')
(coworker ctx')

, workerAfter =
defaultWorkerAfter

, workerAcquire =
withDatabase df wid
}
re = ctx ^. workerRegistry @s @k
df = ctx ^. dbFactory @s @k

-- | Something to pass as the coworker action to 'newApiLayer', which does
-- nothing, and never exits.
idleWorker :: ctx -> wid -> IO ()
idleWorker _ _ = forever $ threadDelay maxBound

-- | Handler for fetching the 'ArgGenChange' for the 'RndState' (i.e. the root
-- XPrv), necessary to derive new change addresses.
rndStateChange
Expand Down Expand Up @@ -2379,27 +2321,77 @@ registerWorker
, IsOurs s RewardAccount
, IsOurs s Address
)
=> ApiLayer s k
=> ctx
-> (WorkerCtx ctx -> WalletId -> IO ())
-- ^ Action to run concurrently with restore
-> WalletId
-> IO ()
registerWorker ctx coworker wid =
void $ Registry.register @_ @ctx re ctx wid config
registerWorker ctx coworker = void . startWorker ctx before coworker
where
before ctx' wid =
runExceptT (W.checkWalletIntegrity ctx' wid gp)
>>= either throwIO pure
(_, NetworkParameters gp _ _, _) = ctx ^. genesisData
re = ctx ^. workerRegistry

-- | see 'Cardano.Wallet#createWallet'
initWorker
:: forall ctx s k.
( ctx ~ ApiLayer s k
, IsOurs s RewardAccount
, IsOurs s Address
)
=> ctx
-- ^ Surrounding API context
-> WalletId
-- ^ Wallet Id
-> (WorkerCtx ctx -> ExceptT ErrWalletAlreadyExists IO WalletId)
-- ^ Create action
-> (WorkerCtx ctx -> WalletId -> IO ())
-- ^ Action to run concurrently with restore
-> ExceptT ErrCreateWallet IO WalletId
initWorker ctx wid createWallet coworker =
liftIO (Registry.lookup re wid) >>= \case
Just _ ->
throwE $ ErrCreateWalletAlreadyExists $ ErrWalletAlreadyExists wid
Nothing ->
(liftIO $ startWorker ctx before coworker wid) >>= \case
Nothing -> throwE ErrCreateWalletFailedToCreateWorker
Just _ -> pure wid
where
before ctx' _ = void $ unsafeRunExceptT $ createWallet ctx'
re = ctx ^. workerRegistry @s @k

-- | Something to pass as the coworker action to 'newApiLayer', which does
-- nothing, and never exits.
idleWorker :: ctx -> wid -> IO ()
idleWorker _ _ = forever $ threadDelay maxBound

startWorker
:: forall ctx s k.
( ctx ~ ApiLayer s k
, IsOurs s RewardAccount
, IsOurs s Address
)
=> ctx
-> (WorkerCtx ctx -> WalletId -> IO ())
-> (WorkerCtx ctx -> WalletId -> IO ())
-> WalletId
-> IO (Maybe ctx)
startWorker ctx before coworker wid =
fmap (const ctx) <$> Registry.register @_ @ctx re ctx wid config
where
re = ctx ^. workerRegistry @s @k
df = ctx ^. dbFactory
config = MkWorker
{ workerBefore = \ctx' _ ->
runExceptT (W.checkWalletIntegrity ctx' wid gp)
>>= either throwIO pure
{ workerBefore = before

, workerMain = \ctx' _ -> do
-- FIXME:
-- Review error handling here
-- FIXME: ADP-641 Review error handling here
race_
(unsafeRunExceptT $ W.restoreWallet @(WorkerCtx ctx) @s ctx' wid)
(coworker ctx' wid)
(unsafeRunExceptT $ W.restoreWallet ctx' wid)
(race_
(W.runLocalTxSubmissionPool ctx' wid)
(coworker ctx' wid))

, workerAfter =
defaultWorkerAfter
Expand Down
23 changes: 16 additions & 7 deletions lib/core/src/Cardano/Wallet/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ import Cardano.Wallet.Primitive.Types.Coin
import Cardano.Wallet.Primitive.Types.Hash
( Hash )
import Cardano.Wallet.Primitive.Types.Tx
( SealedTx, TransactionInfo, Tx (..), TxMeta, TxStatus )
( LocalTxSubmissionStatus
, SealedTx
, TransactionInfo
, Tx (..)
, TxMeta
, TxStatus
)
import Control.Monad.IO.Class
( MonadIO )
import Control.Monad.Trans.Except
Expand Down Expand Up @@ -243,17 +249,20 @@ data DBLayer m s k = forall stm. (MonadIO stm, MonadFail stm) => DBLayer
-- If the wallet doesn't exist, this operation returns an error.

, putLocalTxSubmission
:: SlotNo
:: PrimaryKey WalletId
-> Hash "Tx"
-> SealedTx
-> SlotNo
-> stm ()
-- ^ Add or update a transaction in the local submission pool.
-- ^ Add or update a transaction in the local submission pool with the
-- most recent submission slot.

, getLocalTxSubmissionPending
:: SlotNo
-> stm [(Hash "Tx", SealedTx)]
:: PrimaryKey WalletId
-> stm [LocalTxSubmissionStatus SealedTx]
-- ^ List all transactions from the local submission pool which are
-- still pending as of the given slot.
-- still pending as of the latest checkpoint. The slot numbers for first
-- submission and most recent submission are included.

, updatePendingTxForExpiry
:: PrimaryKey WalletId
Expand Down Expand Up @@ -343,7 +352,7 @@ newtype ErrWalletAlreadyExists
-- functions like 'enqueueCheckpoint' needs to be associated to a corresponding
-- wallet. Some other may not because they are information valid for all wallets
-- (like for instance, the last known network tip).
newtype PrimaryKey key = PrimaryKey key
newtype PrimaryKey key = PrimaryKey { unPrimaryKey :: key }
deriving (Show, Eq, Ord)

-- | Clean a database by removing all wallets.
Expand Down
Loading

0 comments on commit 8bee89a

Please sign in to comment.