Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADP-634] Run SMASH metadata fetching in batches of 15 concurrently #2432

Merged
merged 3 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions lib/core/src/Cardano/Pool/Metadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ module Cardano.Pool.Metadata
, HealthStatusSMASH (..)

-- * Construct URLs
, UrlBuilder
, identityUrlBuilder
, registryUrlBuilder

-- * re-exports
, Manager
, newManager
, defaultManagerSettings

Expand Down Expand Up @@ -164,12 +166,16 @@ defaultManagerSettings =
newManager :: MonadIO m => ManagerSettings -> m Manager
newManager = HTTPS.newTlsManagerWith

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: PoolId
-- | A type-alias to ease signatures
type UrlBuilder
= PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: UrlBuilder
identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
maybe (Left e) Right $ parseURI (T.unpack url)
where
Expand All @@ -178,10 +184,7 @@ identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
-- | Build a URL from a metadata hash compatible with an aggregation registry
registryUrlBuilder
:: URI
-> PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
-> UrlBuilder
registryUrlBuilder baseUrl pid _ hash =
Right $ baseUrl
{ uriPath = "/" <> metadaFetchEp pid hash
Expand Down Expand Up @@ -278,11 +281,7 @@ fetchDelistedPools tr uri manager = runExceptTLog $ do
-- TODO: refactor/simplify this
fetchFromRemote
:: Tracer IO StakePoolMetadataFetchLog
-> [ PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
]
-> [UrlBuilder]
-> Manager
-> PoolId
-> StakePoolMetadataUrl
Expand Down
1 change: 1 addition & 0 deletions lib/shelley/src/Cardano/Wallet/Shelley/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ newRewardBalanceFetcher tr readNodeTip queryRewardQ = do
:: Tip (CardanoBlock StandardCrypto)
-> Set W.RewardAccount
-> IO (Maybe (Map W.RewardAccount W.Coin))
fetch _tip accounts | Set.null accounts = pure (Just mempty)
fetch _tip accounts = do
-- NOTE: We no longer need the tip to run LSQ queries. The local state
-- query client will automatically acquire the latest tip.
Expand Down
76 changes: 51 additions & 25 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import Cardano.BM.Data.Tracer
import Cardano.Pool.DB
( DBLayer (..), ErrPointAlreadyExists (..), readPoolLifeCycleStatus )
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog
( Manager
, StakePoolMetadataFetchLog
, UrlBuilder
, defaultManagerSettings
, fetchDelistedPools
, fetchFromRemote
Expand Down Expand Up @@ -112,7 +114,7 @@ import Cardano.Wallet.Unsafe
import Control.Exception.Base
( AsyncException (..), asyncExceptionFromException )
import Control.Monad
( forM, forM_, forever, void, when )
( forM_, forever, void, when )
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Trans.Except
Expand Down Expand Up @@ -157,6 +159,8 @@ import Fmt
( fixedF, pretty )
import GHC.Generics
( Generic )
import Numeric.Natural
( Natural )
import Ouroboros.Consensus.Cardano.Block
( CardanoBlock, HardForkBlock (..) )
import System.Random
Expand All @@ -168,7 +172,14 @@ import UnliftIO.Exception
import UnliftIO.IORef
( IORef, newIORef, readIORef, writeIORef )
import UnliftIO.STM
( TVar, readTVarIO, writeTVar )
( TBQueue
, TVar
, newTBQueue
, readTBQueue
, readTVarIO
, writeTBQueue
, writeTVar
)

import qualified Cardano.Wallet.Api.Types as Api
import qualified Data.List as L
Expand Down Expand Up @@ -723,43 +734,58 @@ monitorMetadata gcStatus tr sp db@(DBLayer{..}) = do
_ -> pure NoSmashConfigured

if | health == Available || health == NoSmashConfigured -> do
let fetcher fetchStrategies = fetchFromRemote trFetch fetchStrategies manager
loop getPoolMetadata = forever $ do
(refs, successes) <- getPoolMetadata
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

case poolMetadataSource settings of
FetchNone -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (pure ([], [])) -- TODO: exit loop?

FetchDirect -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (fetchThem $ fetcher [identityUrlBuilder])
fetchMetadata manager [identityUrlBuilder]

FetchSMASH (unSmashServer -> uri) -> do
STM.atomically $ writeTVar gcStatus NotStarted
let getDelistedPools =
fetchDelistedPools trFetch uri manager
tid <- forkFinally
(gcDelistedPools gcStatus tr db getDelistedPools)
onExit
flip finally (killThread tid) $
loop (fetchThem $ fetcher [registryUrlBuilder uri])
| otherwise -> traceWith tr MsgSMASHUnreachable
fetchMetadata manager [registryUrlBuilder uri]
`finally` killThread tid

| otherwise ->
traceWith tr MsgSMASHUnreachable
where
trFetch = contramap MsgFetchPoolMetadata tr
fetchThem fetchMetadata = do
refs <- atomically (unfetchedPoolMetadataRefs 100)
successes <- fmap catMaybes $ forM refs $ \(pid, url, hash) -> do
fetchMetadata pid url hash >>= \case
Nothing -> Nothing <$ do
atomically $ putFetchAttempt (url, hash)

Just meta -> Just hash <$ do
atomically $ putPoolMetadata hash meta
pure (refs, successes)

fetchMetadata
:: Manager
-> [UrlBuilder]
-> IO ()
fetchMetadata manager strategies = do
inFlights <- STM.atomically $ newTBQueue maxInFlight
forever $ do
refs <- atomically (unfetchedPoolMetadataRefs $ fromIntegral maxInFlight)
forM_ refs $ \(pid, url, hash) -> withAvailableSeat inFlights $ do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty neat pattern. Unrelatedly, I'm wondering if this is achievable with streamly or conduit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thought of using a conduit here, to stream things directly from the database and then, process them at the right path. Yet, that's much more work on the db side to do, and persistent API is quite unsatisfactory here (we use a custom raw SQL query behind the scene, so most of the persistent API is just unusable). Plus, we would need a "refreshable" stream, because after requests are processed, the set of metadata refs to fetch do actually change.

fetchFromRemote trFetch strategies manager pid url hash >>= \case
Nothing ->
atomically $ putFetchAttempt (url, hash)
Just meta -> do
KtorZ marked this conversation as resolved.
Show resolved Hide resolved
atomically $ putPoolMetadata hash meta
when (null refs) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency
where
maxInFlight :: Natural
maxInFlight = 20
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't matter for merging the PR, but did someone check other values? I'm not questioning that 20 is a reasonable default, but maybe there's more room to optimize? At which point will performance degrade or SMASH give up on us?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried: 10, 20 and 50. There seem to be almost no difference and they all oscillate between 40s and 60s for a entire refresh 🤷‍♂️ .. I went for 10 in the end.


-- | Run an action asyncronously only when there's an available seat.
-- Seats are materialized by a bounded queue. If the queue is full,
-- then there's no seat.
withAvailableSeat :: TBQueue () -> IO a -> IO ()
withAvailableSeat q action = do
STM.atomically $ writeTBQueue q ()
void $ action `forkFinally` const (STM.atomically $ readTBQueue q)

-- NOTE
-- If there's no metadata, we typically need not to retry sooner than the
-- next block. So waiting for a delay that is roughly the same order of
Expand Down