diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 0c3e0d8b28..ecf02d4ab0 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -17,6 +17,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released - Added a context menu option to separate an agglomerate skeleton using Min-Cut. Activate the Proofreading tool, select the source node and open the context menu by right-clicking on the target node which you would like to separate through Min-Cut. [#6361](https://github.com/scalableminds/webknossos/pull/6361) - Added a "clear" button to reset skeletons/meshes after successful mergers/split. [#6459](https://github.com/scalableminds/webknossos/pull/6459) - The proofreading tool now supports merging and splitting (via min-cut) agglomerates by rightclicking a segment (and not a node). Note that there still has to be an active node so that both partners of the operation are defined. [#6464](https://github.com/scalableminds/webknossos/pull/6464) +- Added workflow reporting and logging features for Voxelytics into webKnossos. If activated, the workflows can be accessed from the `Administration` > `Voxelytics` menu item. [#6416](https://github.com/scalableminds/webknossos/pull/6416) [#6460](https://github.com/scalableminds/webknossos/pull/6460) - The color of a segments can now be changed in the segments tab. Rightclick a segment in the list and select "Change Color" to open a color picker. [#6372](https://github.com/scalableminds/webknossos/pull/6372) ### Changed diff --git a/MIGRATIONS.unreleased.md b/MIGRATIONS.unreleased.md index 2730361a00..9a3ca0eba8 100644 --- a/MIGRATIONS.unreleased.md +++ b/MIGRATIONS.unreleased.md @@ -8,5 +8,8 @@ User-facing changes are documented in the [changelog](CHANGELOG.released.md). ## Unreleased [Commits](https://github.com/scalableminds/webknossos/compare/22.09.0...HEAD) +- To use the Voxelytics reporting features in webKnossos, the config field `features.voxelyticsEnabled` needs to be set to true. When also using the logging features, an Elasticsearch instance needs to be running and configured in the config field `voxelytics.elasticsearch.uri`. + ### Postgres Evolutions: - [088-shortlinks.sql](conf/evolutions/088-shortlinks.sql) +- [089-voxelytics.sql](conf/evolutions/089-voxelytics.sql) diff --git a/app/WebKnossosModule.scala b/app/WebKnossosModule.scala index 7ce9a7101f..951a8858be 100644 --- a/app/WebKnossosModule.scala +++ b/app/WebKnossosModule.scala @@ -1,12 +1,13 @@ import com.google.inject.AbstractModule import controllers.InitialDataService import models.analytics.AnalyticsSessionService -import models.job.{JobService, WorkerLivenessService} import models.annotation.AnnotationStore import models.binary.DataSetService +import models.job.{JobService, WorkerLivenessService} import models.task.TaskService -import models.user.{UserCache, _} import models.user.time.TimeSpanService +import models.user._ +import models.voxelytics.ElasticsearchClient import oxalis.files.TempFileService import oxalis.mail.MailchimpTicker import oxalis.telemetry.SlackNotificationService @@ -33,5 +34,6 @@ class WebKnossosModule extends AbstractModule { bind(classOf[SlackNotificationService]).asEagerSingleton() bind(classOf[AnalyticsSessionService]).asEagerSingleton() bind(classOf[WorkerLivenessService]).asEagerSingleton() + bind(classOf[ElasticsearchClient]).asEagerSingleton() } } diff --git a/app/controllers/VoxelyticsController.scala b/app/controllers/VoxelyticsController.scala new file mode 100644 index 0000000000..92e2a26188 --- /dev/null +++ b/app/controllers/VoxelyticsController.scala @@ -0,0 +1,271 @@ +package controllers + +import com.mohiva.play.silhouette.api.Silhouette +import com.mohiva.play.silhouette.api.actions.SecuredRequest +import com.scalableminds.util.tools.{Fox, FoxImplicits} +import models.organization.OrganizationDAO +import models.voxelytics._ +import oxalis.security.WkEnv +import play.api.libs.json._ +import play.api.mvc._ +import utils.{ObjectId, WkConf} + +import javax.inject.Inject +import scala.concurrent.ExecutionContext + +class VoxelyticsController @Inject()( + organizationDAO: OrganizationDAO, + voxelyticsDAO: VoxelyticsDAO, + voxelyticsService: VoxelyticsService, + elasticsearchClient: ElasticsearchClient, + wkConf: WkConf, + sil: Silhouette[WkEnv])(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers) + extends Controller + with FoxImplicits { + + private lazy val conf = wkConf.Voxelytics + + def storeWorkflow: Action[WorkflowDescription] = + sil.SecuredAction.async(validateJson[WorkflowDescription]) { implicit request => + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + _ <- voxelyticsService.checkAuthForWorkflowCreation(request.body.run.name, request.identity) ?~> "voxelytics.workflowUserMismatch" ~> UNAUTHORIZED + _ <- voxelyticsDAO.upsertWorkflow(request.body.workflow.hash, + request.body.workflow.name, + request.identity._organization) + runId <- voxelyticsDAO.upsertRun( + request.identity._organization, + request.identity._id, + request.body.run.name, + request.body.run.user, + request.body.run.hostname, + request.body.run.voxelyticsVersion, + request.body.workflow.hash, + request.body.workflow.yamlContent, + request.body.config.asJsonWithoutTasks + ) + _ <- Fox.combined( + request.body.config.tasks + .map(taskKV => { + val taskName = taskKV._1 + val task = taskKV._2 + voxelyticsService.upsertTaskWithArtifacts(runId, taskName, task, request.body.artifacts) + }) + .toList) + + } yield Ok + } + + def listWorkflows(workflowHash: Option[String]): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + // Auth is implemented in `voxelyticsDAO.findRuns` + runs <- voxelyticsDAO.findRuns(request.identity, None, workflowHash, conf.staleTimeout) + result <- if (runs.nonEmpty) { + listWorkflowsWithRuns(request, runs) + } else { + Fox.successful(Json.arr()) + } + } yield JsonOk(result) + } + + private def listWorkflowsWithRuns(request: SecuredRequest[WkEnv, AnyContent], runs: List[RunEntry]): Fox[JsArray] = + for { + _ <- bool2Fox(runs.nonEmpty) // just asserting once more + taskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization, runs.map(_.id), conf.staleTimeout) + _ <- bool2Fox(taskRuns.nonEmpty) ?~> "voxelytics.noTaskFound" ~> NOT_FOUND + workflows <- voxelyticsDAO.findWorkflowsByHash(request.identity._organization, runs.map(_.workflow_hash).toSet) + _ <- bool2Fox(workflows.nonEmpty) ?~> "voxelytics.noWorkflowFound" ~> NOT_FOUND + + workflowsAsJson = JsArray(workflows.flatMap(workflow => { + val workflowRuns = runs.filter(run => run.workflow_hash == workflow.hash) + if (workflowRuns.nonEmpty) { + val state, beginTime, endTime = voxelyticsService.aggregateBeginEndTime(workflowRuns) + Some( + Json.obj( + "name" -> workflow.name, + "hash" -> workflow.hash, + "beginTime" -> beginTime, + "endTime" -> endTime, + "state" -> state.toString(), + "runs" -> workflowRuns.map(run => { + val tasks = taskRuns.filter(taskRun => taskRun.runId == run.id) + voxelyticsService.runPublicWrites(run, tasks) + }) + )) + } else { + None + } + })) + } yield workflowsAsJson + + def getWorkflow(workflowHash: String, runId: Option[String]): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + runIdValidatedOpt <- Fox.runOptional(runId)(ObjectId.fromString(_)) + // Auth is implemented in `voxelyticsDAO.findRuns` + workflow <- voxelyticsDAO.findWorkflowByHash(request.identity._organization, workflowHash) ?~> "voxelytics.workflowNotFound" ~> NOT_FOUND + + // Fetching all runs for this workflow or specified run + // If all runs are fetched, a combined version of the workflow report + // will be returned that contains the information of the most recent task runs + runs <- runIdValidatedOpt + .map(runIdValidated => + voxelyticsDAO.findRuns(request.identity, Some(List(runIdValidated)), Some(workflowHash), conf.staleTimeout)) + .getOrElse(voxelyticsDAO.findRuns(request.identity, None, Some(workflowHash), conf.staleTimeout)) + _ <- bool2Fox(runs.nonEmpty) ?~> "voxelytics.runNotFound" ~> NOT_FOUND + sortedRuns = runs.sortBy(_.beginTime).reverse + // All workflows have at least one run, because they are created at the same time + mostRecentRun <- sortedRuns.headOption ?~> "voxelytics.zeroRunWorkflow" + + // Fetch task runs for all runs + allTaskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization, + sortedRuns.map(_.id), + conf.staleTimeout) + + // Select one representative "task run" for each task + // This will be the most recent run that is running or finished or the most recent run + combinedTaskRuns = voxelyticsService.combineTaskRuns(allTaskRuns, mostRecentRun.id) + + // Fetch artifact data for selected/combined task runs + artifacts <- voxelyticsDAO.findArtifacts(combinedTaskRuns.map(_.taskId)) + tasks <- voxelyticsDAO.findTasks(combinedTaskRuns) + + // Assemble workflow report JSON + (state, beginTime, endTime) = voxelyticsService.aggregateBeginEndTime(runs) + result = Json.obj( + "config" -> voxelyticsService.workflowConfigPublicWrites(mostRecentRun.workflow_config, tasks), + "artifacts" -> voxelyticsService.artifactsPublicWrites(artifacts), + "run" -> voxelyticsService.runPublicWrites( + mostRecentRun.copy(state = state, beginTime = beginTime, endTime = endTime), + combinedTaskRuns), + "workflow" -> Json.obj( + "name" -> workflow.name, + "hash" -> workflowHash, + "yamlContent" -> mostRecentRun.workflow_yamlContent + ) + ) + } yield JsonOk(result) + } + + def storeWorkflowEvents(workflowHash: String, runName: String): Action[List[WorkflowEvent]] = + sil.SecuredAction.async(validateJson[List[WorkflowEvent]]) { implicit request => + def createWorkflowEvent(runId: ObjectId, event: WorkflowEvent): Fox[Unit] = + event match { + case ev: RunStateChangeEvent => voxelyticsDAO.upsertRunStateChangeEvent(runId, ev) + + case ev: TaskStateChangeEvent => + for { + taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId) + _ <- voxelyticsDAO.upsertTaskStateChangeEvent(taskId, ev) + _ <- Fox.combined( + ev.artifacts + .map(artifactKV => { + val artifactName = artifactKV._1 + val artifact = artifactKV._2 + voxelyticsDAO.upsertArtifact(taskId, + artifactName, + artifact.path, + artifact.file_size, + artifact.inode_count, + artifact.version, + artifact.metadataAsJson) + }) + .toList) + } yield () + + case ev: ChunkStateChangeEvent => + for { + taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId) + chunkId <- voxelyticsDAO.upsertChunk(taskId, ev.executionId, ev.chunkName) + _ <- voxelyticsDAO.upsertChunkStateChangeEvent(chunkId, ev) + } yield () + + case ev: RunHeartbeatEvent => voxelyticsDAO.upsertRunHeartbeatEvent(runId, ev) + + case ev: ChunkProfilingEvent => + for { + taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId) + chunkId <- voxelyticsDAO.getChunkIdByName(taskId, ev.executionId, ev.chunkName) + _ <- voxelyticsDAO.upsertChunkProfilingEvent(chunkId, ev) + } yield () + + case ev: ArtifactFileChecksumEvent => + for { + taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId) + artifactId <- voxelyticsDAO.getArtifactIdByName(taskId, ev.artifactName) + _ <- voxelyticsDAO.upsertArtifactChecksumEvent(artifactId, ev) + } yield () + } + + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + runId <- voxelyticsDAO.getRunIdByName(runName, request.identity._organization) ?~> "voxelytics.runNotFound" ~> NOT_FOUND + _ <- voxelyticsService.checkAuth(runId, request.identity) ~> UNAUTHORIZED + _ <- Fox.serialCombined(request.body)(event => createWorkflowEvent(runId, event)) + } yield Ok + } + + def getChunkStatistics(workflowHash: String, runId: String, taskName: String): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + { + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + runIdValidated <- ObjectId.fromString(runId) + _ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED + taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "voxelytics.taskNotFound" ~> NOT_FOUND + results <- voxelyticsDAO.getChunkStatistics(taskId) + } yield JsonOk(Json.toJson(results)) + } + } + + def getArtifactChecksums(workflowHash: String, + runId: String, + taskName: String, + artifactName: Option[String]): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + { + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + runIdValidated <- ObjectId.fromString(runId) + _ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED + taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "voxelytics.taskNotFound" ~> NOT_FOUND + results <- voxelyticsDAO.getArtifactChecksums(taskId, artifactName) + } yield JsonOk(Json.toJson(results)) + } + } + + def appendLogs: Action[List[JsObject]] = + sil.SecuredAction.async(validateJson[List[JsObject]]) { implicit request => + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + organization <- organizationDAO.findOne(request.identity._organization) + logEntries = request.body.map( + entry => + entry ++ Json.obj("vx" -> ((entry \ "vx").as[JsObject] ++ Json.obj("wk_org" -> organization.name, + "wk_user" -> request.identity._id.id)))) + _ <- elasticsearchClient.bulkInsert(logEntries) + } yield Ok + } + + def getLogs(runId: String, taskName: Option[String], minLevel: Option[String]): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + { + for { + _ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled" + runIdValidated <- ObjectId.fromString(runId) + runName <- voxelyticsDAO.getRunNameById(runIdValidated, request.identity._organization) + _ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED + organization <- organizationDAO.findOne(request.identity._organization) + organizationName = organization.name + logEntries <- elasticsearchClient.queryLogs( + runName, + organizationName, + taskName, + minLevel.flatMap(VoxelyticsLogLevel.fromString).getOrElse(VoxelyticsLogLevel.INFO)) + } yield JsonOk(logEntries) + } + } +} diff --git a/app/models/voxelytics/ElasticsearchClient.scala b/app/models/voxelytics/ElasticsearchClient.scala new file mode 100644 index 0000000000..1caef3e231 --- /dev/null +++ b/app/models/voxelytics/ElasticsearchClient.scala @@ -0,0 +1,191 @@ +package models.voxelytics + +import akka.actor.ActorSystem +import com.scalableminds.util.mvc.MimeTypes +import com.scalableminds.util.tools.Fox +import com.scalableminds.util.tools.Fox.{bool2Fox, box2Fox} +import com.scalableminds.webknossos.datastore.rpc.RPC +import com.typesafe.scalalogging.LazyLogging +import models.voxelytics.VoxelyticsLogLevel.VoxelyticsLogLevel +import net.liftweb.common.Full +import play.api.http.{HeaderNames, Status} +import play.api.libs.json.{JsArray, JsNumber, JsValue, Json} +import utils.WkConf + +import java.time.{Duration, LocalDateTime} +import java.util.UUID +import javax.inject.Inject +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration.{FiniteDuration, SECONDS} +import scala.concurrent.{ExecutionContext, Future} + +class ElasticsearchClient @Inject()(wkConf: WkConf, rpc: RPC, val system: ActorSystem)(implicit ec: ExecutionContext) + extends LazyLogging + with MimeTypes { + + private lazy val conf = wkConf.Voxelytics.Elasticsearch + private lazy val enabled = wkConf.Features.voxelyticsEnabled && conf.uri.nonEmpty + + val SCROLL_SIZE = 10000 + val POLLING_INTERVAL = FiniteDuration(1, SECONDS) + + private lazy val elasticsearchSchema = Json.obj( + "settings" -> Json.obj(), + "mappings" -> Json.obj( + "properties" -> Json.obj( + "@timestamp" -> Json.obj("type" -> "date"), + "@version" -> Json.obj("type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "level" -> Json.obj("type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "message" -> Json.obj("type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "vx" -> Json.obj("properties" -> Json.obj( + "hostname" -> Json.obj( + "type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "logger_name" -> Json.obj( + "type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "run_name" -> Json.obj( + "type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "task_name" -> Json.obj( + "type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "wk_org" -> Json.obj("type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "wk_url" -> Json.obj("type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))), + "workflow_hash" -> Json.obj( + "type" -> "text", + "fields" -> Json.obj("keyword" -> Json.obj("type" -> "keyword", "ignore_above" -> 256))) + )) + )) + ) + + private lazy val serverStartupFuture: Fox[Unit] = { + for { + _ <- bool2Fox(enabled) ?~> "Elasticsearch is not enabled." + _ = logger.info("Waiting for Elasticsearch to become available.") + _ <- pollUntilServerStartedUp(LocalDateTime.now.plus(Duration.ofMillis(conf.startupTimeout.toMillis))) ~> 500 + _ <- bootstrapIndexOnServer + } yield () + } + + private def pollUntilServerStartedUp(until: LocalDateTime): Fox[Unit] = { + def waitAndRecurse(until: LocalDateTime): Fox[Unit] = + for { + _ <- akka.pattern.after(POLLING_INTERVAL, using = system.scheduler)(Future.successful(())) + _ <- bool2Fox(!LocalDateTime.now().isAfter(until)) ?~> s"Elasticsearch did not become ready within ${conf.startupTimeout}." + _ <- pollUntilServerStartedUp(until) + } yield () + + for { + isServerAvailableBox <- rpc(s"${conf.uri}/_cluster/health?wait_for_status=yellow&timeout=10s").request + .withMethod("GET") + .execute() + .flatMap(result => + if (Status.isSuccessful(result.status)) { + Fox.successful(true) + } else { + Fox.failure(s"Unexpected error code from Elasticsearch ${result.status}.") + }) + .recoverWith({ + case e: java.net.ConnectException => { + logger.debug(s"Elasticsearch connection exception: $e") + Fox.successful(false) + } + case e => + logger.error(s"Unexpected error $e") + Fox.failure("Unexpected error while trying to connect to Elasticsearch.", Full(e)) + }) + isServerAvailable <- isServerAvailableBox.toFox + _ <- if (!isServerAvailable) { + waitAndRecurse(until) + } else { + logger.info("Elasticsearch is available.") + Fox.successful(()) + } + } yield () + } + + private def bootstrapIndexOnServer: Fox[Unit] = + for { + // HEAD request will return 2xx if index exists or 404 if it doesn't exist + // Here, we convert that HTTP status code logic into a bool + indexExists <- rpc(s"${conf.uri}/${conf.index}").head.map(_ => true).getOrElse(false) + _ <- Fox.runIf(!indexExists) { + logger.info("Bootstrapping Elasticsearch index for Voxelytics.") + rpc(s"${conf.uri}/${conf.index}").put(elasticsearchSchema) + } + } yield () + + def queryLogs(runName: String, + organizationName: String, + taskName: Option[String], + minLevel: VoxelyticsLogLevel = VoxelyticsLogLevel.INFO): Fox[JsValue] = { + + val levels = VoxelyticsLogLevel.sortedValues.drop(VoxelyticsLogLevel.sortedValues.indexOf(minLevel)) + val queryStringParts = List( + Some(s"""vx.run_name:"$runName""""), + Some(s"""vx.wk_org:"$organizationName""""), + Some(s"level:(${levels.map(_.toString).mkString(" OR ")})"), + taskName.map(t => s"""vx.task_name:"$t"""") + ).flatten + + val scrollBody = Json.obj( + "size" -> JsNumber(SCROLL_SIZE), + "query" -> Json.obj("query_string" -> Json.obj("query" -> queryStringParts.mkString(" AND "))), + "sort" -> Json.arr(Json.obj("@timestamp" -> Json.obj("order" -> "asc"))) + ) + + val buffer = ListBuffer[JsValue]() + + for { + _ <- serverStartupFuture + scroll <- rpc(s"${conf.uri}/${conf.index}/_search?scroll=1m").silent + .postJsonWithJsonResponse[JsValue, JsValue](scrollBody) ~> "Could not fetch logs" + scrollId = (scroll \ "_scroll_id").as[String] + scrollHits = (scroll \ "hits" \ "hits").as[List[JsValue]] + _ = buffer ++= scrollHits + lastScrollId <- fetchBatchToBuffer(buffer, scrollId) + _ <- rpc(s"${conf.uri}/_search/scroll/$lastScrollId").delete() + } yield JsArray(buffer) + + } + + private def fetchBatchToBuffer(bufferMutable: ListBuffer[JsValue], scrollId: String): Fox[String] = + for { + batch <- rpc(s"${conf.uri}/_search/scroll").silent + .postJsonWithJsonResponse[JsValue, JsValue](Json.obj("scroll" -> "1m", "scroll_id" -> scrollId)) + batchScrollId = (batch \ "_scroll_id").as[String] + batchHits = (batch \ "hits" \ "hits").as[List[JsValue]] + _ = bufferMutable ++= batchHits + returnedScrollId <- if (batchHits.isEmpty) { + Fox.successful(scrollId) + } else { + fetchBatchToBuffer(bufferMutable, batchScrollId) + } + } yield returnedScrollId + + def bulkInsert(logEntries: List[JsValue]): Fox[Unit] = + if (logEntries.nonEmpty) { + for { + _ <- serverStartupFuture + bytes = logEntries + .flatMap( + entry => + List( + Json.toBytes(Json.obj("create" -> Json.obj("_index" -> conf.index, "_id" -> UUID.randomUUID.toString))), + Json.toBytes(entry))) + .fold(Array.emptyByteArray)((rest, entry) => rest ++ entry ++ "\n".getBytes) + res <- rpc(s"${conf.uri}/_bulk").silent + .addHttpHeaders(HeaderNames.CONTENT_TYPE -> jsonMimeType) + .postBytesWithJsonResponse[JsValue](bytes) + _ <- Fox.bool2Fox((res \ "errors").asOpt[List[JsValue]].forall(_.isEmpty)) + } yield () + } else { + Fox.successful(()) + } +} diff --git a/app/models/voxelytics/VoxelyticsDAO.scala b/app/models/voxelytics/VoxelyticsDAO.scala new file mode 100644 index 0000000000..7c1cfce617 --- /dev/null +++ b/app/models/voxelytics/VoxelyticsDAO.scala @@ -0,0 +1,672 @@ +package models.voxelytics + +import com.scalableminds.util.tools.Fox +import models.user.User +import play.api.libs.json._ +import slick.jdbc.PostgresProfile.api._ +import utils.{ObjectId, SQLClient, SimpleSQLDAO} + +import java.sql.Timestamp +import javax.inject.Inject +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.Duration + +class VoxelyticsDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext) extends SimpleSQLDAO(sqlClient) { + + def findArtifacts(taskIds: List[ObjectId]): Fox[List[ArtifactEntry]] = + for { + r <- run(sql""" + SELECT + a._id, + a._task, + a.name, + a.path, + a.fileSize, + a.inodeCount, + a.version, + a.metadata, + t.name AS taskName + FROM webknossos.voxelytics_artifacts a + JOIN webknossos.voxelytics_tasks t ON t._id = a._task + WHERE t."_id" IN #${writeEscapedTuple(taskIds.map(_.id))} + """.as[(String, String, String, String, Long, Long, String, String, String)]) + } yield + r.toList.map( + row => + ArtifactEntry(ObjectId(row._1), + ObjectId(row._2), + row._3, + row._4, + row._5, + row._6, + row._7, + Json.parse(row._8).as[JsObject], + row._9)) + + def findTasks(combinedTaskRuns: List[TaskRunEntry]): Fox[List[TaskEntry]] = + for { + r <- run(sql""" + SELECT + t._id, + t._run, + t.name, + t.task, + t.config + FROM webknossos.voxelytics_tasks t + WHERE + ("_run", "name") IN (#${combinedTaskRuns + .map(t => s"(${escapeLiteral(t.runId.id)}, ${escapeLiteral(t.taskName)})") + .mkString(", ")}) + """.as[(String, String, String, String, String)]) + } yield + r.toList.map(row => + TaskEntry(ObjectId(row._1), ObjectId(row._2), row._3, row._4, Json.parse(row._5).as[JsObject])) + + def findWorkflowsByHash(organizationId: ObjectId, workflowHashes: Set[String]): Fox[List[WorkflowEntry]] = + for { + r <- run(sql""" + SELECT name, hash + FROM webknossos.voxelytics_workflows + WHERE hash IN #${writeEscapedTuple(workflowHashes.toList)} AND _organization = $organizationId + """.as[(String, String)]) + } yield r.toList.map(row => WorkflowEntry(row._1, row._2)) + + def findWorkflowByHash(organizationId: ObjectId, workflowHash: String): Fox[WorkflowEntry] = + for { + r <- run(sql""" + SELECT name, hash + FROM webknossos.voxelytics_workflows + WHERE hash = $workflowHash AND _organization = $organizationId + """.as[(String, String)]) + (name, hash) <- r.headOption + } yield WorkflowEntry(name, hash) + + def findTaskRuns(organizationId: ObjectId, runIds: List[ObjectId], staleTimeout: Duration): Fox[List[TaskRunEntry]] = + for { + r <- run(sql""" + WITH latest_chunk_states AS ( + SELECT DISTINCT ON (_chunk) _chunk, timestamp, state + FROM webknossos.voxelytics_chunkStateChangeEvents + ORDER BY _chunk, timestamp DESC + ) + SELECT + r.name AS runName, + r._id AS runId, + t._id AS taskId, + t.name AS taskName, + CASE + WHEN task_state.state = 'RUNNING' AND run_heartbeat.timestamp IS NOT NULL AND run_heartbeat.timestamp < NOW() - INTERVAL '#${staleTimeout.toSeconds} SECONDS' + THEN 'STALE' ELSE task_state.state END AS state, + task_begin.timestamp AS beginTime, + CASE + WHEN task_state.state = 'RUNNING' AND run_heartbeat.timestamp IS NOT NULL AND run_heartbeat.timestamp < NOW() - INTERVAL '#${staleTimeout.toSeconds} SECONDS' + THEN run_heartbeat.timestamp ELSE task_end.timestamp END AS endTime, + exec.executionId AS currentExecutionId, + COALESCE(chunks.total, 0) AS chunksTotal, + COALESCE(chunks.finished, 0) AS chunksFinished + FROM webknossos.voxelytics_runs r + JOIN webknossos.voxelytics_tasks t ON t._run = r._id + JOIN ( + SELECT DISTINCT ON (_task) _task, state + FROM webknossos.voxelytics_taskStateChangeEvents + ORDER BY _task, timestamp DESC + ) task_state + ON t._id = task_state._task + LEFT JOIN ( + SELECT DISTINCT ON (_task) _task, timestamp + FROM webknossos.voxelytics_taskStateChangeEvents + WHERE state = 'RUNNING' + ORDER BY _task, timestamp + ) task_begin + ON t._id = task_begin._task + LEFT JOIN ( + SELECT DISTINCT ON (_task) _task, timestamp + FROM webknossos.voxelytics_taskStateChangeEvents + WHERE state IN ('COMPLETE', 'FAILED', 'CANCELLED') + ORDER BY _task, timestamp DESC + ) task_end + ON t._id = task_end._task + LEFT JOIN ( + SELECT _run, timestamp + FROM webknossos.voxelytics_runHeartbeatEvents + ) run_heartbeat + ON r._id = run_heartbeat._run + LEFT JOIN ( + SELECT DISTINCT ON (c._task) c._task, c.executionId + FROM latest_chunk_states + JOIN webknossos.voxelytics_chunks c ON c._id = latest_chunk_states._chunk + WHERE latest_chunk_states.state = 'RUNNING' + ORDER BY c._task, latest_chunk_states.timestamp DESC + ) exec ON exec._task = t._id + LEFT JOIN ( + SELECT + count_all._task AS _task, + count_all.count AS total, + COALESCE(count_finished.count, 0) AS finished + FROM ( + SELECT _task, COUNT(_id) AS count + FROM webknossos.voxelytics_chunks + GROUP BY _task + ) count_all + LEFT JOIN ( + SELECT c._task, COUNT(_id) AS count + FROM latest_chunk_states + JOIN webknossos.voxelytics_chunks c ON c._id = latest_chunk_states._chunk + WHERE latest_chunk_states.state IN ('COMPLETE', 'FAILED', 'CANCELLED') + GROUP BY c._task + ) count_finished ON count_finished._task = count_all._task + ) chunks ON chunks._task = t._id + WHERE + r._organization = $organizationId AND + r._id IN #${writeEscapedTuple(runIds.map(_.id))} + """ + .as[(String, String, String, String, String, Option[Timestamp], Option[Timestamp], Option[String], Long, Long)]) + results <- Fox.combined( + r.toList.map( + row => + for { + state <- VoxelyticsRunState.fromString(row._5).toFox + } yield + TaskRunEntry( + row._1, + ObjectId(row._2), + ObjectId(row._3), + row._4, + state, + row._6.map(_.toInstant), + row._7.map(_.toInstant), + row._8, + row._9, + row._10 + ))) + } yield results + + def findRuns(currentUser: User, + runIds: Option[List[ObjectId]], + workflowHash: Option[String], + staleTimeout: Duration): Fox[List[RunEntry]] = { + val organizationId = currentUser._organization + val readAccessQ = if (currentUser.isAdmin) { "" } else { s" AND (r._user = ${escapeLiteral(currentUser._id.id)})" } + val runIdsQ = runIds.map(runIds => s" AND r._id IN ${writeEscapedTuple(runIds.map(_.id))}").getOrElse("") + val workflowHashQ = + workflowHash.map(workflowHash => s" AND r.workflow_hash = ${escapeLiteral(workflowHash)}").getOrElse("") + for { + r <- run(sql""" + SELECT + r._id, + r.name, + r.username, + r.hostname, + r.voxelyticsVersion, + r.workflow_hash, + r.workflow_yamlContent, + r.workflow_config, + CASE + WHEN run_state.state = 'RUNNING' AND run_heartbeat.timestamp IS NOT NULL AND run_heartbeat.timestamp < NOW() - INTERVAL '#${staleTimeout.toSeconds} SECONDS' + THEN 'STALE' ELSE run_state.state END AS state, + run_begin.timestamp AS beginTime, + CASE + WHEN run_state.state = 'RUNNING' AND run_heartbeat.timestamp IS NOT NULL AND run_heartbeat.timestamp < NOW() - INTERVAL '#${staleTimeout.toSeconds} SECONDS' + THEN run_heartbeat.timestamp ELSE run_end.timestamp END AS endTime + FROM webknossos.voxelytics_runs r + JOIN ( + SELECT DISTINCT ON (_run) _run, state + FROM webknossos.voxelytics_runStateChangeEvents + ORDER BY _run, timestamp DESC + ) run_state + ON r._id = run_state._run + JOIN ( + SELECT DISTINCT ON (_run) _run, timestamp + FROM webknossos.voxelytics_runStateChangeEvents + WHERE state = 'RUNNING' + ORDER BY _run, timestamp + ) run_begin + ON r._id = run_begin._run + LEFT JOIN ( + SELECT DISTINCT ON (_run) _run, timestamp + FROM webknossos.voxelytics_runStateChangeEvents + WHERE state IN ('COMPLETE', 'FAILED', 'CANCELLED') + ORDER BY _run, timestamp DESC + ) run_end + ON r._id = run_end._run + LEFT JOIN ( + SELECT _run, timestamp + FROM webknossos.voxelytics_runHeartbeatEvents + ) run_heartbeat + ON r._id = run_heartbeat._run + WHERE r._organization = $organizationId + #$runIdsQ + #$workflowHashQ + #$readAccessQ + """.as[(String, String, String, String, String, String, String, String, String, Timestamp, Option[Timestamp])]) + results <- Fox.combined( + r.toList.map( + row => + for { + state <- VoxelyticsRunState.fromString(row._9).toFox + } yield + RunEntry( + ObjectId(row._1), + row._2, + row._3, + row._4, + row._5, + row._6, + row._7, + Json.parse(row._8).as[JsObject], + state, + row._10.toInstant, + row._11.map(_.toInstant) + ))) + } yield results + + } + + def upsertArtifactChecksumEvent(artifactId: ObjectId, ev: ArtifactFileChecksumEvent): Fox[Unit] = + for { + _ <- run( + sqlu"""INSERT INTO webknossos.voxelytics_artifactFileChecksumEvents (_artifact, path, resolvedPath, checksumMethod, checksum, fileSize, lastModified, timestamp) + VALUES ($artifactId, ${ev.path}, ${ev.resolvedPath}, ${ev.checksumMethod}, ${ev.checksum}, ${ev.fileSize}, ${Timestamp + .from(ev.lastModified)}, ${Timestamp.from(ev.timestamp)}) + ON CONFLICT (_artifact, path, timestamp) + DO UPDATE SET + resolvedPath = EXCLUDED.resolvedPath, + checksumMethod = EXCLUDED.checksumMethod, + checksum = EXCLUDED.checksum, + fileSize = EXCLUDED.fileSize, + lastModified = EXCLUDED.lastModified + """) + } yield () + + def upsertChunkProfilingEvent(chunkId: ObjectId, ev: ChunkProfilingEvent): Fox[Unit] = + for { + _ <- run( + sqlu"""INSERT INTO webknossos.voxelytics_chunkProfilingEvents (_chunk, hostname, pid, memory, cpuUser, cpuSystem, timestamp) + VALUES ($chunkId, ${ev.hostname}, ${ev.pid}, ${ev.memory}, ${ev.cpuUser}, ${ev.cpuSystem}, ${Timestamp + .from(ev.timestamp)}) + ON CONFLICT (_chunk, timestamp) + DO UPDATE SET + hostname = EXCLUDED.hostname, + pid = EXCLUDED.pid, + memory = EXCLUDED.memory, + cpuUser = EXCLUDED.cpuUser, + cpuSystem = EXCLUDED.cpuSystem + """) + } yield () + + def upsertRunHeartbeatEvent(runId: ObjectId, ev: RunHeartbeatEvent): Fox[Unit] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_runHeartbeatEvents (_run, timestamp) + VALUES ($runId, ${Timestamp.from(ev.timestamp)}) + ON CONFLICT (_run) + DO UPDATE SET timestamp = EXCLUDED.timestamp + """) + } yield () + + def upsertChunkStateChangeEvent(chunkId: ObjectId, ev: ChunkStateChangeEvent): Fox[Unit] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_chunkStateChangeEvents (_chunk, timestamp, state) + VALUES ($chunkId, ${Timestamp + .from(ev.timestamp)}, ${ev.state.toString}::webknossos.VOXELYTICS_RUN_STATE) + ON CONFLICT (_chunk, timestamp) + DO UPDATE SET state = EXCLUDED.state + """) + } yield () + + def upsertTaskStateChangeEvent(taskId: ObjectId, ev: TaskStateChangeEvent): Fox[Unit] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_taskStateChangeEvents (_task, timestamp, state) + VALUES ($taskId, ${Timestamp.from(ev.timestamp)}, ${ev.state.toString}::webknossos.VOXELYTICS_RUN_STATE) + ON CONFLICT (_task, timestamp) + DO UPDATE SET state = EXCLUDED.state + """) + } yield () + + def upsertRunStateChangeEvent(runId: ObjectId, ev: RunStateChangeEvent): Fox[Unit] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_runStateChangeEvents (_run, timestamp, state) + VALUES ($runId, ${Timestamp.from(ev.timestamp)}, ${ev.state.toString}::webknossos.VOXELYTICS_RUN_STATE) + ON CONFLICT (_run, timestamp) + DO UPDATE SET state = EXCLUDED.state + """) + } yield () + + def upsertWorkflow(hash: String, name: String, organizationId: ObjectId): Fox[Unit] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_workflows (hash, name, _organization) + VALUES ($hash, $name, $organizationId) + ON CONFLICT (_organization, hash) + DO UPDATE SET name = EXCLUDED.name + """) + } yield () + + def upsertRun(organizationId: ObjectId, + userId: ObjectId, + name: String, + username: String, + hostname: String, + voxelyticsVersion: String, + workflow_hash: String, + workflow_yamlContent: String, + workflow_config: JsValue): Fox[ObjectId] = + for { + _ <- run( + sqlu"""INSERT INTO webknossos.voxelytics_runs (_id, _organization, _user, name, username, hostname, voxelyticsVersion, workflow_hash, workflow_yamlContent, workflow_config) + VALUES (${ObjectId.generate}, $organizationId, $userId, $name, $username, $hostname, $voxelyticsVersion, $workflow_hash, $workflow_yamlContent, ${Json + .stringify(workflow_config)}::JSONB) + ON CONFLICT (_organization, name) + DO UPDATE SET + _user = EXCLUDED._user, + username = EXCLUDED.username, + hostname = EXCLUDED.hostname, + voxelyticsVersion = EXCLUDED.voxelyticsVersion, + workflow_hash = EXCLUDED.workflow_hash, + workflow_yamlContent = EXCLUDED.workflow_yamlContent, + workflow_config = EXCLUDED.workflow_config + """) + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_runs + WHERE _organization = $organizationId AND name = $name + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def upsertTask(runId: ObjectId, name: String, task: String, config: JsValue): Fox[ObjectId] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_tasks (_id, _run, name, task, config) + VALUES (${ObjectId.generate}, $runId, $name, $task, ${Json.stringify(config)}::JSONB) + ON CONFLICT (_run, name) + DO UPDATE SET + task = EXCLUDED.task, + config = EXCLUDED.config + """) + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_tasks + WHERE _run = $runId AND name = $name + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def upsertChunk(taskId: ObjectId, executionId: String, chunkName: String): Fox[ObjectId] = + for { + _ <- run(sqlu"""INSERT INTO webknossos.voxelytics_chunks (_id, _task, executionId, chunkName) + VALUES (${ObjectId.generate}, $taskId, $executionId, $chunkName) + ON CONFLICT (_task, executionId, chunkName) DO NOTHING + """) + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_chunks + WHERE _task = $taskId AND executionId = $executionId AND chunkName = $chunkName + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def upsertArtifact(taskId: ObjectId, + name: String, + path: String, + fileSize: Long, + inodeCount: Long, + version: String, + metadata: JsValue): Fox[ObjectId] = + for { + _ <- run( + sqlu"""INSERT INTO webknossos.voxelytics_artifacts (_id, _task, name, path, fileSize, inodeCount, version, metadata) + VALUES (${ObjectId.generate}, $taskId, $name, $path, $fileSize, $inodeCount, $version, ${Json.stringify( + metadata)}::JSONB) + ON CONFLICT (_task, name) + DO UPDATE SET + path = EXCLUDED.path, + fileSize = EXCLUDED.fileSize, + inodeCount = EXCLUDED.inodeCount, + version = EXCLUDED.version, + metadata = EXCLUDED.metadata + """) + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_artifacts + WHERE _task = $taskId AND name = $name + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def getRunIdByName(runName: String, organizationId: ObjectId): Fox[ObjectId] = + for { + objectIdList <- run(sql""" + SELECT _id + FROM webknossos.voxelytics_runs + WHERE name = $runName AND _organization = $organizationId + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def getRunNameById(runId: ObjectId, organizationId: ObjectId): Fox[String] = + for { + nameList <- run(sql"""SELECT name + FROM webknossos.voxelytics_runs + WHERE _id = $runId AND _organization = $organizationId + """.as[String]) + name <- nameList.headOption + } yield name + + def getUserIdForRun(runId: ObjectId): Fox[ObjectId] = + for { + userIdList <- run(sql""" + SELECT _user + FROM webknossos.voxelytics_runs + WHERE _id = $runId + """.as[String]) + userId <- userIdList.headOption + } yield ObjectId(userId) + + def getUserIdForRunOpt(runName: String, organizationId: ObjectId): Fox[Option[ObjectId]] = + for { + userId <- run(sql""" + SELECT _user + FROM webknossos.voxelytics_runs + WHERE name = $runName AND _organization = $organizationId + """.as[String]) + } yield userId.headOption.map(ObjectId(_)) + + def getTaskIdByName(taskName: String, runId: ObjectId): Fox[ObjectId] = + for { + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_tasks + WHERE _run = $runId AND name = $taskName + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def getChunkIdByName(taskId: ObjectId, executionId: String, chunkName: String): Fox[ObjectId] = + for { + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_chunks + WHERE _task = $taskId AND executionId = $executionId AND chunkName = $chunkName + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def getArtifactIdByName(taskId: ObjectId, artifactName: String): Fox[ObjectId] = + for { + objectIdList <- run(sql"""SELECT _id + FROM webknossos.voxelytics_artifacts + WHERE _task = $taskId AND name = $artifactName + """.as[String]) + objectId <- objectIdList.headOption + } yield ObjectId(objectId) + + def getChunkStatistics(taskId: ObjectId): Fox[List[ChunkStatisticsEntry]] = { + for { + r <- run( + sql""" + WITH latest_chunk_states AS ( + SELECT DISTINCT ON (_chunk) _chunk, timestamp, state + FROM webknossos.voxelytics_chunkStateChangeEvents + ORDER BY _chunk, timestamp DESC + ) + SELECT + exec.executionId AS executionId, + exec.countTotal AS countTotal, + exec.countFinished AS countFinished, + times.beginTime AS beginTime, + times.endTime AS endTime, + profiling.max_memory AS max_memory, + profiling.median_memory AS median_memory, + profiling.stddev_memory AS stddev_memory, + profiling.max_cpuUser AS max_cpuUser, + profiling.median_cpuUser AS median_cpuUser, + profiling.stddev_cpuUser AS stddev_cpuUser, + profiling.max_cpuSystem AS max_cpuSystem, + profiling.median_cpuSystem AS median_cpuSystem, + profiling.stddev_cpuSystem AS stddev_cpuSystem, + durations.max_duration AS max_duration, + durations.median_duration AS median_duration, + durations.stddev_duration AS stddev_duration, + durations.sum_duration AS sum_duration + FROM + ( -- Chunks grouped by task and executionId + SELECT + c._task, + c.executionId, + COUNT(c._id) AS countTotal, + COUNT(finished._chunk) AS countFinished + FROM webknossos.voxelytics_chunks c + LEFT JOIN ( + SELECT * + FROM latest_chunk_states + WHERE state IN ('COMPLETE', 'FAILED', 'CANCELLED') + ) finished ON finished._chunk = c._id + GROUP BY _task, executionId + ) exec + LEFT JOIN ( -- Begin and end time of task+executionId + SELECT + c._task, + c.executionId, + MIN(chunk_events.timestamp) AS beginTime, + MAX(chunk_events.timestamp) AS endTime + FROM webknossos.voxelytics_chunkStateChangeEvents chunk_events + JOIN webknossos.voxelytics_chunks c ON c._id = chunk_events._chunk + GROUP BY c._task, c.executionId + ) times ON times._task = exec._task AND times.executionId = exec.executionId + LEFT JOIN ( -- Profiling statistics (memory, cpu); grouped by task and executionId + SELECT + c._task AS _task, + c.executionId AS executionId, + MAX(cp.memory) AS max_memory, + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cp.memory) AS median_memory, + STDDEV(cp.memory) AS stddev_memory, + MAX(cp.cpuUser) AS max_cpuUser, + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cp.cpuUser) AS median_cpuUser, + STDDEV(cp.cpuUser) AS stddev_cpuUser, + MAX(cp.cpuSystem) AS max_cpuSystem, + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cp.cpuSystem) AS median_cpuSystem, + STDDEV(cp.cpuSystem) AS stddev_cpuSystem + FROM + webknossos.voxelytics_chunkProfilingEvents cp, + webknossos.voxelytics_chunks c + WHERE + c._id = cp._chunk + GROUP BY c._task, c.executionId + ) profiling ON profiling._task = exec._task AND profiling.executionId = exec.executionId + LEFT JOIN ( -- Chunk duration statistics; grouped by task and executionId + SELECT + c._task AS _task, + c.executionId AS executionId, + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY (EXTRACT(epoch FROM c_end.timestamp - c_begin.timestamp))) AS median_duration, + MAX(EXTRACT(epoch FROM c_end.timestamp - c_begin.timestamp)) AS max_duration, + STDDEV(EXTRACT(epoch FROM c_end.timestamp - c_begin.timestamp)) AS stddev_duration, + SUM(EXTRACT(epoch FROM c_end.timestamp - c_begin.timestamp)) AS sum_duration + FROM + ( + SELECT DISTINCT ON (_chunk) _chunk, timestamp + FROM webknossos.voxelytics_chunkStateChangeEvents + WHERE state = 'RUNNING' + ORDER BY _chunk, timestamp + ) c_begin, + ( + SELECT DISTINCT ON (_chunk) _chunk, timestamp + FROM webknossos.voxelytics_chunkStateChangeEvents + WHERE state = 'COMPLETE' + ORDER BY _chunk, timestamp + ) c_end, + webknossos.voxelytics_chunks c + WHERE + c_begin._chunk = c_end._chunk + AND c._id = c_begin._chunk + GROUP BY c._task, c.executionId + ) durations ON durations._task = exec._task AND durations.executionId = exec.executionId + JOIN webknossos.voxelytics_tasks t + ON t._id = exec._task + WHERE -- Limit to specified task + exec._task = $taskId + ORDER BY times.beginTime ASC NULLS LAST + """.as[ + (String, + Int, + Int, + Timestamp, + Timestamp, + Double, + Double, + Double, + Double, + Double, + Double, + Double, + Double, + Double, + Double, + Double, + Double, + Double)]) + } yield + r.toList.map( + row => + ChunkStatisticsEntry( + row._1, + row._2, + row._3, + row._4.toInstant, + row._5.toInstant, + StatisticsEntry(row._6, row._7, row._8), + StatisticsEntry(row._9, row._10, row._11), + StatisticsEntry(row._12, row._13, row._14), + StatisticsEntry(row._15, row._16, row._17, Some(row._18)) + )) + } + + def getArtifactChecksums(taskId: ObjectId, artifactName: Option[String]): Fox[List[ArtifactChecksumEntry]] = + for { + r <- run(sql""" + SELECT + t.name AS taskName, + a.name AS artifactName, + af.path AS path, + af.resolvedPath AS resolvedPath, + af.timestamp AS timestamp, + af.checksumMethod AS checksumMethod, + af.checksum AS checksum, + af.fileSize AS fileSize, + af.lastModified AS lastModified + FROM + ( + SELECT DISTINCT ON(_artifact, path) * + FROM webknossos.voxelytics_artifactFileChecksumEvents + ORDER BY _artifact, path, timestamp + ) af + JOIN webknossos.voxelytics_artifacts a ON a._id = af._artifact + JOIN webknossos.voxelytics_tasks t ON t._id = a._task + WHERE + a._task = $taskId #${artifactName.map(a => s"AND a.name = ${escapeLiteral(a)}").getOrElse("")} + ORDER BY af.path + """.as[(String, String, String, String, Timestamp, String, String, Long, Timestamp)]) + } yield + r.toList.map( + row => + ArtifactChecksumEntry(row._1, + row._2, + row._3, + row._4, + row._5.toInstant, + row._6, + row._7, + row._8, + row._9.toInstant)) +} diff --git a/app/models/voxelytics/VoxelyticsLogLevel.scala b/app/models/voxelytics/VoxelyticsLogLevel.scala new file mode 100644 index 0000000000..5c29bf5ba2 --- /dev/null +++ b/app/models/voxelytics/VoxelyticsLogLevel.scala @@ -0,0 +1,9 @@ +package models.voxelytics + +import com.scalableminds.util.enumeration.ExtendedEnumeration + +object VoxelyticsLogLevel extends ExtendedEnumeration { + type VoxelyticsLogLevel = Value + val NOTSET, DEBUG, INFO, NOTICE, WARNING, ERROR, CRITICAL = Value + val sortedValues = values.toList.sortBy(_.id) +} diff --git a/app/models/voxelytics/VoxelyticsRunState.scala b/app/models/voxelytics/VoxelyticsRunState.scala new file mode 100644 index 0000000000..5e7a539257 --- /dev/null +++ b/app/models/voxelytics/VoxelyticsRunState.scala @@ -0,0 +1,10 @@ +package models.voxelytics + +import com.scalableminds.util.enumeration.ExtendedEnumeration + +object VoxelyticsRunState extends ExtendedEnumeration { + type VoxelyticsRunState = Value + val SKIPPED, PENDING, RUNNING, COMPLETE, FAILED, CANCELLED, STALE = Value + + def nonWaitingStates = List(RUNNING, COMPLETE, FAILED, CANCELLED, STALE) +} diff --git a/app/models/voxelytics/VoxelyticsService.scala b/app/models/voxelytics/VoxelyticsService.scala new file mode 100644 index 0000000000..609dada5f2 --- /dev/null +++ b/app/models/voxelytics/VoxelyticsService.scala @@ -0,0 +1,187 @@ +package models.voxelytics + +import com.scalableminds.util.tools.{Fox, FoxImplicits} +import models.user.User +import models.voxelytics.VoxelyticsRunState.VoxelyticsRunState +import play.api.libs.json.{JsObject, Json, OFormat} +import utils.ObjectId + +import java.time.Instant +import javax.inject.Inject +import scala.concurrent.ExecutionContext +import scala.util.Try + +case class RunEntry(id: ObjectId, + name: String, + username: String, + hostname: String, + voxelyticsVersion: String, + workflow_hash: String, + workflow_yamlContent: String, + workflow_config: JsObject, + state: VoxelyticsRunState, + beginTime: Instant, + endTime: Option[Instant]) + +object RunEntry { + implicit val jsonFormat: OFormat[RunEntry] = Json.format[RunEntry] +} + +case class TaskRunEntry(runName: String, + runId: ObjectId, + taskId: ObjectId, + taskName: String, + state: VoxelyticsRunState, + beginTime: Option[Instant], + endTime: Option[Instant], + currentExecutionId: Option[String], + chunksTotal: Long, + chunksFinished: Long) + +object TaskRunEntry { + implicit val jsonFormat: OFormat[TaskRunEntry] = Json.format[TaskRunEntry] +} + +case class WorkflowEntry( + name: String, + hash: String +) + +object WorkflowEntry { + implicit val jsonFormat: OFormat[WorkflowEntry] = Json.format[WorkflowEntry] +} + +case class ArtifactEntry(artifactId: ObjectId, + taskId: ObjectId, + name: String, + path: String, + fileSize: Long, + inodeCount: Long, + version: String, + metadata: JsObject, + taskName: String) + +object ArtifactEntry { + implicit val jsonFormat: OFormat[ArtifactEntry] = Json.format[ArtifactEntry] +} + +case class TaskEntry(taskId: ObjectId, runId: ObjectId, name: String, task: String, config: JsObject) + +object TaskEntry { + implicit val jsonFormat: OFormat[TaskEntry] = Json.format[TaskEntry] +} + +case class StatisticsEntry(max: Double, median: Double, stddev: Double, sum: Option[Double] = None) + +object StatisticsEntry { + implicit val jsonFormat: OFormat[StatisticsEntry] = Json.format[StatisticsEntry] +} + +case class ChunkStatisticsEntry(executionId: String, + countTotal: Long, + countFinished: Long, + beginTime: Instant, + endTime: Instant, + memory: StatisticsEntry, + cpuUser: StatisticsEntry, + cpuSystem: StatisticsEntry, + duration: StatisticsEntry) + +object ChunkStatisticsEntry { + implicit val jsonFormat: OFormat[ChunkStatisticsEntry] = Json.format[ChunkStatisticsEntry] +} + +case class ArtifactChecksumEntry(taskName: String, + artifactName: String, + path: String, + resolvedPath: String, + timestamp: Instant, + checksumMethod: String, + checksum: String, + fileSize: Long, + lastModified: Instant) + +object ArtifactChecksumEntry { + implicit val jsonFormat: OFormat[ArtifactChecksumEntry] = Json.format[ArtifactChecksumEntry] +} + +class VoxelyticsService @Inject()(voxelyticsDAO: VoxelyticsDAO)(implicit ec: ExecutionContext) extends FoxImplicits { + + def checkAuth(runId: ObjectId, user: User): Fox[Unit] = + for { + runUserId <- voxelyticsDAO.getUserIdForRun(runId) + } yield bool2Fox(user.isAdmin || runUserId == user._id) + + def checkAuthForWorkflowCreation(runName: String, user: User): Fox[Unit] = + for { + runUserId <- voxelyticsDAO.getUserIdForRunOpt(runName, user._organization) + } yield bool2Fox(user.isAdmin || runUserId.forall(_ == user._id)) + + def runPublicWrites(run: RunEntry, tasks: List[TaskRunEntry]): JsObject = + Json.toJson(run).as[JsObject] ++ Json.obj( + "tasks" -> tasks.map(Json.toJson(_)) + ) + + def artifactsPublicWrites(artifacts: List[ArtifactEntry]): JsObject = { + val artifactsByTask = artifacts.groupBy(_.taskName) + JsObject(artifactsByTask.map(artifactKV => { + val taskName = artifactKV._1 + val artifacts = artifactKV._2 + (taskName, JsObject(artifacts.map(artifact => (artifact.name, Json.toJson(artifact))))) + })) + } + + def workflowConfigPublicWrites(workflowConfig: JsObject, tasks: List[TaskEntry]): JsObject = + workflowConfig ++ + Json.obj("tasks" -> JsObject(tasks.map(t => (t.name, t.config ++ Json.obj("task" -> t.task))))) + + def aggregateBeginEndTime(runs: List[RunEntry]): (VoxelyticsRunState, Instant, Option[Instant]) = { + // The calling code needs to make sure that runs is non-empty, otherwise the next lines will throw exceptions + val state = runs.maxBy(_.beginTime).state + val beginTime = runs.map(_.beginTime).min + val endTime = Try(runs.flatMap(_.endTime).max).toOption + + (state, beginTime, endTime) + } + + def combineTaskRuns(allTaskRuns: List[TaskRunEntry], mostRecentRunId: ObjectId): List[TaskRunEntry] = + allTaskRuns + .filter(task => task.runId == mostRecentRunId) + .map(task => { + val thisTaskRuns = allTaskRuns.filter(t => t.taskName == task.taskName).sortBy(_.beginTime).reverse + val nonWaitingTaskRuns = thisTaskRuns.filter(t => VoxelyticsRunState.nonWaitingStates.contains(t.state)) + nonWaitingTaskRuns.headOption.getOrElse(thisTaskRuns.head) + }) + + def upsertTaskWithArtifacts(runId: ObjectId, + taskName: String, + task: WorkflowDescriptionTaskConfig, + artifacts: Map[String, Map[String, WorkflowDescriptionArtifact]]): Fox[Unit] = + for { + taskId <- voxelyticsDAO.upsertTask( + runId, + taskName, + task.task, + Json.obj("config" -> task.config, + "description" -> task.description, + "distribution" -> task.distribution, + "inputs" -> task.inputs, + "output_paths" -> task.output_paths) + ) + _ <- Fox.combined( + artifacts + .getOrElse(taskName, List.empty) + .map(artifactKV => { + val artifactName = artifactKV._1 + val artifact = artifactKV._2 + voxelyticsDAO.upsertArtifact(taskId, + artifactName, + artifact.path, + artifact.file_size, + artifact.inode_count, + artifact.version, + artifact.metadataAsJson) + }) + .toList) + } yield () +} diff --git a/app/models/voxelytics/WorkflowDescription.scala b/app/models/voxelytics/WorkflowDescription.scala new file mode 100644 index 0000000000..0d687acb44 --- /dev/null +++ b/app/models/voxelytics/WorkflowDescription.scala @@ -0,0 +1,67 @@ +package models.voxelytics + +import play.api.libs.json.{JsObject, JsValue, Json, OFormat} + +case class WorkflowDescriptionTaskConfig(config: JsValue, + inputs: JsValue, + description: Option[String], + distribution: JsValue, + output_paths: JsValue, + task: String) + +case class WorkflowDescriptionConfig(global_parameters: Map[String, JsValue], + paths: List[String], + schema_version: Option[Long], + git_hash: Option[String], + tasks: Map[String, WorkflowDescriptionTaskConfig]) { + def asJsonWithoutTasks: JsValue = + Json.obj("global_parameters" -> global_parameters, "paths" -> paths, "schema_version" -> schema_version) +} + +case class WorkflowDescriptionArtifact(path: String, + file_size: Long, + inode_count: Long, + version: String, + attributes: JsValue, + iframes: JsValue, + links: JsValue) { + def metadataAsJson: JsObject = + Json.obj("attributes" -> attributes, "iframes" -> iframes, "links" -> links) +} + +case class WorkflowDescriptionRun(name: String, user: String, hostname: String, voxelyticsVersion: String) + +case class WorkflowDescriptionWorkflow(name: String, hash: String, yamlContent: String) + +case class WorkflowDescription(config: WorkflowDescriptionConfig, + artifacts: Map[String, Map[String, WorkflowDescriptionArtifact]], + run: WorkflowDescriptionRun, + workflow: WorkflowDescriptionWorkflow) + +object WorkflowDescriptionTaskConfig { + implicit val jsonFormat: OFormat[WorkflowDescriptionTaskConfig] = + Json.format[WorkflowDescriptionTaskConfig] +} + +object WorkflowDescriptionConfig { + implicit val jsonFormat: OFormat[WorkflowDescriptionConfig] = + Json.format[WorkflowDescriptionConfig] +} + +object WorkflowDescriptionArtifact { + implicit val jsonFormat: OFormat[WorkflowDescriptionArtifact] = + Json.format[WorkflowDescriptionArtifact] +} + +object WorkflowDescriptionRun { + implicit val jsonFormat: OFormat[WorkflowDescriptionRun] = Json.format[WorkflowDescriptionRun] +} + +object WorkflowDescriptionWorkflow { + implicit val jsonFormat: OFormat[WorkflowDescriptionWorkflow] = + Json.format[WorkflowDescriptionWorkflow] +} + +object WorkflowDescription { + implicit val jsonFormat: OFormat[WorkflowDescription] = Json.format[WorkflowDescription] +} diff --git a/app/models/voxelytics/WorkflowEvents.scala b/app/models/voxelytics/WorkflowEvents.scala new file mode 100644 index 0000000000..770a16a427 --- /dev/null +++ b/app/models/voxelytics/WorkflowEvents.scala @@ -0,0 +1,101 @@ +package models.voxelytics + +import models.voxelytics.VoxelyticsRunState.VoxelyticsRunState +import play.api.libs.json._ + +import java.time.Instant + +trait WorkflowEvent {} + +case class RunStateChangeEvent(state: VoxelyticsRunState, timestamp: Instant) extends WorkflowEvent + +case class TaskStateChangeEvent(taskName: String, + state: VoxelyticsRunState, + timestamp: Instant, + artifacts: Map[String, WorkflowDescriptionArtifact]) + extends WorkflowEvent + +case class ChunkStateChangeEvent(taskName: String, + executionId: String, + chunkName: String, + timestamp: Instant, + state: VoxelyticsRunState) + extends WorkflowEvent + +case class RunHeartbeatEvent(timestamp: Instant) extends WorkflowEvent + +case class ChunkProfilingEvent(taskName: String, + executionId: String, + chunkName: String, + hostname: String, + pid: Long, + memory: Double, + cpuUser: Double, + cpuSystem: Double, + timestamp: Instant) + extends WorkflowEvent + +case class ArtifactFileChecksumEvent(taskName: String, + artifactName: String, + path: String, + resolvedPath: String, + checksumMethod: String, + checksum: String, + fileSize: Long, + lastModified: Instant, + timestamp: Instant) + extends WorkflowEvent + +object RunStateChangeEvent { + implicit val jsonFormat: OFormat[RunStateChangeEvent] = Json.format[RunStateChangeEvent] +} + +object TaskStateChangeEvent { + implicit val jsonFormat: OFormat[TaskStateChangeEvent] = Json.format[TaskStateChangeEvent] +} + +object ChunkStateChangeEvent { + implicit val jsonFormat: OFormat[ChunkStateChangeEvent] = Json.format[ChunkStateChangeEvent] +} + +object RunHeartbeatEvent { + implicit val jsonFormat: OFormat[RunHeartbeatEvent] = Json.format[RunHeartbeatEvent] +} + +object ChunkProfilingEvent { + implicit val jsonFormat: OFormat[ChunkProfilingEvent] = Json.format[ChunkProfilingEvent] +} + +object ArtifactFileChecksumEvent { + implicit val jsonFormat: OFormat[ArtifactFileChecksumEvent] = Json.format[ArtifactFileChecksumEvent] +} + +object WorkflowEvent { + implicit object workflowEventFormat extends Format[WorkflowEvent] { + override def reads(json: JsValue): JsResult[WorkflowEvent] = + (json \ "type").as[String] match { + case "RUN_STATE_CHANGE" => json.validate[RunStateChangeEvent] + case "TASK_STATE_CHANGE" => json.validate[TaskStateChangeEvent] + case "CHUNK_STATE_CHANGE" => json.validate[ChunkStateChangeEvent] + case "RUN_HEARTBEAT" => json.validate[RunHeartbeatEvent] + case "CHUNK_PROFILING" => json.validate[ChunkProfilingEvent] + case "ARTIFACT_FILE_CHECKSUM" => json.validate[ArtifactFileChecksumEvent] + } + + override def writes(a: WorkflowEvent): JsObject = a match { + case s: RunStateChangeEvent => + Json.obj("type" -> "RUN_STATE_CHANGE") ++ Json.toJson(s)(RunStateChangeEvent.jsonFormat).as[JsObject] + case s: TaskStateChangeEvent => + Json.obj("type" -> "TASK_STATE_CHANGE") ++ Json.toJson(s)(TaskStateChangeEvent.jsonFormat).as[JsObject] + case s: ChunkStateChangeEvent => + Json.obj("type" -> "CHUNK_STATE_CHANGE") ++ Json.toJson(s)(ChunkStateChangeEvent.jsonFormat).as[JsObject] + case s: RunHeartbeatEvent => + Json.obj("type" -> "RUN_HEARTBEAT") ++ Json.toJson(s)(RunHeartbeatEvent.jsonFormat).as[JsObject] + case s: ChunkProfilingEvent => + Json.obj("type" -> "CHUNK_PROFILING") ++ Json.toJson(s)(ChunkProfilingEvent.jsonFormat).as[JsObject] + case s: ArtifactFileChecksumEvent => + Json + .obj("type" -> "ARTIFACT_FILE_CHECKSUM") ++ Json.toJson(s)(ArtifactFileChecksumEvent.jsonFormat).as[JsObject] + } + } +} diff --git a/app/utils/SQLHelpers.scala b/app/utils/SQLHelpers.scala index 615b8dd33b..d3b3b4926a 100644 --- a/app/utils/SQLHelpers.scala +++ b/app/utils/SQLHelpers.scala @@ -104,6 +104,33 @@ class SimpleSQLDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext } } + def escapeLiteral(aString: String): String = { + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + var hasBackslash = false + val escaped = new StringBuffer("'") + + aString.foreach { c => + if (c == '\'') { + escaped.append(c).append(c) + } else if (c == '\\') { + escaped.append(c).append(c) + hasBackslash = true + } else { + escaped.append(c) + } + } + escaped.append('\'') + + if (hasBackslash) { + "E" + escaped.toString + } else { + escaped.toString + } + } + + def writeEscapedTuple(seq: List[String]): String = + "(" + seq.map(escapeLiteral).mkString(", ") + ")" + def sanitize(aString: String): String = aString.replaceAll("'", "") // escape ' by doubling it, escape " with backslash, drop commas @@ -230,7 +257,7 @@ abstract class SQLDAO[C, R, X <: AbstractTable[R]] @Inject()(sqlClient: SQLClien def parseFirst(rowSeq: Seq[X#TableElementType], queryLabel: String): Fox[C] = for { firstRow <- rowSeq.headOption.toFox // No error chain here, as this should stay Fox.Empty - parsed <- parse(firstRow) ?~> s"Parsing failed for row in ${collectionName} queried by $queryLabel" + parsed <- parse(firstRow) ?~> s"Parsing failed for row in $collectionName queried by $queryLabel" } yield parsed def parseAll(rowSeq: Seq[X#TableElementType]): Fox[List[C]] = diff --git a/app/utils/WkConf.scala b/app/utils/WkConf.scala index 8167cdcd84..6ccfe67614 100644 --- a/app/utils/WkConf.scala +++ b/app/utils/WkConf.scala @@ -3,8 +3,8 @@ package utils import com.scalableminds.util.tools.ConfigReader import com.typesafe.scalalogging.LazyLogging import play.api.Configuration -import javax.inject.Inject +import javax.inject.Inject import scala.concurrent.duration._ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with LazyLogging { @@ -20,36 +20,46 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L val enabled: Seq[String] = getList[String]("play.modules.enabled") val disabled: Seq[String] = getList[String]("play.modules.disabled") } + val children = List(Modules) } object WebKnossos { val tabTitle: String = get[String]("webKnossos.tabTitle") + object User { val timeTrackingPause: FiniteDuration = get[FiniteDuration]("webKnossos.user.timeTrackingPause") val inviteExpiry: Duration = get[Duration]("webKnossos.user.inviteExpiry") val ssoKey: String = get[String]("webKnossos.user.ssoKey") } + val newOrganizationMailingList: String = get[String]("webKnossos.newOrganizationMailingList") + object Tasks { val maxOpenPerUser: Int = get[Int]("webKnossos.tasks.maxOpenPerUser") } + object Cache { object User { val timeout: FiniteDuration = get[FiniteDuration]("webKnossos.cache.user.timeout") } + val children = List(User) } + object SampleOrganization { val enabled: Boolean = get[Boolean]("webKnossos.sampleOrganization.enabled") + object User { val email: String = get[String]("webKnossos.sampleOrganization.user.email") val password: String = get[String]("webKnossos.sampleOrganization.user.password") val token: String = get[String]("webKnossos.sampleOrganization.user.token") val isSuperUser: Boolean = get[Boolean]("webKnossos.sampleOrganization.user.isSuperUser") } + val children = List(User) } + val operatorData: String = get[String]("webKnossos.operatorData") val children = List(User, Tasks, Cache, SampleOrganization) } @@ -57,6 +67,7 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L object Features { val isDemoInstance: Boolean = get[Boolean]("features.isDemoInstance") val jobsEnabled: Boolean = get[Boolean]("features.jobsEnabled") + val voxelyticsEnabled: Boolean = get[Boolean]("features.voxelyticsEnabled") val taskReopenAllowed: FiniteDuration = get[Int]("features.taskReopenAllowedInSeconds") seconds val allowDeleteDatasets: Boolean = get[Boolean]("features.allowDeleteDatasets") val publicDemoDatasetUrl: String = get[String]("features.publicDemoDatasetUrl") @@ -83,6 +94,7 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L object Mail { val logToStdout: Boolean = get[Boolean]("mail.logToStdout") + object Smtp { val host: String = get[String]("mail.smtp.host") val port: Int = get[Int]("mail.smtp.port") @@ -91,13 +103,16 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L val user: String = get[String]("mail.smtp.user") val pass: String = get[String]("mail.smtp.pass") } + val defaultSender: String = get[String]("mail.defaultSender") + object Mailchimp { val host: String = get[String]("mail.mailchimp.host") val listId: String = get[String]("mail.mailchimp.listId") val user: String = get[String]("mail.mailchimp.user") val password: String = get[String]("mail.mailchimp.password") } + val children = List(Smtp, Mailchimp) } @@ -108,6 +123,7 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L val authenticatorExpiry: Duration = get[Duration]("silhouette.tokenAuthenticator.authenticatorExpiry") val authenticatorIdleTimeout: Duration = get[Duration]("silhouette.tokenAuthenticator.authenticatorIdleTimeout") } + object CookieAuthenticator { val cookieName: String = get[String]("silhouette.cookieAuthenticator.cookieName") val cookiePath: String = get[String]("silhouette.cookieAuthenticator.cookiePath") @@ -117,6 +133,7 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L val authenticatorExpiry: Duration = get[Duration]("silhouette.cookieAuthenticator.authenticatorExpiry") val cookieMaxAge: Duration = get[Duration]("silhouette.cookieAuthenticator.cookieMaxAge") } + val children = List(TokenAuthenticator, CookieAuthenticator) } @@ -156,6 +173,18 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L val verboseLoggingEnabled: Boolean = get[Boolean]("backendAnalytics.verboseLoggingEnabled") } + object Voxelytics { + val staleTimeout: FiniteDuration = get[FiniteDuration]("voxelytics.staleTimeout") + + object Elasticsearch { + val uri: String = get[String]("voxelytics.elasticsearch.uri") + val index: String = get[String]("voxelytics.elasticsearch.index") + val startupTimeout: FiniteDuration = get[FiniteDuration]("voxelytics.elasticsearch.startupTimeout") + } + + val children = List(Elasticsearch) + } + val children = List( Http, @@ -170,7 +199,8 @@ class WkConf @Inject()(configuration: Configuration) extends ConfigReader with L Braintracing, Airbrake, GoogleAnalytics, - BackendAnalytics + BackendAnalytics, + Voxelytics ) val removedConfigKeys = List( diff --git a/conf/application.conf b/conf/application.conf index 504df8a5ce..298405b334 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -82,6 +82,7 @@ features { allowDeleteDatasets = true # to enable jobs for local development, use "yarn enable-jobs" to also activate it in the database jobsEnabled = false + voxelyticsEnabled = true # For new users, the dashboard will show a banner which encourages the user to check out the following dataset. # If isDemoInstance == true, `/createExplorative/hybrid/true` is appended to the URL so that a new tracing is opened. # If isDemoInstance == false, `/view` is appended to the URL so that it's opened in view mode (since the user might not @@ -262,5 +263,14 @@ swagger.api.info { title = "webknossos" } +voxelytics { + staleTimeout = 20 minutes + elasticsearch { + uri = "http://localhost:9200" + index = "voxelytics-logs" + startupTimeout = 1 minute + } +} + # Avoid creation of a pid file pidfile.path = "/dev/null" diff --git a/conf/evolutions/089-voxelytics.sql b/conf/evolutions/089-voxelytics.sql new file mode 100644 index 0000000000..7b0b7862c3 --- /dev/null +++ b/conf/evolutions/089-voxelytics.sql @@ -0,0 +1,138 @@ +START TRANSACTION; + +CREATE TYPE webknossos.VOXELYTICS_RUN_STATE AS ENUM ('PENDING', 'SKIPPED', 'RUNNING', 'COMPLETE', 'FAILED', 'CANCELLED', 'STALE'); + +CREATE TABLE webknossos.voxelytics_artifacts ( + _id CHAR(24) NOT NULL, + _task CHAR(24) NOT NULL, + name VARCHAR(512) NOT NULL, + path TEXT NOT NULL, + fileSize INT8 NOT NULL, + inodeCount INT8 NOT NULL, + version TEXT NOT NULL DEFAULT '0', + metadata JSONB, + PRIMARY KEY (_id), + UNIQUE (_task, name), + CONSTRAINT metadataIsJsonObject CHECK(jsonb_typeof(metadata) = 'object') +); + +CREATE TABLE webknossos.voxelytics_runs ( + _id CHAR(24) NOT NULL, + _organization CHAR(24) NOT NULL, + _user CHAR(24) NOT NULL, + name VARCHAR(2048) NOT NULL, + username TEXT NOT NULL, + hostname TEXT NOT NULL, + voxelyticsVersion TEXT NOT NULL, + workflow_hash VARCHAR(512) NOT NULL, + workflow_yamlContent TEXT, + workflow_config JSONB, + PRIMARY KEY (_id), + UNIQUE (_organization, name), + CONSTRAINT workflowConfigIsJsonObject CHECK(jsonb_typeof(workflow_config) = 'object') +); + +CREATE TABLE webknossos.voxelytics_tasks ( + _id CHAR(24) NOT NULL, + _run CHAR(24) NOT NULL, + name varCHAR(2048) NOT NULL, + task varCHAR(512) NOT NULL, + config JSONB NOT NULL, + PRIMARY KEY (_id), + UNIQUE (_run, name), + CONSTRAINT configIsJsonObject CHECK(jsonb_typeof(config) = 'object') +); + +CREATE TABLE webknossos.voxelytics_chunks ( + _id CHAR(24) NOT NULL, + _task CHAR(24) NOT NULL, + executionId VARCHAR(2048) NOT NULL, + chunkName VARCHAR(2048) NOT NULL, + PRIMARY KEY (_id), + UNIQUE (_task, executionId, chunkName) +); + +CREATE TABLE webknossos.voxelytics_workflows ( + _organization CHAR(24) NOT NULL, + hash VARCHAR(512) NOT NULL, + name TEXT NOT NULL, + PRIMARY KEY (_organization, hash) +); + +CREATE TABLE webknossos.voxelytics_runStateChangeEvents ( + _run CHAR(24) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + state webknossos.VOXELYTICS_RUN_STATE NOT NULL, + PRIMARY KEY (_run, timestamp) +); + +CREATE TABLE webknossos.voxelytics_runHeartbeatEvents ( + _run CHAR(24) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY (_run) +); + +CREATE TABLE webknossos.voxelytics_taskStateChangeEvents ( + _task CHAR(24) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + state webknossos.VOXELYTICS_RUN_STATE NOT NULL, + PRIMARY KEY (_task, timestamp) +); + +CREATE TABLE webknossos.voxelytics_chunkStateChangeEvents ( + _chunk CHAR(24) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + state webknossos.VOXELYTICS_RUN_STATE NOT NULL, + PRIMARY KEY (_chunk, timestamp) +); + +CREATE TABLE webknossos.voxelytics_chunkProfilingEvents ( + _chunk CHAR(24) NOT NULL, + hostname TEXT NOT NULL, + pid INT8 NOT NULL, + memory FLOAT NOT NULL, + cpuUser FLOAT NOT NULL, + cpuSystem FLOAT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY (_chunk, timestamp) +); + +CREATE TABLE webknossos.voxelytics_artifactFileChecksumEvents ( + _artifact CHAR(24) NOT NULL, + path TEXT NOT NULL, + resolvedPath TEXT NOT NULL, + checksumMethod VARCHAR(512) NOT NULL, + checksum VARCHAR(512) NOT NULL, + fileSize INT8 NOT NULL, + lastModified TIMESTAMPTZ NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY (_artifact, path, timestamp) +); + +ALTER TABLE webknossos.voxelytics_artifacts + ADD FOREIGN KEY (_task) REFERENCES webknossos.voxelytics_tasks(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_runs + ADD FOREIGN KEY (_organization) REFERENCES webknossos.organizations(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE, + ADD FOREIGN KEY (_organization, workflow_hash) REFERENCES webknossos.voxelytics_workflows(_organization, hash) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_tasks + ADD FOREIGN KEY (_run) REFERENCES webknossos.voxelytics_runs(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_chunks + ADD FOREIGN KEY (_task) REFERENCES webknossos.voxelytics_tasks(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_workflows + ADD FOREIGN KEY (_organization) REFERENCES webknossos.organizations(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_runStateChangeEvents + ADD FOREIGN KEY (_run) REFERENCES webknossos.voxelytics_runs(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_runHeartbeatEvents + ADD FOREIGN KEY (_run) REFERENCES webknossos.voxelytics_runs(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_taskStateChangeEvents + ADD FOREIGN KEY (_task) REFERENCES webknossos.voxelytics_tasks(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_chunkStateChangeEvents + ADD FOREIGN KEY (_chunk) REFERENCES webknossos.voxelytics_chunks(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_chunkProfilingEvents + ADD FOREIGN KEY (_chunk) REFERENCES webknossos.voxelytics_chunks(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; +ALTER TABLE webknossos.voxelytics_artifactFileChecksumEvents + ADD FOREIGN KEY (_artifact) REFERENCES webknossos.voxelytics_artifacts(_id) ON DELETE CASCADE ON UPDATE CASCADE DEFERRABLE; + +UPDATE webknossos.releaseInformation SET schemaVersion = 89; + +COMMIT TRANSACTION; diff --git a/conf/evolutions/reversions/089-voxelytics.sql b/conf/evolutions/reversions/089-voxelytics.sql new file mode 100644 index 0000000000..79e3536d1a --- /dev/null +++ b/conf/evolutions/reversions/089-voxelytics.sql @@ -0,0 +1,21 @@ +START TRANSACTION; + +DROP TABLE + webknossos.voxelytics_artifacts, + webknossos.voxelytics_runs, + webknossos.voxelytics_tasks, + webknossos.voxelytics_chunks, + webknossos.voxelytics_workflows, + webknossos.voxelytics_runStateChangeEvents, + webknossos.voxelytics_runHeartbeatEvents, + webknossos.voxelytics_taskStateChangeEvents, + webknossos.voxelytics_chunkStateChangeEvents, + webknossos.voxelytics_chunkProfilingEvents, + webknossos.voxelytics_artifactFileChecksumEvents + CASCADE; + +DROP TYPE webknossos.VOXELYTICS_RUN_STATE; + +UPDATE webknossos.releaseInformation SET schemaVersion = 88; + +COMMIT TRANSACTION; diff --git a/conf/messages b/conf/messages index 458a41dcf5..96600c88f5 100644 --- a/conf/messages +++ b/conf/messages @@ -367,6 +367,15 @@ job.meshFile.notAllowed.organization = Calculating mesh files is only allowed fo job.globalizeFloodfill.notAllowed.organization = Globalizing floodfills is only allowed for datasets of your own organization. job.applyMergerMode.notAllowed.organization = Applying merger mode tracings is only allowed for datasets of your own organization. +voxelytics.disabled = Voxelytics workflow reporting and logging are not enabled for this webKnossos instance. +voxelytics.runNotFound = Workflow runs not found +voxelytics.taskNotFound = Task not found +voxelytics.workflowNotFound = Workflow not found +voxelytics.noTaskFound = No tasks found +voxelytics.noWorkflowFound = No workflows found +voxelytics.workflowUserMismatch = Workflow run already exists by other user +voxelytics.zeroRunWorkflow = No run for this workflow found + publication.notFound = Publication could not be found. agglomerateSkeleton.failed=Could not generate agglomerate skeleton. diff --git a/conf/webknossos.latest.routes b/conf/webknossos.latest.routes index 1064cccbbe..d6cd13ce94 100644 --- a/conf/webknossos.latest.routes +++ b/conf/webknossos.latest.routes @@ -241,3 +241,13 @@ GET /publications/:id # Shortlinks POST /shortLinks controllers.ShortLinkController.create GET /shortLinks/byKey/:key controllers.ShortLinkController.getByKey(key: String) + +# Voxelytics +POST /voxelytics/workflows controllers.VoxelyticsController.storeWorkflow +GET /voxelytics/workflows controllers.VoxelyticsController.listWorkflows(workflowHash: Option[String]) +GET /voxelytics/workflows/:workflowHash controllers.VoxelyticsController.getWorkflow(workflowHash: String, runId: Option[String]) +POST /voxelytics/workflows/:workflowHash/events controllers.VoxelyticsController.storeWorkflowEvents(workflowHash: String, runName: String) +GET /voxelytics/workflows/:workflowHash/chunkStatistics controllers.VoxelyticsController.getChunkStatistics(workflowHash: String, runId: String, taskName: String) +GET /voxelytics/workflows/:workflowHash/artifactChecksums controllers.VoxelyticsController.getArtifactChecksums(workflowHash: String, runId: String, taskName: String, artifactName: Option[String]) +POST /voxelytics/logs controllers.VoxelyticsController.appendLogs +GET /voxelytics/logs controllers.VoxelyticsController.getLogs(runId: String, taskName: Option[String], minLevel: Option[String]) diff --git a/frontend/javascripts/admin/admin_rest_api.ts b/frontend/javascripts/admin/admin_rest_api.ts index 6367741687..d194963557 100644 --- a/frontend/javascripts/admin/admin_rest_api.ts +++ b/frontend/javascripts/admin/admin_rest_api.ts @@ -59,6 +59,9 @@ import type { ServerEditableMapping, APICompoundType, ZarrPrivateLink, + VoxelyticsWorkflowInfo, + VoxelyticsWorkflowReport, + VoxelyticsChunkStatistics, ShortLink, } from "types/api_flow_types"; import { APIAnnotationTypeEnum } from "types/api_flow_types"; @@ -2296,7 +2299,6 @@ export async function getEdgesForAgglomerateMinCut( } // ### Short links - export const createShortLink = _.memoize( (longLink: string): Promise => Request.sendJSONReceiveJSON("/api/shortLinks", { @@ -2312,3 +2314,61 @@ export const createShortLink = _.memoize( export function getShortLink(key: string): Promise { return Request.receiveJSON(`/api/shortLinks/byKey/${key}`); } + +// ### Voxelytics +export function getVoxelyticsWorkflows(): Promise> { + return Request.receiveJSON("/api/voxelytics/workflows"); +} + +export function getVoxelyticsWorkflow( + workflowHash: string, + runId: string | null, +): Promise { + const params = new URLSearchParams(); + if (runId != null) { + params.append("runId", runId); + } + return Request.receiveJSON(`/api/voxelytics/workflows/${workflowHash}?${params}`); +} + +export function getVoxelyticsLogs( + runId: string, + taskName: string | null, + minLevel: string, +): Promise> { + const params = new URLSearchParams({ runId, minLevel }); + if (taskName != null) { + params.append("taskName", taskName); + } + return Request.receiveJSON(`/api/voxelytics/logs?${params}`); +} + +export function getVoxelyticsChunkStatistics( + workflowHash: string, + runId: string, + taskName: string, +): Promise> { + return Request.receiveJSON( + `/api/voxelytics/workflows/${workflowHash}/chunkStatistics?${new URLSearchParams({ + runId, + taskName, + })}`, + ); +} +export function getVoxelyticsArtifactChecksums( + workflowHash: string, + runId: string, + taskName: string, + artifactName?: string, +): Promise>> { + const params = new URLSearchParams({ + runId, + taskName, + }); + if (artifactName != null) { + params.append("artifactName", artifactName); + } + return Request.receiveJSON( + `/api/voxelytics/workflows/${workflowHash}/artifactChecksums?${params}`, + ); +} diff --git a/frontend/javascripts/admin/voxelytics/artifacts_view.tsx b/frontend/javascripts/admin/voxelytics/artifacts_view.tsx new file mode 100644 index 0000000000..c2239244a1 --- /dev/null +++ b/frontend/javascripts/admin/voxelytics/artifacts_view.tsx @@ -0,0 +1,176 @@ +import React from "react"; +import { JSONTree } from "react-json-tree"; +import { Button, Card, message } from "antd"; +import { CopyOutlined } from "@ant-design/icons"; + +import { VoxelyticsArtifactConfig } from "types/api_flow_types"; +import { getVoxelyticsArtifactChecksums } from "admin/admin_rest_api"; +import { formatBytes } from "libs/format_utils"; +import { useTheme } from "./task_view"; + +function isObjectEmpty(obj: Record) { + return Object.keys(obj).length === 0 && obj.constructor === Object; +} + +async function copyToClipboad(text: string) { + await navigator.clipboard.writeText(text); + message.success("Copied to clipboard"); +} + +function renderArtifactPath(artifact: VoxelyticsArtifactConfig) { + return ( +
+ {artifact.path} + { + copyToClipboad(artifact.path); + }} + /> +
+ ); +} + +function renderIframes(iframes: VoxelyticsArtifactConfig["metadata"]["iframes"]) { + return ( + !isObjectEmpty(iframes) && + Object.entries(iframes).map(([iframeKey, iframeUrl]) => { + const isImage = iframeUrl.match(/\.(jpg|png|gif)$/); + return isImage ? ( + {iframeKey} + ) : ( +