Skip to content

Commit

Permalink
Avoid Java Temp Files in task creation (#6178)
Browse files Browse the repository at this point in the history
* Avoid Java Temp Files in task creation

* [wip] temp file service

* add clean up logic, refactor

* pr feedback, replace slash in temp files
  • Loading branch information
fm3 authored May 3, 2022
1 parent 3499faf commit 7a953f9
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
### Changed

### Fixed
- Fixed a bug in the task cration, where creation for some tasks with initial volume data would fail. [#6178](https://github.com/scalableminds/webknossos/pull/6178)

### Removed

Expand Down
9 changes: 9 additions & 0 deletions app/Startup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import models.annotation.AnnotationDAO
import models.user.InviteService
import net.liftweb.common.{Failure, Full}
import oxalis.cleanup.CleanUpService
import oxalis.files.TempFileService
import oxalis.mail.{Mailer, MailerConfig}
import oxalis.security.WkSilhouetteEnvironment
import oxalis.telemetry.SlackNotificationService
Expand All @@ -27,6 +28,7 @@ class Startup @Inject()(actorSystem: ActorSystem,
annotationDAO: AnnotationDAO,
wkSilhouetteEnvironment: WkSilhouetteEnvironment,
lifecycle: ApplicationLifecycle,
tempFileService: TempFileService,
inviteService: InviteService,
sqlClient: SQLClient,
slackNotificationService: SlackNotificationService)
Expand Down Expand Up @@ -67,6 +69,13 @@ class Startup @Inject()(actorSystem: ActorSystem,
}
}

lifecycle.addStopHook { () =>
Future.successful {
logger.info("Deleting temporary files")
tempFileService.cleanUpAll()
}
}

private def ensurePostgresDatabase = {
logger.info("Running ensure_db.sh with POSTGRES_URL " + sys.env.get("POSTGRES_URL"))

Expand Down
2 changes: 2 additions & 0 deletions app/WebKnossosModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import models.binary.DataSetService
import models.task.TaskService
import models.user.{UserCache, _}
import models.user.time.TimeSpanService
import oxalis.files.TempFileService
import oxalis.telemetry.SlackNotificationService
import utils.SQLClient

Expand All @@ -25,6 +26,7 @@ class WebKnossosModule extends AbstractModule {
bind(classOf[AnnotationStore]).asEagerSingleton()
bind(classOf[DataSetService]).asEagerSingleton()
bind(classOf[TimeSpanService]).asEagerSingleton()
bind(classOf[TempFileService]).asEagerSingleton()
bind(classOf[JobService]).asEagerSingleton()
bind(classOf[SlackNotificationService]).asEagerSingleton()
bind(classOf[AnalyticsSessionService]).asEagerSingleton()
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/AnnotationIOController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Expects:

private def mergeAndSaveVolumeLayers(volumeLayersGrouped: Seq[List[UploadedVolumeLayer]],
client: WKRemoteTracingStoreClient,
otherFiles: Map[String, TemporaryFile]): Fox[List[AnnotationLayer]] = {
otherFiles: Map[String, File]): Fox[List[AnnotationLayer]] = {
if (volumeLayersGrouped.isEmpty) return Fox.successful(List())
if (volumeLayersGrouped.length > 1 && volumeLayersGrouped.exists(_.length > 1))
return Fox.failure("Cannot merge multiple annotations that each have multiple volume layers.")
Expand Down
23 changes: 13 additions & 10 deletions app/models/annotation/AnnotationUploadService.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package models.annotation

import java.io.{File, FileInputStream, InputStream}
import java.nio.file.{Files, StandardCopyOption}
import java.nio.file.{Files, Path, StandardCopyOption}

import com.scalableminds.util.io.ZipIO
import com.scalableminds.webknossos.datastore.SkeletonTracing.{SkeletonTracing, TreeGroup}
Expand All @@ -12,15 +12,18 @@ import models.annotation.nml.NmlResults._
import models.annotation.nml.{NmlParser, NmlResults}
import net.liftweb.common.{Box, Empty, Failure, Full}
import net.liftweb.util.Helpers.tryo
import oxalis.files.TempFileService
import play.api.i18n.MessagesProvider
import play.api.libs.Files.{TemporaryFile, TemporaryFileCreator}

import scala.concurrent.ExecutionContext

case class UploadedVolumeLayer(tracing: VolumeTracing, dataZipLocation: String, name: Option[String]) {
def getDataZipFrom(otherFiles: Map[String, TemporaryFile]): Option[File] =
otherFiles.get(dataZipLocation).map(_.path.toFile)
def getDataZipFrom(otherFiles: Map[String, File]): Option[File] =
otherFiles.get(dataZipLocation)
}

class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreator) extends LazyLogging {
class AnnotationUploadService @Inject()(tempFileService: TempFileService)(implicit ec: ExecutionContext)
extends LazyLogging {

private def extractFromNml(file: File, name: String, overwritingDataSetName: Option[String], isTaskUpload: Boolean)(
implicit m: MessagesProvider): NmlParseResult =
Expand Down Expand Up @@ -50,7 +53,7 @@ class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreat
overwritingDataSetName: Option[String],
isTaskUpload: Boolean)(implicit m: MessagesProvider): MultiNmlParseResult = {
val name = zipFileName getOrElse file.getName
var otherFiles = Map.empty[String, TemporaryFile]
var otherFiles = Map.empty[String, File]
var parseResults = List.empty[NmlParseResult]

ZipIO.withUnziped(file) { (filename, inputStream) =>
Expand All @@ -59,9 +62,9 @@ class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreat
extractFromNml(inputStream, filename.toString, overwritingDataSetName, isTaskUpload, Some(file.getPath))
parseResults ::= (if (useZipName) result.withName(name) else result)
} else {
val tempFile = temporaryFileCreator.create(filename.toString)
Files.copy(inputStream, tempFile.path, StandardCopyOption.REPLACE_EXISTING)
otherFiles += (file.getPath + filename.toString -> tempFile)
val tempFile: Path = tempFileService.create(file.getPath.replaceAll("/", "_") + filename.toString)
Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING)
otherFiles += (file.getPath + filename.toString -> tempFile.toFile)
}
}
MultiNmlParseResult(parseResults, otherFiles)
Expand Down Expand Up @@ -123,7 +126,7 @@ class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreat
acc.combineWith(
extractFromFiles(
extractFromZip(file, Some(name), useZipName, overwritingDataSetName, isTaskUpload).otherFiles.toSeq
.map(tuple => (tuple._2.path.toFile, tuple._1)),
.map(tuple => (tuple._2, tuple._1)),
useZipName,
overwritingDataSetName,
isTaskUpload
Expand Down
6 changes: 2 additions & 4 deletions app/models/annotation/nml/NmlResults.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.scalableminds.webknossos.datastore.VolumeTracing.VolumeTracing
import com.typesafe.scalalogging.LazyLogging
import models.annotation.UploadedVolumeLayer
import net.liftweb.common.{Box, Empty, Failure, Full}
import play.api.libs.Files.TemporaryFile

object NmlResults extends LazyLogging {

Expand Down Expand Up @@ -50,8 +49,7 @@ object NmlResults extends LazyLogging {
def succeeded = false
}

case class MultiNmlParseResult(parseResults: List[NmlParseResult] = Nil,
otherFiles: Map[String, TemporaryFile] = Map.empty) {
case class MultiNmlParseResult(parseResults: List[NmlParseResult] = Nil, otherFiles: Map[String, File] = Map.empty) {

def combineWith(other: MultiNmlParseResult): MultiNmlParseResult =
MultiNmlParseResult(parseResults ::: other.parseResults, other.otherFiles ++ otherFiles)
Expand Down Expand Up @@ -82,7 +80,7 @@ object NmlResults extends LazyLogging {
case Full(success) if success.volumeLayers.length <= 1 =>
success.volumeLayers.headOption match {
case Some(UploadedVolumeLayer(tracing, dataZipLocation, _)) =>
Full((tracing, otherFiles.get(dataZipLocation).map(_.path.toFile)))
Full((tracing, otherFiles.get(dataZipLocation)))
case None => Empty
}
case Full(success) if success.volumeLayers.length > 1 =>
Expand Down
18 changes: 6 additions & 12 deletions app/models/task/TaskCreationService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package models.task
import java.io.File

import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.geometry.{BoundingBox, Vec3Int, Vec3Double}
import com.scalableminds.util.geometry.{BoundingBox, Vec3Double, Vec3Int}
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.SkeletonTracing.{SkeletonTracing, SkeletonTracingOpt, SkeletonTracings}
import com.scalableminds.webknossos.datastore.VolumeTracing.VolumeTracing
Expand All @@ -12,15 +12,7 @@ import com.scalableminds.webknossos.tracingstore.tracings.TracingType
import com.scalableminds.webknossos.tracingstore.tracings.volume.ResolutionRestrictions
import javax.inject.Inject
import models.annotation.nml.NmlResults.TracingBoxContainer
import models.annotation.{
Annotation,
AnnotationDAO,
AnnotationService,
AnnotationState,
AnnotationType,
WKRemoteTracingStoreClient,
TracingStoreService
}
import models.annotation._
import models.binary.{DataSet, DataSetDAO, DataSetService}
import models.project.{Project, ProjectDAO}
import models.team.{Team, TeamDAO}
Expand Down Expand Up @@ -406,8 +398,10 @@ class TaskCreationService @Inject()(taskTypeService: TaskTypeService,
case _ => savedId
}
}
volumeTracingIds: List[Box[Option[String]]] <- Fox.sequence(
requestedTasks.map(requestedTask => saveVolumeTracingIfPresent(requestedTask, tracingStoreClient)))
// Note that volume tracings are saved sequentially to reduce server load
volumeTracingIds: List[Box[Option[String]]] <- Fox.serialSequenceBox(requestedTasks) { requestedTask =>
saveVolumeTracingIfPresent(requestedTask, tracingStoreClient)
}
skeletonTracingsIdsMerged = mergeTracingIds((requestedTasks.map(_.map(_._1)), skeletonTracingIds).zipped.toList,
isSkeletonId = true)
volumeTracingsIdsMerged = mergeTracingIds((requestedTasks.map(_.map(_._1)), volumeTracingIds).zipped.toList,
Expand Down
55 changes: 55 additions & 0 deletions app/oxalis/files/TempFileService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package oxalis.files

import java.nio.file.{Files, Path, Paths}

import com.scalableminds.util.tools.Fox
import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject
import net.liftweb.common.Box.tryo
import org.apache.commons.io.FileUtils
import oxalis.cleanup.CleanUpService

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

/**
* Avoiding Java TemporaryFiles because of seeming openJDK regression,
* see discussion at https://github.com/scalableminds/webknossos/issues/6173
*/
class TempFileService @Inject()(cleanUpService: CleanUpService)(implicit ec: ExecutionContext) extends LazyLogging {

private val tmpDir: Path = Paths.get(System.getProperty("java.io.tmpdir")).resolve("webKnossosTempFiles")

private val activeTempFiles = scala.collection.mutable.Set[(Path, Long)]()

cleanUpService.register("Clean up expired temporary files", 1 hour)(cleanUpExpiredFiles())

private def ensureParent(): Path =
Files.createDirectories(tmpDir)

def create(prefix: String = "tmpFile", lifeTime: FiniteDuration = 2 hours): Path = {
ensureParent()
val path = tmpDir.resolve(f"$prefix-${Random.alphanumeric.take(15).mkString("")}")
logger.info(f"Creating temp file at $path")
Files.createFile(path)
activeTempFiles.add((path, System.currentTimeMillis() + lifeTime.toMillis))
path
}

def cleanUpExpiredFiles(): Fox[Unit] = {
val now = System.currentTimeMillis()
activeTempFiles.foreach {
case (path, expiryTime) =>
if (expiryTime < now) {
tryo(Files.delete(path))
activeTempFiles.remove((path, expiryTime))
}
}
Fox.successful(())
}

def cleanUpAll(): Unit =
FileUtils.deleteDirectory(tmpDir.toFile)

}
17 changes: 17 additions & 0 deletions util/src/main/scala/com/scalableminds/util/tools/Fox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object Fox extends FoxImplicits {
implicit ec: ExecutionContext): Fox[Nothing] =
new Fox(Future.successful(Failure(message, ex, chain)))

// run serially, fail on the first failure
def serialSequence[A, B](l: List[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[List[B]] = {
def runNext(remaining: List[A], results: List[B]): Future[List[B]] =
remaining match {
Expand All @@ -71,6 +72,21 @@ object Fox extends FoxImplicits {
runNext(l, Nil)
}

// run serially, return individual results in list of box
def serialSequenceBox[A, B](l: List[A])(f: A => Fox[B])(implicit ec: ExecutionContext): Future[List[Box[B]]] = {
def runNext(remaining: List[A], results: List[Box[B]]): Future[List[Box[B]]] =
remaining match {
case head :: tail =>
for {
currentResult <- f(head).futureBox
results <- runNext(tail, currentResult :: results)
} yield results
case Nil =>
Future.successful(results.reverse)
}
runNext(l, Nil)
}

def sequence[T](l: List[Fox[T]])(implicit ec: ExecutionContext): Future[List[Box[T]]] =
Future.sequence(l.map(_.futureBox))

Expand Down Expand Up @@ -105,6 +121,7 @@ object Fox extends FoxImplicits {
new Fox(r)
}

// Run serially, fail on the first failure
def serialCombined[A, B](l: List[A])(f: A => Fox[B])(implicit ec: ExecutionContext): Fox[List[B]] = {
def runNext(remaining: List[A], results: List[B]): Fox[List[B]] =
remaining match {
Expand Down

0 comments on commit 7a953f9

Please sign in to comment.