Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Voxelytics routes #6416

Merged
merged 47 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
41dfe79
adds log routes
normanrz Aug 17, 2022
d961890
fixes log routes
normanrz Aug 17, 2022
1ba8730
fixes log routes
normanrz Aug 17, 2022
d73ecc6
moar routes
normanrz Aug 18, 2022
0eac34b
Create unified json format for ObjectId
fm3 Aug 18, 2022
bee7751
remove unused imports
fm3 Aug 18, 2022
0f3c73e
fixes
normanrz Aug 18, 2022
0d1175c
works
normanrz Aug 19, 2022
f78e15e
Merge branch 'master' into vx-routes
normanrz Aug 19, 2022
4cc3692
schema
normanrz Aug 19, 2022
6930bf1
schema
normanrz Aug 19, 2022
ba7ff09
lint
normanrz Aug 19, 2022
24bc6f9
refactoring
normanrz Aug 19, 2022
cab6637
Merge branch 'master' into objectid-json
leowe Aug 25, 2022
d3b23f3
pr feedback
normanrz Aug 27, 2022
bc93b9d
merge
normanrz Aug 27, 2022
7f0887d
pr feedback
normanrz Aug 27, 2022
97a392a
Merge remote-tracking branch 'origin/master' into vx-routes
normanrz Aug 27, 2022
b3818a6
rename tables
normanrz Aug 27, 2022
4c92291
fixes
normanrz Aug 27, 2022
f9b9d29
fixes
normanrz Aug 28, 2022
0564b58
messages and conf
normanrz Aug 31, 2022
9366ca2
Merge remote-tracking branch 'origin/master' into vx-routes
normanrz Aug 31, 2022
384b6d1
fixes Elasticsearch polling and setup
normanrz Sep 5, 2022
70c1555
lint
normanrz Sep 5, 2022
a293cb9
remove @Api annotation from VoxelyticsController
normanrz Sep 6, 2022
07bd91a
Merge branch 'master' into vx-routes
normanrz Sep 12, 2022
4dff1de
pr feedback
normanrz Sep 12, 2022
55aef88
update snapshots
fm3 Sep 12, 2022
1d8be25
pr feedback
normanrz Sep 13, 2022
fbe9f4c
pr feedback
normanrz Sep 13, 2022
ba807c8
revert blanket-cors header
normanrz Sep 16, 2022
0a9021d
no cors in vx routes
normanrz Sep 16, 2022
c5d9d1a
Merge remote-tracking branch 'origin/master' into vx-routes
normanrz Sep 16, 2022
8f19c9a
fixes log levels
normanrz Sep 16, 2022
95e974a
pr feedback
normanrz Sep 16, 2022
f55e3c4
format
normanrz Sep 16, 2022
6789790
move
normanrz Sep 16, 2022
bf28dbe
changelog
normanrz Sep 16, 2022
1b48a67
Update app/models/voxelytics/VoxelyticsLogLevel.scala
normanrz Sep 19, 2022
3d11180
Merge branch 'master' into vx-routes
normanrz Sep 19, 2022
7471fa3
Voxelytics frontend (#6460)
normanrz Sep 21, 2022
7e85ebb
styling
normanrz Sep 21, 2022
1849a4c
merge
normanrz Sep 21, 2022
1cf8ecc
evolutions
normanrz Sep 21, 2022
9aed1fe
Merge branch 'master' into vx-routes
normanrz Sep 22, 2022
87b0c15
Merge branch 'master' into vx-routes
normanrz Sep 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Added a context menu option to separate an agglomerate skeleton using Min-Cut. Activate the Proofreading tool, select the source node and open the context menu by right-clicking on the target node which you would like to separate through Min-Cut. [#6361](https://github.com/scalableminds/webknossos/pull/6361)
- Added a "clear" button to reset skeletons/meshes after successful mergers/split. [#6459](https://github.com/scalableminds/webknossos/pull/6459)
- The proofreading tool now supports merging and splitting (via min-cut) agglomerates by rightclicking a segment (and not a node). Note that there still has to be an active node so that both partners of the operation are defined. [#6464](https://github.com/scalableminds/webknossos/pull/6464)
- Added workflow reporting and logging features for Voxelytics into webKnossos. If activated, the workflows can be accessed from the `Administration` > `Voxelytics` menu item. [#6416](https://github.com/scalableminds/webknossos/pull/6416) [#6460](https://github.com/scalableminds/webknossos/pull/6460)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions MIGRATIONS.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ User-facing changes are documented in the [changelog](CHANGELOG.released.md).
## Unreleased
[Commits](https://github.com/scalableminds/webknossos/compare/22.09.0...HEAD)

- To use the Voxelytics reporting features in webKnossos, the config field `features.voxelyticsEnabled` needs to be set to true. When also using the logging features, an Elasticsearch instance needs to be running and configured in the config field `voxelytics.elasticsearch.uri`.

### Postgres Evolutions:
- [089-voxelytics.sql](conf/evolutions/089-voxelytics.sql)
6 changes: 4 additions & 2 deletions app/WebKnossosModule.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import com.google.inject.AbstractModule
import controllers.InitialDataService
import models.analytics.AnalyticsSessionService
import models.job.{JobService, WorkerLivenessService}
import models.annotation.AnnotationStore
import models.binary.DataSetService
import models.job.{JobService, WorkerLivenessService}
import models.task.TaskService
import models.user.{UserCache, _}
import models.user.time.TimeSpanService
import models.user._
import models.voxelytics.ElasticsearchClient
import oxalis.files.TempFileService
import oxalis.mail.MailchimpTicker
import oxalis.telemetry.SlackNotificationService
Expand All @@ -33,5 +34,6 @@ class WebKnossosModule extends AbstractModule {
bind(classOf[SlackNotificationService]).asEagerSingleton()
bind(classOf[AnalyticsSessionService]).asEagerSingleton()
bind(classOf[WorkerLivenessService]).asEagerSingleton()
bind(classOf[ElasticsearchClient]).asEagerSingleton()
}
}
271 changes: 271 additions & 0 deletions app/controllers/VoxelyticsController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package controllers

import com.mohiva.play.silhouette.api.Silhouette
import com.mohiva.play.silhouette.api.actions.SecuredRequest
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import models.organization.OrganizationDAO
import models.voxelytics._
import oxalis.security.WkEnv
import play.api.libs.json._
import play.api.mvc._
import utils.{ObjectId, WkConf}

import javax.inject.Inject
import scala.concurrent.ExecutionContext

class VoxelyticsController @Inject()(
organizationDAO: OrganizationDAO,
voxelyticsDAO: VoxelyticsDAO,
voxelyticsService: VoxelyticsService,
elasticsearchClient: ElasticsearchClient,
wkConf: WkConf,
sil: Silhouette[WkEnv])(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
extends Controller
with FoxImplicits {

private lazy val conf = wkConf.Voxelytics

def storeWorkflow: Action[WorkflowDescription] =
sil.SecuredAction.async(validateJson[WorkflowDescription]) { implicit request =>
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
_ <- voxelyticsService.checkAuthForWorkflowCreation(request.body.run.name, request.identity) ?~> "voxelytics.workflowUserMismatch" ~> UNAUTHORIZED
_ <- voxelyticsDAO.upsertWorkflow(request.body.workflow.hash,
request.body.workflow.name,
request.identity._organization)
runId <- voxelyticsDAO.upsertRun(
request.identity._organization,
request.identity._id,
request.body.run.name,
request.body.run.user,
request.body.run.hostname,
request.body.run.voxelyticsVersion,
request.body.workflow.hash,
request.body.workflow.yamlContent,
request.body.config.asJsonWithoutTasks
)
_ <- Fox.combined(
request.body.config.tasks
.map(taskKV => {
val taskName = taskKV._1
val task = taskKV._2
voxelyticsService.upsertTaskWithArtifacts(runId, taskName, task, request.body.artifacts)
})
.toList)

} yield Ok
}

def listWorkflows(workflowHash: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
// Auth is implemented in `voxelyticsDAO.findRuns`
runs <- voxelyticsDAO.findRuns(request.identity, None, workflowHash, conf.staleTimeout)
result <- if (runs.nonEmpty) {
listWorkflowsWithRuns(request, runs)
} else {
Fox.successful(Json.arr())
}
} yield JsonOk(result)
}

private def listWorkflowsWithRuns(request: SecuredRequest[WkEnv, AnyContent], runs: List[RunEntry]): Fox[JsArray] =
for {
_ <- bool2Fox(runs.nonEmpty) // just asserting once more
taskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization, runs.map(_.id), conf.staleTimeout)
_ <- bool2Fox(taskRuns.nonEmpty) ?~> "voxelytics.noTaskFound" ~> NOT_FOUND
workflows <- voxelyticsDAO.findWorkflowsByHash(request.identity._organization, runs.map(_.workflow_hash).toSet)
_ <- bool2Fox(workflows.nonEmpty) ?~> "voxelytics.noWorkflowFound" ~> NOT_FOUND

workflowsAsJson = JsArray(workflows.flatMap(workflow => {
val workflowRuns = runs.filter(run => run.workflow_hash == workflow.hash)
if (workflowRuns.nonEmpty) {
val state, beginTime, endTime = voxelyticsService.aggregateBeginEndTime(workflowRuns)
Some(
Json.obj(
"name" -> workflow.name,
"hash" -> workflow.hash,
"beginTime" -> beginTime,
"endTime" -> endTime,
"state" -> state.toString(),
"runs" -> workflowRuns.map(run => {
val tasks = taskRuns.filter(taskRun => taskRun.runId == run.id)
voxelyticsService.runPublicWrites(run, tasks)
})
))
} else {
None
}
}))
} yield workflowsAsJson

def getWorkflow(workflowHash: String, runId: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runIdValidatedOpt <- Fox.runOptional(runId)(ObjectId.fromString(_))
// Auth is implemented in `voxelyticsDAO.findRuns`
workflow <- voxelyticsDAO.findWorkflowByHash(request.identity._organization, workflowHash) ?~> "voxelytics.workflowNotFound" ~> NOT_FOUND

// Fetching all runs for this workflow or specified run
// If all runs are fetched, a combined version of the workflow report
// will be returned that contains the information of the most recent task runs
runs <- runIdValidatedOpt
.map(runIdValidated =>
normanrz marked this conversation as resolved.
Show resolved Hide resolved
voxelyticsDAO.findRuns(request.identity, Some(List(runIdValidated)), Some(workflowHash), conf.staleTimeout))
.getOrElse(voxelyticsDAO.findRuns(request.identity, None, Some(workflowHash), conf.staleTimeout))
_ <- bool2Fox(runs.nonEmpty) ?~> "voxelytics.runNotFound" ~> NOT_FOUND
sortedRuns = runs.sortBy(_.beginTime).reverse
// All workflows have at least one run, because they are created at the same time
mostRecentRun <- sortedRuns.headOption ?~> "voxelytics.zeroRunWorkflow"

// Fetch task runs for all runs
allTaskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization,
sortedRuns.map(_.id),
conf.staleTimeout)

// Select one representative "task run" for each task
// This will be the most recent run that is running or finished or the most recent run
combinedTaskRuns = voxelyticsService.combineTaskRuns(allTaskRuns, mostRecentRun.id)

// Fetch artifact data for selected/combined task runs
artifacts <- voxelyticsDAO.findArtifacts(combinedTaskRuns.map(_.taskId))
tasks <- voxelyticsDAO.findTasks(combinedTaskRuns)

// Assemble workflow report JSON
(state, beginTime, endTime) = voxelyticsService.aggregateBeginEndTime(runs)
result = Json.obj(
"config" -> voxelyticsService.workflowConfigPublicWrites(mostRecentRun.workflow_config, tasks),
"artifacts" -> voxelyticsService.artifactsPublicWrites(artifacts),
"run" -> voxelyticsService.runPublicWrites(
mostRecentRun.copy(state = state, beginTime = beginTime, endTime = endTime),
combinedTaskRuns),
"workflow" -> Json.obj(
"name" -> workflow.name,
"hash" -> workflowHash,
"yamlContent" -> mostRecentRun.workflow_yamlContent
)
)
} yield JsonOk(result)
}

def storeWorkflowEvents(workflowHash: String, runName: String): Action[List[WorkflowEvent]] =
sil.SecuredAction.async(validateJson[List[WorkflowEvent]]) { implicit request =>
def createWorkflowEvent(runId: ObjectId, event: WorkflowEvent): Fox[Unit] =
event match {
case ev: RunStateChangeEvent => voxelyticsDAO.upsertRunStateChangeEvent(runId, ev)

case ev: TaskStateChangeEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
_ <- voxelyticsDAO.upsertTaskStateChangeEvent(taskId, ev)
_ <- Fox.combined(
ev.artifacts
.map(artifactKV => {
val artifactName = artifactKV._1
val artifact = artifactKV._2
voxelyticsDAO.upsertArtifact(taskId,
artifactName,
artifact.path,
artifact.file_size,
artifact.inode_count,
artifact.version,
artifact.metadataAsJson)
})
.toList)
} yield ()

case ev: ChunkStateChangeEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
chunkId <- voxelyticsDAO.upsertChunk(taskId, ev.executionId, ev.chunkName)
_ <- voxelyticsDAO.upsertChunkStateChangeEvent(chunkId, ev)
} yield ()

case ev: RunHeartbeatEvent => voxelyticsDAO.upsertRunHeartbeatEvent(runId, ev)

case ev: ChunkProfilingEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
chunkId <- voxelyticsDAO.getChunkIdByName(taskId, ev.executionId, ev.chunkName)
_ <- voxelyticsDAO.upsertChunkProfilingEvent(chunkId, ev)
} yield ()

case ev: ArtifactFileChecksumEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
artifactId <- voxelyticsDAO.getArtifactIdByName(taskId, ev.artifactName)
_ <- voxelyticsDAO.upsertArtifactChecksumEvent(artifactId, ev)
} yield ()
}

