Skip to content

Commit

Permalink
Add retries to local state query caching
Browse files Browse the repository at this point in the history
Also use the new `Control.Cache` module.
  • Loading branch information
HeinrichApfelmus committed Aug 13, 2021
1 parent 6a9ff72 commit 2667eb6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 57 deletions.
27 changes: 23 additions & 4 deletions lib/shelley/src/Cardano/Wallet/Shelley.hs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ import Cardano.Wallet.Api.Types
import Cardano.Wallet.DB.Sqlite
( DBFactoryLog, DefaultFieldValues (..), PersistState )
import Cardano.Wallet.Logging
( trMessageText )
( trMessageText, LoggedException (..) )
import Cardano.Wallet.Network
( FollowLog (..), NetworkLayer (..) )
import Cardano.Wallet.Primitive.AddressDerivation
Expand Down Expand Up @@ -138,7 +138,6 @@ import Cardano.Wallet.Shelley.Pools
, monitorMetadata
, monitorStakePools
, newStakePoolLayer
, runCacheWorker
)
import Cardano.Wallet.Shelley.Transaction
( newTransactionLayer )
Expand All @@ -148,6 +147,8 @@ import Cardano.Wallet.Transaction
( TransactionLayer )
import Control.Applicative
( Const (..) )
import Control.Cache
( CacheWorker (..), newCacheWorker, don'tCacheWorker, NominalDiffTime )
import Control.Monad
( forM_, void )
import Control.Tracer
Expand Down Expand Up @@ -192,6 +193,7 @@ import UnliftIO.STM
import qualified Cardano.Pool.DB.Sqlite as Pool
import qualified Cardano.Wallet.Api.Server as Server
import qualified Cardano.Wallet.DB.Sqlite as Sqlite
import qualified Control.Retry as Retry
import qualified Data.Text as T
import qualified Network.Wai.Handler.Warp as Warp

Expand Down Expand Up @@ -360,6 +362,7 @@ serveWallet
void $ forkFinally (monitorStakePools tr np nl db)
(traceAfterThread (contramap (MsgFollowLog . MsgExitMonitoring) poolsEngineTracer))

-- set up thread for querying and caching stake pool metadata
-- fixme: needs to be simplified as part of ADP-634
let startMetadataThread = forkIOWithUnmask $ \unmask ->
unmask $ monitorMetadata gcStatus tr sp db
Expand All @@ -368,10 +371,26 @@ serveWallet
killThread tid
startMetadataThread

(worker, spl) <- newStakePoolLayer gcStatus nl db restartMetadataThread
-- Set up caching for local state query (LSQ) of StakePoolSummary
-- TODO later:
-- I suppose that handling retries and logging exceptions
-- that occur while retrying may be a job for Cardano.Wallet.Registry
let policy = Retry.exponentialBackoff 30_000_000 <> Retry.limitRetries 3
traceEx = Retry.logRetries (\_ -> pure True) $ \_ e _ ->
traceWith tr $ MsgFollowLog $ MsgLocalStateQueryException $ LoggedException e
withRetries :: forall a. IO a -> IO a
withRetries maction = Retry.recovering policy
(Retry.skipAsyncExceptions ++ [traceEx]) (\_ -> maction)
mkCacheWorker
:: forall a. NominalDiffTime -> NominalDiffTime
-> IO a -> IO (CacheWorker, IO a)
mkCacheWorker t1 t2 = newCacheWorker t1 t2 . withRetries

(worker, spl) <-
newStakePoolLayer gcStatus nl db mkCacheWorker restartMetadataThread

void $ forkFinally (runCacheWorker worker)
(traceAfterThread (contramap (MsgFollowLog . MsgExitRewardCaching) poolsEngineTracer))
(traceAfterThread (contramap (MsgFollowLog . MsgExitLocalStateQueryCaching) tr))

action spl

Expand Down
73 changes: 20 additions & 53 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
module Cardano.Wallet.Shelley.Pools
( StakePoolLayer (..)
, newStakePoolLayer
, CacheWorker (..)
, monitorStakePools
, monitorMetadata

Expand Down Expand Up @@ -58,6 +57,8 @@ import Cardano.Wallet.Api.Types
( ApiT (..), HealthCheckSMASH (..), toApiEpochInfo )
import Cardano.Wallet.Byron.Compatibility
( toByronBlockHeader )
import Cardano.Wallet.Logging
( LoggedException (..) )
import Cardano.Wallet.Network
( FollowAction (..)
, FollowExceptionRecovery (..)
Expand Down Expand Up @@ -114,6 +115,8 @@ import Cardano.Wallet.Shelley.Compatibility
)
import Cardano.Wallet.Unsafe
( unsafeMkPercentage )
import Control.Cache
( CacheWorker )
import Control.Monad
( forM, forM_, forever, void, when )
import Control.Monad.IO.Class
Expand Down Expand Up @@ -164,23 +167,18 @@ import Ouroboros.Consensus.Cardano.Block
( CardanoBlock, HardForkBlock (..) )
import System.Random
( RandomGen, randoms )
import UnliftIO
( MonadIO )
import UnliftIO.Concurrent
( forkFinally, killThread, threadDelay )
import UnliftIO.Exception
( finally )
( SomeException, finally )
import UnliftIO.IORef
( IORef, newIORef, readIORef, writeIORef )
import UnliftIO.STM
( TBQueue
, TVar
, newTBQueue
, newTVarIO
, readTBQueue
, readTVar
, readTVarIO
, retrySTM
, writeTBQueue
, writeTVar
)
Expand Down Expand Up @@ -227,14 +225,17 @@ data StakePoolLayer = StakePoolLayer
}

newStakePoolLayer
:: forall sc. ()
:: forall crypto. ()
=> TVar PoolMetadataGCStatus
-> NetworkLayer IO (CardanoBlock sc)
-> NetworkLayer IO (CardanoBlock crypto)
-> DBLayer IO
-> (forall a. NominalDiffTime -> NominalDiffTime -> IO a -> IO (CacheWorker, IO a))
-> IO ()
-> IO (CacheWorker, StakePoolLayer)
newStakePoolLayer gcStatus nl db@DBLayer {..} restartSyncThread = do
(worker, _stakeDistribution) <- nooooCacheWorker hour (stakeDistribution nl)
newStakePoolLayer gcStatus nl db@DBLayer {..} mkCacheWorker restartSyncThread = do
let ttl = 60 * 60 :: NominalDiffTime -- one hour
grace = 3 :: NominalDiffTime
(worker, _stakeDistribution) <- mkCacheWorker ttl grace (stakeDistribution nl)
pure (worker, StakePoolLayer
{ getPoolLifeCycleStatus = _getPoolLifeCycleStatus
, knownPools = _knownPools
Expand Down Expand Up @@ -319,44 +320,6 @@ sortRandomOn seed f
. L.sortOn (\(nonce,a) -> (f a, nonce))
. zip (randoms seed :: [Int])

-- | A worker (an action of type @IO ()@) that
-- runs a function periodically and caches the result.
newtype CacheWorker = CacheWorker { runCacheWorker :: IO () }

-- | For testing: Don't actually cache anything.
nooooCacheWorker :: NominalDiffTime -> IO a -> IO (CacheWorker, IO a)
nooooCacheWorker _ action = pure (CacheWorker $ pure (), action)

-- | Create a new cache and a worker to fill it.
newCacheWorker
:: NominalDiffTime -- ^ cache time to live (TTL)
-> IO a -- ^ action whose result we want to cache
-> IO (CacheWorker, IO a)
newCacheWorker ttl action = do
cache <- newTVarIO Nothing
let worker :: IO ()
worker = forever $ do
threadWait 3 -- 3 seconds grace period
a <- action
STM.atomically $ writeCache cache a
threadWait $ max 0 (ttl-3)
return (CacheWorker worker, STM.atomically $ readCache cache)
-- TODO: make cache worker more intelligent
where
readCache v = maybe retrySTM pure =<< readTVar v
writeCache v = writeTVar v . Just

-- | Variant of 'threadDelay' where the argument has type 'NominalDiffTime'.
--
-- The resolution for delaying threads is microseconds.
threadWait :: MonadIO m => NominalDiffTime -> m ()
threadWait s = threadDelay $ round (s / microsecond)
where microsecond = 1e-6 :: NominalDiffTime

-- | Number of seconds contained in one hour.
hour :: NominalDiffTime
hour = 60*60

{-------------------------------------------------------------------------------
Rewards and scoring
Expand Down Expand Up @@ -938,7 +901,8 @@ gcDelistedPools gcStatus tr DBLayer{..} fetchDelisted = forever $ do
-------------------------------------------------------------------------------}
data StakePoolLog
= MsgExitMonitoring AfterThreadLog
| MsgExitRewardCaching AfterThreadLog
| MsgExitLocalStateQueryCaching AfterThreadLog
| MsgLocalStateQueryException (LoggedException SomeException)
| MsgStakePoolGarbageCollection PoolGarbageCollectionInfo
| MsgStakePoolRegistration PoolRegistrationCertificate
| MsgStakePoolRetirement PoolRetirementCertificate
Expand All @@ -965,7 +929,8 @@ instance HasPrivacyAnnotation StakePoolLog
instance HasSeverityAnnotation StakePoolLog where
getSeverityAnnotation = \case
MsgExitMonitoring msg -> getSeverityAnnotation msg
MsgExitRewardCaching msg -> getSeverityAnnotation msg
MsgExitLocalStateQueryCaching msg -> getSeverityAnnotation msg
MsgLocalStateQueryException{} -> Warning
MsgStakePoolGarbageCollection{} -> Debug
MsgStakePoolRegistration{} -> Debug
MsgStakePoolRetirement{} -> Debug
Expand All @@ -980,8 +945,10 @@ instance ToText StakePoolLog where
toText = \case
MsgExitMonitoring msg ->
"Stake pool monitor exit: " <> toText msg
MsgExitRewardCaching msg ->
"Stake pool reward cache exit: " <> toText msg
MsgExitLocalStateQueryCaching msg ->
"Local state query cache exit: " <> toText msg
MsgLocalStateQueryException e ->
"Local state query exception: " <> toText e
MsgStakePoolGarbageCollection info -> mconcat
[ "Performing garbage collection of retired stake pools. "
, "Currently in epoch "
Expand Down

0 comments on commit 2667eb6

Please sign in to comment.