Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add load-based throttling to search endpoints #116

Merged
merged 5 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chainweb-data.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ executable chainweb-data
, http-client-tls ^>=0.3
, http-types
, lens-aeson
, managed
, mtl
, optparse-applicative >=0.14 && <0.17
, pact >=4.3.1
Expand Down
102 changes: 74 additions & 28 deletions exec/Chainweb/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import Chainweb.Api.NodeInfo
import Control.Applicative
import Control.Concurrent
import Control.Error
import Control.Exception (bracket_)
import Control.Monad.Except
import qualified Control.Monad.Managed as M
import Control.Retry
import Data.Aeson hiding (Error)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -159,6 +161,17 @@ apiServer env senv = do
logg Info $ fromString $ show e
Right cutBS -> apiServerCut env senv cutBS

type ConnectionWithThrottling = (Connection, Double)

-- | Given the amount of contention on connections, calculate a factor between
-- 0 and 1 that should be used to scale down the amount of work done by request
-- handlers
throttlingFactor :: Integer -> Double
throttlingFactor load = if loadPerCap <= 1 then 1 else 1 / loadPerCap where
-- We're arbitrarily assuming that Postgres will handle 3 concurrent requests
-- without any slowdown
loadPerCap = fromInteger load / 3

apiServerCut :: Env -> ServerEnv -> ByteString -> IO ()
apiServerCut env senv cutBS = do
let curHeight = cutMaxHeight cutBS
Expand All @@ -174,13 +187,21 @@ apiServerCut env senv cutBS = do
_ <- forkIO $ scheduledUpdates env pool ssRef (_serverEnv_runFill senv) (_serverEnv_fillDelay senv)
_ <- forkIO $ retryingListener env ssRef
logg Info $ fromString "Starting chainweb-data server"
throttledPool <- do
loadedSrc <- mkLoadedSource $ M.managed $ P.withResource pool
return $ do
loadedRes <- loadedSrc
load <- M.liftIO (lrLoadRef loadedRes)
return (lrResource loadedRes, throttlingFactor load)

