Skip to content

Commit

Permalink
Always return 404 for Failures in Zarr Streaming (#6515)
Browse files Browse the repository at this point in the history
* Always return 404 for Failures in Zarr Streaming

* changelog

* use NOT_FUOND constant
  • Loading branch information
fm3 authored Sep 29, 2022
1 parent 68f68ad commit 7698a02
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 238 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Creating tasks in bulk now also supports referencing task types by their summary instead of id. [#6486](https://github.com/scalableminds/webknossos/pull/6486)

### Fixed
- Fixed a bug where some file requests replied with error 400 instead of 404, confusing some zarr clients. [#6515](https://github.com/scalableminds/webknossos/pull/6515)

### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ import scala.concurrent.{ExecutionContext, Future}

trait BoxToResultHelpers extends I18nSupport with Formatter with RemoteOriginHelpers {

protected def defaultErrorCode: Int = BAD_REQUEST

def asResult[T <: Result](b: Box[T])(implicit messages: MessagesProvider): Result = {
val result = b match {
case Full(result) =>
result
case ParamFailure(msg, _, chain, statusCode: Int) =>
new JsonResult(statusCode)(Messages(msg), formatChainOpt(chain))
case ParamFailure(_, _, _, msgs: JsArray) =>
new JsonResult(BAD_REQUEST)(jsonMessages(msgs))
new JsonResult(defaultErrorCode)(jsonMessages(msgs))
case Failure(msg, _, chain) =>
new JsonResult(BAD_REQUEST)(Messages(msg), formatChainOpt(chain))
new JsonResult(defaultErrorCode)(Messages(msg), formatChainOpt(chain))
case Empty =>
new JsonResult(NOT_FOUND)("Couldn't find the requested resource.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class ZarrStreamingController @Inject()(
)(implicit ec: ExecutionContext)
extends Controller {

override def defaultErrorCode: Int = NOT_FOUND

val binaryDataService: BinaryDataService = binaryDataServiceHolder.binaryDataService

override def allowRemoteOrigin: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package com.scalableminds.webknossos.tracingstore.controllers

import java.io.File
import java.nio.{ByteBuffer, ByteOrder}
import akka.stream.scaladsl.Source
import com.google.inject.Inject
import com.scalableminds.util.geometry.{BoundingBox, Vec3Double, Vec3Int}
import com.scalableminds.util.tools.ExtendedTypes.ExtendedString
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.VolumeTracing.{VolumeTracing, VolumeTracingOpt, VolumeTracings}
import com.scalableminds.webknossos.datastore.dataformats.MagLocator
import com.scalableminds.webknossos.datastore.dataformats.zarr.{ZarrCoordinatesParser, ZarrSegmentationLayer}
import com.scalableminds.webknossos.datastore.datareaders.jzarr.{OmeNgffGroupHeader, OmeNgffHeader, ZarrHeader}
import com.scalableminds.webknossos.datastore.datareaders.{ArrayOrder, AxisOrder}
import com.scalableminds.webknossos.datastore.helpers.ProtoGeometryImplicits
import com.scalableminds.webknossos.datastore.models.datasource.{DataLayer, ElementClass}
import com.scalableminds.webknossos.datastore.models.{WebKnossosDataRequest, WebKnossosIsosurfaceRequest}
import com.scalableminds.webknossos.datastore.rpc.RPC
import com.scalableminds.webknossos.datastore.services.UserAccessRequest
Expand Down Expand Up @@ -43,16 +36,18 @@ import play.api.libs.iteratee.streams.IterateeStreams
import play.api.libs.json.Json
import play.api.mvc.{Action, AnyContent, MultipartFormData, PlayBodyParsers}

import scala.concurrent.{ExecutionContext, Future}
import java.io.File
import java.nio.{ByteBuffer, ByteOrder}
import scala.concurrent.ExecutionContext

class VolumeTracingController @Inject()(
val tracingService: VolumeTracingService,
val config: TracingStoreConfig,
val remoteWebKnossosClient: TSRemoteWebKnossosClient,
val remoteDataStoreClient: TSRemoteDatastoreClient,
val accessTokenService: TracingStoreAccessTokenService,
editableMappingService: EditableMappingService,
val slackNotificationService: TSSlackNotificationService,
val remoteWebKnossosClient: TSRemoteWebKnossosClient,
val rpc: RPC)(implicit val ec: ExecutionContext, val bodyParsers: PlayBodyParsers)
extends TracingController[VolumeTracing, VolumeTracings]
with ProtoGeometryImplicits
Expand Down Expand Up @@ -253,222 +248,6 @@ class VolumeTracingController @Inject()(
}
}

def volumeTracingFolderContent(token: Option[String], tracingId: String): Action[AnyContent] =
Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> NOT_FOUND
existingMags = tracing.resolutions.map(vec3IntFromProto)
} yield
Ok(
views.html.datastoreZarrDatasourceDir(
"Tracingstore",
"%s".format(tracingId),
List(".zattrs", ".zgroup") ++ existingMags.map(_.toMagLiteral(allowScalar = true))
)).withHeaders()
}
}

def volumeTracingFolderContentJson(token: Option[String], tracingId: String): Action[AnyContent] =
Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> 404
existingMags = tracing.resolutions.map(vec3IntFromProto(_).toMagLiteral(allowScalar = true))
} yield Ok(Json.toJson(List(".zattrs", ".zgroup") ++ existingMags))
}
}

