Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JSON-API] Add moar timing metrics #10045

Merged
merged 4 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.http.scaladsl.model.headers.{
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import com.codahale.metrics.Timer
import com.daml.lf
import com.daml.http.ContractsService.SearchResult
import com.daml.http.EndpointsCompanion._
Expand Down Expand Up @@ -66,64 +67,105 @@ class Endpoints(
import util.ErrorOps._
import Uri.Path._

// Parenthesis in the case matches below are required because otherwise scalafmt breaks.
//noinspection ScalaUnnecessaryParentheses
def all(implicit
lc: LoggingContextOf[InstanceUUID],
metrics: Metrics,
): PartialFunction[HttpRequest, Http.IncomingConnection => Future[HttpResponse]] = {
val dispatch: PartialFunction[HttpRequest, LoggingContextOf[
InstanceUUID with RequestID
] => Future[HttpResponse]] = {
// Parenthesis are required because otherwise scalafmt breaks.
case req @ HttpRequest(POST, Uri.Path("/v1/create"), _, _, _) =>
(implicit lc => httpResponse(create(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/exercise"), _, _, _) =>
(implicit lc => httpResponse(exercise(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/create-and-exercise"), _, _, _) =>
(implicit lc => httpResponse(createAndExercise(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/fetch"), _, _, _) =>
(implicit lc => httpResponse(fetch(req)))
val apiMetrics = metrics.daml.HttpJsonApi
type DispatchFun =
PartialFunction[HttpRequest, LoggingContextOf[InstanceUUID with RequestID] => Future[
HttpResponse
]]
def mkDispatchFunWithTimer(timer: Timer)(fun: DispatchFun): DispatchFun =
fun andThen (f => lc => Timed.future(timer, f(lc)))
val commandDispatch =
mkDispatchFunWithTimer(apiMetrics.commandSubmissionTimer) {
case req @ HttpRequest(POST, Uri.Path("/v1/create"), _, _, _) =>
(implicit lc => httpResponse(create(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/exercise"), _, _, _) =>
(implicit lc => httpResponse(exercise(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/create-and-exercise"), _, _, _) =>
(implicit lc => httpResponse(createAndExercise(req)))
}
val queryAllDispatch = mkDispatchFunWithTimer(apiMetrics.queryAllTimer) {
case req @ HttpRequest(GET, Uri.Path("/v1/query"), _, _, _) =>
(implicit lc => httpResponse(retrieveAll(req)))
}
val queryMatchingDispatch = mkDispatchFunWithTimer(apiMetrics.queryMatchingTimer) {
case req @ HttpRequest(POST, Uri.Path("/v1/query"), _, _, _) =>
(implicit lc => httpResponse(query(req)))
}
val fetchDispatch: DispatchFun = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to merge the single case matches here into one because it's also weird to have an extra declaration for them then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the other hand, mixing stuff together when it's split up that cleanly isn't looking that much better either.

case req @ HttpRequest(POST, Uri.Path("/v1/fetch"), _, _, _) =>
(implicit lc => Timed.future(apiMetrics.fetchTimer, httpResponse(fetch(req))))
}
val getPartyDispatch = mkDispatchFunWithTimer(apiMetrics.getPartyTimer) {
case req @ HttpRequest(GET, Uri.Path("/v1/parties"), _, _, _) =>
(implicit lc => httpResponse(allParties(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/parties"), _, _, _) =>
(implicit lc => httpResponse(parties(req)))
}
val allocatePartyDispatch: DispatchFun = {
case req @ HttpRequest(POST, Uri.Path("/v1/parties/allocate"), _, _, _) =>
(implicit lc => httpResponse(allocateParty(req)))
(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this weird formatting just because scalafmt needs these braces, otherwise it fails with a parser error -.-

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take weird formatting over formatting discussions any time. 🙂

implicit lc =>
Timed.future(apiMetrics.allocatePartyTimer, httpResponse(allocateParty(req)))
)
}
val packageManagementDispatch: DispatchFun = {
case req @ HttpRequest(GET, Uri.Path("/v1/packages"), _, _, _) =>
(implicit lc => httpResponse(listPackages(req)))
case req @ HttpRequest(POST, Uri.Path("/v1/packages"), _, _, _) =>
(
implicit lc =>
Timed.future(
apiMetrics.uploadPackageTimer,
httpResponse(uploadDarFile(req)),
)
)
// format: off
case req @ HttpRequest(GET,
case req @ HttpRequest(GET,
Uri(_, _, Slash(Segment("v1", Slash(Segment("packages", Slash(Segment(packageId, Empty)))))), _, _),
_, _, _) => (implicit lc => downloadPackage(req, packageId))
// format: on
case req @ HttpRequest(POST, Uri.Path("/v1/packages"), _, _, _) =>
(implicit lc => httpResponse(uploadDarFile(req)))
_, _, _) =>
(implicit lc => Timed.future(apiMetrics.downloadPackageTimer, downloadPackage(req, packageId)))
// format: on
}
val liveOrHealthDispatch: DispatchFun = {
case HttpRequest(GET, Uri.Path("/livez"), _, _, _) =>
_ => Future.successful(HttpResponse(status = StatusCodes.OK))
case HttpRequest(GET, Uri.Path("/readyz"), _, _, _) =>
_ => healthService.ready().map(_.toHttpResponse)
}
import scalaz.std.partialFunction._, scalaz.syntax.arrow._
(dispatch &&& { case r => r }) andThen { case (lcFhr, req) =>
((commandDispatch orElse
queryAllDispatch orElse
queryMatchingDispatch orElse
fetchDispatch orElse
getPartyDispatch orElse
allocatePartyDispatch orElse
packageManagementDispatch orElse
liveOrHealthDispatch) &&& { case r => r }) andThen { case (lcFhr, req) =>
(connection: Http.IncomingConnection) =>
extendWithRequestIdLogCtx(implicit lc => {
val t0 = System.nanoTime
logger.info(s"Incoming request on ${req.uri} from ${connection.remoteAddress}")
metrics.daml.HttpJsonApi.httpRequestThroughput.mark()
Timed
.future(metrics.daml.HttpJsonApi.httpRequestTimer, lcFhr(lc))
.map(res => {
logger.trace(s"Processed request after ${System.nanoTime() - t0}ns")
logger.info(s"Responding to client with HTTP ${res.status}")
res
})
for {
res <- lcFhr(lc)
_ = logger.trace(s"Processed request after ${System.nanoTime() - t0}ns")
_ = logger.info(s"Responding to client with HTTP ${res.status}")
} yield res
})
}
}

def getParseAndDecodeTimerCtx()(implicit
metrics: Metrics
): ET[Timer.Context] =
EitherT.pure(metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time())

def withJwtPayloadLoggingContext[A](jwtPayload: JwtPayloadG)(
fn: LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => A
)(implicit lc: LoggingContextOf[InstanceUUID with RequestID]): A =
Expand All @@ -142,6 +184,7 @@ class Endpoints(
Jwt,
JwtWritePayload,
JsValue,
Timer.Context,
) => LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => ET[
T[ApiValue]
]
Expand All @@ -152,22 +195,26 @@ class Endpoints(
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
_ <- EitherT.pure(metrics.daml.HttpJsonApi.commandSubmissionThroughput.mark())
t3 <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtWritePayload, JsValue)]
(jwt, jwtPayload, reqBody) = t3
resp <- withJwtPayloadLoggingContext(jwtPayload)(fn(jwt, jwtPayload, reqBody))
resp <- withJwtPayloadLoggingContext(jwtPayload)(
fn(jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx)
)
jsVal <- either(SprayJson.encode1(resp).liftErr(ServerError)): ET[JsValue]
} yield domain.OkResponse(jsVal)

def create(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
for {
cmd <- either(
decoder.decodeCreateCommand(reqBody).liftErr(InvalidUserInput)
): ET[domain.CreateCommand[ApiRecord, TemplateId.RequiredPkg]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())

ac <- eitherT(
handleFutureEitherFailure(commandService.create(jwt, jwtPayload, cmd))
Expand All @@ -179,12 +226,12 @@ class Endpoints(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
for {
cmd <- either(
decoder.decodeExerciseCommand(reqBody).liftErr(InvalidUserInput)
): ET[domain.ExerciseCommand[LfValue, domain.ContractLocator[LfValue]]]

_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
resolvedRef <- eitherT(
resolveReference(jwt, jwtPayload, cmd.reference)
): ET[domain.ResolvedContractRef[ApiValue]]
Expand All @@ -206,11 +253,12 @@ class Endpoints(
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc =>
handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc =>
for {
cmd <- either(
decoder.decodeCreateAndExerciseCommand(reqBody).liftErr(InvalidUserInput)
): ET[domain.CreateAndExerciseCommand[ApiRecord, ApiValue, TemplateId.RequiredPkg]]
_ <- EitherT.pure(parseAndDecodeTimerCtx.close())

resp <- eitherT(
handleFutureEitherFailure(
Expand All @@ -221,9 +269,11 @@ class Endpoints(
}

def fetch(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): ET[domain.SyncResponse[JsValue]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
input <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtPayload, JsValue)]

(jwt, jwtPayload, reqBody) = input
Expand All @@ -235,7 +285,7 @@ class Endpoints(
cl <- either(
decoder.decodeContractLocator(reqBody).liftErr(InvalidUserInput)
): ET[domain.ContractLocator[LfValue]]

_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
_ = logger.debug(s"/v1/fetch cl: $cl")

ac <- eitherT(
Expand All @@ -251,10 +301,15 @@ class Endpoints(
} yield domain.OkResponse(jsVal)

def retrieveAll(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
): Future[Error \/ SearchResult[Error \/ JsValue]] =
inputAndJwtPayload[JwtPayload](req).map {
lc: LoggingContextOf[InstanceUUID with RequestID],
metrics: Metrics,
): Future[Error \/ SearchResult[Error \/ JsValue]] = for {
parseAndDecodeTimerCtx <- Future(
metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time()
)
res <- inputAndJwtPayload[JwtPayload](req).map {
_.map { case (jwt, jwtPayload, _) =>
parseAndDecodeTimerCtx.close()
withJwtPayloadLoggingContext(jwtPayload) { implicit lc =>
val result: SearchResult[ContractsService.Error \/ domain.ActiveContract[LfValue]] =
contractsService.retrieveAll(jwt, jwtPayload)
Expand All @@ -267,6 +322,7 @@ class Endpoints(
}
}
}
} yield res

def query(req: HttpRequest)(implicit
lc: LoggingContextOf[InstanceUUID with RequestID]
Expand Down Expand Up @@ -341,9 +397,10 @@ class Endpoints(
metrics: Metrics,
): ET[domain.SyncResponse[Unit]] =
for {
parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx()
_ <- EitherT.pure(metrics.daml.HttpJsonApi.uploadPackagesThroughput.mark())
t2 <- either(inputSource(req)): ET[(Jwt, Source[ByteString, Any])]

_ <- EitherT.pure(parseAndDecodeTimerCtx.close())
(jwt, source) = t2

_ <- eitherT(
Expand Down
21 changes: 19 additions & 2 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,25 @@ final class Metrics(val registry: MetricRegistry) {
object HttpJsonApi {
private val Prefix: MetricName = daml.Prefix :+ "http_json_api"

// Meters how long processing of a request takes
val httpRequestTimer: Timer = registry.timer(Prefix :+ "http_request_timing")
// Meters how long processing of a command submission request takes
val commandSubmissionTimer: Timer = registry.timer(Prefix :+ "command_submission_timing")
// Meters how long processing of a query GET request takes
val queryAllTimer: Timer = registry.timer(Prefix :+ "query_all_timing")
// Meters how long processing of a query POST request takes
val queryMatchingTimer: Timer = registry.timer(Prefix :+ "query_matching_timing")
// Meters how long processing of a fetch request takes
val fetchTimer: Timer = registry.timer(Prefix :+ "fetch_timing")
// Meters how long processing of a get party/parties request takes
val getPartyTimer: Timer = registry.timer(Prefix :+ "get_party_timing")
// Meters how long processing of a party management request takes
val allocatePartyTimer: Timer = registry.timer(Prefix :+ "allocate_party_timing")
// Meters how long processing of a package management request takes
val downloadPackageTimer: Timer = registry.timer(Prefix :+ "download_package_timing")
// Meters how long processing of a package upload request takes
val uploadPackageTimer: Timer = registry.timer(Prefix :+ "upload_package_timing")
// Meters how long parsing and decoding of an incoming json payload takes
val incomingJsonParsingAndValidationTimer: Timer =
registry.timer(Prefix :+ "incoming_json_parsing_and_validation_timing")
// Meters http requests throughput
val httpRequestThroughput: Meter = registry.meter(Prefix :+ "http_request_throughput")
// Meters how many websocket connections are currently active
Expand Down