Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log data loading errors in conversion / mapping application #8202

Merged
merged 10 commits into from
Nov 27, 2024
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Reading image files on datastore filesystem is now done asynchronously. [#8126](https://github.com/scalableminds/webknossos/pull/8126)
- Improved error messages for starting jobs on datasets from other organizations. [#8181](https://github.com/scalableminds/webknossos/pull/8181)
- Removed bounding box size restriction for inferral jobs for super users. [#8200](https://github.com/scalableminds/webknossos/pull/8200)
- Improved logging for errors when loading datasets and problems arise during a conversion step. [#8202](https://github.com/scalableminds/webknossos/pull/8202)

### Fixed
- Fix performance bottleneck when deleting a lot of trees at once. [#8176](https://github.com/scalableminds/webknossos/pull/8176)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{Category, DataL
import com.scalableminds.webknossos.datastore.models.requests.{DataReadInstruction, DataServiceDataRequest}
import com.scalableminds.webknossos.datastore.storage._
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Box, Failure, Full}
import net.liftweb.common.{Box, Full}
import ucar.ma2.{Array => MultiArray}
import net.liftweb.common.Box.tryo

Expand Down Expand Up @@ -55,22 +55,28 @@ class BinaryDataService(val dataBaseDir: Path,
def handleDataRequests(requests: List[DataServiceDataRequest]): Fox[(Array[Byte], List[Int])] = {
def convertIfNecessary(isNecessary: Boolean,
inputArray: Array[Byte],
conversionFunc: Array[Byte] => Box[Array[Byte]]): Box[Array[Byte]] =
if (isNecessary) conversionFunc(inputArray) else Full(inputArray)
conversionFunc: Array[Byte] => Fox[Array[Byte]],
request: DataServiceDataRequest): Fox[Array[Byte]] =
if (isNecessary) datasetErrorLoggingService match {
case Some(value) => value.withErrorLogging(request.dataSource.id, "Conversion", conversionFunc(inputArray))
frcroth marked this conversation as resolved.
Show resolved Hide resolved
case None => conversionFunc(inputArray)
} else Full(inputArray)

val requestsCount = requests.length
val requestData = requests.zipWithIndex.map {
case (request, index) =>
for {
data <- handleDataRequest(request)
mappedData <- agglomerateServiceOpt.map { agglomerateService =>
mappedDataFox <- agglomerateServiceOpt.map { agglomerateService =>
convertIfNecessary(
request.settings.appliedAgglomerate.isDefined && request.dataLayer.category == Category.segmentation && request.cuboid.mag.maxDim <= MaxMagForAgglomerateMapping,
data,
agglomerateService.applyAgglomerate(request)
agglomerateService.applyAgglomerate(request),
request
)
}.getOrElse(Full(data)) ?~> "Failed to apply agglomerate mapping"
resultData <- convertIfNecessary(request.settings.halfByte, mappedData, convertToHalfByte)
}.fillEmpty(Fox.successful(data)) ?~> "Failed to apply agglomerate mapping"
mappedData <- mappedDataFox
resultData <- convertIfNecessary(request.settings.halfByte, mappedData, convertToHalfByte, request)
} yield (resultData, index)
}

Expand All @@ -91,28 +97,15 @@ class BinaryDataService(val dataBaseDir: Path,
val bucketProvider =
bucketProviderCache.getOrLoadAndPut((dataSourceId, request.dataLayer.bucketProviderCacheKey))(_ =>
request.dataLayer.bucketProvider(remoteSourceDescriptorServiceOpt, dataSourceId, sharedChunkContentsCache))
bucketProvider.load(readInstruction).futureBox.flatMap {
case Failure(msg, Full(e: InternalError), _) =>
applicationHealthService.foreach(a => a.pushError(e))
logger.error(
s"Caught internal error: $msg while loading a bucket for layer ${request.dataLayer.name} of dataset ${request.dataSource.id}")
Fox.failure(e.getMessage)
case f: Failure =>
if (datasetErrorLoggingService.exists(_.shouldLog(request.dataSource.id.team, request.dataSource.id.name))) {
logger.error(
s"Bucket loading for layer ${request.dataLayer.name} of dataset ${request.dataSource.id.team}/${request.dataSource.id.name} at ${readInstruction.bucket} failed: ${Fox
.failureChainAsString(f, includeStackTraces = true)}")
datasetErrorLoggingService.foreach(_.registerLogged(request.dataSource.id.team, request.dataSource.id.name))
}
f.toFox
case Full(data) =>
if (data.length == 0) {
val msg =
s"Bucket provider returned Full, but data is zero-length array. Layer ${request.dataLayer.name} of dataset ${request.dataSource.id}, ${request.cuboid}"
logger.warn(msg)
Fox.failure(msg)
} else Fox.successful(data)
case other => other.toFox
datasetErrorLoggingService match {
case Some(d) =>
d.withErrorLogging(
request.dataSource.id,
s"Bucket loading for layer ${request.dataLayer.name} at ${readInstruction.bucket}, cuboid: ${request.cuboid}",
frcroth marked this conversation as resolved.
Show resolved Hide resolved
bucketProvider.load(readInstruction),
e => applicationHealthService.foreach(a => a.pushError(e))
)
case None => bucketProvider.load(readInstruction)
}
} else Fox.empty

Expand Down Expand Up @@ -197,5 +190,4 @@ class BinaryDataService(val dataBaseDir: Path,

(closedAgglomerateFileHandleCount, clearedBucketProviderCount, removedChunksCount)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package com.scalableminds.webknossos.datastore.services

import org.apache.pekko.actor.ActorSystem
import com.google.inject.name.Named
import com.scalableminds.util.tools.{Fox, TextUtils}
import com.scalableminds.util.tools.Fox.box2Fox
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import play.api.inject.ApplicationLifecycle

import javax.inject.Inject
Expand Down Expand Up @@ -41,4 +45,39 @@ class DatasetErrorLoggingService @Inject()(
recentErrors.remove((organizationId, datasetName))

override protected def tick(): Unit = recentErrors.clear()

def withErrorLogging(dataSourceId: DataSourceId,
label: String,
resultFox: Fox[Array[Byte]],
onInternalError: InternalError => Unit = _ => ()): Fox[Array[Byte]] =
resultFox.futureBox.flatMap {
case Full(data) =>
if (data.length == 0) {
val msg = s"Zero-length array returned while $label for $dataSourceId"
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.warn(msg)
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg)
} else {
Fox.successful(data)
}
case Failure(msg, Full(e: InternalError), _) =>
logger.error(s"Caught internal error: $label for $dataSourceId", e)
frcroth marked this conversation as resolved.
Show resolved Hide resolved
onInternalError(e)
Fox.failure(msg, Full(e))
case Failure(msg, Full(exception), _) =>
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.error(s"Error while $label for $dataSourceId Stack trace: ${TextUtils.stackTraceAsString(exception)} ")
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg, Full(exception))
frcroth marked this conversation as resolved.
Show resolved Hide resolved
case Failure(msg, Empty, _) =>
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.error(s"Error while $label for $dataSourceId, Empty failure")
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg)
case other => other.toFox
}
}