def volumeTracingMagFolderContent(token: Option[String], tracingId: String, mag: String): Action[AnyContent] =
Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> NOT_FOUND

existingMags = tracing.resolutions.map(vec3IntFromProto)
magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true) ?~> Messages("dataLayer.invalidMag", mag) ~> NOT_FOUND
_ <- bool2Fox(existingMags.contains(magParsed)) ?~> Messages("tracing.wrongMag", tracingId, mag) ~> NOT_FOUND
} yield
Ok(
views.html.datastoreZarrDatasourceDir(
"Tracingstore",
"%s".format(tracingId),
List(".zarray")
)).withHeaders()
}
}

def volumeTracingMagFolderContentJson(token: Option[String], tracingId: String, mag: String): Action[AnyContent] =
Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> 404

existingMags = tracing.resolutions.map(vec3IntFromProto)
magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true) ?~> Messages("dataLayer.invalidMag", mag) ~> 404
_ <- bool2Fox(existingMags.contains(magParsed)) ?~> Messages("tracing.wrongMag", tracingId, mag) ~> 404
} yield Ok(Json.toJson(List(".zarray")))
}
}

def zArray(token: Option[String], tracingId: String, mag: String): Action[AnyContent] = Action.async {
implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> NOT_FOUND

existingMags = tracing.resolutions.map(vec3IntFromProto)
magParsed <- Vec3Int
.fromMagLiteral(mag, allowScalar = true) ?~> Messages("dataLayer.invalidMag", mag) ~> NOT_FOUND
_ <- bool2Fox(existingMags.contains(magParsed)) ?~> Messages("tracing.wrongMag", tracingId, mag) ~> NOT_FOUND

cubeLength = DataLayer.bucketLength
(channels, dtype) = ElementClass.toChannelAndZarrString(tracing.elementClass)
// data request method always decompresses before sending
compressor = None

shape = Array(
channels,
// Zarr can't handle data sets that don't start at 0, so we extend shape to include "true" coords
(tracing.boundingBox.width + tracing.boundingBox.topLeft.x) / magParsed.x,
(tracing.boundingBox.height + tracing.boundingBox.topLeft.y) / magParsed.y,
(tracing.boundingBox.depth + tracing.boundingBox.topLeft.z) / magParsed.z
)

chunks = Array(channels, cubeLength, cubeLength, cubeLength)

zarrHeader = ZarrHeader(zarr_format = 2,
shape = shape,
chunks = chunks,
compressor = compressor,
dtype = dtype,
order = ArrayOrder.F)
} yield Ok(Json.toJson(zarrHeader))
}
}

def zGroup(token: Option[String], tracingId: String): Action[AnyContent] = Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
Future(Ok(Json.toJson(OmeNgffGroupHeader(zarr_format = 2))))
}
}

/**
* Handles a request for .zattrs file for a Volume Tracing via a HTTP GET.
* Uses the OME-NGFF standard (see https://ngff.openmicroscopy.org/latest/)
* Used by zarr-streaming.
*/
def zAttrs(
token: Option[String],
tracingId: String,
): Action[AnyContent] = Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> NOT_FOUND

existingMags = tracing.resolutions.map(vec3IntFromProto)
dataSource <- remoteWebKnossosClient.getDataSource(tracing.organizationName, tracing.dataSetName) ~> NOT_FOUND

omeNgffHeader = OmeNgffHeader.fromNameScaleAndMags(tracingId,
dataSourceScale = dataSource.scale,
mags = existingMags.toList)
} yield Ok(Json.toJson(omeNgffHeader))
}
}