for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runId <- voxelyticsDAO.getRunIdByName(runName, request.identity._organization) ?~> "voxelytics.runNotFound" ~> NOT_FOUND
_ <- voxelyticsService.checkAuth(runId, request.identity) ~> UNAUTHORIZED
_ <- Fox.serialCombined(request.body)(event => createWorkflowEvent(runId, event))
} yield Ok
}

def getChunkStatistics(workflowHash: String, runId: String, taskName: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runIdValidated <- ObjectId.fromString(runId)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "voxelytics.taskNotFound" ~> NOT_FOUND
results <- voxelyticsDAO.getChunkStatistics(taskId)
} yield JsonOk(Json.toJson(results))
}
}

def getArtifactChecksums(workflowHash: String,
runId: String,
taskName: String,
artifactName: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runIdValidated <- ObjectId.fromString(runId)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "voxelytics.taskNotFound" ~> NOT_FOUND
results <- voxelyticsDAO.getArtifactChecksums(taskId, artifactName)
} yield JsonOk(Json.toJson(results))
}
}

def appendLogs: Action[List[JsObject]] =
sil.SecuredAction.async(validateJson[List[JsObject]]) { implicit request =>
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
organization <- organizationDAO.findOne(request.identity._organization)
logEntries = request.body.map(
entry =>
entry ++ Json.obj("vx" -> ((entry \ "vx").as[JsObject] ++ Json.obj("wk_org" -> organization.name,
"wk_user" -> request.identity._id.id))))
_ <- elasticsearchClient.bulkInsert(logEntries)
} yield Ok
}

def getLogs(runId: String, taskName: Option[String], minLevel: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runIdValidated <- ObjectId.fromString(runId)
runName <- voxelyticsDAO.getRunNameById(runIdValidated, request.identity._organization)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
organization <- organizationDAO.findOne(request.identity._organization)
organizationName = organization.name
logEntries <- elasticsearchClient.queryLogs(
runName,
organizationName,
taskName,
minLevel.flatMap(VoxelyticsLogLevel.fromString).getOrElse(VoxelyticsLogLevel.INFO))
} yield JsonOk(logEntries)
}
}
}
Loading