Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log data loading errors in conversion / mapping application #8202

Merged
merged 10 commits into from
Nov 27, 2024
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Improved error messages for starting jobs on datasets from other organizations. [#8181](https://github.com/scalableminds/webknossos/pull/8181)
- Terms of Service for Webknossos are now accepted at registration, not afterward. [#8193](https://github.com/scalableminds/webknossos/pull/8193)
- Removed bounding box size restriction for inferral jobs for super users. [#8200](https://github.com/scalableminds/webknossos/pull/8200)
- Improved logging for errors when loading datasets and problems arise during a conversion step. [#8202](https://github.com/scalableminds/webknossos/pull/8202)

### Fixed
- Fix performance bottleneck when deleting a lot of trees at once. [#8176](https://github.com/scalableminds/webknossos/pull/8176)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{Category, DataL
import com.scalableminds.webknossos.datastore.models.requests.{DataReadInstruction, DataServiceDataRequest}
import com.scalableminds.webknossos.datastore.storage._
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Box, Failure, Full}
import net.liftweb.common.{Box, Full}
import ucar.ma2.{Array => MultiArray}
import net.liftweb.common.Box.tryo

Expand All @@ -20,7 +20,6 @@ import scala.concurrent.ExecutionContext
class BinaryDataService(val dataBaseDir: Path,
val agglomerateServiceOpt: Option[AgglomerateService],
remoteSourceDescriptorServiceOpt: Option[RemoteSourceDescriptorService],
val applicationHealthService: Option[ApplicationHealthService],
sharedChunkContentsCache: Option[AlfuCache[String, MultiArray]],
datasetErrorLoggingService: Option[DatasetErrorLoggingService])(implicit ec: ExecutionContext)
extends FoxImplicits
Expand Down Expand Up @@ -55,22 +54,29 @@ class BinaryDataService(val dataBaseDir: Path,
def handleDataRequests(requests: List[DataServiceDataRequest]): Fox[(Array[Byte], List[Int])] = {
def convertIfNecessary(isNecessary: Boolean,
inputArray: Array[Byte],
conversionFunc: Array[Byte] => Box[Array[Byte]]): Box[Array[Byte]] =
if (isNecessary) conversionFunc(inputArray) else Full(inputArray)
conversionFunc: Array[Byte] => Fox[Array[Byte]],
request: DataServiceDataRequest): Fox[Array[Byte]] =
if (isNecessary) datasetErrorLoggingService match {
case Some(value) =>
value.withErrorLogging(request.dataSource.id, "converting bucket data", conversionFunc(inputArray))
case None => conversionFunc(inputArray)
} else Full(inputArray)

val requestsCount = requests.length
val requestData = requests.zipWithIndex.map {
case (request, index) =>
for {
data <- handleDataRequest(request)
mappedData <- agglomerateServiceOpt.map { agglomerateService =>
mappedDataFox <- agglomerateServiceOpt.map { agglomerateService =>
convertIfNecessary(
request.settings.appliedAgglomerate.isDefined && request.dataLayer.category == Category.segmentation && request.cuboid.mag.maxDim <= MaxMagForAgglomerateMapping,
data,
agglomerateService.applyAgglomerate(request)
agglomerateService.applyAgglomerate(request),
request
)
}.getOrElse(Full(data)) ?~> "Failed to apply agglomerate mapping"
resultData <- convertIfNecessary(request.settings.halfByte, mappedData, convertToHalfByte)
}.fillEmpty(Fox.successful(data)) ?~> "Failed to apply agglomerate mapping"
mappedData <- mappedDataFox
resultData <- convertIfNecessary(request.settings.halfByte, mappedData, convertToHalfByte, request)
} yield (resultData, index)
}

Expand All @@ -91,28 +97,14 @@ class BinaryDataService(val dataBaseDir: Path,
val bucketProvider =
bucketProviderCache.getOrLoadAndPut((dataSourceId, request.dataLayer.bucketProviderCacheKey))(_ =>
request.dataLayer.bucketProvider(remoteSourceDescriptorServiceOpt, dataSourceId, sharedChunkContentsCache))
bucketProvider.load(readInstruction).futureBox.flatMap {
case Failure(msg, Full(e: InternalError), _) =>
applicationHealthService.foreach(a => a.pushError(e))
logger.error(
s"Caught internal error: $msg while loading a bucket for layer ${request.dataLayer.name} of dataset ${request.dataSource.id}")
Fox.failure(e.getMessage)
case f: Failure =>
if (datasetErrorLoggingService.exists(_.shouldLog(request.dataSource.id.team, request.dataSource.id.name))) {
logger.error(
s"Bucket loading for layer ${request.dataLayer.name} of dataset ${request.dataSource.id.team}/${request.dataSource.id.name} at ${readInstruction.bucket} failed: ${Fox
.failureChainAsString(f, includeStackTraces = true)}")
datasetErrorLoggingService.foreach(_.registerLogged(request.dataSource.id.team, request.dataSource.id.name))
}
f.toFox
case Full(data) =>
if (data.length == 0) {
val msg =
s"Bucket provider returned Full, but data is zero-length array. Layer ${request.dataLayer.name} of dataset ${request.dataSource.id}, ${request.cuboid}"
logger.warn(msg)
Fox.failure(msg)
} else Fox.successful(data)
case other => other.toFox
datasetErrorLoggingService match {
case Some(d) =>
d.withErrorLogging(
request.dataSource.id,
s"loading bucket for layer ${request.dataLayer.name} at ${readInstruction.bucket}, cuboid: ${request.cuboid}",
bucketProvider.load(readInstruction)
)
case None => bucketProvider.load(readInstruction)
}
} else Fox.empty

Expand Down Expand Up @@ -197,5 +189,4 @@ class BinaryDataService(val dataBaseDir: Path,

(closedAgglomerateFileHandleCount, clearedBucketProviderCount, removedChunksCount)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.concurrent.ExecutionContext
class BinaryDataServiceHolder @Inject()(
config: DataStoreConfig,
agglomerateService: AgglomerateService,
applicationHealthService: ApplicationHealthService,
remoteSourceDescriptorService: RemoteSourceDescriptorService,
datasetErrorLoggingService: DatasetErrorLoggingService)(implicit ec: ExecutionContext)
extends LazyLogging {
Expand All @@ -46,7 +45,6 @@ class BinaryDataServiceHolder @Inject()(
Paths.get(config.Datastore.baseFolder),
Some(agglomerateService),
Some(remoteSourceDescriptorService),
Some(applicationHealthService),
Some(sharedChunkContentsCache),
Some(datasetErrorLoggingService)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package com.scalableminds.webknossos.datastore.services

import org.apache.pekko.actor.ActorSystem
import com.google.inject.name.Named
import com.scalableminds.util.tools.{Fox, TextUtils}
import com.scalableminds.util.tools.Fox.box2Fox
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Empty, Failure, Full}
import play.api.inject.ApplicationLifecycle

import javax.inject.Inject
Expand All @@ -12,6 +16,7 @@ import scala.concurrent.duration._

class DatasetErrorLoggingService @Inject()(
val lifecycle: ApplicationLifecycle,
val applicationHealthService: ApplicationHealthService,
@Named("webknossos-datastore") val system: ActorSystem)(implicit val ec: ExecutionContext)
extends IntervalScheduler
with LazyLogging {
Expand Down Expand Up @@ -41,4 +46,36 @@ class DatasetErrorLoggingService @Inject()(
recentErrors.remove((organizationId, datasetName))

override protected def tick(): Unit = recentErrors.clear()

def withErrorLogging(dataSourceId: DataSourceId, label: String, resultFox: Fox[Array[Byte]]): Fox[Array[Byte]] =
resultFox.futureBox.flatMap {
case Full(data) =>
if (data.length == 0) {
val msg = s"Zero-length array returned while $label for $dataSourceId"
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.warn(msg)
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg)
} else {
Fox.successful(data)
}
case Failure(msg, Full(e: InternalError), _) =>
logger.error(s"Caught internal error while $label for $dataSourceId:", e)
applicationHealthService.pushError(e)
Fox.failure(msg, Full(e))
case Failure(msg, Full(exception), _) =>
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.error(s"Error while $label for $dataSourceId Stack trace: ${TextUtils.stackTraceAsString(exception)} ")
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg, Full(exception))
frcroth marked this conversation as resolved.
Show resolved Hide resolved
case Failure(msg, Empty, _) =>
if (shouldLog(dataSourceId.team, dataSourceId.name)) {
logger.error(s"Error while $label for $dataSourceId, Empty failure")
registerLogged(dataSourceId.team, dataSourceId.name)
}
Fox.failure(msg)
case other => other.toFox
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class EditableMappingService @Inject()(

private def generateId: String = UUID.randomUUID.toString

val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None, None)
val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None)
adHocMeshServiceHolder.tracingStoreAdHocMeshConfig = (binaryDataService, 30 seconds, 1)
private val adHocMeshService: AdHocMeshService = adHocMeshServiceHolder.tracingStoreAdHocMeshService

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class VolumeTracingService @Inject()(

/* We want to reuse the bucket loading methods from binaryDataService for the volume tracings, however, it does not
actually load anything from disk, unlike its “normal” instance in the datastore (only from the volume tracing store) */
private val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None, None)
private val binaryDataService = new BinaryDataService(Paths.get(""), None, None, None, None)

adHocMeshServiceHolder.tracingStoreAdHocMeshConfig = (binaryDataService, 30 seconds, 1)
val adHocMeshService: AdHocMeshService = adHocMeshServiceHolder.tracingStoreAdHocMeshService
Expand Down