From 7a953f9aa818c4d4e80ba280a469965566d6af2f Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 3 May 2022 13:17:47 +0200 Subject: [PATCH] Avoid Java Temp Files in task creation (#6178) * Avoid Java Temp Files in task creation * [wip] temp file service * add clean up logic, refactor * pr feedback, replace slash in temp files --- CHANGELOG.unreleased.md | 1 + app/Startup.scala | 9 +++ app/WebKnossosModule.scala | 2 + app/controllers/AnnotationIOController.scala | 2 +- .../annotation/AnnotationUploadService.scala | 23 ++++---- app/models/annotation/nml/NmlResults.scala | 6 +- app/models/task/TaskCreationService.scala | 18 ++---- app/oxalis/files/TempFileService.scala | 55 +++++++++++++++++++ .../com/scalableminds/util/tools/Fox.scala | 17 ++++++ 9 files changed, 106 insertions(+), 27 deletions(-) create mode 100644 app/oxalis/files/TempFileService.scala diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 28f28c0a8b1..e19755caf4c 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -15,6 +15,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released ### Changed ### Fixed +- Fixed a bug in the task cration, where creation for some tasks with initial volume data would fail. [#6178](https://github.com/scalableminds/webknossos/pull/6178) ### Removed diff --git a/app/Startup.scala b/app/Startup.scala index 5ebb82f20e4..0e343e4e38c 100755 --- a/app/Startup.scala +++ b/app/Startup.scala @@ -9,6 +9,7 @@ import models.annotation.AnnotationDAO import models.user.InviteService import net.liftweb.common.{Failure, Full} import oxalis.cleanup.CleanUpService +import oxalis.files.TempFileService import oxalis.mail.{Mailer, MailerConfig} import oxalis.security.WkSilhouetteEnvironment import oxalis.telemetry.SlackNotificationService @@ -27,6 +28,7 @@ class Startup @Inject()(actorSystem: ActorSystem, annotationDAO: AnnotationDAO, wkSilhouetteEnvironment: WkSilhouetteEnvironment, lifecycle: ApplicationLifecycle, + tempFileService: TempFileService, inviteService: InviteService, sqlClient: SQLClient, slackNotificationService: SlackNotificationService) @@ -67,6 +69,13 @@ class Startup @Inject()(actorSystem: ActorSystem, } } + lifecycle.addStopHook { () => + Future.successful { + logger.info("Deleting temporary files") + tempFileService.cleanUpAll() + } + } + private def ensurePostgresDatabase = { logger.info("Running ensure_db.sh with POSTGRES_URL " + sys.env.get("POSTGRES_URL")) diff --git a/app/WebKnossosModule.scala b/app/WebKnossosModule.scala index dd06e7c6138..53d34ca341e 100644 --- a/app/WebKnossosModule.scala +++ b/app/WebKnossosModule.scala @@ -7,6 +7,7 @@ import models.binary.DataSetService import models.task.TaskService import models.user.{UserCache, _} import models.user.time.TimeSpanService +import oxalis.files.TempFileService import oxalis.telemetry.SlackNotificationService import utils.SQLClient @@ -25,6 +26,7 @@ class WebKnossosModule extends AbstractModule { bind(classOf[AnnotationStore]).asEagerSingleton() bind(classOf[DataSetService]).asEagerSingleton() bind(classOf[TimeSpanService]).asEagerSingleton() + bind(classOf[TempFileService]).asEagerSingleton() bind(classOf[JobService]).asEagerSingleton() bind(classOf[SlackNotificationService]).asEagerSingleton() bind(classOf[AnalyticsSessionService]).asEagerSingleton() diff --git a/app/controllers/AnnotationIOController.scala b/app/controllers/AnnotationIOController.scala index 4a8e07a3b6f..26e221b2cdd 100755 --- a/app/controllers/AnnotationIOController.scala +++ b/app/controllers/AnnotationIOController.scala @@ -139,7 +139,7 @@ Expects: private def mergeAndSaveVolumeLayers(volumeLayersGrouped: Seq[List[UploadedVolumeLayer]], client: WKRemoteTracingStoreClient, - otherFiles: Map[String, TemporaryFile]): Fox[List[AnnotationLayer]] = { + otherFiles: Map[String, File]): Fox[List[AnnotationLayer]] = { if (volumeLayersGrouped.isEmpty) return Fox.successful(List()) if (volumeLayersGrouped.length > 1 && volumeLayersGrouped.exists(_.length > 1)) return Fox.failure("Cannot merge multiple annotations that each have multiple volume layers.") diff --git a/app/models/annotation/AnnotationUploadService.scala b/app/models/annotation/AnnotationUploadService.scala index c709b9b4217..84f5bb44a59 100644 --- a/app/models/annotation/AnnotationUploadService.scala +++ b/app/models/annotation/AnnotationUploadService.scala @@ -1,7 +1,7 @@ package models.annotation import java.io.{File, FileInputStream, InputStream} -import java.nio.file.{Files, StandardCopyOption} +import java.nio.file.{Files, Path, StandardCopyOption} import com.scalableminds.util.io.ZipIO import com.scalableminds.webknossos.datastore.SkeletonTracing.{SkeletonTracing, TreeGroup} @@ -12,15 +12,18 @@ import models.annotation.nml.NmlResults._ import models.annotation.nml.{NmlParser, NmlResults} import net.liftweb.common.{Box, Empty, Failure, Full} import net.liftweb.util.Helpers.tryo +import oxalis.files.TempFileService import play.api.i18n.MessagesProvider -import play.api.libs.Files.{TemporaryFile, TemporaryFileCreator} + +import scala.concurrent.ExecutionContext case class UploadedVolumeLayer(tracing: VolumeTracing, dataZipLocation: String, name: Option[String]) { - def getDataZipFrom(otherFiles: Map[String, TemporaryFile]): Option[File] = - otherFiles.get(dataZipLocation).map(_.path.toFile) + def getDataZipFrom(otherFiles: Map[String, File]): Option[File] = + otherFiles.get(dataZipLocation) } -class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreator) extends LazyLogging { +class AnnotationUploadService @Inject()(tempFileService: TempFileService)(implicit ec: ExecutionContext) + extends LazyLogging { private def extractFromNml(file: File, name: String, overwritingDataSetName: Option[String], isTaskUpload: Boolean)( implicit m: MessagesProvider): NmlParseResult = @@ -50,7 +53,7 @@ class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreat overwritingDataSetName: Option[String], isTaskUpload: Boolean)(implicit m: MessagesProvider): MultiNmlParseResult = { val name = zipFileName getOrElse file.getName - var otherFiles = Map.empty[String, TemporaryFile] + var otherFiles = Map.empty[String, File] var parseResults = List.empty[NmlParseResult] ZipIO.withUnziped(file) { (filename, inputStream) => @@ -59,9 +62,9 @@ class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreat extractFromNml(inputStream, filename.toString, overwritingDataSetName, isTaskUpload, Some(file.getPath)) parseResults ::= (if (useZipName) result.withName(name) else result) } else { - val tempFile = temporaryFileCreator.create(filename.toString) - Files.copy(inputStream, tempFile.path, StandardCopyOption.REPLACE_EXISTING) - otherFiles += (file.getPath + filename.toString -> tempFile) + val tempFile: Path = tempFileService.create(file.getPath.replaceAll("/", "_") + filename.toString) + Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING) + otherFiles += (file.getPath + filename.toString -> tempFile.toFile) } } MultiNmlParseResult(parseResults, otherFiles) @@ -123,7 +126,7 @@ class AnnotationUploadService @Inject()(temporaryFileCreator: TemporaryFileCreat acc.combineWith( extractFromFiles( extractFromZip(file, Some(name), useZipName, overwritingDataSetName, isTaskUpload).otherFiles.toSeq - .map(tuple => (tuple._2.path.toFile, tuple._1)), + .map(tuple => (tuple._2, tuple._1)), useZipName, overwritingDataSetName, isTaskUpload diff --git a/app/models/annotation/nml/NmlResults.scala b/app/models/annotation/nml/NmlResults.scala index a9f0d454770..e4fbf08966f 100644 --- a/app/models/annotation/nml/NmlResults.scala +++ b/app/models/annotation/nml/NmlResults.scala @@ -7,7 +7,6 @@ import com.scalableminds.webknossos.datastore.VolumeTracing.VolumeTracing import com.typesafe.scalalogging.LazyLogging import models.annotation.UploadedVolumeLayer import net.liftweb.common.{Box, Empty, Failure, Full} -import play.api.libs.Files.TemporaryFile object NmlResults extends LazyLogging { @@ -50,8 +49,7 @@ object NmlResults extends LazyLogging { def succeeded = false } - case class MultiNmlParseResult(parseResults: List[NmlParseResult] = Nil, - otherFiles: Map[String, TemporaryFile] = Map.empty) { + case class MultiNmlParseResult(parseResults: List[NmlParseResult] = Nil, otherFiles: Map[String, File] = Map.empty) { def combineWith(other: MultiNmlParseResult): MultiNmlParseResult = MultiNmlParseResult(parseResults ::: other.parseResults, other.otherFiles ++ otherFiles) @@ -82,7 +80,7 @@ object NmlResults extends LazyLogging { case Full(success) if success.volumeLayers.length <= 1 => success.volumeLayers.headOption match { case Some(UploadedVolumeLayer(tracing, dataZipLocation, _)) => - Full((tracing, otherFiles.get(dataZipLocation).map(_.path.toFile))) + Full((tracing, otherFiles.get(dataZipLocation))) case None => Empty } case Full(success) if success.volumeLayers.length > 1 => diff --git a/app/models/task/TaskCreationService.scala b/app/models/task/TaskCreationService.scala index 04db9d1d8cc..fda0f3601b2 100644 --- a/app/models/task/TaskCreationService.scala +++ b/app/models/task/TaskCreationService.scala @@ -3,7 +3,7 @@ package models.task import java.io.File import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext} -import com.scalableminds.util.geometry.{BoundingBox, Vec3Int, Vec3Double} +import com.scalableminds.util.geometry.{BoundingBox, Vec3Double, Vec3Int} import com.scalableminds.util.tools.{Fox, FoxImplicits} import com.scalableminds.webknossos.datastore.SkeletonTracing.{SkeletonTracing, SkeletonTracingOpt, SkeletonTracings} import com.scalableminds.webknossos.datastore.VolumeTracing.VolumeTracing @@ -12,15 +12,7 @@ import com.scalableminds.webknossos.tracingstore.tracings.TracingType import com.scalableminds.webknossos.tracingstore.tracings.volume.ResolutionRestrictions import javax.inject.Inject import models.annotation.nml.NmlResults.TracingBoxContainer -import models.annotation.{ - Annotation, - AnnotationDAO, - AnnotationService, - AnnotationState, - AnnotationType, - WKRemoteTracingStoreClient, - TracingStoreService -} +import models.annotation._ import models.binary.{DataSet, DataSetDAO, DataSetService} import models.project.{Project, ProjectDAO} import models.team.{Team, TeamDAO} @@ -406,8 +398,10 @@ class TaskCreationService @Inject()(taskTypeService: TaskTypeService, case _ => savedId } } - volumeTracingIds: List[Box[Option[String]]] <- Fox.sequence( - requestedTasks.map(requestedTask => saveVolumeTracingIfPresent(requestedTask, tracingStoreClient))) + // Note that volume tracings are saved sequentially to reduce server load + volumeTracingIds: List[Box[Option[String]]] <- Fox.serialSequenceBox(requestedTasks) { requestedTask => + saveVolumeTracingIfPresent(requestedTask, tracingStoreClient) + } skeletonTracingsIdsMerged = mergeTracingIds((requestedTasks.map(_.map(_._1)), skeletonTracingIds).zipped.toList, isSkeletonId = true) volumeTracingsIdsMerged = mergeTracingIds((requestedTasks.map(_.map(_._1)), volumeTracingIds).zipped.toList, diff --git a/app/oxalis/files/TempFileService.scala b/app/oxalis/files/TempFileService.scala new file mode 100644 index 00000000000..ca63e85b5d8 --- /dev/null +++ b/app/oxalis/files/TempFileService.scala @@ -0,0 +1,55 @@ +package oxalis.files + +import java.nio.file.{Files, Path, Paths} + +import com.scalableminds.util.tools.Fox +import com.typesafe.scalalogging.LazyLogging +import javax.inject.Inject +import net.liftweb.common.Box.tryo +import org.apache.commons.io.FileUtils +import oxalis.cleanup.CleanUpService + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.Random + +/** + * Avoiding Java TemporaryFiles because of seeming openJDK regression, + * see discussion at https://github.com/scalableminds/webknossos/issues/6173 + */ +class TempFileService @Inject()(cleanUpService: CleanUpService)(implicit ec: ExecutionContext) extends LazyLogging { + + private val tmpDir: Path = Paths.get(System.getProperty("java.io.tmpdir")).resolve("webKnossosTempFiles") + + private val activeTempFiles = scala.collection.mutable.Set[(Path, Long)]() + + cleanUpService.register("Clean up expired temporary files", 1 hour)(cleanUpExpiredFiles()) + + private def ensureParent(): Path = + Files.createDirectories(tmpDir) + + def create(prefix: String = "tmpFile", lifeTime: FiniteDuration = 2 hours): Path = { + ensureParent() + val path = tmpDir.resolve(f"$prefix-${Random.alphanumeric.take(15).mkString("")}") + logger.info(f"Creating temp file at $path") + Files.createFile(path) + activeTempFiles.add((path, System.currentTimeMillis() + lifeTime.toMillis)) + path + } + + def cleanUpExpiredFiles(): Fox[Unit] = { + val now = System.currentTimeMillis() + activeTempFiles.foreach { + case (path, expiryTime) => + if (expiryTime < now) { + tryo(Files.delete(path)) + activeTempFiles.remove((path, expiryTime)) + } + } + Fox.successful(()) + } + + def cleanUpAll(): Unit = + FileUtils.deleteDirectory(tmpDir.toFile) + +} diff --git a/util/src/main/scala/com/scalableminds/util/tools/Fox.scala b/util/src/main/scala/com/scalableminds/util/tools/Fox.scala index 10ad67d26c2..f862bf9f285 100644 --- a/util/src/main/scala/com/scalableminds/util/tools/Fox.scala +++ b/util/src/main/scala/com/scalableminds/util/tools/Fox.scala @@ -57,6 +57,7 @@ object Fox extends FoxImplicits { implicit ec: ExecutionContext): Fox[Nothing] = new Fox(Future.successful(Failure(message, ex, chain))) + // run serially, fail on the first failure def serialSequence[A, B](l: List[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[List[B]] = { def runNext(remaining: List[A], results: List[B]): Future[List[B]] = remaining match { @@ -71,6 +72,21 @@ object Fox extends FoxImplicits { runNext(l, Nil) } + // run serially, return individual results in list of box + def serialSequenceBox[A, B](l: List[A])(f: A => Fox[B])(implicit ec: ExecutionContext): Future[List[Box[B]]] = { + def runNext(remaining: List[A], results: List[Box[B]]): Future[List[Box[B]]] = + remaining match { + case head :: tail => + for { + currentResult <- f(head).futureBox + results <- runNext(tail, currentResult :: results) + } yield results + case Nil => + Future.successful(results.reverse) + } + runNext(l, Nil) + } + def sequence[T](l: List[Fox[T]])(implicit ec: ExecutionContext): Future[List[Box[T]]] = Future.sequence(l.map(_.futureBox)) @@ -105,6 +121,7 @@ object Fox extends FoxImplicits { new Fox(r) } + // Run serially, fail on the first failure def serialCombined[A, B](l: List[A])(f: A => Fox[B])(implicit ec: ExecutionContext): Fox[List[B]] = { def runNext(remaining: List[A], results: List[B]): Fox[List[B]] = remaining match {