Skip to content

Commit

Permalink
use allDataBlocking and temporary files for explorative download (#6009)
Browse files Browse the repository at this point in the history
* use allDataBlocking and temporary files for explorative download

* changelog

* include name attribute in volume tag in nml

* explicit variable name
  • Loading branch information
fm3 authored Feb 3, 2022
1 parent 8a0ecac commit 50cef58
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Fixed volume-related bugs which could corrupt the volume data in certain scenarios. [#5955](https://github.com/scalableminds/webknossos/pull/5955)
- Fixed the placeholder resolution computation for anisotropic layers with missing base resolutions. [#5983](https://github.com/scalableminds/webknossos/pull/5983)
- Fixed a bug where ad-hoc meshes were computed for a mapping, although it was disabled. [#5982](https://github.com/scalableminds/webknossos/pull/5982)
- Fixed a bug where volume annotation downloads would sometimes contain truncated zips. [#6009](https://github.com/scalableminds/webknossos/pull/6009)


### Removed
Expand Down
115 changes: 66 additions & 49 deletions app/controllers/AnnotationIOController.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package controllers

import java.io.File
import java.io.{BufferedOutputStream, File, FileOutputStream}

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl._
import com.mohiva.play.silhouette.api.Silhouette
import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.io.{NamedEnumeratorStream, ZipIO}
Expand All @@ -30,9 +29,7 @@ import models.task._
import models.user._
import oxalis.security.WkEnv
import play.api.i18n.{Messages, MessagesProvider}
import play.api.libs.Files.TemporaryFile
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.streams.IterateeStreams
import play.api.libs.Files.{TemporaryFile, TemporaryFileCreator}
import play.api.libs.json.Json
import play.api.mvc.{Action, AnyContent, MultipartFormData}
import utils.ObjectId
Expand All @@ -51,6 +48,7 @@ class AnnotationIOController @Inject()(
taskDAO: TaskDAO,
taskTypeDAO: TaskTypeDAO,
tracingStoreService: TracingStoreService,
temporaryFileCreator: TemporaryFileCreator,
annotationService: AnnotationService,
analyticsService: AnalyticsService,
sil: Silhouette[WkEnv],
Expand Down Expand Up @@ -268,30 +266,34 @@ Expects:
volumeVersion: Option[Long],
skipVolumeData: Boolean)(implicit ctx: DBAccessContext) = {

def skeletonToDownloadStream(dataSet: DataSet, annotation: Annotation, name: String, organizationName: String) =
def skeletonToTemporaryFile(dataSet: DataSet,
annotation: Annotation,
organizationName: String): Fox[TemporaryFile] =
for {
tracingStoreClient <- tracingStoreService.clientFor(dataSet)
fetchedAnnotationLayers <- Fox.serialCombined(annotation.skeletonAnnotationLayers)(
tracingStoreClient.getSkeletonTracing(_, skeletonVersion))
user <- userService.findOneById(annotation._user, useCache = true)
taskOpt <- Fox.runOptional(annotation._task)(taskDAO.findOne)
} yield {
(nmlWriter.toNmlStream(fetchedAnnotationLayers,
Some(annotation),
dataSet.scale,
None,
organizationName,
Some(user),
taskOpt),
name + ".nml")
}
nmlStream = nmlWriter.toNmlStream(fetchedAnnotationLayers,
Some(annotation),
dataSet.scale,
None,
organizationName,
Some(user),
taskOpt)
nmlTemporaryFile = temporaryFileCreator.create()
temporaryFileStream = new BufferedOutputStream(new FileOutputStream(nmlTemporaryFile))
_ <- NamedEnumeratorStream("", nmlStream).writeTo(temporaryFileStream)
_ = temporaryFileStream.close()
} yield nmlTemporaryFile

def volumeOrHybridToDownloadStream(dataSet: DataSet,
annotation: Annotation,
name: String,
organizationName: String): Fox[(Enumerator[Array[Byte]], String)] =
def volumeOrHybridToTemporaryFile(dataset: DataSet,
annotation: Annotation,
name: String,
organizationName: String): Fox[TemporaryFile] =
for {
tracingStoreClient <- tracingStoreService.clientFor(dataSet)
tracingStoreClient <- tracingStoreService.clientFor(dataset)
fetchedVolumeLayers: List[FetchedAnnotationLayer] <- Fox.serialCombined(annotation.volumeAnnotationLayers) {
volumeAnnotationLayer =>
tracingStoreClient.getVolumeTracing(volumeAnnotationLayer, volumeVersion, skipVolumeData)
Expand All @@ -302,46 +304,61 @@ Expects:
}
user <- userService.findOneById(annotation._user, useCache = true)
taskOpt <- Fox.runOptional(annotation._task)(taskDAO.findOne)
} yield {
val nmlStream = NamedEnumeratorStream(
name + ".nml",
nmlWriter.toNmlStream(fetchedSkeletonLayers ::: fetchedVolumeLayers,
Some(annotation),
dataSet.scale,
None,
organizationName,
Some(user),
taskOpt)
)
val dataStreams: List[NamedEnumeratorStream] =
fetchedVolumeLayers.zipWithIndex.flatMap {
case (volumeLayer, index) => volumeLayer.namedVolumeDataEnumerator(index, fetchedVolumeLayers.length == 1)
}
(Enumerator.outputStream { outputStream =>
ZipIO.zip(
nmlStream :: dataStreams,
outputStream
)
}, name + ".zip")
}
nmlStream = nmlWriter.toNmlStream(fetchedSkeletonLayers ::: fetchedVolumeLayers,
Some(annotation),
dataset.scale,
None,
organizationName,
Some(user),
taskOpt)
temporaryFile = temporaryFileCreator.create()
zipper = ZipIO.startZip(new BufferedOutputStream(new FileOutputStream(new File(temporaryFile.path.toString))))
_ <- zipper.addFileFromEnumerator(name + ".nml", nmlStream)
_ = fetchedVolumeLayers.zipWithIndex.map {
case (volumeLayer, index) =>
volumeLayer.volumeDataOpt.foreach { volumeData =>
val dataZipName = volumeLayer.volumeDataZipName(index, fetchedSkeletonLayers.length == 1)
zipper.addFileFromBytes(dataZipName, volumeData)
}
}
_ = zipper.close()
} yield temporaryFile

def annotationToTemporaryFile(dataSet: DataSet,
annotation: Annotation,
name: String,
organizationName: String): Fox[TemporaryFile] =
if (annotation.tracingType == TracingType.skeleton)
skeletonToTemporaryFile(dataSet, annotation, organizationName)
else
volumeOrHybridToTemporaryFile(dataSet, annotation, name, organizationName)

def exportExtensionForAnnotation(annotation: Annotation): String =
if (annotation.tracingType == TracingType.skeleton)
".nml"
else
".zip"

def tracingToDownloadStream(dataSet: DataSet, annotation: Annotation, name: String, organizationName: String) =
def exportMimeTypeForAnnotation(annotation: Annotation): String =
if (annotation.tracingType == TracingType.skeleton)
skeletonToDownloadStream(dataSet, annotation, name, organizationName)
"application/xml"
else
volumeOrHybridToDownloadStream(dataSet, annotation, name, organizationName)
"application/zip"

for {
annotation <- provider.provideAnnotation(typ, annotationId, issuingUser) ~> NOT_FOUND
restrictions <- provider.restrictionsFor(typ, annotationId)
name <- provider.nameFor(annotation) ?~> "annotation.name.impossible"
fileExtension = exportExtensionForAnnotation(annotation)
fileName = name + fileExtension
mimeType = exportMimeTypeForAnnotation(annotation)
_ <- restrictions.allowDownload(issuingUser) ?~> "annotation.download.notAllowed" ~> FORBIDDEN
dataSet <- dataSetDAO.findOne(annotation._dataSet)(GlobalAccessContext) ?~> "dataSet.notFoundForAnnotation" ~> NOT_FOUND
organization <- organizationDAO.findOne(dataSet._organization)(GlobalAccessContext) ?~> "organization.notFound" ~> NOT_FOUND
(downloadStream, fileName) <- tracingToDownloadStream(dataSet, annotation, name, organization.name)
temporaryFile <- annotationToTemporaryFile(dataSet, annotation, name, organization.name)
} yield {
Ok.chunked(Source.fromPublisher(IterateeStreams.enumeratorToPublisher(downloadStream)))
.as(if (fileName.toLowerCase.endsWith(".zip")) "application/zip" else "application/xml")
Ok.sendFile(temporaryFile, inline = false)
.as(mimeType)
.withHeaders(CONTENT_DISPOSITION ->
s"attachment;filename=${'"'}$fileName${'"'}")
}
Expand Down
17 changes: 2 additions & 15 deletions app/models/annotation/AnnotationLayer.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package models.annotation

import akka.stream.Materializer
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
import com.scalableminds.util.io.NamedEnumeratorStream
import com.scalableminds.util.tools.Fox.bool2Fox
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.SkeletonTracing.SkeletonTracing
import com.scalableminds.webknossos.datastore.VolumeTracing.VolumeTracing
import models.annotation.AnnotationLayerType.AnnotationLayerType
import play.api.libs.iteratee.Enumerator
import play.api.libs.json.{Json, OFormat}
import scalapb.GeneratedMessage

Expand Down Expand Up @@ -39,18 +34,10 @@ object AnnotationLayer extends FoxImplicits {
case class FetchedAnnotationLayer(tracingId: String,
name: Option[String],
tracing: Either[SkeletonTracing, VolumeTracing],
volumeDataOpt: Option[Source[ByteString, _]] = None) {
volumeDataOpt: Option[Array[Byte]] = None) {
def typ: AnnotationLayerType =
if (tracing.isLeft) AnnotationLayerType.Skeleton else AnnotationLayerType.Volume

def volumeDataEnumerator(implicit ec: ExecutionContext, materializer: Materializer): Option[Enumerator[Array[Byte]]] =
volumeDataOpt.map(d => Enumerator.fromStream(d.runWith(StreamConverters.asInputStream())))

def namedVolumeDataEnumerator(index: Int, isSingle: Boolean)(
implicit ec: ExecutionContext,
materializer: Materializer): Option[NamedEnumeratorStream] =
volumeDataEnumerator.map(enumerator => NamedEnumeratorStream(volumeDataZipName(index, isSingle), enumerator))

def volumeDataZipName(index: Int, isSingle: Boolean): String = {
val indexLabel = if (isSingle) "" else s"_$index"
val nameLabel = name.map(n => s"_$n").getOrElse("")
Expand All @@ -62,7 +49,7 @@ object FetchedAnnotationLayer {
def fromAnnotationLayer[T <: GeneratedMessage](
annotationLayer: AnnotationLayer,
tracing: Either[SkeletonTracing, VolumeTracing],
volumeDataOpt: Option[Source[ByteString, _]] = None)(implicit ec: ExecutionContext): Fox[FetchedAnnotationLayer] =
volumeDataOpt: Option[Array[Byte]] = None)(implicit ec: ExecutionContext): Fox[FetchedAnnotationLayer] =
for {
_ <- bool2Fox(
(annotationLayer.typ == AnnotationLayerType.Skeleton && tracing.isLeft) || annotationLayer.typ == AnnotationLayerType.Volume && tracing.isRight) ?~> "annotation.download.fetch.typeMismatch"
Expand Down
4 changes: 2 additions & 2 deletions app/models/annotation/WKRemoteTracingStoreClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ class WKRemoteTracingStoreClient(tracingStore: TracingStore, dataSet: DataSet, r
.addQueryStringOptional("version", version.map(_.toString))
.getWithProtoResponse[VolumeTracing](VolumeTracing)
data <- Fox.runIf(!skipVolumeData) {
rpc(s"${tracingStore.url}/tracings/volume/$tracingId/allData")
rpc(s"${tracingStore.url}/tracings/volume/$tracingId/allDataBlocking")
.addQueryString("token" -> RpcTokenHolder.webKnossosToken)
.addQueryStringOptional("version", version.map(_.toString))
.getStream
.getWithBytesResponse
}
fetchedAnnotationLayer <- FetchedAnnotationLayer.fromAnnotationLayer(annotationLayer, Right(tracing), data)
} yield fetchedAnnotationLayer
Expand Down
1 change: 1 addition & 0 deletions app/models/annotation/nml/NmlWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class NmlWriter @Inject()(implicit ec: ExecutionContext) extends FoxImplicits {
Xml.withinElementSync("volume") {
writer.writeAttribute("id", index.toString)
writer.writeAttribute("location", volumeFilename.getOrElse(volumeLayer.volumeDataZipName(index, isSingle)))
volumeLayer.name.foreach(n => writer.writeAttribute("name", n))
volumeLayer.tracing match {
case Right(volumeTracing) => volumeTracing.fallbackLayer.foreach(writer.writeAttribute("fallbackLayer", _))
case _ => ()
Expand Down
4 changes: 2 additions & 2 deletions util/src/main/scala/com/scalableminds/util/io/FileIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ trait NamedStream {
}

case class NamedFunctionStream(name: String, writer: OutputStream => Future[Unit]) extends NamedStream {
def writeTo(out: OutputStream)(implicit ec: ExecutionContext) = writer(out)
def writeTo(out: OutputStream)(implicit ec: ExecutionContext): Future[Unit] = writer(out)
}

case class NamedEnumeratorStream(name: String, enumerator: Enumerator[Array[Byte]]) extends NamedStream {
def writeTo(out: OutputStream)(implicit ec: ExecutionContext) = {
def writeTo(out: OutputStream)(implicit ec: ExecutionContext): Future[Unit] = {
val iteratee = Iteratee.foreach[Array[Byte]] { bytes =>
out.write(bytes)
}
Expand Down
8 changes: 6 additions & 2 deletions util/src/main/scala/com/scalableminds/util/io/ZipIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ object ZipIO extends LazyLogging {
}

def addFileFromEnumerator(name: String, data: Enumerator[Array[Byte]])(
implicit ec: ExecutionContext): Future[Unit] =
addFileFromNamedEnumerator(NamedEnumeratorStream(name, data))

def addFileFromNamedEnumerator(namedEnumerator: NamedEnumeratorStream)(
implicit ec: ExecutionContext): Future[Unit] = {
stream.putNextEntry(new ZipEntry(name))
NamedEnumeratorStream("", data).writeTo(stream).map(_ => stream.closeEntry())
stream.putNextEntry(new ZipEntry(namedEnumerator.name))
namedEnumerator.writeTo(stream).map(_ => stream.closeEntry())
}

/**
Expand Down

0 comments on commit 50cef58

Please sign in to comment.