diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala index 943dc2a5c766..c2d50135db55 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/Endpoints.scala @@ -17,6 +17,7 @@ import akka.http.scaladsl.model.headers.{ import akka.stream.Materializer import akka.stream.scaladsl.{Flow, Source} import akka.util.ByteString +import com.codahale.metrics.Timer import com.daml.lf import com.daml.http.ContractsService.SearchResult import com.daml.http.EndpointsCompanion._ @@ -66,64 +67,105 @@ class Endpoints( import util.ErrorOps._ import Uri.Path._ + // Parenthesis in the case matches below are required because otherwise scalafmt breaks. + //noinspection ScalaUnnecessaryParentheses def all(implicit lc: LoggingContextOf[InstanceUUID], metrics: Metrics, ): PartialFunction[HttpRequest, Http.IncomingConnection => Future[HttpResponse]] = { - val dispatch: PartialFunction[HttpRequest, LoggingContextOf[ - InstanceUUID with RequestID - ] => Future[HttpResponse]] = { - // Parenthesis are required because otherwise scalafmt breaks. - case req @ HttpRequest(POST, Uri.Path("/v1/create"), _, _, _) => - (implicit lc => httpResponse(create(req))) - case req @ HttpRequest(POST, Uri.Path("/v1/exercise"), _, _, _) => - (implicit lc => httpResponse(exercise(req))) - case req @ HttpRequest(POST, Uri.Path("/v1/create-and-exercise"), _, _, _) => - (implicit lc => httpResponse(createAndExercise(req))) - case req @ HttpRequest(POST, Uri.Path("/v1/fetch"), _, _, _) => - (implicit lc => httpResponse(fetch(req))) + val apiMetrics = metrics.daml.HttpJsonApi + type DispatchFun = + PartialFunction[HttpRequest, LoggingContextOf[InstanceUUID with RequestID] => Future[ + HttpResponse + ]] + def mkDispatchFunWithTimer(timer: Timer)(fun: DispatchFun): DispatchFun = + fun andThen (f => lc => Timed.future(timer, f(lc))) + val commandDispatch = + mkDispatchFunWithTimer(apiMetrics.commandSubmissionTimer) { + case req @ HttpRequest(POST, Uri.Path("/v1/create"), _, _, _) => + (implicit lc => httpResponse(create(req))) + case req @ HttpRequest(POST, Uri.Path("/v1/exercise"), _, _, _) => + (implicit lc => httpResponse(exercise(req))) + case req @ HttpRequest(POST, Uri.Path("/v1/create-and-exercise"), _, _, _) => + (implicit lc => httpResponse(createAndExercise(req))) + } + val queryAllDispatch = mkDispatchFunWithTimer(apiMetrics.queryAllTimer) { case req @ HttpRequest(GET, Uri.Path("/v1/query"), _, _, _) => (implicit lc => httpResponse(retrieveAll(req))) + } + val queryMatchingDispatch = mkDispatchFunWithTimer(apiMetrics.queryMatchingTimer) { case req @ HttpRequest(POST, Uri.Path("/v1/query"), _, _, _) => (implicit lc => httpResponse(query(req))) + } + val fetchDispatch: DispatchFun = { + case req @ HttpRequest(POST, Uri.Path("/v1/fetch"), _, _, _) => + (implicit lc => Timed.future(apiMetrics.fetchTimer, httpResponse(fetch(req)))) + } + val getPartyDispatch = mkDispatchFunWithTimer(apiMetrics.getPartyTimer) { case req @ HttpRequest(GET, Uri.Path("/v1/parties"), _, _, _) => (implicit lc => httpResponse(allParties(req))) case req @ HttpRequest(POST, Uri.Path("/v1/parties"), _, _, _) => (implicit lc => httpResponse(parties(req))) + } + val allocatePartyDispatch: DispatchFun = { case req @ HttpRequest(POST, Uri.Path("/v1/parties/allocate"), _, _, _) => - (implicit lc => httpResponse(allocateParty(req))) + ( + implicit lc => + Timed.future(apiMetrics.allocatePartyTimer, httpResponse(allocateParty(req))) + ) + } + val packageManagementDispatch: DispatchFun = { case req @ HttpRequest(GET, Uri.Path("/v1/packages"), _, _, _) => (implicit lc => httpResponse(listPackages(req))) + case req @ HttpRequest(POST, Uri.Path("/v1/packages"), _, _, _) => + ( + implicit lc => + Timed.future( + apiMetrics.uploadPackageTimer, + httpResponse(uploadDarFile(req)), + ) + ) // format: off - case req @ HttpRequest(GET, + case req @ HttpRequest(GET, Uri(_, _, Slash(Segment("v1", Slash(Segment("packages", Slash(Segment(packageId, Empty)))))), _, _), - _, _, _) => (implicit lc => downloadPackage(req, packageId)) - // format: on - case req @ HttpRequest(POST, Uri.Path("/v1/packages"), _, _, _) => - (implicit lc => httpResponse(uploadDarFile(req))) + _, _, _) => + (implicit lc => Timed.future(apiMetrics.downloadPackageTimer, downloadPackage(req, packageId))) + // format: on + } + val liveOrHealthDispatch: DispatchFun = { case HttpRequest(GET, Uri.Path("/livez"), _, _, _) => _ => Future.successful(HttpResponse(status = StatusCodes.OK)) case HttpRequest(GET, Uri.Path("/readyz"), _, _, _) => _ => healthService.ready().map(_.toHttpResponse) } import scalaz.std.partialFunction._, scalaz.syntax.arrow._ - (dispatch &&& { case r => r }) andThen { case (lcFhr, req) => + ((commandDispatch orElse + queryAllDispatch orElse + queryMatchingDispatch orElse + fetchDispatch orElse + getPartyDispatch orElse + allocatePartyDispatch orElse + packageManagementDispatch orElse + liveOrHealthDispatch) &&& { case r => r }) andThen { case (lcFhr, req) => (connection: Http.IncomingConnection) => extendWithRequestIdLogCtx(implicit lc => { val t0 = System.nanoTime logger.info(s"Incoming request on ${req.uri} from ${connection.remoteAddress}") metrics.daml.HttpJsonApi.httpRequestThroughput.mark() - Timed - .future(metrics.daml.HttpJsonApi.httpRequestTimer, lcFhr(lc)) - .map(res => { - logger.trace(s"Processed request after ${System.nanoTime() - t0}ns") - logger.info(s"Responding to client with HTTP ${res.status}") - res - }) + for { + res <- lcFhr(lc) + _ = logger.trace(s"Processed request after ${System.nanoTime() - t0}ns") + _ = logger.info(s"Responding to client with HTTP ${res.status}") + } yield res }) } } + def getParseAndDecodeTimerCtx()(implicit + metrics: Metrics + ): ET[Timer.Context] = + EitherT.pure(metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time()) + def withJwtPayloadLoggingContext[A](jwtPayload: JwtPayloadG)( fn: LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => A )(implicit lc: LoggingContextOf[InstanceUUID with RequestID]): A = @@ -142,6 +184,7 @@ class Endpoints( Jwt, JwtWritePayload, JsValue, + Timer.Context, ) => LoggingContextOf[JwtPayloadTag with InstanceUUID with RequestID] => ET[ T[ApiValue] ] @@ -152,10 +195,13 @@ class Endpoints( metrics: Metrics, ): ET[domain.SyncResponse[JsValue]] = for { + parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx() _ <- EitherT.pure(metrics.daml.HttpJsonApi.commandSubmissionThroughput.mark()) t3 <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtWritePayload, JsValue)] (jwt, jwtPayload, reqBody) = t3 - resp <- withJwtPayloadLoggingContext(jwtPayload)(fn(jwt, jwtPayload, reqBody)) + resp <- withJwtPayloadLoggingContext(jwtPayload)( + fn(jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) + ) jsVal <- either(SprayJson.encode1(resp).liftErr(ServerError)): ET[JsValue] } yield domain.OkResponse(jsVal) @@ -163,11 +209,12 @@ class Endpoints( lc: LoggingContextOf[InstanceUUID with RequestID], metrics: Metrics, ): ET[domain.SyncResponse[JsValue]] = - handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc => + handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc => for { cmd <- either( decoder.decodeCreateCommand(reqBody).liftErr(InvalidUserInput) ): ET[domain.CreateCommand[ApiRecord, TemplateId.RequiredPkg]] + _ <- EitherT.pure(parseAndDecodeTimerCtx.close()) ac <- eitherT( handleFutureEitherFailure(commandService.create(jwt, jwtPayload, cmd)) @@ -179,12 +226,12 @@ class Endpoints( lc: LoggingContextOf[InstanceUUID with RequestID], metrics: Metrics, ): ET[domain.SyncResponse[JsValue]] = - handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc => + handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc => for { cmd <- either( decoder.decodeExerciseCommand(reqBody).liftErr(InvalidUserInput) ): ET[domain.ExerciseCommand[LfValue, domain.ContractLocator[LfValue]]] - + _ <- EitherT.pure(parseAndDecodeTimerCtx.close()) resolvedRef <- eitherT( resolveReference(jwt, jwtPayload, cmd.reference) ): ET[domain.ResolvedContractRef[ApiValue]] @@ -206,11 +253,12 @@ class Endpoints( lc: LoggingContextOf[InstanceUUID with RequestID], metrics: Metrics, ): ET[domain.SyncResponse[JsValue]] = - handleCommand(req) { (jwt, jwtPayload, reqBody) => implicit lc => + handleCommand(req) { (jwt, jwtPayload, reqBody, parseAndDecodeTimerCtx) => implicit lc => for { cmd <- either( decoder.decodeCreateAndExerciseCommand(reqBody).liftErr(InvalidUserInput) ): ET[domain.CreateAndExerciseCommand[ApiRecord, ApiValue, TemplateId.RequiredPkg]] + _ <- EitherT.pure(parseAndDecodeTimerCtx.close()) resp <- eitherT( handleFutureEitherFailure( @@ -221,9 +269,11 @@ class Endpoints( } def fetch(req: HttpRequest)(implicit - lc: LoggingContextOf[InstanceUUID with RequestID] + lc: LoggingContextOf[InstanceUUID with RequestID], + metrics: Metrics, ): ET[domain.SyncResponse[JsValue]] = for { + parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx() input <- inputJsValAndJwtPayload(req): ET[(Jwt, JwtPayload, JsValue)] (jwt, jwtPayload, reqBody) = input @@ -235,7 +285,7 @@ class Endpoints( cl <- either( decoder.decodeContractLocator(reqBody).liftErr(InvalidUserInput) ): ET[domain.ContractLocator[LfValue]] - + _ <- EitherT.pure(parseAndDecodeTimerCtx.close()) _ = logger.debug(s"/v1/fetch cl: $cl") ac <- eitherT( @@ -251,10 +301,15 @@ class Endpoints( } yield domain.OkResponse(jsVal) def retrieveAll(req: HttpRequest)(implicit - lc: LoggingContextOf[InstanceUUID with RequestID] - ): Future[Error \/ SearchResult[Error \/ JsValue]] = - inputAndJwtPayload[JwtPayload](req).map { + lc: LoggingContextOf[InstanceUUID with RequestID], + metrics: Metrics, + ): Future[Error \/ SearchResult[Error \/ JsValue]] = for { + parseAndDecodeTimerCtx <- Future( + metrics.daml.HttpJsonApi.incomingJsonParsingAndValidationTimer.time() + ) + res <- inputAndJwtPayload[JwtPayload](req).map { _.map { case (jwt, jwtPayload, _) => + parseAndDecodeTimerCtx.close() withJwtPayloadLoggingContext(jwtPayload) { implicit lc => val result: SearchResult[ContractsService.Error \/ domain.ActiveContract[LfValue]] = contractsService.retrieveAll(jwt, jwtPayload) @@ -267,6 +322,7 @@ class Endpoints( } } } + } yield res def query(req: HttpRequest)(implicit lc: LoggingContextOf[InstanceUUID with RequestID] @@ -341,9 +397,10 @@ class Endpoints( metrics: Metrics, ): ET[domain.SyncResponse[Unit]] = for { + parseAndDecodeTimerCtx <- getParseAndDecodeTimerCtx() _ <- EitherT.pure(metrics.daml.HttpJsonApi.uploadPackagesThroughput.mark()) t2 <- either(inputSource(req)): ET[(Jwt, Source[ByteString, Any])] - + _ <- EitherT.pure(parseAndDecodeTimerCtx.close()) (jwt, source) = t2 _ <- eitherT( diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index b3e82c7af00c..82245558fe99 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -706,8 +706,25 @@ final class Metrics(val registry: MetricRegistry) { object HttpJsonApi { private val Prefix: MetricName = daml.Prefix :+ "http_json_api" - // Meters how long processing of a request takes - val httpRequestTimer: Timer = registry.timer(Prefix :+ "http_request_timing") + // Meters how long processing of a command submission request takes + val commandSubmissionTimer: Timer = registry.timer(Prefix :+ "command_submission_timing") + // Meters how long processing of a query GET request takes + val queryAllTimer: Timer = registry.timer(Prefix :+ "query_all_timing") + // Meters how long processing of a query POST request takes + val queryMatchingTimer: Timer = registry.timer(Prefix :+ "query_matching_timing") + // Meters how long processing of a fetch request takes + val fetchTimer: Timer = registry.timer(Prefix :+ "fetch_timing") + // Meters how long processing of a get party/parties request takes + val getPartyTimer: Timer = registry.timer(Prefix :+ "get_party_timing") + // Meters how long processing of a party management request takes + val allocatePartyTimer: Timer = registry.timer(Prefix :+ "allocate_party_timing") + // Meters how long processing of a package management request takes + val downloadPackageTimer: Timer = registry.timer(Prefix :+ "download_package_timing") + // Meters how long processing of a package upload request takes + val uploadPackageTimer: Timer = registry.timer(Prefix :+ "upload_package_timing") + // Meters how long parsing and decoding of an incoming json payload takes + val incomingJsonParsingAndValidationTimer: Timer = + registry.timer(Prefix :+ "incoming_json_parsing_and_validation_timing") // Meters http requests throughput val httpRequestThroughput: Meter = registry.meter(Prefix :+ "http_request_throughput") // Meters how many websocket connections are currently active