def zarrSource(token: Option[String], tracingId: String, tracingName: Option[String]): Action[AnyContent] =
Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> 404

zarrLayer = ZarrSegmentationLayer(
name = tracingName.getOrElse(tracingId),
largestSegmentId = tracing.largestSegmentId,
boundingBox = tracing.boundingBox,
elementClass = tracing.elementClass,
mags = tracing.resolutions.toList.map(x => MagLocator(x, None, None, Some(AxisOrder.cxyz))),
mappings = None,
numChannels = Some(if (tracing.elementClass.isuint24) 3 else 1)
)
} yield Ok(Json.toJson(zarrLayer))
}
}

def rawZarrCube(token: Option[String], tracingId: String, mag: String, cxyz: String): Action[AnyContent] =
Action.async { implicit request =>
{
accessTokenService.validateAccess(UserAccessRequest.readTracing(tracingId), urlOrHeaderToken(token, request)) {
for {
tracing <- tracingService.find(tracingId) ?~> Messages("tracing.notFound") ~> NOT_FOUND

existingMags = tracing.resolutions.map(vec3IntFromProto)
magParsed <- Vec3Int.fromMagLiteral(mag, allowScalar = true) ?~> Messages("dataLayer.invalidMag", mag) ~> NOT_FOUND
_ <- bool2Fox(existingMags.contains(magParsed)) ?~> Messages("tracing.wrongMag", tracingId, mag) ~> NOT_FOUND

(c, x, y, z) <- ZarrCoordinatesParser.parseDotCoordinates(cxyz) ?~> Messages("zarr.invalidChunkCoordinates") ~> NOT_FOUND
_ <- bool2Fox(c == 0) ~> Messages("zarr.invalidFirstChunkCoord") ~> NOT_FOUND
cubeSize = DataLayer.bucketLength
wkRequest = WebKnossosDataRequest(
position = Vec3Int(x, y, z) * cubeSize * magParsed,
mag = magParsed,
cubeSize = cubeSize,
fourBit = Some(false),
applyAgglomerate = None,
version = None
)
(data, missingBucketIndices) <- if (tracing.getMappingIsEditable)
editableMappingService.volumeData(tracing, List(wkRequest), urlOrHeaderToken(token, request))
else tracingService.data(tracingId, tracing, List(wkRequest))
dataWithFallback <- getFallbackLayerDataIfEmpty(tracing,
data,
missingBucketIndices,
magParsed,
Vec3Int(x, y, z),
cubeSize,
urlOrHeaderToken(token, request)) ~> NOT_FOUND
} yield Ok(dataWithFallback)
}
}
}

private def getFallbackLayerDataIfEmpty(tracing: VolumeTracing,
data: Array[Byte],
missingBucketIndices: List[Int],
mag: Vec3Int,
position: Vec3Int,
cubeSize: Int,
urlToken: Option[String]): Fox[Array[Byte]] = {
def fallbackLayerData(fallbackLayerName: String): Fox[Array[Byte]] = {
val request = WebKnossosDataRequest(
position = position * mag * cubeSize,
mag = mag,
cubeSize = cubeSize,
fourBit = Some(false),
applyAgglomerate = tracing.mappingName,
version = None
)
for {
organizationName <- tracing.organizationName ?~> "Zarr streaming not supported for legacy volume annotations (organizationName is not set)"
remoteFallbackLayer = RemoteFallbackLayer(organizationName,
tracing.dataSetName,
fallbackLayerName,
tracing.elementClass)
(fallbackData, fallbackMissingBucketIndices) <- remoteDataStoreClient.getData(remoteFallbackLayer,
List(request),
urlToken)
_ <- bool2Fox(fallbackMissingBucketIndices.isEmpty) ?~> "No data at coordinations in fallback layer"
} yield fallbackData
}

if (missingBucketIndices.nonEmpty) {
for {
fallbackLayer <- tracing.fallbackLayer.toFox ?~> "No data at coordinates, no fallback layer defined"
data <- fallbackLayerData(fallbackLayer) ?~> "No data at coordinates, no fallback layer data at coordinates."
} yield data
} else Fox.successful(data)
}

private def getNeighborIndices(neighbors: List[Int]) =
List("NEIGHBORS" -> formatNeighborList(neighbors), "Access-Control-Expose-Headers" -> "NEIGHBORS")

Expand Down
Loading

0 comments on commit 7698a02

Please sign in to comment.