Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Voxelytics routes #6416

Merged
merged 47 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
41dfe79
adds log routes
normanrz Aug 17, 2022
d961890
fixes log routes
normanrz Aug 17, 2022
1ba8730
fixes log routes
normanrz Aug 17, 2022
d73ecc6
moar routes
normanrz Aug 18, 2022
0eac34b
Create unified json format for ObjectId
fm3 Aug 18, 2022
bee7751
remove unused imports
fm3 Aug 18, 2022
0f3c73e
fixes
normanrz Aug 18, 2022
0d1175c
works
normanrz Aug 19, 2022
f78e15e
Merge branch 'master' into vx-routes
normanrz Aug 19, 2022
4cc3692
schema
normanrz Aug 19, 2022
6930bf1
schema
normanrz Aug 19, 2022
ba7ff09
lint
normanrz Aug 19, 2022
24bc6f9
refactoring
normanrz Aug 19, 2022
cab6637
Merge branch 'master' into objectid-json
leowe Aug 25, 2022
d3b23f3
pr feedback
normanrz Aug 27, 2022
bc93b9d
merge
normanrz Aug 27, 2022
7f0887d
pr feedback
normanrz Aug 27, 2022
97a392a
Merge remote-tracking branch 'origin/master' into vx-routes
normanrz Aug 27, 2022
b3818a6
rename tables
normanrz Aug 27, 2022
4c92291
fixes
normanrz Aug 27, 2022
f9b9d29
fixes
normanrz Aug 28, 2022
0564b58
messages and conf
normanrz Aug 31, 2022
9366ca2
Merge remote-tracking branch 'origin/master' into vx-routes
normanrz Aug 31, 2022
384b6d1
fixes Elasticsearch polling and setup
normanrz Sep 5, 2022
70c1555
lint
normanrz Sep 5, 2022
a293cb9
remove @Api annotation from VoxelyticsController
normanrz Sep 6, 2022
07bd91a
Merge branch 'master' into vx-routes
normanrz Sep 12, 2022
4dff1de
pr feedback
normanrz Sep 12, 2022
55aef88
update snapshots
fm3 Sep 12, 2022
1d8be25
pr feedback
normanrz Sep 13, 2022
fbe9f4c
pr feedback
normanrz Sep 13, 2022
ba807c8
revert blanket-cors header
normanrz Sep 16, 2022
0a9021d
no cors in vx routes
normanrz Sep 16, 2022
c5d9d1a
Merge remote-tracking branch 'origin/master' into vx-routes
normanrz Sep 16, 2022
8f19c9a
fixes log levels
normanrz Sep 16, 2022
95e974a
pr feedback
normanrz Sep 16, 2022
f55e3c4
format
normanrz Sep 16, 2022
6789790
move
normanrz Sep 16, 2022
bf28dbe
changelog
normanrz Sep 16, 2022
1b48a67
Update app/models/voxelytics/VoxelyticsLogLevel.scala
normanrz Sep 19, 2022
3d11180
Merge branch 'master' into vx-routes
normanrz Sep 19, 2022
7471fa3
Voxelytics frontend (#6460)
normanrz Sep 21, 2022
7e85ebb
styling
normanrz Sep 21, 2022
1849a4c
merge
normanrz Sep 21, 2022
1cf8ecc
evolutions
normanrz Sep 21, 2022
9aed1fe
Merge branch 'master' into vx-routes
normanrz Sep 22, 2022
87b0c15
Merge branch 'master' into vx-routes
normanrz Sep 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions app/RequestHandler.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import com.typesafe.scalalogging.LazyLogging
import controllers.{Assets, DemoProxyController, SitemapController}
import javax.inject.Inject
import play.api.OptionalDevContext
import play.api.http.{DefaultHttpRequestHandler, HttpConfiguration, HttpErrorHandler, HttpFilters}
import play.api.mvc.{Handler, InjectedController, RequestHeader}
import play.api.mvc.Results.Ok
import play.api.mvc.{Handler, InjectedController, RequestHeader, Result}
import play.api.routing.Router
import play.core.WebCommands
import utils.WkConf

import javax.inject.Inject

trait AdditionalHeaders {
def options(request: RequestHeader): Result =
Ok(":D").withHeaders(
"Access-Control-Allow-Origin" -> "*",
"Access-Control-Max-Age" -> "600",
"Access-Control-Allow-Methods" -> "POST, GET, DELETE, PUT, HEAD, PATCH, OPTIONS",
"Access-Control-Allow-Headers" -> request.headers.get("Access-Control-Request-Headers").getOrElse(""),
"Access-Control-Expose-Headers" -> "MISSING-BUCKETS"
)
}
class RequestHandler @Inject()(webCommands: WebCommands,
optionalDevContext: OptionalDevContext,
router: Router,
Expand All @@ -27,10 +39,15 @@ class RequestHandler @Inject()(webCommands: WebCommands,
filters
)
with InjectedController
with LazyLogging {
with LazyLogging
with AdditionalHeaders {

override def routeRequest(request: RequestHeader): Option[Handler] =
if (request.uri.matches("^(/api/|/data/|/tracings/|/swagger).*$")) {
if (request.method == "OPTIONS") {
Some(Action {
options(request)
normanrz marked this conversation as resolved.
Show resolved Hide resolved
})
} else if (request.uri.matches("^(/api/|/data/|/tracings/|/swagger).*$")) {
super.routeRequest(request)
} else if (request.uri.matches("^(/assets/).*$")) {
val path = request.path.replaceFirst("^(/assets/)", "")
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/ScriptController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ScriptController @Inject()(scriptDAO: ScriptDAO,
private val scriptPublicReads =
((__ \ 'name).read[String](minLength[String](2) or maxLength[String](50)) and
(__ \ 'gist).read[String] and
(__ \ 'owner).read[String](ObjectId.stringObjectIdReads("owner")))(Script.fromForm _)
(__ \ 'owner).read[ObjectId])(Script.fromForm _)

def create: Action[JsValue] = sil.SecuredAction.async(parse.json) { implicit request =>
withJsonBodyUsing(scriptPublicReads) { script =>
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/TaskTypeController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TaskTypeController @Inject()(taskTypeDAO: TaskTypeDAO,
private val taskTypePublicReads =
((__ \ 'summary).read[String](minLength[String](2) or maxLength[String](50)) and
(__ \ 'description).read[String] and
(__ \ 'teamId).read[String](ObjectId.stringObjectIdReads("teamId")) and
(__ \ 'teamId).read[ObjectId] and
(__ \ 'settings).read[AnnotationSettings] and
(__ \ 'recommendedConfiguration).readNullable[JsValue] and
(__ \ 'tracingType).read[TracingType.Value])(taskTypeService.fromForm _)
Expand Down
282 changes: 282 additions & 0 deletions app/controllers/VoxelyticsController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package controllers

import com.mohiva.play.silhouette.api.Silhouette
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import io.swagger.annotations._
import models.organization.OrganizationDAO
import models.voxelytics._
import oxalis.security.WkEnv
import play.api.libs.json._
import play.api.mvc._
import utils.{ObjectId, WkConf}

import javax.inject.Inject
import scala.concurrent.ExecutionContext

@Api
class VoxelyticsController @Inject()(
organizationDAO: OrganizationDAO,
voxelyticsDAO: VoxelyticsDAO,
voxelyticsService: VoxelyticsService,
elasticsearchClient: ElasticsearchClient,
wkConf: WkConf,
sil: Silhouette[WkEnv])(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
extends Controller
with FoxImplicits {

private lazy val conf = wkConf.Voxelytics

override def allowRemoteOrigin: Boolean = true

@ApiOperation(hidden = true, value = "")
normanrz marked this conversation as resolved.
Show resolved Hide resolved
def storeWorkflow: Action[WorkflowDescription] =
sil.SecuredAction.async(validateJson[WorkflowDescription]) { implicit request =>
for {
_ <- voxelyticsService.checkAuthForWorkflowCreation(request.body.run.name, request.identity) ?~> "Workflow run already exists by other user" ~> UNAUTHORIZED
_ <- voxelyticsDAO.upsertWorkflow(request.body.workflow.hash,
request.body.workflow.name,
request.identity._organization)
runId <- voxelyticsDAO.upsertRun(
request.identity._organization,
request.identity._id,
request.body.run.name,
request.body.run.user,
request.body.run.hostname,
request.body.run.voxelyticsVersion,
request.body.workflow.hash,
request.body.workflow.yamlContent,
request.body.config.asJsonWithoutTasks
)
_ <- Fox.combined(
request.body.config.tasks
.map(taskKV => {
val taskName = taskKV._1
val task = taskKV._2
voxelyticsService.upsertTask(runId, taskName, task, request.body.artifacts)
})
.toList)

} yield Ok
}

@ApiOperation(hidden = true, value = "")
def listWorkflows(workflowHash: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
// Auth is implemented in `voxelyticsDAO.selectRuns`
normanrz marked this conversation as resolved.
Show resolved Hide resolved
runs <- voxelyticsDAO.findRuns(request.identity, None, workflowHash, conf.staleTimeout)
_ <- bool2Fox(runs.nonEmpty) ?~> "No run found" ~> NOT_FOUND
taskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization, runs.map(_.runId), conf.staleTimeout)
_ <- bool2Fox(taskRuns.nonEmpty) ?~> "No tasks found" ~> NOT_FOUND
workflows <- voxelyticsDAO.findWorkflowsByHash(request.identity._organization, runs.map(_.workflow_hash).toSet)
_ <- bool2Fox(workflows.nonEmpty) ?~> "No workflows found" ~> NOT_FOUND

result = JsArray(workflows.flatMap(workflow => {
val workflowRuns = runs.filter(run => run.workflow_hash == workflow.hash)
if (workflowRuns.nonEmpty) {
val state, beginTime, endTime = voxelyticsService.aggregateBeginEndTime(workflowRuns)
Some(
Json.obj(
"name" -> workflow.name,
"hash" -> workflow.hash,
"beginTime" -> beginTime,
"endTime" -> endTime,
"state" -> state.toString(),
"runs" -> workflowRuns.map(run => {
val tasks = taskRuns.filter(taskRun => taskRun.runId == run.runId)
voxelyticsService.runWrites(run, tasks)
})
))
} else {
None
}
}))
} yield JsonOk(result)
}

@ApiOperation(hidden = true, value = "")
def getWorkflow(workflowHash: String, runId: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
runIdValidated <- Fox.runOptional(runId)(ObjectId.fromString(_))
// Auth is implemented in `voxelyticsDAO.selectRuns`
normanrz marked this conversation as resolved.
Show resolved Hide resolved
workflow <- voxelyticsDAO.findWorkflowByHash(request.identity._organization, workflowHash) ?~> "Workflow not found" ~> NOT_FOUND

// Fetching all runs for this workflow or specified run
// If all runs are fetched, a combined version of the workflow report
// will be returned that contains the information of the most recent task runs
runs <- runIdValidated
.map(runIdValidated =>
normanrz marked this conversation as resolved.
Show resolved Hide resolved
voxelyticsDAO.findRuns(request.identity, Some(List(runIdValidated)), Some(workflowHash), conf.staleTimeout))
.getOrElse(voxelyticsDAO.findRuns(request.identity, None, Some(workflowHash), conf.staleTimeout))
_ <- bool2Fox(runs.nonEmpty) ?~> "Runs not found" ~> NOT_FOUND
sortedRuns = runs.sortBy(_.beginTime).reverse
// All workflows have at least one run, because they are created at the same time
mostRecentRun = sortedRuns.head
normanrz marked this conversation as resolved.
Show resolved Hide resolved

// Fetch task runs for all runs
taskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization,
sortedRuns.map(_.runId),
conf.staleTimeout)

// Select one representative "task run" for each task
// This will be the most recent run that is running or finished or the most recent run
combinedTaskRuns = taskRuns
.filter(task => task.runId == mostRecentRun.runId)
normanrz marked this conversation as resolved.
Show resolved Hide resolved
.map(task => {
val thisTaskRuns = taskRuns.filter(t => t.taskName == task.taskName).sortBy(_.beginTime)
val nonWaitingTaskRuns = thisTaskRuns.filter(t => {
t.state == VoxelyticsRunState.RUNNING || t.state == VoxelyticsRunState.COMPLETE || t.state == VoxelyticsRunState.FAILED || t.state == VoxelyticsRunState.CANCELLED
})
if (nonWaitingTaskRuns.nonEmpty) {
nonWaitingTaskRuns.head
} else {
thisTaskRuns.head
}
})

// Fetch artifact data for selected/combined task runs
artifacts <- voxelyticsDAO.findArtifacts(combinedTaskRuns.map(_.taskId))
tasks <- voxelyticsDAO.findTasks(combinedTaskRuns)

// Assemble workflow report JSON
(state, beginTime, endTime) = voxelyticsService.aggregateBeginEndTime(runs)
result = Json.obj(
"config" -> voxelyticsService.workflowConfigWrites(mostRecentRun.workflow_config, tasks),
"artifacts" -> voxelyticsService.artifactsWrites(artifacts),
"run" -> voxelyticsService
.runWrites(mostRecentRun.copy(state = state, beginTime = beginTime, endTime = endTime), combinedTaskRuns),
"workflow" -> Json.obj(
"name" -> workflow.name,
"hash" -> workflowHash,
"yamlContent" -> mostRecentRun.workflow_yamlContent
)
)
} yield JsonOk(result)
}

@ApiOperation(hidden = true, value = "")
def storeWorkflowEvents(workflowHash: String, runName: String): Action[List[WorkflowEvent]] =
sil.SecuredAction.async(validateJson[List[WorkflowEvent]]) { implicit request =>
def createWorkflowEvent(runId: ObjectId, event: WorkflowEvent): Fox[Unit] =
event match {
case ev: RunStateChangeEvent =>
for {
_ <- voxelyticsDAO.upsertRunStateChangeEvent(runId, ev)
} yield ()
normanrz marked this conversation as resolved.
Show resolved Hide resolved

case ev: TaskStateChangeEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
_ <- voxelyticsDAO.upsertTaskStateChangeEvent(taskId, ev)
_ <- Fox.combined(
ev.artifacts
.map(artifactKV => {
val artifactName = artifactKV._1
val artifact = artifactKV._2
voxelyticsDAO.upsertArtifact(taskId,
artifactName,
artifact.path,
artifact.file_size,
artifact.inode_count,
artifact.version,
artifact.metadataAsJson)
})
.toList)
} yield ()

case ev: ChunkStateChangeEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
chunkId <- voxelyticsDAO.upsertChunk(taskId, ev.executionId, ev.chunkName)
_ <- voxelyticsDAO.upsertChunkStateChangeEvent(chunkId, ev)
} yield ()

case ev: RunHeartbeatEvent =>
for {
_ <- voxelyticsDAO.upsertRunHeartbeatEvent(runId, ev)
} yield ()
normanrz marked this conversation as resolved.
Show resolved Hide resolved

case ev: ChunkProfilingEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
chunkId <- voxelyticsDAO.getChunkIdByName(taskId, ev.executionId, ev.chunkName)
_ <- voxelyticsDAO.upsertChunkProfilingEvent(chunkId, ev)
} yield ()

case ev: ArtifactFileChecksumEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
artifactId <- voxelyticsDAO.getArtifactIdByName(taskId, ev.artifactName)
_ <- voxelyticsDAO.upsertArtifactChecksumEvent(artifactId, ev)
} yield ()
}

for {
runId <- voxelyticsDAO.getRunIdByName(runName, request.identity._organization) ?~> "Run not found" ~> NOT_FOUND
_ <- voxelyticsService.checkAuth(runId, request.identity) ~> UNAUTHORIZED
_ <- Fox.serialCombined(request.body)(event => createWorkflowEvent(runId, event))
} yield Ok
}

@ApiOperation(hidden = true, value = "")
def getChunkStatistics(workflowHash: String, runId: String, taskName: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
runIdValidated <- ObjectId.fromString(runId)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "Task not found" ~> NOT_FOUND
results <- voxelyticsDAO.getChunkStatistics(taskId)
} yield JsonOk(Json.toJson(results))
}
}

@ApiOperation(hidden = true, value = "")
def getArtifactChecksums(workflowHash: String,
runId: String,
taskName: String,
artifactName: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
runIdValidated <- ObjectId.fromString(runId)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "Task not found" ~> NOT_FOUND
results <- voxelyticsDAO.getArtifactChecksums(taskId, artifactName)
} yield JsonOk(Json.toJson(results))
}
}

@ApiOperation(hidden = true, value = "")
def appendLogs: Action[List[JsObject]] =
sil.SecuredAction.async(validateJson[List[JsObject]]) { implicit request =>
for {
organization <- organizationDAO.findOne(request.identity._organization)
logEntries = request.body.map(
entry =>
entry ++ Json.obj("vx" -> ((entry \ "vx").as[JsObject] ++ Json.obj("wk_org" -> organization.name,
"wk_user" -> request.identity._id.id))))
_ <- elasticsearchClient.bulkInsert(logEntries)
} yield Ok
}

@ApiOperation(hidden = true, value = "")
def getLogs(runId: String, taskName: Option[String], minLevel: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
runIdValidated <- ObjectId.fromString(runId)
runName <- voxelyticsDAO.getRunNameById(runIdValidated, request.identity._organization)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
organization <- organizationDAO.findOne(request.identity._organization)
organizationName = organization.name
logEntries <- elasticsearchClient.queryLogs(
runName,
organizationName,
taskName,
minLevel.flatMap(VoxelyticsLogLevel.fromString).getOrElse(VoxelyticsLogLevel.INFO))
} yield JsonOk(logEntries)
}
}
}
8 changes: 2 additions & 6 deletions app/models/mesh/Mesh.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import com.scalableminds.util.accesscontext.DBAccessContext
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.schema.Tables._

import javax.inject.Inject
import play.api.libs.functional.syntax._
import play.api.libs.json.Json._
import play.api.libs.json._
import slick.jdbc.PostgresProfile.api._
Expand All @@ -30,11 +30,7 @@ case class MeshInfoParameters(
position: Vec3Int,
)
object MeshInfoParameters {
implicit val meshInfoParametersReads: Reads[MeshInfoParameters] =
((__ \ "annotationId").read[String](ObjectId.stringObjectIdReads("teamId")) and
(__ \ "description").read[String] and
(__ \ "position").read[Vec3Int])((annotationId, description, position) =>
MeshInfoParameters(ObjectId(annotationId), description, position))
implicit val jsonFormat: OFormat[MeshInfoParameters] = Json.format[MeshInfoParameters]
}

class MeshService @Inject()()(implicit ec: ExecutionContext) {
Expand Down
6 changes: 3 additions & 3 deletions app/models/project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ object Project {
// format: off
val projectPublicReads: Reads[Project] =
((__ \ 'name).read[String](Reads.minLength[String](3) keepAnd validateProjectName) and
(__ \ 'team).read[String](ObjectId.stringObjectIdReads("team")) and
(__ \ 'team).read[ObjectId] and
(__ \ 'priority).read[Int] and
(__ \ 'paused).readNullable[Boolean] and
(__ \ 'expectedTime).readNullable[Long] and
(__ \ 'owner).read[String](ObjectId.stringObjectIdReads("owner")) and
(__ \ 'owner).read[ObjectId] and
(__ \ 'isBlacklistedFromReport).read[Boolean]) (
(name, team, priority, paused, expectedTime, owner, isBlacklistedFromReport) =>
Project(ObjectId.generate, ObjectId(team), ObjectId(owner), name, priority, paused getOrElse false, expectedTime, isBlacklistedFromReport))
Project(ObjectId.generate, team, owner, name, priority, paused getOrElse false, expectedTime, isBlacklistedFromReport))
// format: on

}
Expand Down
4 changes: 2 additions & 2 deletions app/models/task/Script.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class ScriptService @Inject()(userDAO: UserDAO, userService: UserService) {
}

object Script {
def fromForm(name: String, gist: String, _owner: String): Script =
Script(ObjectId.generate, ObjectId(_owner), name, gist)
def fromForm(name: String, gist: String, _owner: ObjectId): Script =
Script(ObjectId.generate, _owner, name, gist)
}

class ScriptDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext)
Expand Down
Loading