Skip to content

Commit

Permalink
Log data loading errors in conversion / mapping application (#8202)
Browse files Browse the repository at this point in the history
  • Loading branch information
frcroth authored Nov 27, 2024
1 parent b53752a commit 5d3d66d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Improved error messages for starting jobs on datasets from other organizations. [#8181](https://github.com/scalableminds/webknossos/pull/8181)
- Terms of Service for Webknossos are now accepted at registration, not afterward. [#8193](https://github.com/scalableminds/webknossos/pull/8193)
- Removed bounding box size restriction for inferral jobs for super users. [#8200](https://github.com/scalableminds/webknossos/pull/8200)
- Improved logging for errors when loading datasets and problems arise during a conversion step. [#8202](https://github.com/scalableminds/webknossos/pull/8202)

### Fixed
- Fix performance bottleneck when deleting a lot of trees at once. [#8176](https://github.com/scalableminds/webknossos/pull/8176)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{Category, DataL
import com.scalableminds.webknossos.datastore.models.requests.{DataReadInstruction, DataServiceDataRequest}
import com.scalableminds.webknossos.datastore.storage._
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Box, Failure, Full}
import net.liftweb.common.{Box, Full}
import ucar.ma2.{Array => MultiArray}
import net.liftweb.common.Box.tryo

Expand All @@ -20,7 +20,6 @@ import scala.concurrent.ExecutionContext
class BinaryDataService(val dataBaseDir: Path,
val agglomerateServiceOpt: Option[AgglomerateService],
remoteSourceDescriptorServiceOpt: Option[RemoteSourceDescriptorService],
val applicationHealthService: Option[ApplicationHealthService],
sharedChunkContentsCache: Option[AlfuCache[String, MultiArray]],
datasetErrorLoggingService: Option[DatasetErrorLoggingService])(implicit ec: ExecutionContext)
extends FoxImplicits
Expand Down Expand Up @@ -55,22 +54,29 @@ class BinaryDataService(val dataBaseDir: Path,
def handleDataRequests(requests: List[DataServiceDataRequest]): Fox[(Array[Byte], List[Int])] = {
def convertIfNecessary(isNecessary: Boolean,
inputArray: Array[Byte],
conversionFunc: Array[Byte] => Box[Array[Byte]]): Box[Array[Byte]] =
if (isNecessary) conversionFunc(inputArray) else Full(inputArray)
conversionFunc: Array[Byte] => Fox[Array[Byte]],
request: DataServiceDataRequest): Fox[Array[Byte]] =
if (isNecessary) datasetErrorLoggingService match {
case Some(value) =>
value.withErrorLogging(request.dataSource.id, "converting bucket data", conversionFunc(inputArray))
case None => conversionFunc(inputArray)
} else Full(inputArray)

val requestsCount = requests.length
val requestData = requests.zipWithIndex.map {
case (request, index) =>
for {
data <- handleDataRequest(request)
mappedData <- agglomerateServiceOpt.map { agglomerateService =>
mappedDataFox <- agglomerateServiceOpt.map { agglomerateService =>
convertIfNecessary(
request.settings.appliedAgglomerate.isDefined && request.dataLayer.category == Category.segmentation && request.cuboid.mag.maxDim <= MaxMagForAgglomerateMapping,
data,
agglomerateService.applyAgglomerate(request)
agglomerateService.applyAgglomerate(request),
request
)
}.getOrElse(Full(data)) ?~> "Failed to apply agglomerate mapping"
resultData <- convertIfNecessary(request.settings.halfByte, mappedData, convertToHalfByte)
}.fillEmpty(Fox.successful(data)) ?~> "Failed to apply agglomerate mapping"
mappedData <- mappedDataFox
resultData <- convertIfNecessary(request.settings.halfByte, mappedData, convertToHalfByte, request)
} yield (resultData, index)
}

Expand All @@ -91,28 +97,14 @@ class BinaryDataService(val dataBaseDir: Path,
val bucketProvider =
bucketProviderCache.getOrLoadAndPut((dataSourceId, request.dataLayer.bucketProviderCacheKey))(_ =>
request.dataLayer.bucketProvider(remoteSourceDescriptorServiceOpt, dataSourceId, sharedChunkContentsCache))
bucketProvider.load(readInstruction).futureBox.flatMap {
case Failure(msg, Full(e: InternalError), _) =>
applicationHealthService.foreach(a => a.pushError(e))
logger.error(
s"Caught internal error: $msg while loading a bucket for layer ${request.dataLayer.name} of dataset ${request.dataSource.id}")
Fox.failure(e.getMessage)
case f: Failure =>
if (datasetErrorLoggingService.exists(_.shouldLog(request.dataSource.id.team, request.dataSource.id.name))) {
logger.error(
s"Bucket loading for layer ${request.dataLayer.name} of dataset ${request.dataSource.id.team}/${request.dataSource.id.name} at ${readInstruction.bucket} failed: ${Fox
.failureChainAsString(f, includeStackTraces = true)}")
datasetErrorLoggingService.foreach(_.registerLogged(request.dataSource.id.team, request.dataSource.id.name))
}
f.toFox
case Full(data) =>
if (data.length == 0) {
val msg =
s"Bucket provider returned Full, but data is zero-length array. Layer ${request.dataLayer.name} of dataset ${request.dataSource.id}, ${request.cuboid}"
logger.warn(msg)
Fox.failure(msg)
} else Fox.successful(data)
case other => other.toFox
datasetErrorLoggingService match {
case Some(d) =>
d.withErrorLogging(
request.dataSource.id,
s"loading bucket for layer ${request.dataLayer.name} at ${readInstruction.bucket}, cuboid: ${request.cuboid}",
bucketProvider.load(readInstruction)
)
case None => bucketProvider.load(readInstruction)
}
} else Fox.empty

Expand Down Expand Up @@ -197,5 +189,4 @@ class BinaryDataService(val dataBaseDir: Path,

(closedAgglomerateFileHandleCount, clearedBucketProviderCount, removedChunksCount)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.concurrent.ExecutionContext
class BinaryDataServiceHolder @Inject()(
config: DataStoreConfig,
agglomerateService: AgglomerateService,
applicationHealthService: ApplicationHealthService,
remoteSourceDescriptorService: RemoteSourceDescriptorService,
datasetErrorLoggingService: DatasetErrorLoggingService)(implicit ec: ExecutionContext)
extends LazyLogging {
Expand All @@ -46,7 +45,6 @@ class BinaryDataServiceHolder @Inject()(
Paths.get(config.Datastore.baseFolder),
Some(agglomerateService),
Some(remoteSourceDescriptorService),
Some(applicationHealthService),
Some(sharedChunkContentsCache),
Some(datasetErrorLoggingService)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package com.scalableminds.webknossos.datastore.services

import org.apache.pekko.actor.ActorSystem
import com.google.inject.name.Named
import com.scalableminds.util.tools.{Fox, TextUtils}
import com.scalableminds.util.tools.Fox.box2Fox
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import play.api.inject.ApplicationLifecycle

import javax.inject.Inject
Expand All @@ -12,6 +16,7 @@ import scala.concurrent.duration._

class DatasetErrorLoggingService @Inject()(
val lifecycle: ApplicationLifecycle,
val applicationHealthService: ApplicationHealthService,
@Named("webknossos-datastore") val system: ActorSystem)(implicit val ec: ExecutionContext)
extends IntervalScheduler
with LazyLogging {
Expand Down Expand Up @@ -41,4 +46,36 @@ class DatasetErrorLoggingService @Inject()(
recentErrors.remove((organizationId, datasetName))

override protected def tick(): Unit = recentErrors.clear()

def withErrorLogging(dataSourceId: DataSourceId, label: String, resultFox: Fox[Array[Byte]]): Fox[Array[Byte]] =
resultFox.futureBox.flatMap {
case Full(data) =>
if (data.length == 0) {
val msg = s"Zero-length array returned while $label for $dataSourceId"
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.warn(msg)
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg)
} else {
Fox.successful(data)
}
case Failure(msg, Full(e: InternalError), _) =>
logger.error(s"Caught internal error while $label for $dataSourceId:", e)
applicationHealthService.pushError(e)
Fox.failure(msg, Full(e))
case Failure(msg, Full(exception), _) =>
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.error(s"Error while $label for $dataSourceId Stack trace: ${TextUtils.stackTraceAsString(exception)} ")
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg, Full(exception))
case Failure(msg, Empty, _) =>
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.error(s"Error while $label for $dataSourceId, Empty failure")
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg)
case other => other.toFox
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class EditableMappingService @Inject()(

private def generateId: String = UUID.randomUUID.toString

val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None, None)
val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None)
adHocMeshServiceHolder.tracingStoreAdHocMeshConfig = (binaryDataService, 30 seconds, 1)
private val adHocMeshService: AdHocMeshService = adHocMeshServiceHolder.tracingStoreAdHocMeshService

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class VolumeTracingService @Inject()(

/* We want to reuse the bucket loading methods from binaryDataService for the volume tracings, however, it does not
actually load anything from disk, unlike its “normal” instance in the datastore (only from the volume tracing store) */
private val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None, None)
private val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None)

adHocMeshServiceHolder.tracingStoreAdHocMeshConfig = (binaryDataService, 30 seconds, 1)
val adHocMeshService: AdHocMeshService = adHocMeshServiceHolder.tracingStoreAdHocMeshService
Expand Down

0 comments on commit 5d3d66d

Please sign in to comment.