Skip to content

Commit

Permalink
More logging for long-running jobs (#5473)
Browse files Browse the repository at this point in the history
* [WIP] more logging for long-running jobs

* send slack notification for successful job

* undo datastore config change

* remove unused config key
  • Loading branch information
fm3 authored May 11, 2021
1 parent 3cc3acf commit 7f96874
Show file tree
Hide file tree
Showing 17 changed files with 130 additions and 105 deletions.
6 changes: 3 additions & 3 deletions app/controllers/AnnotationController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class AnnotationController @Inject()(

def info(typ: String, id: String, timestamp: Long): Action[AnyContent] = sil.UserAwareAction.async {
implicit request =>
log {
log() {
val notFoundMessage =
if (request.identity.isEmpty) "annotation.notFound.considerLoggingIn" else "annotation.notFound"
for {
Expand Down Expand Up @@ -211,7 +211,7 @@ class AnnotationController @Inject()(

def finish(typ: String, id: String, timestamp: Long): Action[AnyContent] = sil.SecuredAction.async {
implicit request =>
log {
log() {
for {
(updated, message) <- finishAnnotation(typ, id, request.identity, timestamp) ?~> "annotation.finish.failed"
restrictions <- provider.restrictionsFor(typ, id)
Expand All @@ -222,7 +222,7 @@ class AnnotationController @Inject()(

def finishAll(typ: String, timestamp: Long): Action[JsValue] = sil.SecuredAction.async(parse.json) {
implicit request =>
log {
log() {
withJsonAs[JsArray](request.body \ "annotations") { annotationIds =>
val results = Fox.serialSequence(annotationIds.value.toList) { jsValue =>
jsValue.asOpt[String].toFox.flatMap(id => finishAnnotation(typ, id, request.identity, timestamp))
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/AnnotationIOController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AnnotationIOController @Inject()(nmlWriter: NmlWriter,

def upload: Action[MultipartFormData[TemporaryFile]] = sil.SecuredAction.async(parse.multipartFormData) {
implicit request =>
log {
log() {
val shouldCreateGroupForEachFile: Boolean =
request.body.dataParts("createGroupForEachFile").headOption.contains("true")
val overwritingDataSetName: Option[String] =
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class Application @Inject()(multiUserDAO: MultiUserDAO,
val after = System.currentTimeMillis()
logger.info(s"Answering ok for wK health check, took ${after - before} ms$localStoresLabelWrapped")
}
log {
log() {
for {
before <- Fox.successful(System.currentTimeMillis())
dataStoreDuration <- checkDatastoreHealthIfEnabled ?~> "dataStore.unavailable"
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/DataSetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class DataSetController @Inject()(userService: UserService,

def read(organizationName: String, dataSetName: String, sharingToken: Option[String]): Action[AnyContent] =
sil.UserAwareAction.async { implicit request =>
log {
log() {
val ctx = URLSharing.fallbackTokenAccessContext(sharingToken)
for {
organization <- organizationDAO.findOneByName(organizationName)(GlobalAccessContext) ?~> Messages(
Expand Down
106 changes: 61 additions & 45 deletions app/controllers/JobsController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class JobService @Inject()(wkConf: WkConf,
celeryInfoJson <- flowerRpc("/api/tasks?offset=0").getWithJsonResponse[JsObject]
celeryInfoMap <- celeryInfoJson
.validate[Map[String, JsObject]] ?~> "Could not validate celery response as json map"
_ = trackAllNewlyFailed(celeryInfoMap)
_ = trackAllNewlyDone(celeryInfoMap)
_ <- Fox.serialCombined(celeryInfoMap.keys.toList)(jobId =>
jobDAO.updateCeleryInfoByCeleryId(jobId, celeryInfoMap(jobId)))
} yield ()
Expand All @@ -134,37 +134,40 @@ class JobService @Inject()(wkConf: WkConf,
}
}

private def trackAllNewlyFailed(celeryInfoMap: Map[String, JsObject]): Fox[Unit] =
private def trackAllNewlyDone(celeryInfoMap: Map[String, JsObject]): Fox[Unit] =
for {
oldJobs <- jobDAO.getAllByCeleryIds(celeryInfoMap.keys.toList)
nowFailedJobInfos = filterFailed(celeryInfoMap: Map[String, JsObject])
newlyFailedJobs = getNewlyFailedJobs(oldJobs, nowFailedJobInfos)
nowFailedJobInfos = filterByStatus(celeryInfoMap: Map[String, JsObject], "FAILURE")
newlyFailedJobs = getNewlyDoneJobs(oldJobs, nowFailedJobInfos)
_ = newlyFailedJobs.map(trackNewlyFailed)
nowSuccessfulJobInfos = filterByStatus(celeryInfoMap: Map[String, JsObject], "SUCCESS")
newlySuccessfulJobs = getNewlyDoneJobs(oldJobs, nowSuccessfulJobInfos)
_ = newlySuccessfulJobs.map(trackNewlySuccessful)
} yield ()

private def filterFailed(celeryInfoMap: Map[String, JsObject]): Map[String, JsObject] =
private def filterByStatus(celeryInfoMap: Map[String, JsObject], statusToFilter: String): Map[String, JsObject] =
celeryInfoMap.filter(tuple => {
val statusOpt = (tuple._2 \ "state").validate[String]
statusOpt match {
case JsSuccess(status, _) =>
if (status == "FAILURE") true
if (status == statusToFilter) true
else false
case _ => false
}
})

private def getNewlyFailedJobs(oldJobs: List[Job], nowFailedJobInfos: Map[String, JsObject]): List[Job] = {
val failableStates = List("STARTED", "PENDING", "RETRY")
val previouslyFailableJobs = oldJobs.filter(job => {
private def getNewlyDoneJobs(oldJobs: List[Job], nowDoneJobInfos: Map[String, JsObject]): List[Job] = {
val incompleteStates = List("STARTED", "PENDING", "RETRY")
val previouslyIncompleteJobs = oldJobs.filter(job => {
val oldSatusOpt = (job.celeryInfo \ "state").validate[String]
oldSatusOpt match {
case JsSuccess(status, _) => failableStates.contains(status)
case JsSuccess(status, _) => incompleteStates.contains(status)
case _ => true
}
})
val newlyFailedJobs = previouslyFailableJobs.filter(job => nowFailedJobInfos.contains(job.celeryJobId))
newlyFailedJobs.map { job =>
job.copy(celeryInfo = nowFailedJobInfos(job.celeryJobId))
val newlyDoneJobs = previouslyIncompleteJobs.filter(job => nowDoneJobInfos.contains(job.celeryJobId))
newlyDoneJobs.map { job =>
job.copy(celeryInfo = nowDoneJobInfos(job.celeryJobId))
}
}

Expand All @@ -179,6 +182,12 @@ class JobService @Inject()(wkConf: WkConf,
()
}

private def trackNewlySuccessful(job: Job): Unit =
slackNotificationService.info(
"Successful job",
s"Job ${job._id} succeeded. Command ${job.command}, celery job id: ${job.celeryJobId}."
)

def publicWrites(job: Job): Fox[JsObject] =
Fox.successful(
Json.obj(
Expand Down Expand Up @@ -220,6 +229,7 @@ class JobService @Inject()(wkConf: WkConf,
class JobsController @Inject()(jobDAO: JobDAO,
sil: Silhouette[WkEnv],
jobService: JobService,
slackNotificationService: SlackNotificationService,
organizationDAO: OrganizationDAO)(implicit ec: ExecutionContext)
extends Controller {

Expand All @@ -241,16 +251,20 @@ class JobsController @Inject()(jobDAO: JobDAO,

def runConvertToWkwJob(organizationName: String, dataSetName: String, scale: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
organization <- organizationDAO.findOneByName(organizationName) ?~> Messages("organization.notFound",
organizationName)
_ <- bool2Fox(request.identity._organization == organization._id) ~> FORBIDDEN
command = "convert_to_wkw"
commandArgs = Json.obj("organization_name" -> organizationName, "dataset_name" -> dataSetName, "scale" -> scale)

job <- jobService.runJob(command, commandArgs, request.identity) ?~> "job.couldNotRunCubing"
js <- jobService.publicWrites(job)
} yield Ok(js)
log(Some(slackNotificationService.noticeFailedJobRequest)) {
for {
organization <- organizationDAO.findOneByName(organizationName) ?~> Messages("organization.notFound",
organizationName)
_ <- bool2Fox(request.identity._organization == organization._id) ~> FORBIDDEN
command = "convert_to_wkw"
commandArgs = Json.obj("organization_name" -> organizationName,
"dataset_name" -> dataSetName,
"scale" -> scale)

job <- jobService.runJob(command, commandArgs, request.identity) ?~> "job.couldNotRunCubing"
js <- jobService.publicWrites(job)
} yield Ok(js)
}
}

def runExportTiffJob(organizationName: String,
Expand All @@ -262,28 +276,30 @@ class JobsController @Inject()(jobDAO: JobDAO,
annotationId: Option[String],
annotationType: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
organization <- organizationDAO.findOneByName(organizationName) ?~> Messages("organization.notFound",
organizationName)
_ <- bool2Fox(request.identity._organization == organization._id) ?~> "job.export.notAllowed.organization" ~> FORBIDDEN
_ <- jobService.assertTiffExportBoundingBoxLimits(bbox)
command = "export_tiff"
exportFileName = s"${formatDateForFilename(new Date())}__${dataSetName}__${tracingId.map(_ => "volume").getOrElse(layerName.getOrElse(""))}.zip"
commandArgs = Json.obj(
"organization_name" -> organizationName,
"dataset_name" -> dataSetName,
"bbox" -> bbox,
"webknossos_token" -> TracingStoreRpcClient.webKnossosToken,
"export_file_name" -> exportFileName,
"layer_name" -> layerName,
"volume_tracing_id" -> tracingId,
"volume_tracing_version" -> tracingVersion,
"annotation_id" -> annotationId,
"annotation_type" -> annotationType
)
job <- jobService.runJob(command, commandArgs, request.identity) ?~> "job.couldNotRunTiffExport"
js <- jobService.publicWrites(job)
} yield Ok(js)
log(Some(slackNotificationService.noticeFailedJobRequest)) {
for {
organization <- organizationDAO.findOneByName(organizationName) ?~> Messages("organization.notFound",
organizationName)
_ <- bool2Fox(request.identity._organization == organization._id) ?~> "job.export.notAllowed.organization" ~> FORBIDDEN
_ <- jobService.assertTiffExportBoundingBoxLimits(bbox)
command = "export_tiff"
exportFileName = s"${formatDateForFilename(new Date())}__${dataSetName}__${tracingId.map(_ => "volume").getOrElse(layerName.getOrElse(""))}.zip"
commandArgs = Json.obj(
"organization_name" -> organizationName,
"dataset_name" -> dataSetName,
"bbox" -> bbox,
"webknossos_token" -> TracingStoreRpcClient.webKnossosToken,
"export_file_name" -> exportFileName,
"layer_name" -> layerName,
"volume_tracing_id" -> tracingId,
"volume_tracing_version" -> tracingVersion,
"annotation_id" -> annotationId,
"annotation_type" -> annotationType
)
job <- jobService.runJob(command, commandArgs, request.identity) ?~> "job.couldNotRunTiffExport"
js <- jobService.publicWrites(job)
} yield Ok(js)
}
}

def downloadExport(jobId: String, exportFileName: String): Action[AnyContent] =
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/TaskController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class TaskController @Inject()(taskCreationService: TaskCreationService,
}

def request: Action[AnyContent] = sil.SecuredAction.async { implicit request =>
log {
log() {
val user = request.identity
for {
teams <- taskService.getAllowedTeamsForNextTask(user)
Expand Down
4 changes: 2 additions & 2 deletions app/controllers/UserController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ class UserController @Inject()(userService: UserService,
private val DefaultAnnotationListLimit = 1000

def current: Action[AnyContent] = sil.SecuredAction.async { implicit request =>
log {
log() {
for {
userJs <- userService.publicWrites(request.identity, request.identity)
} yield Ok(userJs)
}
}

def user(userId: String): Action[AnyContent] = sil.SecuredAction.async { implicit request =>
log {
log() {
for {
userIdValidated <- ObjectId.parse(userId) ?~> "user.id.invalid"
user <- userDAO.findOne(userIdValidated) ?~> "user.notFound" ~> NOT_FOUND
Expand Down
14 changes: 4 additions & 10 deletions app/oxalis/security/UserAwareRequestLogging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,12 @@ trait UserAwareRequestLogging extends AbstractRequestLogging {

case class RequesterIdOpt(id: Option[String]) //forcing implicit conversion

def log(block: => Result)(implicit request: Request[_], requesterIdOpt: RequesterIdOpt): Result = {
val result: Result = block
logRequestFormatted(request, result, requesterIdOpt.id)
result
}

def log(block: => Future[Result])(implicit request: Request[_],
userIdOpt: RequesterIdOpt,
ec: ExecutionContext): Future[Result] =
def log(notifier: Option[String => Unit] = None)(block: => Future[Result])(implicit request: Request[_],
requesterIdOpt: RequesterIdOpt,
ec: ExecutionContext): Future[Result] =
for {
result: Result <- block
_ = logRequestFormatted(request, result, userIdOpt.id)
_ = logRequestFormatted(request, result, notifier, requesterIdOpt.id)
} yield result

implicit def userAwareRequestToRequesterIdOpt(implicit request: UserAwareRequest[WkEnv, _]): RequesterIdOpt =
Expand Down
12 changes: 12 additions & 0 deletions app/oxalis/telemetry/SlackNotificationService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ class SlackNotificationService @Inject()(rpc: RPC, config: WkConf) extends LazyL
msg = msg
)

def info(title: String, msg: String): Unit =
slackClient.info(
title = title,
msg = msg
)

def noticeFailedJobRequest(msg: String): Unit =
slackClient.warn(
title = "Failed job request",
msg = msg
)

def noticeBaseAnnotationTaskCreation(taskType: List[String], numberOfTasks: Int): Unit =
slackClient.info(
title = "Task creation with base",
Expand Down
1 change: 0 additions & 1 deletion app/utils/WkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L
val user: String = get[String]("mail.smtp.user")
val pass: String = get[String]("mail.smtp.pass")
}
val demoSender: String = get[String]("mail.demoSender")
val defaultSender: String = get[String]("mail.defaultSender")
object Mailchimp {
val host: String = get[String]("mail.mailchimp.host")
Expand Down
1 change: 0 additions & 1 deletion conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ mail {
pass = ""
}
defaultSender = "webKnossos <[email protected]>"
demoSender = ""
reply = "webKnossos <[email protected]>"
mailchimp {
host = ""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
package com.scalableminds.util.requestlogging

import com.typesafe.scalalogging.LazyLogging
import play.api.http.HttpEntity
import play.api.http.{HttpEntity, Status}
import play.api.mvc.{Request, Result}

import scala.concurrent.{ExecutionContext, Future}

trait AbstractRequestLogging extends LazyLogging {

def logRequestFormatted(request: Request[_], result: Result, userId: Option[String] = None): Unit = {
val userIdMsg = userId.map(id => s" for user $id").getOrElse("")
def logRequestFormatted(request: Request[_],
result: Result,
notifier: Option[String => Unit],
requesterId: Option[String] = None): Unit = {

if (Status.isSuccessful(result.header.status)) return

val userIdMsg = requesterId.map(id => s" for user $id").getOrElse("")
val resultMsg = s": ${resultBody(result)}"
val msg = s"Answering ${result.header.status} at ${request.uri}$userIdMsg$resultMsg"
logger.warn(msg)
notifier.foreach(_(msg))
}

private def resultBody(result: Result): String =
result.body match {
case HttpEntity.Strict(byteString, _) if result.header.status != 200 =>
logger.warn(
s"Answering ${result.header.status} at ${request.uri}$userIdMsg${byteString.take(20000).decodeString("utf-8")}")
case _ => ()
case HttpEntity.Strict(byteString, _) => byteString.take(20000).decodeString("utf-8")
case _ => ""
}
}

}

trait RequestLogging extends AbstractRequestLogging {
// Hint: within webKnossos itself, UserAwareRequestLogging is available, which additionally logs the requester user id

def log(block: => Future[Result])(implicit request: Request[_], ec: ExecutionContext): Future[Result] =
def log(notifier: Option[String => Unit] = None)(block: => Future[Result])(implicit request: Request[_],
ec: ExecutionContext): Future[Result] =
for {
result: Result <- block
_ = logRequestFormatted(request, result)
_ = logRequestFormatted(request, result, notifier)
} yield result

def logTime(notifier: String => Unit)(block: => Future[Result])(implicit request: Request[_],
Expand All @@ -43,14 +54,8 @@ trait RequestLogging extends AbstractRequestLogging {
for {
result: Result <- block
executionTime = System.nanoTime() - start
_ = if (request.uri.contains("volume") && executionTime > 3e9) logTimeFormatted(executionTime, request, result)
_ = if (executionTime > 5e9) logTimeFormatted(executionTime, request, result)
} yield result
}

def log(block: => Result)(implicit request: Request[_]): Result = {
val result: Result = block
logRequestFormatted(request, result)
result
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ class TracingStoreConfig @Inject()(configuration: Configuration) extends ConfigR
val uri: String = get[String]("slackNotifications.uri")
val verboseLoggingEnabled: Boolean = get[Boolean]("slackNotifications.verboseLoggingEnabled")
}
val children = List(Http, Tracingstore)
val children = List(Http, Tracingstore, SlackNotifications)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Application @Inject()(tracingDataStore: TracingDataStore, redisClient: Red
extends Controller {

def health: Action[AnyContent] = Action.async { implicit request =>
log {
log() {
AllowRemoteOrigin {
for {
before <- Fox.successful(System.currentTimeMillis())
Expand Down
Loading

0 comments on commit 7f96874

Please sign in to comment.