let unThrottledPool = fst <$> throttledPool
let serverApp req =
( ( recentTxsHandler ssRef
:<|> searchTxs logg pool req
:<|> evHandler logg pool req
:<|> txHandler logg pool
:<|> txsHandler logg pool
:<|> accountHandler logg pool req
:<|> searchTxs logg throttledPool req
:<|> evHandler logg throttledPool req
:<|> txHandler logg unThrottledPool
:<|> txsHandler logg unThrottledPool
:<|> accountHandler logg unThrottledPool req
)
:<|> statsHandler ssRef
:<|> coinsHandler ssRef
Expand Down Expand Up @@ -325,12 +346,9 @@ isBoundedStrategy req =
other -> Left $ toS $ "Unknown " <> fromString headerName <> ": " <> other
where headerName = "Chainweb-Execution-Strategy"

getExecutionStrategy :: Request -> Integer -> Handler ExecutionStrategy
getExecutionStrategy req scanLimit = do
isBounded <- either throw400 return $ isBoundedStrategy req
return $ if isBounded
then Bounded scanLimit
else Unbounded
isBoundedStrategyM :: Request -> Handler Bool
isBoundedStrategyM req = do
either throw400 return $ isBoundedStrategy req

mkContinuation :: MonadError ServerError m =>
(NextToken -> Maybe b) ->
Expand All @@ -347,7 +365,7 @@ mkContinuation readTkn mbOffset mbNext = case (mbNext, mbOffset) of

searchTxs
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed ConnectionWithThrottling
-> Request
-> Maybe Limit
-> Maybe Offset
Expand All @@ -359,13 +377,16 @@ searchTxs logger pool req givenMbLim mbOffset (Just search) mbNext = do
liftIO $ logger Info $ fromString $ printf
"Transaction search from %s: %s" (show $ remoteHost req) (T.unpack search)
continuation <- mkContinuation readTxToken mbOffset mbNext
let
resultLimit = maybe 10 (min 100 . unLimit) givenMbLim
scanLimit = 20000

strategy <- getExecutionStrategy req scanLimit
isBounded <- isBoundedStrategyM req

liftIO $ M.with pool $ \(c, throttling) -> do
let
scanLimit = ceiling $ 50000 * throttling
maxResultLimit = ceiling $ 250 * throttling
resultLimit = min maxResultLimit $ maybe 10 unLimit givenMbLim
strategy = if isBounded then Bounded scanLimit else Unbounded

liftIO $ P.withResource pool $ \c ->
PG.withTransactionLevel PG.RepeatableRead c $ do
(mbCont, results) <- performBoundedScan strategy
(runBeamPostgresDebug (logger Debug . T.pack) c)
Expand Down Expand Up @@ -395,12 +416,12 @@ throw400 msg = throwError $ err400 { errBody = msg }

txHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed Connection
-> Maybe RequestKey
-> Handler TxDetail
txHandler _ _ Nothing = throw404 "You must specify a search string"
txHandler logger pool (Just (RequestKey rk)) =
may404 $ liftIO $ P.withResource pool $ \c ->
may404 $ liftIO $ M.with pool $ \c ->
runBeamPostgresDebug (logger Debug . T.pack) c $ do
r <- runSelectReturningOne $ select $ do
tx <- all_ (_cddb_transactions database)
Expand Down Expand Up @@ -451,12 +472,12 @@ txHandler logger pool (Just (RequestKey rk)) =

txsHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed Connection
-> Maybe RequestKey
-> Handler [TxDetail]
txsHandler _ _ Nothing = throw404 "You must specify a search string"
txsHandler logger pool (Just (RequestKey rk)) =
emptyList404 $ liftIO $ P.withResource pool $ \c ->
emptyList404 $ liftIO $ M.with pool $ \c ->
runBeamPostgresDebug (logger Debug . T.pack) c $ do
r <- runSelectReturningList $ select $ do
tx <- all_ (_cddb_transactions database)
Expand Down Expand Up @@ -518,7 +539,7 @@ mkToken contents = NextToken $ T.pack $

accountHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed Connection
-> Request
-> Text -- ^ account identifier
-> Maybe Text -- ^ token type
Expand All @@ -542,7 +563,7 @@ accountHandler logger pool req account token chain limit offset mbNext = do
where errMsg = toS (printf "the maximum allowed offset is 10,000. You requested %d" o :: String)
Nothing -> return 0
return $ AQSNewQuery boundedOffset
liftIO $ P.withResource pool $ \c -> do
liftIO $ M.with pool $ \c -> do
let boundedLimit = Limit $ maybe 20 (min 100 . unLimit) limit
r <- runBeamPostgresDebug (logger Debug . T.pack) c $
runSelectReturningList $
Expand Down Expand Up @@ -588,7 +609,7 @@ mkEventToken est = mkBSToken $ est <&> \c ->

evHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed ConnectionWithThrottling
-> Request
-> Maybe Limit
-> Maybe Offset
Expand All @@ -608,12 +629,15 @@ evHandler logger pool req limit mbOffset qSearch qParam qName qModuleName minhei
, espName = qName
, espModuleName = qModuleName
}
resultLimit = fromMaybe 100 $ limit <&> \(Limit l) -> min 100 l
scanLimit = 20000

strategy <- getExecutionStrategy req scanLimit
isBounded <- isBoundedStrategyM req

liftIO $ P.withResource pool $ \c ->
liftIO $ M.with pool $ \(c, throttling) -> do
let
scanLimit = ceiling $ 50000 * throttling
maxResultLimit = ceiling $ 250 * throttling
resultLimit = min maxResultLimit $ maybe 10 unLimit limit
strategy = if isBounded then Bounded scanLimit else Unbounded
PG.withTransactionLevel PG.RepeatableRead c $ do
(mbCont, results) <- performBoundedScan strategy
(runBeamPostgresDebug (logger Debug . T.pack) c)
Expand Down Expand Up @@ -689,3 +713,25 @@ addNewTransactions txs (RecentTxs s1) = RecentTxs s2

unPgJsonb :: PgJSONB a -> a
unPgJsonb (PgJSONB v) = v

-- | A "LoadedResource" is a resource along with a way to read an integer
-- quantity representing how much load there is on the resource currently
data LoadedResource resource = LoadedResource
{ lrResource :: resource
, lrLoadRef :: IO Integer
}

type LoadedSource resource = M.Managed (LoadedResource resource)

-- | Wrap a given "Managed" with a layer that keeps track of how many other
-- consumers there currently are using or waiting on the inner "Managed".
-- At any given moment, this number can be read through the "lrLoadRef" of the
-- provided "LoadedResource".
mkLoadedSource :: M.Managed resource -> IO (M.Managed (LoadedResource resource))
mkLoadedSource innerSource = do
loadRef <- newIORef 0
let modifyLoad f = atomicModifyIORef' loadRef $ \load -> (f load, ())
return $ M.managed $ \outerBorrower ->
bracket_ (modifyLoad succ) (modifyLoad pred) $
M.with innerSource $ \resource -> outerBorrower $
LoadedResource resource (readIORef loadRef)