Skip to content

Commit

Permalink
Merge #2432
Browse files Browse the repository at this point in the history
2432: [ADP-634] Run SMASH metadata fetching in batches of 15 concurrently r=rvl a=hasufell

# Issue Number

ADP-634

# Overview

All requests already share the same [Manager](https://hackage.haskell.org/package/http-client-0.7.3/docs/Network-HTTP-Client.html#t:Manager). I'm not sure if that implicitly leads to proper HTTP pipelining when used with `forConcurrently`, but it seems to work: the sync time on testnet goes from 3 minutes down to ~30 seconds.

# Comments

## Open questions

1. do we also want to add a way for daedalus to check the sync-status? If so, how to define it? Syncing is continuous.
2. is this safe? We don't want to DoS SMASH
3. do we want to make the batch size a settings? We already have a database table for settings...

Co-authored-by: KtorZ <[email protected]>
  • Loading branch information
iohk-bors[bot] and KtorZ authored Feb 16, 2021
2 parents a37c985 + 7ea46a4 commit 5ff6c51
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 36 deletions.
23 changes: 11 additions & 12 deletions lib/core/src/Cardano/Pool/Metadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ module Cardano.Pool.Metadata
, HealthStatusSMASH (..)

-- * Construct URLs
, UrlBuilder
, identityUrlBuilder
, registryUrlBuilder

-- * re-exports
, Manager
, newManager
, defaultManagerSettings

Expand Down Expand Up @@ -164,12 +166,16 @@ defaultManagerSettings =
newManager :: MonadIO m => ManagerSettings -> m Manager
newManager = HTTPS.newTlsManagerWith

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: PoolId
-- | A type-alias to ease signatures
type UrlBuilder
= PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI

-- | Simply return a pool metadata url, unchanged
identityUrlBuilder
:: UrlBuilder
identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
maybe (Left e) Right $ parseURI (T.unpack url)
where
Expand All @@ -178,10 +184,7 @@ identityUrlBuilder _ (StakePoolMetadataUrl url) _ =
-- | Build a URL from a metadata hash compatible with an aggregation registry
registryUrlBuilder
:: URI
-> PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
-> UrlBuilder
registryUrlBuilder baseUrl pid _ hash =
Right $ baseUrl
{ uriPath = "/" <> metadaFetchEp pid hash
Expand Down Expand Up @@ -278,11 +281,7 @@ fetchDelistedPools tr uri manager = runExceptTLog $ do
-- TODO: refactor/simplify this
fetchFromRemote
:: Tracer IO StakePoolMetadataFetchLog
-> [ PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> Either HttpException URI
]
-> [UrlBuilder]
-> Manager
-> PoolId
-> StakePoolMetadataUrl
Expand Down
1 change: 1 addition & 0 deletions lib/shelley/src/Cardano/Wallet/Shelley/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ newRewardBalanceFetcher tr readNodeTip queryRewardQ = do
:: Tip (CardanoBlock StandardCrypto)
-> Set W.RewardAccount
-> IO (Maybe (Map W.RewardAccount W.Coin))
fetch _tip accounts | Set.null accounts = pure (Just mempty)
fetch _tip accounts = do
-- NOTE: We no longer need the tip to run LSQ queries. The local state
-- query client will automatically acquire the latest tip.
Expand Down
82 changes: 58 additions & 24 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import Cardano.BM.Data.Tracer
import Cardano.Pool.DB
( DBLayer (..), ErrPointAlreadyExists (..), readPoolLifeCycleStatus )
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog
( Manager
, StakePoolMetadataFetchLog
, UrlBuilder
, defaultManagerSettings
, fetchDelistedPools
, fetchFromRemote
Expand Down Expand Up @@ -129,6 +131,8 @@ import Data.Function
( (&) )
import Data.Generics.Internal.VL.Lens
( view )
import Data.List
( nub, (\\) )
import Data.List.NonEmpty
( NonEmpty (..) )
import Data.Map
Expand All @@ -151,6 +155,8 @@ import Data.Time.Clock.POSIX
( getPOSIXTime, posixDayLength )
import Data.Tuple.Extra
( dupe )
import Data.Void
( Void )
import Data.Word
( Word64 )
import Fmt
Expand All @@ -168,7 +174,14 @@ import UnliftIO.Exception
import UnliftIO.IORef
( IORef, newIORef, readIORef, writeIORef )
import UnliftIO.STM
( TVar, readTVarIO, writeTVar )
( TBQueue
, TVar
, newTBQueue
, readTBQueue
, readTVarIO
, writeTBQueue
, writeTVar
)

import qualified Cardano.Wallet.Api.Types as Api
import qualified Data.List as L
Expand Down Expand Up @@ -723,43 +736,64 @@ monitorMetadata gcStatus tr sp db@(DBLayer{..}) = do
_ -> pure NoSmashConfigured

if | health == Available || health == NoSmashConfigured -> do
let fetcher fetchStrategies = fetchFromRemote trFetch fetchStrategies manager
loop getPoolMetadata = forever $ do
(refs, successes) <- getPoolMetadata
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

case poolMetadataSource settings of
FetchNone -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (pure ([], [])) -- TODO: exit loop?

FetchDirect -> do
STM.atomically $ writeTVar gcStatus NotApplicable
loop (fetchThem $ fetcher [identityUrlBuilder])
void $ fetchMetadata manager [identityUrlBuilder]

FetchSMASH (unSmashServer -> uri) -> do
STM.atomically $ writeTVar gcStatus NotStarted
let getDelistedPools =
fetchDelistedPools trFetch uri manager
tid <- forkFinally
(gcDelistedPools gcStatus tr db getDelistedPools)
onExit
flip finally (killThread tid) $
loop (fetchThem $ fetcher [registryUrlBuilder uri])
| otherwise -> traceWith tr MsgSMASHUnreachable
void $ fetchMetadata manager [registryUrlBuilder uri]
`finally` killThread tid

| otherwise ->
traceWith tr MsgSMASHUnreachable
where
trFetch = contramap MsgFetchPoolMetadata tr
fetchThem fetchMetadata = do
refs <- atomically (unfetchedPoolMetadataRefs 100)
successes <- fmap catMaybes $ forM refs $ \(pid, url, hash) -> do
fetchMetadata pid url hash >>= \case
Nothing -> Nothing <$ do
atomically $ putFetchAttempt (url, hash)

Just meta -> Just hash <$ do
atomically $ putPoolMetadata hash meta
pure (refs, successes)

fetchMetadata
:: Manager
-> [UrlBuilder]
-> IO Void
fetchMetadata manager strategies = do
inFlights <- STM.atomically $ newTBQueue maxInFlight
endlessly [] $ \keys -> do
refs <- nub . (\\ keys) <$> atomically (unfetchedPoolMetadataRefs limit)
when (null refs) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency
forM refs $ \k@(pid, url, hash) -> k <$ withAvailableSeat inFlights (do
fetchFromRemote trFetch strategies manager pid url hash >>= \case
Nothing ->
atomically $ putFetchAttempt (url, hash)
Just meta -> do
atomically $ putPoolMetadata hash meta
)
where
-- Twice 'maxInFlight' so that, when removing keys currently in flight,
-- we are left with at least 'maxInFlight' keys.
limit = fromIntegral (2 * maxInFlight)
maxInFlight = 10

endlessly :: Monad m => a -> (a -> m a) -> m Void
endlessly zero action = action zero >>= (`endlessly` action)

-- | Run an action asyncronously only when there's an available seat.
-- Seats are materialized by a bounded queue. If the queue is full,
-- then there's no seat.
withAvailableSeat :: TBQueue () -> IO a -> IO ()
withAvailableSeat q action = do
STM.atomically $ writeTBQueue q ()
void $ action `forkFinally` const (STM.atomically $ readTBQueue q)

-- NOTE
-- If there's no metadata, we typically need not to retry sooner than the
-- next block. So waiting for a delay that is roughly the same order of
Expand Down

0 comments on commit 5ff6c51

Please sign in to comment.