From 54f26f82394305fae029e9fee5745012ab5cb52b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enis=20Bayramo=C4=9Flu?= Date: Sat, 24 Dec 2022 00:04:13 +0100 Subject: [PATCH 1/4] Increase the scanLimit --- exec/Chainweb/Server.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exec/Chainweb/Server.hs b/exec/Chainweb/Server.hs index 93ccaa0f..50462d40 100644 --- a/exec/Chainweb/Server.hs +++ b/exec/Chainweb/Server.hs @@ -349,7 +349,7 @@ searchTxs logger pool req givenMbLim mbOffset (Just search) mbNext = do continuation <- mkContinuation readTxToken mbOffset mbNext let resultLimit = maybe 10 (min 100 . unLimit) givenMbLim - scanLimit = 20000 + scanLimit = 50000 strategy <- getExecutionStrategy req scanLimit @@ -599,7 +599,7 @@ evHandler logger pool req limit mbOffset qSearch qParam qName qModuleName minhei , espModuleName = qModuleName } resultLimit = fromMaybe 100 $ limit <&> \(Limit l) -> min 100 l - scanLimit = 20000 + scanLimit = 50000 strategy <- getExecutionStrategy req scanLimit From c81a66a0512e88ef4eb94c902089c9a9fd04c52f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enis=20Bayramo=C4=9Flu?= Date: Sat, 24 Dec 2022 16:31:41 +0100 Subject: [PATCH 2/4] Remove the fromheight parameter of /txs/accounts (#114) This PR removes the yet unreleased `fromheight` parameter of the `/txs/accounts` endpoint. The original purpose of that parameter was to facilitate efficient pagination of the `/txs/accounts` results, but it has since been obsoleted for that purpose by the `Chainweb-Next`-token-based pagination workflow. The `Chainweb-Next`-based pagination is superior, because it doesn't require the client to figure out how to stitch together the result chunks produced by each request and it is easier to reuse across endpoints independent of whether specifying `height` limits is a reasonable or efficient way to paginate for a given endpoint. Admittedly, `fromheight` could still be useful independent of pagination, but I'm not convinced that adding query parameters to `chainweb-data` endpoints for hypothetical use cases is a good idea at this stage, especially considering the backward compatibility burden that entails. * Remove the fromheight parameter of /txs/accounts * Update the chainweb-api pin --- cabal.project | 2 +- deps/chainweb-api/github.json | 4 ++-- exec/Chainweb/Server.hs | 14 ++++++-------- lib/ChainwebDb/Queries.hs | 5 ++--- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/cabal.project b/cabal.project index 767fa58e..83c3252a 100644 --- a/cabal.project +++ b/cabal.project @@ -16,7 +16,7 @@ source-repository-package source-repository-package type: git location: https://github.com/kadena-io/chainweb-api.git - tag: 6ffcf6265309bcd08034652523c940e4b71e9153 + tag: 953019e5eedc1d23833cb7b5b6c5f53627ff2b20 source-repository-package type: git diff --git a/deps/chainweb-api/github.json b/deps/chainweb-api/github.json index af933ac2..3abdc6af 100644 --- a/deps/chainweb-api/github.json +++ b/deps/chainweb-api/github.json @@ -3,6 +3,6 @@ "repo": "chainweb-api", "branch": "master", "private": false, - "rev": "6ffcf6265309bcd08034652523c940e4b71e9153", - "sha256": "1pagn20ss7w6llck75d2yvcjv1lji31knd39fppc6a8av64n6n8g" + "rev": "953019e5eedc1d23833cb7b5b6c5f53627ff2b20", + "sha256": "1q33fkigjk34z101jb7j827bfp3hhhglqip6rxvqpg09p26nc03p" } diff --git a/exec/Chainweb/Server.hs b/exec/Chainweb/Server.hs index 50462d40..b602fb00 100644 --- a/exec/Chainweb/Server.hs +++ b/exec/Chainweb/Server.hs @@ -511,27 +511,25 @@ accountHandler -> Text -- ^ account identifier -> Maybe Text -- ^ token type -> Maybe ChainId -- ^ chain identifier - -> Maybe BlockHeight -> Maybe Limit -> Maybe Offset -> Maybe NextToken -> Handler (NextHeaders [AccountDetail]) -accountHandler logger pool req account token chain fromHeight limit offset mbNext = do +accountHandler logger pool req account token chain limit offset mbNext = do liftIO $ logger Info $ fromString $ printf "Account search from %s for: %s %s %s" (show $ remoteHost req) (T.unpack account) (maybe "coin" T.unpack token) (maybe "" show chain) - queryStart <- case (mbNext, fromHeight, offset) of - (Just nextToken, Nothing, Nothing) -> case readToken nextToken of + queryStart <- case (mbNext, offset) of + (Just nextToken, Nothing) -> case readToken nextToken of Nothing -> throw400 $ toS $ "Invalid next token: " <> unNextToken nextToken Just ((hgt, reqkey, idx) :: AccountNextToken) -> return $ AQSContinue (fromIntegral hgt) (rkcbFromText reqkey) (fromIntegral idx) - (Just _, Just _, _) -> throw400 $ "next token query parameter not allowed with fromheight" - (Just _, _, Just _) -> throw400 $ "next token query parameter not allowed with offset" - (Nothing, _, _) -> do + (Just _, Just _) -> throw400 $ "next token query parameter not allowed with offset" + (Nothing, _) -> do boundedOffset <- Offset <$> case offset of Just (Offset o) -> if o >= 10000 then throw400 errMsg else return o where errMsg = toS (printf "the maximum allowed offset is 10,000. You requested %d" o :: String) Nothing -> return 0 - return $ AQSNewQuery fromHeight boundedOffset + return $ AQSNewQuery boundedOffset liftIO $ P.withResource pool $ \c -> do let boundedLimit = Limit $ maybe 20 (min 100 . unLimit) limit r <- runBeamPostgresDebug (logger Debug . T.pack) c $ diff --git a/lib/ChainwebDb/Queries.hs b/lib/ChainwebDb/Queries.hs index 15c3c843..98ccb226 100644 --- a/lib/ChainwebDb/Queries.hs +++ b/lib/ChainwebDb/Queries.hs @@ -167,7 +167,7 @@ _bytequery = \case SqlSelect s -> pgRenderSyntaxScript $ fromPgSelect s data AccountQueryStart - = AQSNewQuery (Maybe BlockHeight) Offset + = AQSNewQuery Offset | AQSContinue BlockHeight ReqKeyOrCoinbase Int accountQueryStmt @@ -202,8 +202,7 @@ accountQueryStmt (Limit limit) account token chain aqs = rowFilter tr return (tr,_block_creationTime blk) (Offset offset, rowFilter) = case aqs of - AQSNewQuery mbHeight ofst -> (,) ofst $ \tr -> - whenArg mbHeight $ \bh -> guard_ $ _tr_height tr <=. val_ (fromIntegral bh) + AQSNewQuery ofst -> (ofst, const $ return ()) AQSContinue height reqKey idx -> (,) (Offset 0) $ \tr -> guard_ $ tupleCmp (<.) [ _tr_height tr :<> fromIntegral height From 4d874fec7a3d80f7e0ccc6d4ae189434d06b9ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enis=20Bayramo=C4=9Flu?= Date: Sat, 24 Dec 2022 16:32:22 +0100 Subject: [PATCH 3/4] Generate and serve OpenAPI specs (#106) This PR adds a hidden `--serve-swagger-ui` CLI option to the `serve` subcommand. When `--serve-swagger-ui` is enabled, `chainweb-data` serves an OpenAPI v3 spec for the `chainweb-data` API. The spec that's currently served is automatically generated from the Servant API definition using [`servant-openapi3`](https://hackage.haskell.org/package/servant-openapi3) and this PR makes no effort to refine the documentation generated by `servant-openapi3` which lacks any explanations for the endpoints and the data schemas and it produces unhelpful details like integral values having meaningless upper and lower bounds. So, despite being helpful as it is, the currently generated spec is only rudimentary, that's why this PR hides the new `--serve-swagger-ui` argument from the CLI `--help` output keeping it unofficial for now. * Generate and serve OpenAPI specs * Make swagger UI optional with a hidden CLI flag * Remove unused declaration * Add special 404 message when cwd-spec not enabled --- chainweb-data.cabal | 4 +++ exec/Chainweb/Server.hs | 18 ++++++++-- lib/ChainwebData/Env.hs | 3 ++ lib/ChainwebData/Spec.hs | 72 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 lib/ChainwebData/Spec.hs diff --git a/chainweb-data.cabal b/chainweb-data.cabal index 2a2349d2..fbc3afa7 100644 --- a/chainweb-data.cabal +++ b/chainweb-data.cabal @@ -66,6 +66,7 @@ library ChainwebData.Genesis ChainwebData.Types ChainwebData.Env + ChainwebData.Spec ChainwebDb.BoundedScan ChainwebDb.Database ChainwebDb.Queries @@ -85,8 +86,10 @@ library , http-client ^>=0.6 , http-client-tls ^>=0.3 , http-types + , openapi3 , optparse-applicative >=0.14 && <0.17 , servant-client + , servant-openapi3 , yet-another-logger if flag(ghc-flags) @@ -131,6 +134,7 @@ executable chainweb-data , servant-client , servant-client-core , servant-server + , servant-swagger-ui , stm , stm-chans , streaming ^>=0.2 diff --git a/exec/Chainweb/Server.hs b/exec/Chainweb/Server.hs index b602fb00..f710dbaa 100644 --- a/exec/Chainweb/Server.hs +++ b/exec/Chainweb/Server.hs @@ -54,6 +54,7 @@ import Network.Wai.Handler.Warp import Network.Wai.Middleware.Cors import Servant.API import Servant.Server +import Servant.Swagger.UI import System.Directory import System.FilePath import System.Logger.Types hiding (logg) @@ -74,6 +75,7 @@ import ChainwebData.Api import ChainwebData.AccountDetail import ChainwebData.EventDetail import ChainwebData.AccountDetail () +import qualified ChainwebData.Spec as Spec import ChainwebData.Pagination import ChainwebData.TxDetail import ChainwebData.TxSummary @@ -138,8 +140,14 @@ type TxEndpoint = "tx" :> QueryParam "requestkey" Text :> Get '[JSON] TxDetail type TheApi = ChainwebDataApi :<|> RichlistEndpoint -theApi :: Proxy TheApi -theApi = Proxy + +type ApiWithSwaggerUI + = TheApi + :<|> SwaggerSchemaUI "cwd-spec" "cwd-spec.json" + +type ApiWithNoSwaggerUI + = TheApi + :<|> "cwd-spec" :> Get '[PlainText] Text -- Respond with 404 apiServer :: Env -> ServerEnv -> IO () apiServer env senv = do @@ -178,8 +186,12 @@ apiServerCut env senv cutBS = do :<|> coinsHandler ssRef ) :<|> richlistHandler + let swaggerServer = swaggerSchemaUIServer Spec.spec + noSwaggerServer = throw404 "Swagger UI server is not enabled on this instance" Network.Wai.Handler.Warp.run (_serverEnv_port senv) $ setCors $ \req f -> - serve theApi (serverApp req) req f + if _serverEnv_serveSwaggerUi senv + then serve (Proxy @ApiWithSwaggerUI) (serverApp req :<|> swaggerServer) req f + else serve (Proxy @ApiWithNoSwaggerUI) (serverApp req :<|> noSwaggerServer) req f retryingListener :: Env -> IORef ServerState -> IO () retryingListener env ssRef = do diff --git a/lib/ChainwebData/Env.hs b/lib/ChainwebData/Env.hs index b08f7b01..3a9647af 100644 --- a/lib/ChainwebData/Env.hs +++ b/lib/ChainwebData/Env.hs @@ -203,6 +203,7 @@ data ServerEnv = ServerEnv { _serverEnv_port :: Int , _serverEnv_runFill :: Bool , _serverEnv_fillDelay :: Maybe Int + , _serverEnv_serveSwaggerUi :: Bool } deriving (Eq,Ord,Show) envP :: Parser Args @@ -289,6 +290,8 @@ serverP = ServerEnv <$> option auto (long "port" <> metavar "INT" <> help "Port the server will listen on") <*> flag False True (long "run-fill" <> short 'f' <> help "Run fill operation once a day to fill gaps") <*> delayP + -- The OpenAPI spec is currently rudimentary and not official so we're hiding this option + <*> flag False True (long "serve-swagger-ui" <> internal) delayP :: Parser (Maybe Int) delayP = optional $ option auto (long "delay" <> metavar "DELAY_MICROS" <> help "Number of microseconds to delay between queries to the node") diff --git a/lib/ChainwebData/Spec.hs b/lib/ChainwebData/Spec.hs new file mode 100644 index 00000000..1c4845e7 --- /dev/null +++ b/lib/ChainwebData/Spec.hs @@ -0,0 +1,72 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} + +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE GeneralisedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeOperators #-} + +module ChainwebData.Spec where + +import ChainwebData.Api + +import Data.Proxy + +import Data.OpenApi.ParamSchema +import Data.OpenApi.Schema +import Servant.OpenApi +import ChainwebData.Pagination +import Chainweb.Api.ChainId +import ChainwebData.TxSummary +import Data.OpenApi + +import ChainwebData.EventDetail (EventDetail) +import ChainwebData.Util +import qualified Data.Aeson as A +import ChainwebData.TxDetail +import ChainwebData.AccountDetail (AccountDetail) + +instance ToSchema A.Value where + declareNamedSchema _ = pure $ NamedSchema (Just "AnyValue") mempty + +deriving newtype instance ToParamSchema Limit +deriving newtype instance ToParamSchema Offset +deriving newtype instance ToParamSchema EventParam +deriving newtype instance ToParamSchema EventName +deriving newtype instance ToParamSchema EventModuleName +deriving newtype instance ToParamSchema RequestKey +deriving newtype instance ToParamSchema ChainId +deriving newtype instance ToParamSchema NextToken + +instance ToSchema TxSummary where + declareNamedSchema = genericDeclareNamedSchema + defaultSchemaOptions{ fieldLabelModifier = drop 11 } + +deriving anyclass instance ToSchema TxResult + +instance ToSchema EventDetail where + declareNamedSchema = genericDeclareNamedSchema + defaultSchemaOptions{ fieldLabelModifier = lensyConstructorToNiceJson 10 } + +instance ToSchema TxDetail where + declareNamedSchema = genericDeclareNamedSchema + defaultSchemaOptions{ fieldLabelModifier = lensyConstructorToNiceJson 10 } + +instance ToSchema TxEvent where + declareNamedSchema = genericDeclareNamedSchema + defaultSchemaOptions{ fieldLabelModifier = lensyConstructorToNiceJson 9 } + +instance ToSchema AccountDetail where + declareNamedSchema = genericDeclareNamedSchema + defaultSchemaOptions{ fieldLabelModifier = lensyConstructorToNiceJson 10 } + +instance ToSchema ChainwebDataStats where + declareNamedSchema = genericDeclareNamedSchema + defaultSchemaOptions{ fieldLabelModifier = drop 5 } + +spec :: OpenApi +spec = toOpenApi (Proxy :: Proxy ChainwebDataApi) + From 4c1c75459bfd5e610226cd02df45d7a2085e5cdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Enis=20Bayramo=C4=9Flu?= Date: Wed, 4 Jan 2023 09:58:26 +0100 Subject: [PATCH 4/4] Apply search throttling when there is load --- chainweb-data.cabal | 1 + exec/Chainweb/Server.hs | 102 +++++++++++++++++++++++++++++----------- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/chainweb-data.cabal b/chainweb-data.cabal index fbc3afa7..b4e85762 100644 --- a/chainweb-data.cabal +++ b/chainweb-data.cabal @@ -122,6 +122,7 @@ executable chainweb-data , http-client-tls ^>=0.3 , http-types , lens-aeson + , managed , mtl , optparse-applicative >=0.14 && <0.17 , pact >=4.3.1 diff --git a/exec/Chainweb/Server.hs b/exec/Chainweb/Server.hs index f710dbaa..5c6f6058 100644 --- a/exec/Chainweb/Server.hs +++ b/exec/Chainweb/Server.hs @@ -23,7 +23,9 @@ import Chainweb.Api.NodeInfo import Control.Applicative import Control.Concurrent import Control.Error +import Control.Exception (bracket_) import Control.Monad.Except +import qualified Control.Monad.Managed as M import Control.Retry import Data.Aeson hiding (Error) import qualified Data.ByteString as BS @@ -159,6 +161,17 @@ apiServer env senv = do logg Info $ fromString $ show e Right cutBS -> apiServerCut env senv cutBS +type ConnectionWithThrottling = (Connection, Double) + +-- | Given the amount of contention on connections, calculate a factor between +-- 0 and 1 that should be used to scale down the amount of work done by request +-- handlers +throttlingFactor :: Integer -> Double +throttlingFactor load = if loadPerCap <= 1 then 1 else 1 / loadPerCap where + -- We're arbitrarily assuming that Postgres will handle 3 concurrent requests + -- without any slowdown + loadPerCap = fromInteger load / 3 + apiServerCut :: Env -> ServerEnv -> ByteString -> IO () apiServerCut env senv cutBS = do let curHeight = cutMaxHeight cutBS @@ -174,13 +187,21 @@ apiServerCut env senv cutBS = do _ <- forkIO $ scheduledUpdates env pool ssRef (_serverEnv_runFill senv) (_serverEnv_fillDelay senv) _ <- forkIO $ retryingListener env ssRef logg Info $ fromString "Starting chainweb-data server" + throttledPool <- do + loadedSrc <- mkLoadedSource $ M.managed $ P.withResource pool + return $ do + loadedRes <- loadedSrc + load <- M.liftIO (lrLoadRef loadedRes) + return (lrResource loadedRes, throttlingFactor load) + + let unThrottledPool = fst <$> throttledPool let serverApp req = ( ( recentTxsHandler ssRef - :<|> searchTxs logg pool req - :<|> evHandler logg pool req - :<|> txHandler logg pool - :<|> txsHandler logg pool - :<|> accountHandler logg pool req + :<|> searchTxs logg throttledPool req + :<|> evHandler logg throttledPool req + :<|> txHandler logg unThrottledPool + :<|> txsHandler logg unThrottledPool + :<|> accountHandler logg unThrottledPool req ) :<|> statsHandler ssRef :<|> coinsHandler ssRef @@ -325,12 +346,9 @@ isBoundedStrategy req = other -> Left $ toS $ "Unknown " <> fromString headerName <> ": " <> other where headerName = "Chainweb-Execution-Strategy" -getExecutionStrategy :: Request -> Integer -> Handler ExecutionStrategy -getExecutionStrategy req scanLimit = do - isBounded <- either throw400 return $ isBoundedStrategy req - return $ if isBounded - then Bounded scanLimit - else Unbounded +isBoundedStrategyM :: Request -> Handler Bool +isBoundedStrategyM req = do + either throw400 return $ isBoundedStrategy req mkContinuation :: MonadError ServerError m => (NextToken -> Maybe b) -> @@ -347,7 +365,7 @@ mkContinuation readTkn mbOffset mbNext = case (mbNext, mbOffset) of searchTxs :: LogFunctionIO Text - -> P.Pool Connection + -> M.Managed ConnectionWithThrottling -> Request -> Maybe Limit -> Maybe Offset @@ -359,13 +377,16 @@ searchTxs logger pool req givenMbLim mbOffset (Just search) mbNext = do liftIO $ logger Info $ fromString $ printf "Transaction search from %s: %s" (show $ remoteHost req) (T.unpack search) continuation <- mkContinuation readTxToken mbOffset mbNext - let - resultLimit = maybe 10 (min 100 . unLimit) givenMbLim - scanLimit = 50000 - strategy <- getExecutionStrategy req scanLimit + isBounded <- isBoundedStrategyM req + + liftIO $ M.with pool $ \(c, throttling) -> do + let + scanLimit = ceiling $ 50000 * throttling + maxResultLimit = ceiling $ 250 * throttling + resultLimit = min maxResultLimit $ maybe 10 unLimit givenMbLim + strategy = if isBounded then Bounded scanLimit else Unbounded - liftIO $ P.withResource pool $ \c -> PG.withTransactionLevel PG.RepeatableRead c $ do (mbCont, results) <- performBoundedScan strategy (runBeamPostgresDebug (logger Debug . T.pack) c) @@ -395,12 +416,12 @@ throw400 msg = throwError $ err400 { errBody = msg } txHandler :: LogFunctionIO Text - -> P.Pool Connection + -> M.Managed Connection -> Maybe RequestKey -> Handler TxDetail txHandler _ _ Nothing = throw404 "You must specify a search string" txHandler logger pool (Just (RequestKey rk)) = - may404 $ liftIO $ P.withResource pool $ \c -> + may404 $ liftIO $ M.with pool $ \c -> runBeamPostgresDebug (logger Debug . T.pack) c $ do r <- runSelectReturningOne $ select $ do tx <- all_ (_cddb_transactions database) @@ -451,12 +472,12 @@ txHandler logger pool (Just (RequestKey rk)) = txsHandler :: LogFunctionIO Text - -> P.Pool Connection + -> M.Managed Connection -> Maybe RequestKey -> Handler [TxDetail] txsHandler _ _ Nothing = throw404 "You must specify a search string" txsHandler logger pool (Just (RequestKey rk)) = - emptyList404 $ liftIO $ P.withResource pool $ \c -> + emptyList404 $ liftIO $ M.with pool $ \c -> runBeamPostgresDebug (logger Debug . T.pack) c $ do r <- runSelectReturningList $ select $ do tx <- all_ (_cddb_transactions database) @@ -518,7 +539,7 @@ mkToken contents = NextToken $ T.pack $ accountHandler :: LogFunctionIO Text - -> P.Pool Connection + -> M.Managed Connection -> Request -> Text -- ^ account identifier -> Maybe Text -- ^ token type @@ -542,7 +563,7 @@ accountHandler logger pool req account token chain limit offset mbNext = do where errMsg = toS (printf "the maximum allowed offset is 10,000. You requested %d" o :: String) Nothing -> return 0 return $ AQSNewQuery boundedOffset - liftIO $ P.withResource pool $ \c -> do + liftIO $ M.with pool $ \c -> do let boundedLimit = Limit $ maybe 20 (min 100 . unLimit) limit r <- runBeamPostgresDebug (logger Debug . T.pack) c $ runSelectReturningList $ @@ -588,7 +609,7 @@ mkEventToken est = mkBSToken $ est <&> \c -> evHandler :: LogFunctionIO Text - -> P.Pool Connection + -> M.Managed ConnectionWithThrottling -> Request -> Maybe Limit -> Maybe Offset @@ -608,12 +629,15 @@ evHandler logger pool req limit mbOffset qSearch qParam qName qModuleName minhei , espName = qName , espModuleName = qModuleName } - resultLimit = fromMaybe 100 $ limit <&> \(Limit l) -> min 100 l - scanLimit = 50000 - strategy <- getExecutionStrategy req scanLimit + isBounded <- isBoundedStrategyM req - liftIO $ P.withResource pool $ \c -> + liftIO $ M.with pool $ \(c, throttling) -> do + let + scanLimit = ceiling $ 50000 * throttling + maxResultLimit = ceiling $ 250 * throttling + resultLimit = min maxResultLimit $ maybe 10 unLimit limit + strategy = if isBounded then Bounded scanLimit else Unbounded PG.withTransactionLevel PG.RepeatableRead c $ do (mbCont, results) <- performBoundedScan strategy (runBeamPostgresDebug (logger Debug . T.pack) c) @@ -689,3 +713,25 @@ addNewTransactions txs (RecentTxs s1) = RecentTxs s2 unPgJsonb :: PgJSONB a -> a unPgJsonb (PgJSONB v) = v + +-- | A "LoadedResource" is a resource along with a way to read an integer +-- quantity representing how much load there is on the resource currently +data LoadedResource resource = LoadedResource + { lrResource :: resource + , lrLoadRef :: IO Integer + } + +type LoadedSource resource = M.Managed (LoadedResource resource) + +-- | Wrap a given "Managed" with a layer that keeps track of how many other +-- consumers there currently are using or waiting on the inner "Managed". +-- At any given moment, this number can be read through the "lrLoadRef" of the +-- provided "LoadedResource". +mkLoadedSource :: M.Managed resource -> IO (M.Managed (LoadedResource resource)) +mkLoadedSource innerSource = do + loadRef <- newIORef 0 + let modifyLoad f = atomicModifyIORef' loadRef $ \load -> (f load, ()) + return $ M.managed $ \outerBorrower -> + bracket_ (modifyLoad succ) (modifyLoad pred) $ + M.with innerSource $ \resource -> outerBorrower $ + LoadedResource resource (readIORef loadRef)