Skip to content

Commit

Permalink
Add load-based throttling to search endpoints (#116)
Browse files Browse the repository at this point in the history
This PR adds instrumentation around the connection pool used by the endpoint handlers to keep track of the number of requests currently pending or being served. It then uses this information in order to scale down the amount of work that's performed per request by the search endpoints in order to make it easier to maintain a fair resource allocation under load across multiple clients.

This PR still doesn't stop a single client from making a large number of search requests in parallel, but `chainweb-data` expects that this would be handled via rate limiting at the API gateway layer. In theory, it could be argued that it would be better for the gateway to handle such throttling as well, but it seems harder to implement there in practice.

This PR also increases the maximum `scanLimit` of the search endpoints from 20000 to 50000 (but subject to load throttling) in order to reduce network roundtrips. It used to take us around 50-100ms to scan 20000 rows, so now each request should take between 100-250ms, which is still low enough and now each request should scan around 10 hours worth of blockchain history.


* Increase the scanLimit

* Remove the fromheight parameter of /txs/accounts (#114)

This PR removes the yet unreleased `fromheight` parameter of the `/txs/accounts` endpoint.

The original purpose of that parameter was to facilitate efficient pagination of the `/txs/accounts` results, but it has since been obsoleted for that purpose by the `Chainweb-Next`-token-based pagination workflow. The `Chainweb-Next`-based pagination is superior, because it doesn't require the client to figure out how to stitch together the result chunks produced by each request and it is easier to reuse across endpoints independent of whether specifying `height` limits is a reasonable or efficient way to paginate for a given endpoint.

Admittedly, `fromheight` could still be useful independent of pagination, but I'm not convinced that adding query parameters to `chainweb-data` endpoints for hypothetical use cases is a good idea at this stage, especially considering the backward compatibility burden that entails.

* Remove the fromheight parameter of /txs/accounts

* Update the chainweb-api pin

* Generate and serve OpenAPI specs (#106)

This PR adds a hidden `--serve-swagger-ui` CLI option to the `serve` subcommand. When `--serve-swagger-ui` is enabled, `chainweb-data` serves an OpenAPI v3 spec for the `chainweb-data` API.

The spec that's currently served is automatically generated from the Servant API definition using [`servant-openapi3`](https://hackage.haskell.org/package/servant-openapi3) and this PR makes no effort to refine the documentation generated by `servant-openapi3` which lacks any explanations for the endpoints and the data schemas and it produces unhelpful details like integral values having meaningless upper and lower bounds.

So, despite being helpful as it is, the currently generated spec is only rudimentary, that's why this PR hides the new `--serve-swagger-ui` argument from the CLI `--help` output keeping it unofficial for now.

* Generate and serve OpenAPI specs

* Make swagger UI optional with a hidden CLI flag

* Remove unused declaration

* Add special 404 message when cwd-spec not enabled

* Apply search throttling when there is load
  • Loading branch information
enobayram authored Jan 4, 2023
1 parent f505da8 commit d05b267
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
1 change: 1 addition & 0 deletions chainweb-data.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ executable chainweb-data
, http-client-tls ^>=0.3
, http-types
, lens-aeson
, managed
, mtl
, optparse-applicative >=0.14 && <0.17
, pact >=4.3.1
Expand Down
102 changes: 74 additions & 28 deletions exec/Chainweb/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import Chainweb.Api.NodeInfo
import Control.Applicative
import Control.Concurrent
import Control.Error
import Control.Exception (bracket_)
import Control.Monad.Except
import qualified Control.Monad.Managed as M
import Control.Retry
import Data.Aeson hiding (Error)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -159,6 +161,17 @@ apiServer env senv = do
logg Info $ fromString $ show e
Right cutBS -> apiServerCut env senv cutBS

type ConnectionWithThrottling = (Connection, Double)

-- | Given the amount of contention on connections, calculate a factor between
-- 0 and 1 that should be used to scale down the amount of work done by request
-- handlers
throttlingFactor :: Integer -> Double
throttlingFactor load = if loadPerCap <= 1 then 1 else 1 / loadPerCap where
-- We're arbitrarily assuming that Postgres will handle 3 concurrent requests
-- without any slowdown
loadPerCap = fromInteger load / 3

apiServerCut :: Env -> ServerEnv -> ByteString -> IO ()
apiServerCut env senv cutBS = do
let curHeight = cutMaxHeight cutBS
Expand All @@ -174,13 +187,21 @@ apiServerCut env senv cutBS = do
_ <- forkIO $ scheduledUpdates env pool ssRef (_serverEnv_runFill senv) (_serverEnv_fillDelay senv)
_ <- forkIO $ retryingListener env ssRef
logg Info $ fromString "Starting chainweb-data server"
throttledPool <- do
loadedSrc <- mkLoadedSource $ M.managed $ P.withResource pool
return $ do
loadedRes <- loadedSrc
load <- M.liftIO (lrLoadRef loadedRes)
return (lrResource loadedRes, throttlingFactor load)

let unThrottledPool = fst <$> throttledPool
let serverApp req =
( ( recentTxsHandler ssRef
:<|> searchTxs logg pool req
:<|> evHandler logg pool req
:<|> txHandler logg pool
:<|> txsHandler logg pool
:<|> accountHandler logg pool req
:<|> searchTxs logg throttledPool req
:<|> evHandler logg throttledPool req
:<|> txHandler logg unThrottledPool
:<|> txsHandler logg unThrottledPool
:<|> accountHandler logg unThrottledPool req
)
:<|> statsHandler ssRef
:<|> coinsHandler ssRef
Expand Down Expand Up @@ -325,12 +346,9 @@ isBoundedStrategy req =
other -> Left $ toS $ "Unknown " <> fromString headerName <> ": " <> other
where headerName = "Chainweb-Execution-Strategy"

getExecutionStrategy :: Request -> Integer -> Handler ExecutionStrategy
getExecutionStrategy req scanLimit = do
isBounded <- either throw400 return $ isBoundedStrategy req
return $ if isBounded
then Bounded scanLimit
else Unbounded
isBoundedStrategyM :: Request -> Handler Bool
isBoundedStrategyM req = do
either throw400 return $ isBoundedStrategy req

mkContinuation :: MonadError ServerError m =>
(NextToken -> Maybe b) ->
Expand All @@ -347,7 +365,7 @@ mkContinuation readTkn mbOffset mbNext = case (mbNext, mbOffset) of

searchTxs
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed ConnectionWithThrottling
-> Request
-> Maybe Limit
-> Maybe Offset
Expand All @@ -359,13 +377,16 @@ searchTxs logger pool req givenMbLim mbOffset (Just search) mbNext = do
liftIO $ logger Info $ fromString $ printf
"Transaction search from %s: %s" (show $ remoteHost req) (T.unpack search)
continuation <- mkContinuation readTxToken mbOffset mbNext
let
resultLimit = maybe 10 (min 100 . unLimit) givenMbLim
scanLimit = 20000

strategy <- getExecutionStrategy req scanLimit
isBounded <- isBoundedStrategyM req

liftIO $ M.with pool $ \(c, throttling) -> do
let
scanLimit = ceiling $ 50000 * throttling
maxResultLimit = ceiling $ 250 * throttling
resultLimit = min maxResultLimit $ maybe 10 unLimit givenMbLim
strategy = if isBounded then Bounded scanLimit else Unbounded

liftIO $ P.withResource pool $ \c ->
PG.withTransactionLevel PG.RepeatableRead c $ do
(mbCont, results) <- performBoundedScan strategy
(runBeamPostgresDebug (logger Debug . T.pack) c)
Expand Down Expand Up @@ -395,12 +416,12 @@ throw400 msg = throwError $ err400 { errBody = msg }

txHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed Connection
-> Maybe RequestKey
-> Handler TxDetail
txHandler _ _ Nothing = throw404 "You must specify a search string"
txHandler logger pool (Just (RequestKey rk)) =
may404 $ liftIO $ P.withResource pool $ \c ->
may404 $ liftIO $ M.with pool $ \c ->
runBeamPostgresDebug (logger Debug . T.pack) c $ do
r <- runSelectReturningOne $ select $ do
tx <- all_ (_cddb_transactions database)
Expand Down Expand Up @@ -451,12 +472,12 @@ txHandler logger pool (Just (RequestKey rk)) =

txsHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed Connection
-> Maybe RequestKey
-> Handler [TxDetail]
txsHandler _ _ Nothing = throw404 "You must specify a search string"
txsHandler logger pool (Just (RequestKey rk)) =
emptyList404 $ liftIO $ P.withResource pool $ \c ->
emptyList404 $ liftIO $ M.with pool $ \c ->
runBeamPostgresDebug (logger Debug . T.pack) c $ do
r <- runSelectReturningList $ select $ do
tx <- all_ (_cddb_transactions database)
Expand Down Expand Up @@ -518,7 +539,7 @@ mkToken contents = NextToken $ T.pack $

accountHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed Connection
-> Request
-> Text -- ^ account identifier
-> Maybe Text -- ^ token type
Expand All @@ -542,7 +563,7 @@ accountHandler logger pool req account token chain limit offset mbNext = do
where errMsg = toS (printf "the maximum allowed offset is 10,000. You requested %d" o :: String)
Nothing -> return 0
return $ AQSNewQuery boundedOffset
liftIO $ P.withResource pool $ \c -> do
liftIO $ M.with pool $ \c -> do
let boundedLimit = Limit $ maybe 20 (min 100 . unLimit) limit
r <- runBeamPostgresDebug (logger Debug . T.pack) c $
runSelectReturningList $
Expand Down Expand Up @@ -588,7 +609,7 @@ mkEventToken est = mkBSToken $ est <&> \c ->

evHandler
:: LogFunctionIO Text
-> P.Pool Connection
-> M.Managed ConnectionWithThrottling
-> Request
-> Maybe Limit
-> Maybe Offset
Expand All @@ -608,12 +629,15 @@ evHandler logger pool req limit mbOffset qSearch qParam qName qModuleName minhei
, espName = qName
, espModuleName = qModuleName
}
resultLimit = fromMaybe 100 $ limit <&> \(Limit l) -> min 100 l
scanLimit = 20000

strategy <- getExecutionStrategy req scanLimit
isBounded <- isBoundedStrategyM req

liftIO $ P.withResource pool $ \c ->
liftIO $ M.with pool $ \(c, throttling) -> do
let
scanLimit = ceiling $ 50000 * throttling
maxResultLimit = ceiling $ 250 * throttling
resultLimit = min maxResultLimit $ maybe 10 unLimit limit
strategy = if isBounded then Bounded scanLimit else Unbounded
PG.withTransactionLevel PG.RepeatableRead c $ do
(mbCont, results) <- performBoundedScan strategy
(runBeamPostgresDebug (logger Debug . T.pack) c)
Expand Down Expand Up @@ -689,3 +713,25 @@ addNewTransactions txs (RecentTxs s1) = RecentTxs s2

unPgJsonb :: PgJSONB a -> a
unPgJsonb (PgJSONB v) = v

-- | A "LoadedResource" is a resource along with a way to read an integer
-- quantity representing how much load there is on the resource currently
data LoadedResource resource = LoadedResource
{ lrResource :: resource
, lrLoadRef :: IO Integer
}

type LoadedSource resource = M.Managed (LoadedResource resource)

-- | Wrap a given "Managed" with a layer that keeps track of how many other
-- consumers there currently are using or waiting on the inner "Managed".
-- At any given moment, this number can be read through the "lrLoadRef" of the
-- provided "LoadedResource".
mkLoadedSource :: M.Managed resource -> IO (M.Managed (LoadedResource resource))
mkLoadedSource innerSource = do
loadRef <- newIORef 0
let modifyLoad f = atomicModifyIORef' loadRef $ \load -> (f load, ())
return $ M.managed $ \outerBorrower ->
bracket_ (modifyLoad succ) (modifyLoad pred) $
M.with innerSource $ \resource -> outerBorrower $
LoadedResource resource (readIORef loadRef)

0 comments on commit d05b267

Please sign in to comment.