From 8bee89aa4e76745fd5326c86e816a1d8e30989fc Mon Sep 17 00:00:00 2001 From: Rodney Lorrimar Date: Tue, 23 Mar 2021 17:56:47 +1000 Subject: [PATCH] Retry tx submission in wallet server --- lib/core/cardano-wallet-core.cabal | 2 +- lib/core/src/Cardano/Wallet.hs | 69 +++++++-- lib/core/src/Cardano/Wallet/Api/Server.hs | 138 +++++++++--------- lib/core/src/Cardano/Wallet/DB.hs | 23 ++- lib/core/src/Cardano/Wallet/DB/MVar.hs | 12 +- lib/core/src/Cardano/Wallet/DB/Model.hs | 46 +++++- lib/core/src/Cardano/Wallet/DB/Sqlite.hs | 37 ++++- lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs | 6 +- .../src/Cardano/Wallet/DB/Sqlite/Types.hs | 3 +- .../src/Cardano/Wallet/Primitive/Types/Tx.hs | 12 ++ .../unit/Cardano/Wallet/DB/StateMachine.hs | 4 + 11 files changed, 241 insertions(+), 111 deletions(-) diff --git a/lib/core/cardano-wallet-core.cabal b/lib/core/cardano-wallet-core.cabal index 83e23226847..9076a08ebb6 100644 --- a/lib/core/cardano-wallet-core.cabal +++ b/lib/core/cardano-wallet-core.cabal @@ -189,8 +189,8 @@ library Cardano.Wallet.Primitive.Types.UTxO Cardano.Wallet.Primitive.Types.UTxOIndex Cardano.Wallet.Primitive.Types.UTxOIndex.Internal - Cardano.Wallet.TokenMetadata.MockServer Cardano.Wallet.Registry + Cardano.Wallet.TokenMetadata.MockServer Cardano.Wallet.Transaction Cardano.Wallet.Unsafe Cardano.Wallet.Version diff --git a/lib/core/src/Cardano/Wallet.hs b/lib/core/src/Cardano/Wallet.hs index 7a6fcffcb53..f77f08be245 100644 --- a/lib/core/src/Cardano/Wallet.hs +++ b/lib/core/src/Cardano/Wallet.hs @@ -315,6 +315,7 @@ import Cardano.Wallet.Primitive.Types.TokenBundle ( TokenBundle ) import Cardano.Wallet.Primitive.Types.Tx ( Direction (..) + , LocalTxSubmissionStatus , SealedTx (..) , TransactionInfo (..) , Tx @@ -348,8 +349,10 @@ import Control.DeepSeq ( NFData ) import Control.Monad ( forM, forM_, forever, replicateM, unless, when ) -import Control.Monad.IO.Class - ( liftIO ) +import Control.Monad.Class.MonadTime + ( DiffTime, MonadMonotonicTime (..), diffTime, getCurrentTime ) +import Control.Monad.IO.Unlift + ( MonadUnliftIO, liftIO ) import Control.Monad.Trans.Class ( lift ) import Control.Monad.Trans.Except @@ -404,7 +407,7 @@ import Data.Set import Data.Text.Class ( ToText (..) ) import Data.Time.Clock - ( NominalDiffTime, UTCTime, getCurrentTime ) + ( NominalDiffTime, UTCTime ) import Data.Type.Equality ( (:~:) (..), testEquality ) import Data.Word @@ -423,6 +426,8 @@ import Type.Reflection ( Typeable, typeRep ) import UnliftIO.Exception ( Exception ) +import UnliftIO.MVar + ( modifyMVar_, newMVar ) import qualified Cardano.Crypto.Wallet as CC import qualified Cardano.Wallet.Primitive.AddressDiscovery.Random as Rnd @@ -1555,7 +1560,8 @@ submitTx ctx wid (tx, meta, binary) = db & \DBLayer{..} -> do withExceptT ErrSubmitTxNetwork $ traceWithExceptT tr $ postTx nw binary mapExceptT atomically $ do - lift $ putLocalTxSubmission (meta ^. #slotNo) (tx ^. #txId) binary + lift $ putLocalTxSubmission (PrimaryKey wid) + (tx ^. #txId) binary (meta ^. #slotNo) withExceptT ErrSubmitTxNoSuchWallet $ putTxHistory (PrimaryKey wid) [(tx, meta)] where @@ -1628,23 +1634,37 @@ getTxExpiry ti maybeTTL = do defaultTTL :: NominalDiffTime defaultTTL = 7200 -- that's 2 hours +-- | Given a LocalTxSubmission record, calculate the slot when it should be +-- retried next. +-- +-- The current implementation is really basic. Retry every 10 slots. +scheduleLocalTxSubmission :: LocalTxSubmissionStatus tx -> SlotNo +scheduleLocalTxSubmission st = (st ^. #latestSubmission) + 10 + -- | Retry submission of pending transactions. runLocalTxSubmissionPool - :: forall ctx s k a (n :: NetworkDiscriminant). + :: forall ctx s k a. ( HasLogger WalletLog ctx , HasNetworkLayer ctx , HasDBLayer s k ctx ) - => Proxy n - -> ctx + => ctx + -> WalletId -> IO a -runLocalTxSubmissionPool _ ctx = db & \DBLayer{..} -> forever - watchNodeTip $ \bh -> bracketTracer trBracket $ do - pending <- atomically $ getLocalTxSubmissionPending (bh ^. #slotNo) - -- re-submit those transactions, ignore errors - forM_ pending $ \(txId, sealedTx) -> do - res <- runExceptT $ postTx nw sealedTx - traceWith tr (MsgRetryPostTxResult txId res) +runLocalTxSubmissionPool ctx wid = db & \DBLayer{..} -> forever $ do + submitPending <- rateLimited $ \bh -> bracketTracer trBracket $ do + pending <- atomically $ getLocalTxSubmissionPending (PrimaryKey wid) + let sl = bh ^. #slotNo + -- Re-submit transactions due, ignore errors + forM_ (filter (isScheduled sl) pending) $ \st -> do + res <- runExceptT $ postTx nw (st ^. #submittedTx) + traceWith tr (MsgRetryPostTxResult (st ^. #txId) res) + atomically $ putLocalTxSubmission + (PrimaryKey wid) + (st ^. #txId) + (st ^. #submittedTx) + sl + watchNodeTip submitPending where nw = ctx ^. networkLayer db = ctx ^. dbLayer @s @k @@ -1652,6 +1672,25 @@ runLocalTxSubmissionPool _ ctx = db & \DBLayer{..} -> forever tr = contramap MsgTxSubmit (ctx ^. logger @WalletLog) trBracket = contramap MsgProcessPendingPool tr + isScheduled now = (<= now) . scheduleLocalTxSubmission + + -- Limit pool check frequency to every 1000ms at most. + rateLimited = throttle 1 + +-- | Return a function to run an action at most once every _interval_. +throttle + :: (MonadUnliftIO m, MonadMonotonicTime m) + => DiffTime + -> (a -> m ()) + -> m (a -> m ()) +throttle interval action = do + var <- newMVar Nothing + pure $ \arg -> modifyMVar_ var $ \prev -> do + now <- getMonotonicTime + if (maybe interval (diffTime now) prev >= interval) + then action arg $> Just now + else pure prev + -- | List all transactions and metadata from history for a given wallet. listTransactions :: forall ctx s k. @@ -2481,7 +2520,7 @@ instance ToText TxSubmitLog where Right _ -> "accepted again" Left _ -> "not accepted (this is expected)" MsgProcessPendingPool msg -> - "Processing the pending local tx submittion pool: " <> toText msg + "Processing the pending local tx submission pool: " <> toText msg instance HasPrivacyAnnotation TxSubmitLog instance HasSeverityAnnotation TxSubmitLog where diff --git a/lib/core/src/Cardano/Wallet/Api/Server.hs b/lib/core/src/Cardano/Wallet/Api/Server.hs index 73da3c40575..2763f539987 100644 --- a/lib/core/src/Cardano/Wallet/Api/Server.hs +++ b/lib/core/src/Cardano/Wallet/Api/Server.hs @@ -97,7 +97,6 @@ module Cardano.Wallet.Api.Server , manageRewardBalance , idleWorker - -- * Logging , WalletEngineLog (..) ) where @@ -148,7 +147,6 @@ import Cardano.Wallet , ErrWithdrawalNotWorth (..) , ErrWrongPassphrase (..) , FeeEstimation (..) - , HasLogger , HasNetworkLayer , TxSubmitLog , WalletLog @@ -674,8 +672,7 @@ postShelleyWallet ctx generateKey body = do let state = mkSeqStateFromRootXPrv (rootXPrv, pwd) purposeCIP1852 g void $ liftHandler $ initWorker @_ @s @k ctx wid (\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName state) - (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid) - (\wrk -> W.manageRewardBalance @(WorkerCtx ctx) @s @k (Proxy @n) wrk wid) + (\wrk _ -> W.manageRewardBalance @(WorkerCtx ctx) @s @k (Proxy @n) wrk wid) withWorkerCtx @_ @s @k ctx wid liftE liftE $ \wrk -> liftHandler $ W.attachPrivateKeyFromPwd @_ @s @k wrk wid (rootXPrv, pwd) fst <$> getWallet ctx (mkShelleyWallet @_ @s @k) (ApiT wid) @@ -711,8 +708,7 @@ postAccountWallet ctx mkWallet liftKey coworker body = do let state = mkSeqStateFromAccountXPub (liftKey accXPub) purposeCIP1852 g void $ liftHandler $ initWorker @_ @s @k ctx wid (\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName state) - (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid) - (`coworker` wid) + coworker fst <$> getWallet ctx mkWallet (ApiT wid) where g = maybe defaultAddressPoolGap getApiT (body ^. #addressPoolGap) @@ -823,8 +819,7 @@ postLegacyWallet -> Handler ApiByronWallet postLegacyWallet ctx (rootXPrv, pwd) createWallet = do void $ liftHandler $ initWorker @_ @s @k ctx wid (`createWallet` wid) - (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid) - (`idleWorker` wid) + idleWorker withWorkerCtx ctx wid liftE liftE $ \wrk -> liftHandler $ W.attachPrivateKeyFromPwd wrk wid (rootXPrv, pwd) fst <$> getWallet ctx mkLegacyWallet (ApiT wid) @@ -929,8 +924,7 @@ postRandomWalletFromXPrv ctx body = do s <- liftIO $ mkRndState byronKey <$> getStdRandom random void $ liftHandler $ initWorker @_ @s @k ctx wid (\wrk -> W.createWallet @(WorkerCtx ctx) @s @k wrk wid wName s) - (\wrk -> W.restoreWallet @(WorkerCtx ctx) @s @k wrk wid) - (`idleWorker` wid) + idleWorker withWorkerCtx ctx wid liftE liftE $ \wrk -> liftHandler $ W.attachPrivateKeyFromPwdHash wrk wid (byronKey, pwd) fst <$> getWallet ctx mkLegacyWallet (ApiT wid) @@ -1964,58 +1958,6 @@ postAccountPublicKey ctx (ApiT wid) (ApiT ix) (ApiPostAccountKeyData (ApiT pwd) Helpers -------------------------------------------------------------------------------} --- | see 'Cardano.Wallet#createWallet' -initWorker - :: forall ctx s k. - ( HasWorkerRegistry s k ctx - , HasDBFactory s k ctx - , HasLogger (WorkerLog WalletId WalletLog) ctx - ) - => ctx - -- ^ Surrounding API context - -> WalletId - -- ^ Wallet Id - -> (WorkerCtx ctx -> ExceptT ErrWalletAlreadyExists IO WalletId) - -- ^ Create action - -> (WorkerCtx ctx -> ExceptT ErrNoSuchWallet IO ()) - -- ^ Restore action - -> (WorkerCtx ctx -> IO ()) - -- ^ Action to run concurrently with restore action - -> ExceptT ErrCreateWallet IO WalletId -initWorker ctx wid createWallet restoreWallet coworker = - liftIO (Registry.lookup re wid) >>= \case - Just _ -> - throwE $ ErrCreateWalletAlreadyExists $ ErrWalletAlreadyExists wid - Nothing -> - liftIO (Registry.register @_ @ctx re ctx wid config) >>= \case - Nothing -> - throwE ErrCreateWalletFailedToCreateWorker - Just _ -> - pure wid - where - config = MkWorker - { workerBefore = \ctx' _ -> do - void $ unsafeRunExceptT $ createWallet ctx' - - , workerMain = \ctx' _ -> do - race_ - (unsafeRunExceptT $ restoreWallet ctx') - (coworker ctx') - - , workerAfter = - defaultWorkerAfter - - , workerAcquire = - withDatabase df wid - } - re = ctx ^. workerRegistry @s @k - df = ctx ^. dbFactory @s @k - --- | Something to pass as the coworker action to 'newApiLayer', which does --- nothing, and never exits. -idleWorker :: ctx -> wid -> IO () -idleWorker _ _ = forever $ threadDelay maxBound - -- | Handler for fetching the 'ArgGenChange' for the 'RndState' (i.e. the root -- XPrv), necessary to derive new change addresses. rndStateChange @@ -2379,27 +2321,77 @@ registerWorker , IsOurs s RewardAccount , IsOurs s Address ) - => ApiLayer s k + => ctx -> (WorkerCtx ctx -> WalletId -> IO ()) + -- ^ Action to run concurrently with restore -> WalletId -> IO () -registerWorker ctx coworker wid = - void $ Registry.register @_ @ctx re ctx wid config +registerWorker ctx coworker = void . startWorker ctx before coworker where + before ctx' wid = + runExceptT (W.checkWalletIntegrity ctx' wid gp) + >>= either throwIO pure (_, NetworkParameters gp _ _, _) = ctx ^. genesisData - re = ctx ^. workerRegistry + +-- | see 'Cardano.Wallet#createWallet' +initWorker + :: forall ctx s k. + ( ctx ~ ApiLayer s k + , IsOurs s RewardAccount + , IsOurs s Address + ) + => ctx + -- ^ Surrounding API context + -> WalletId + -- ^ Wallet Id + -> (WorkerCtx ctx -> ExceptT ErrWalletAlreadyExists IO WalletId) + -- ^ Create action + -> (WorkerCtx ctx -> WalletId -> IO ()) + -- ^ Action to run concurrently with restore + -> ExceptT ErrCreateWallet IO WalletId +initWorker ctx wid createWallet coworker = + liftIO (Registry.lookup re wid) >>= \case + Just _ -> + throwE $ ErrCreateWalletAlreadyExists $ ErrWalletAlreadyExists wid + Nothing -> + (liftIO $ startWorker ctx before coworker wid) >>= \case + Nothing -> throwE ErrCreateWalletFailedToCreateWorker + Just _ -> pure wid + where + before ctx' _ = void $ unsafeRunExceptT $ createWallet ctx' + re = ctx ^. workerRegistry @s @k + +-- | Something to pass as the coworker action to 'newApiLayer', which does +-- nothing, and never exits. +idleWorker :: ctx -> wid -> IO () +idleWorker _ _ = forever $ threadDelay maxBound + +startWorker + :: forall ctx s k. + ( ctx ~ ApiLayer s k + , IsOurs s RewardAccount + , IsOurs s Address + ) + => ctx + -> (WorkerCtx ctx -> WalletId -> IO ()) + -> (WorkerCtx ctx -> WalletId -> IO ()) + -> WalletId + -> IO (Maybe ctx) +startWorker ctx before coworker wid = + fmap (const ctx) <$> Registry.register @_ @ctx re ctx wid config + where + re = ctx ^. workerRegistry @s @k df = ctx ^. dbFactory config = MkWorker - { workerBefore = \ctx' _ -> - runExceptT (W.checkWalletIntegrity ctx' wid gp) - >>= either throwIO pure + { workerBefore = before , workerMain = \ctx' _ -> do - -- FIXME: - -- Review error handling here + -- FIXME: ADP-641 Review error handling here race_ - (unsafeRunExceptT $ W.restoreWallet @(WorkerCtx ctx) @s ctx' wid) - (coworker ctx' wid) + (unsafeRunExceptT $ W.restoreWallet ctx' wid) + (race_ + (W.runLocalTxSubmissionPool ctx' wid) + (coworker ctx' wid)) , workerAfter = defaultWorkerAfter diff --git a/lib/core/src/Cardano/Wallet/DB.hs b/lib/core/src/Cardano/Wallet/DB.hs index c1b32e0652d..4fecbed0e2d 100644 --- a/lib/core/src/Cardano/Wallet/DB.hs +++ b/lib/core/src/Cardano/Wallet/DB.hs @@ -55,7 +55,13 @@ import Cardano.Wallet.Primitive.Types.Coin import Cardano.Wallet.Primitive.Types.Hash ( Hash ) import Cardano.Wallet.Primitive.Types.Tx - ( SealedTx, TransactionInfo, Tx (..), TxMeta, TxStatus ) + ( LocalTxSubmissionStatus + , SealedTx + , TransactionInfo + , Tx (..) + , TxMeta + , TxStatus + ) import Control.Monad.IO.Class ( MonadIO ) import Control.Monad.Trans.Except @@ -243,17 +249,20 @@ data DBLayer m s k = forall stm. (MonadIO stm, MonadFail stm) => DBLayer -- If the wallet doesn't exist, this operation returns an error. , putLocalTxSubmission - :: SlotNo + :: PrimaryKey WalletId -> Hash "Tx" -> SealedTx + -> SlotNo -> stm () - -- ^ Add or update a transaction in the local submission pool. + -- ^ Add or update a transaction in the local submission pool with the + -- most recent submission slot. , getLocalTxSubmissionPending - :: SlotNo - -> stm [(Hash "Tx", SealedTx)] + :: PrimaryKey WalletId + -> stm [LocalTxSubmissionStatus SealedTx] -- ^ List all transactions from the local submission pool which are - -- still pending as of the given slot. + -- still pending as of the latest checkpoint. The slot numbers for first + -- submission and most recent submission are included. , updatePendingTxForExpiry :: PrimaryKey WalletId @@ -343,7 +352,7 @@ newtype ErrWalletAlreadyExists -- functions like 'enqueueCheckpoint' needs to be associated to a corresponding -- wallet. Some other may not because they are information valid for all wallets -- (like for instance, the last known network tip). -newtype PrimaryKey key = PrimaryKey key +newtype PrimaryKey key = PrimaryKey { unPrimaryKey :: key } deriving (Show, Eq, Ord) -- | Clean a database by removing all wallets. diff --git a/lib/core/src/Cardano/Wallet/DB/MVar.hs b/lib/core/src/Cardano/Wallet/DB/MVar.hs index e5f34ab37ce..9f611b6b650 100644 --- a/lib/core/src/Cardano/Wallet/DB/MVar.hs +++ b/lib/core/src/Cardano/Wallet/DB/MVar.hs @@ -33,6 +33,7 @@ import Cardano.Wallet.DB.Model , ModelOp , emptyDatabase , mCheckWallet + , mGetLocalTxSubmissionPending , mInitializeWallet , mIsStakeKeyRegistered , mListCheckpoints @@ -40,6 +41,7 @@ import Cardano.Wallet.DB.Model , mPutCheckpoint , mPutDelegationCertificate , mPutDelegationRewardBalance + , mPutLocalTxSubmission , mPutPrivateKey , mPutTxHistory , mPutWalletMeta @@ -66,6 +68,8 @@ import Cardano.Wallet.Primitive.Types.Tx ( TransactionInfo (..) ) import Control.DeepSeq ( NFData, deepseq ) +import Control.Monad + ( void ) import Control.Monad.Trans.Except ( ExceptT (..) ) import Data.Functor.Identity @@ -177,9 +181,13 @@ newDBLayer timeInterpreter = do Pending Tx -----------------------------------------------------------------------} - , putLocalTxSubmission = \_slot _txId _tx -> pure () -- fixme: ADP-764 + , putLocalTxSubmission = \pk txid tx sl -> + void $ alterDB (const (Just ())) db $ + mPutLocalTxSubmission pk txid tx sl + + , getLocalTxSubmissionPending = + readDB db . mGetLocalTxSubmissionPending - , getLocalTxSubmissionPending = \_slot -> pure [] -- fixme: ADP-764 , updatePendingTxForExpiry = \pk tip -> ExceptT $ do alterDB errNoSuchWallet db (mUpdatePendingTxForExpiry pk tip) diff --git a/lib/core/src/Cardano/Wallet/DB/Model.hs b/lib/core/src/Cardano/Wallet/DB/Model.hs index 78fe20e8f1f..bc8e2e59e33 100644 --- a/lib/core/src/Cardano/Wallet/DB/Model.hs +++ b/lib/core/src/Cardano/Wallet/DB/Model.hs @@ -53,6 +53,8 @@ module Cardano.Wallet.DB.Model , mIsStakeKeyRegistered , mPutTxHistory , mReadTxHistory + , mPutLocalTxSubmission + , mGetLocalTxSubmissionPending , mUpdatePendingTxForExpiry , mRemovePendingOrExpiredTx , mPutPrivateKey @@ -92,6 +94,8 @@ import Cardano.Wallet.Primitive.Types.Hash ( Hash (..) ) import Cardano.Wallet.Primitive.Types.Tx ( Direction (..) + , LocalTxSubmissionStatus (..) + , SealedTx , TransactionInfo (..) , Tx (..) , TxMeta (..) @@ -108,7 +112,7 @@ import Data.Function import Data.Functor.Identity ( Identity (..) ) import Data.Generics.Internal.VL.Lens - ( (^.) ) + ( view, (^.) ) import Data.List ( sort, sortOn ) import Data.Map.Strict @@ -156,6 +160,7 @@ data WalletDatabase s xprv = WalletDatabase , xprv :: !(Maybe xprv) , genesisParameters :: !GenesisParameters , rewardAccountBalance :: !Coin + , submittedTxs :: !(Map (Hash "Tx") (SealedTx, SlotNo)) } deriving (Show, Eq, Generic) -- | Shorthand for the putTxHistory argument type. @@ -216,9 +221,10 @@ mInitializeWallet wid cp meta txs0 gp db@Database{wallets,txs} , xprv = Nothing , genesisParameters = gp , rewardAccountBalance = minBound + , submittedTxs = mempty } - txs' = Map.fromList $ (\(tx, _) -> (txId tx, tx)) <$> txs0 - history = Map.fromList $ first txId <$> txs0 + txs' = Map.fromList $ (\(tx, _) -> (view #txId tx, tx)) <$> txs0 + history = Map.fromList $ first (view #txId) <$> txs0 in (Right (), Database (Map.insert wid wal wallets) (txs <> txs')) @@ -259,6 +265,7 @@ mListCheckpoints wid db@(Database wallets _) = mUpdatePendingTxForExpiry :: Ord wid => wid -> SlotNo -> ModelOp wid s xprv () mUpdatePendingTxForExpiry wid currentTip = alterModel wid $ \wal -> + -- fixme: update submitted txs ((), wal { txHistory = setExpired <$> txHistory wal }) where setExpired :: TxMeta -> TxMeta @@ -270,6 +277,7 @@ mUpdatePendingTxForExpiry wid currentTip = alterModel wid $ \wal -> mRemovePendingOrExpiredTx :: Ord wid => wid -> Hash "Tx" -> ModelOp wid s xprv () mRemovePendingOrExpiredTx wid tid = alterModelErr wid $ \wal -> + -- fixme: update submitted txs case Map.lookup tid (txHistory wal) of Nothing -> ( Left (NoSuchTx wid tid), wal ) @@ -407,8 +415,8 @@ mPutTxHistory wid txList db@Database{wallets,txs} = ) where wal' = wal { txHistory = txHistory wal <> txHistory' } - txHistory' = Map.fromList $ first txId <$> txList - txs' = Map.fromList $ (\(tx, _) -> (txId tx, tx)) <$> txList + txHistory' = Map.fromList $ first (view #txId) <$> txList + txs' = Map.fromList $ (\(tx, _) -> (view #txId tx, tx)) <$> txList Nothing -> (Left (NoSuchWallet wid), db) mReadTxHistory @@ -442,7 +450,7 @@ mReadTxHistory ti wid minWithdrawal order range mstatus db@(Database wallets txs mkTransactionInfo cp (tx, meta) = TransactionInfo { txInfoId = - txId tx + view #txId tx , txInfoFee = fee tx , txInfoInputs = @@ -492,6 +500,30 @@ mReadDelegationRewardBalance mReadDelegationRewardBalance wid db@(Database wallets _) = (Right (maybe minBound rewardAccountBalance $ Map.lookup wid wallets), db) +mPutLocalTxSubmission :: Ord wid => wid -> Hash "Tx" -> SealedTx -> SlotNo -> ModelOp wid s xprv () +mPutLocalTxSubmission wid txid tx sl = alterModel wid $ \wal -> + ((), wal { submittedTxs = Map.insertWith upsert txid (tx, sl) (submittedTxs wal) }) + where + upsert (origTx, _) (_, newSl) = (origTx, newSl) + +mGetLocalTxSubmissionPending + :: Ord wid => wid -> ModelOp wid s xprv [LocalTxSubmissionStatus SealedTx] +mGetLocalTxSubmissionPending wid db@(Database wallets _) = + (Right (mapMaybe getSubmission pendings), db) + where + wal = Map.lookup wid wallets + + pendings = mapMaybe getPending $ Map.toList $ maybe mempty txHistory wal + getPending :: (Hash "Tx", TxMeta) -> Maybe (Hash "Tx", SlotNo) + getPending (txid, txMeta@TxMeta{status,slotNo}) + | status == Pending = Just (txid, slotNo) + | otherwise = Nothing + + submitted = maybe mempty submittedTxs wal + getSubmission (tid, sl0) = case Map.lookup tid submitted of + Just (tx, sl1) -> Just (LocalTxSubmissionStatus tid tx sl0 sl1) + Nothing -> Nothing + {------------------------------------------------------------------------------- Model function helpers -------------------------------------------------------------------------------} @@ -538,7 +570,7 @@ filterTxHistory minWithdrawal order range = . sortByTxId where sortBySlot = sortOn (Down . (slotNo :: TxMeta -> SlotNo) . snd) - sortByTxId = sortOn (txId . fst) + sortByTxId = sortOn (view #txId . fst) atLeast inf = not . Map.null . Map.filter (>= inf) filterWithdrawals = maybe (const True) diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs index d6743be39a0..1eb6988c5c9 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite.hs @@ -92,6 +92,7 @@ import Cardano.Wallet.DB.Sqlite.TH , DelegationReward (..) , EntityField (..) , Key (..) + , LocalTxSubmission (..) , PrivateKey (..) , RndState (..) , RndStateAddress (..) @@ -190,6 +191,7 @@ import Database.Persist.Sql ( Entity (..) , Filter , SelectOpt (..) + , Single (..) , Update (..) , deleteCascadeWhere , deleteWhere @@ -197,12 +199,14 @@ import Database.Persist.Sql , insertMany_ , insert_ , rawExecute + , rawSql , repsert , repsertMany , selectFirst , selectKeysList , selectList , updateWhere + , upsert , (/<-.) , (<-.) , (<.) @@ -1497,9 +1501,14 @@ newDBLayerWith cacheBehavior tr ti SqliteContext{runQuery} = do , (TxMetaStatus ==.) <$> status ] - , putLocalTxSubmission = \_slot _txId _tx -> pure () -- fixme: ADP-764 + , putLocalTxSubmission = \(PrimaryKey wid) txid tx sl -> + let record = LocalTxSubmission (TxId txid) wid sl tx + in void $ upsert record [ LocalTxSubmissionLastSlot =. sl ] - , getLocalTxSubmissionPending = \_slot -> pure [] -- fixme: ADP-764 + , getLocalTxSubmissionPending = + fmap (map localTxSubmissionFromEntity) + . listPendingLocalTxSubmissionQuery + . unPrimaryKey , updatePendingTxForExpiry = \(PrimaryKey wid) tip -> ExceptT $ do selectWallet wid >>= \case @@ -1778,7 +1787,7 @@ mkTxHistory wid txs = flatTxHistory , mkTxWithdrawals (txid, tx) ) | (tx, derived) <- txs - , let txid = W.txId tx + , let txid = view #txId tx ] where -- | Make flat lists of entities from the result of 'mkTxHistory'. @@ -2246,6 +2255,28 @@ deletePendingOrExpiredTx wid tid = do Nothing -> fromIntegral <$> deleteWhereCount ((TxMetaStatus <-. [W.Pending, W.Expired]):filt) +-- | Returns the initial submission slot and submission record for all pending +-- transactions in the wallet. +listPendingLocalTxSubmissionQuery + :: W.WalletId + -> SqlPersistT IO [(W.SlotNo, LocalTxSubmission)] +listPendingLocalTxSubmissionQuery wid = fmap unRaw <$> rawSql query params + where + query = + "SELECT tx_meta.slot,?? " <> + "FROM tx_meta INNER JOIN local_tx_submission " <> + "ON tx_meta.wallet_id=local_tx_submission.wallet_id " <> + " AND tx_meta.tx_id=local_tx_submission.tx_id " <> + "WHERE tx_meta.wallet_id=? AND tx_meta.status=?" + params = [toPersistValue wid, toPersistValue W.Pending] + unRaw (Single sl, Entity _ tx) = (sl, tx) + +localTxSubmissionFromEntity + :: (W.SlotNo, LocalTxSubmission) + -> W.LocalTxSubmissionStatus W.SealedTx +localTxSubmissionFromEntity (sl0, LocalTxSubmission (TxId txid) _ sl tx) = + W.LocalTxSubmissionStatus txid tx sl0 sl + deleteLocalTxSubmission :: W.WalletId -> W.Hash "Tx" diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs index cadf6816e78..6dc1cd85b96 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite/TH.hs @@ -174,10 +174,12 @@ TxWithdrawal -- rolled back. LocalTxSubmission localTxSubmissionTxId TxId sql=tx_id - localTxSubmissionTx W.SealedTx sql=tx + localTxSubmissionWalletId W.WalletId sql=wallet_id localTxSubmissionLastSlot SlotNo sql=last_slot + localTxSubmissionTx W.SealedTx sql=tx - Primary localTxSubmissionTxId + UniqueLocalTxSubmission localTxSubmissionTxId localTxSubmissionWalletId + Primary localTxSubmissionTxId localTxSubmissionWalletId deriving Show Generic -- A checkpoint for a given wallet is referred to by (wallet_id, slot). diff --git a/lib/core/src/Cardano/Wallet/DB/Sqlite/Types.hs b/lib/core/src/Cardano/Wallet/DB/Sqlite/Types.hs index b943842067b..4767810edc4 100644 --- a/lib/core/src/Cardano/Wallet/DB/Sqlite/Types.hs +++ b/lib/core/src/Cardano/Wallet/DB/Sqlite/Types.hs @@ -242,7 +242,8 @@ instance PathPiece WalletId where ---------------------------------------------------------------------------- -- TxId --- Wraps Hash "Tx" because the persistent dsl doesn't like (Hash "Tx") +-- | Wraps 'Hash "Tx"' because the persistent entity syntax doesn't seem to +-- support parameterized types. newtype TxId = TxId { getTxId :: Hash "Tx" } deriving (Show, Eq, Ord, Generic) instance PersistField TxId where diff --git a/lib/core/src/Cardano/Wallet/Primitive/Types/Tx.hs b/lib/core/src/Cardano/Wallet/Primitive/Types/Tx.hs index bac302f8026..cbb21165221 100644 --- a/lib/core/src/Cardano/Wallet/Primitive/Types/Tx.hs +++ b/lib/core/src/Cardano/Wallet/Primitive/Types/Tx.hs @@ -28,6 +28,7 @@ module Cardano.Wallet.Primitive.Types.Tx , UnsignedTx (..) , TransactionInfo (..) , Direction (..) + , LocalTxSubmissionStatus (..) , TokenBundleSizeAssessor (..) , TokenBundleSizeAssessment (..) @@ -438,6 +439,17 @@ toTxHistory :: TransactionInfo -> (Tx, TxMeta) toTxHistory info = (fromTransactionInfo info, txInfoMeta info) +-- | Information about when a transaction was submitted to the local node. +-- This is used for scheduling resubmissions. +data LocalTxSubmissionStatus tx = LocalTxSubmissionStatus + { txId :: !(Hash "Tx") + , submittedTx :: !tx + , firstSubmission :: !SlotNo + -- ^ Time of first successful submission to the local node. + , latestSubmission :: !SlotNo + -- ^ Time of most recent resubmission attempt. + } deriving (Generic, Show, Eq) + -- | A function capable of assessing the size of a token bundle relative to the -- upper limit of what can be included in a single transaction output. -- diff --git a/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs b/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs index 394cedeab7f..3141bd765b1 100644 --- a/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs +++ b/lib/core/test/unit/Cardano/Wallet/DB/StateMachine.hs @@ -139,6 +139,7 @@ import Cardano.Wallet.Primitive.Types.TokenQuantity ( TokenQuantity ) import Cardano.Wallet.Primitive.Types.Tx ( Direction (..) + , SealedTx , TransactionInfo (..) , Tx (..) , TxIn (..) @@ -929,6 +930,9 @@ instance ToExpr Address where instance ToExpr TxMeta where toExpr = genericToExpr +instance ToExpr SealedTx where + toExpr = genericToExpr + instance ToExpr Percentage where toExpr = genericToExpr