Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip typing data chunk arrays if no rechunking is needed #7370

Merged
merged 6 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
### Added

- Added social media link previews for links to datasets and annotations (only if they are public or if the links contain sharing tokens). [#7331](https://github.com/scalableminds/webknossos/pull/7331)
- Loading sharded zarr3 datasets is now significantly faster. [#7363](https://github.com/scalableminds/webknossos/pull/7363)
- Loading sharded zarr3 datasets is now significantly faster. [#7363](https://github.com/scalableminds/webknossos/pull/7363) and [#7370](https://github.com/scalableminds/webknossos/pull/7370)

### Changed
- Updated backend code to Scala 2.13, with upgraded Dependencies for optimized performance. [#7327](https://github.com/scalableminds/webknossos/pull/7327)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@ import scala.concurrent.ExecutionContext
class ChunkReader(header: DatasetHeader) {

private lazy val chunkTyper = ChunkTyper.createFromHeader(header)
private lazy val shortcutChunkTyper = new ShortcutChunkTyper(header)

def read(path: VaultPath, chunkShapeFromMetadata: Array[Int], range: Option[NumericRange[Long]])(
implicit ec: ExecutionContext): Fox[MultiArray] =
def read(path: VaultPath,
chunkShapeFromMetadata: Array[Int],
range: Option[NumericRange[Long]],
useSkipTypingShortcut: Boolean)(implicit ec: ExecutionContext): Fox[MultiArray] =
for {
chunkBytesAndShapeBox: Box[(Array[Byte], Option[Array[Int]])] <- readChunkBytesAndShape(path, range).futureBox
chunkShape: Array[Int] = chunkBytesAndShapeBox.toOption.flatMap(_._2).getOrElse(chunkShapeFromMetadata)
typed <- chunkBytesAndShapeBox.map(_._1) match {
case Full(chunkBytes) if useSkipTypingShortcut =>
shortcutChunkTyper.wrapAndType(chunkBytes, chunkShape).toFox ?~> "chunk.shortcutWrapAndType.failed"
case Empty if useSkipTypingShortcut =>
shortcutChunkTyper.createFromFillValue(chunkShape).toFox ?~> "chunk.shortcutCreateFromFillValue.failed"
case Full(chunkBytes) =>
chunkTyper.wrapAndType(chunkBytes, chunkShape).toFox ?~> "chunk.wrapAndType.failed"
case Empty =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.scalableminds.webknossos.datastore.datareaders

import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo

Expand Down Expand Up @@ -57,7 +56,7 @@ class DoubleChunkTyper(val header: DatasetHeader) extends ChunkTyper {
}.get)
}

class ShortChunkTyper(val header: DatasetHeader) extends ChunkTyper with LazyLogging {
class ShortChunkTyper(val header: DatasetHeader) extends ChunkTyper {

val ma2DataType: MADataType = MADataType.SHORT

Expand Down Expand Up @@ -117,3 +116,19 @@ class FloatChunkTyper(val header: DatasetHeader) extends ChunkTyper {
MultiArray.factory(ma2DataType, chunkSizeOrdered(chunkShape), typedStorage)
}.get)
}

// In no-partial-copy shortcut, the MultiArray shape is never used, so it is just set to flat.
// type is always BYTE
class ShortcutChunkTyper(val header: DatasetHeader) extends ChunkTyper {
val ma2DataType: MADataType = MADataType.BYTE

def wrapAndType(bytes: Array[Byte], chunkShape: Array[Int]): Box[MultiArray] = tryo {
val flatShape = Array(bytes.length)
MultiArray.factory(ma2DataType, flatShape, bytes)
}

override def createFromFillValue(chunkShape: Array[Int]): Box[MultiArray] = {
val flatShape = Array(chunkShape.product * header.bytesPerElement)
MultiArrayUtils.createFilledArray(ma2DataType, flatShape, header.fillValueNumber)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,14 @@ class DatasetArray(vaultPath: VaultPath,
// returns byte array in fortran-order with little-endian values
private def readBytes(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext): Fox[Array[Byte]] =
for {
typedData <- readAsFortranOrder(shape, offset)
asBytes <- BytesConverter.toByteArray(typedData, header.resolvedDataType, ByteOrder.LITTLE_ENDIAN)
typedMultiArray <- readAsFortranOrder(shape, offset)
asBytes <- BytesConverter.toByteArray(typedMultiArray, header.resolvedDataType, ByteOrder.LITTLE_ENDIAN)
} yield asBytes

// Read from array. Note that shape and offset should be passed in XYZ order, left-padded with 0 and 1 respectively.
// This function will internally adapt to the array's axis order so that XYZ data in fortran-order is returned.
private def readAsFortranOrder(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext): Fox[Object] = {
private def readAsFortranOrder(shape: Array[Int], offset: Array[Int])(
implicit ec: ExecutionContext): Fox[MultiArray] = {
val totalOffset: Array[Int] = offset.zip(header.voxelOffset).map { case (o, v) => o - v }
val chunkIndices = ChunkUtils.computeChunkIndices(axisOrder.permuteIndicesReverse(header.datasetShape),
axisOrder.permuteIndicesReverse(header.chunkSize),
Expand All @@ -98,12 +99,13 @@ class DatasetArray(vaultPath: VaultPath,
if (partialCopyingIsNotNeeded(shape, totalOffset, chunkIndices)) {
for {
chunkIndex <- chunkIndices.headOption.toFox
sourceChunk: MultiArray <- getSourceChunkDataWithCache(axisOrder.permuteIndices(chunkIndex))
} yield sourceChunk.getStorage
sourceChunk: MultiArray <- getSourceChunkDataWithCache(axisOrder.permuteIndices(chunkIndex),
useSkipTypingShortcut = true)
} yield sourceChunk
} else {
val targetBuffer = MultiArrayUtils.createDataBuffer(header.resolvedDataType, shape)
val targetInCOrder: MultiArray =
MultiArrayUtils.orderFlippedView(MultiArrayUtils.createArrayWithGivenStorage(targetBuffer, shape.reverse))
val targetMultiArray = MultiArrayUtils.createArrayWithGivenStorage(targetBuffer, shape.reverse)
val targetInCOrder: MultiArray = MultiArrayUtils.orderFlippedView(targetMultiArray)
val copiedFuture = Fox.combined(chunkIndices.map { chunkIndex: Array[Int] =>
for {
sourceChunk: MultiArray <- getSourceChunkDataWithCache(axisOrder.permuteIndices(chunkIndex))
Expand All @@ -119,7 +121,7 @@ class DatasetArray(vaultPath: VaultPath,
})
for {
_ <- copiedFuture
} yield targetBuffer
} yield targetMultiArray
}
}

Expand All @@ -133,20 +135,23 @@ class DatasetArray(vaultPath: VaultPath,
private def chunkContentsCacheKey(chunkIndex: Array[Int]): String =
s"${dataSourceId}__${layerName}__${vaultPath}__chunk_${chunkIndex.mkString(",")}"

private def getSourceChunkDataWithCache(chunkIndex: Array[Int])(implicit ec: ExecutionContext): Fox[MultiArray] =
sharedChunkContentsCache.getOrLoad(chunkContentsCacheKey(chunkIndex), _ => readSourceChunkData(chunkIndex))
private def getSourceChunkDataWithCache(chunkIndex: Array[Int], useSkipTypingShortcut: Boolean = false)(
implicit ec: ExecutionContext): Fox[MultiArray] =
sharedChunkContentsCache.getOrLoad(chunkContentsCacheKey(chunkIndex),
_ => readSourceChunkData(chunkIndex, useSkipTypingShortcut))

private def readSourceChunkData(chunkIndex: Array[Int])(implicit ec: ExecutionContext): Fox[MultiArray] =
private def readSourceChunkData(chunkIndex: Array[Int], useSkipTypingShortcut: Boolean)(
implicit ec: ExecutionContext): Fox[MultiArray] =
if (header.isSharded) {
for {
(shardPath, chunkRange) <- getShardedChunkPathAndRange(chunkIndex) ?~> "chunk.getShardedPathAndRange.failed"
chunkShape = header.chunkSizeAtIndex(chunkIndex)
multiArray <- chunkReader.read(shardPath, chunkShape, Some(chunkRange))
multiArray <- chunkReader.read(shardPath, chunkShape, Some(chunkRange), useSkipTypingShortcut)
} yield multiArray
} else {
val chunkPath = vaultPath / getChunkFilename(chunkIndex)
val chunkShape = header.chunkSizeAtIndex(chunkIndex)
chunkReader.read(chunkPath, chunkShape, None)
chunkReader.read(chunkPath, chunkShape, None, useSkipTypingShortcut)
}

protected def getChunkFilename(chunkIndex: Array[Int]): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,46 @@ import net.liftweb.common.Box
import net.liftweb.util.Helpers.tryo

import java.nio.{ByteBuffer, ByteOrder}
import ucar.ma2.{Array => MultiArray}

object BytesConverter {
def toByteArray(array: Object, dataType: ArrayDataType, byteOrder: ByteOrder): Box[Array[Byte]] = tryo {
def toByteArray(multiArray: MultiArray, dataType: ArrayDataType, byteOrder: ByteOrder): Box[Array[Byte]] = tryo {
val array = multiArray.getStorage
val bytesPerElement = bytesPerElementFor(dataType)
dataType match {
case ArrayDataType.u1 | ArrayDataType.i1 =>
array.asInstanceOf[Array[Byte]]
case ArrayDataType.u2 | ArrayDataType.i2 =>
val arrayTyped = array.asInstanceOf[Array[Short]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asShortBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.u4 | ArrayDataType.i4 =>
val arrayTyped = array.asInstanceOf[Array[Int]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asIntBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.i8 | ArrayDataType.u8 =>
val arrayTyped = array.asInstanceOf[Array[Long]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asLongBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.f4 =>
val arrayTyped = array.asInstanceOf[Array[Float]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asFloatBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.f8 =>
val arrayTyped = array.asInstanceOf[Array[Double]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asDoubleBuffer().put(arrayTyped)
byteBuffer.array()
// If the multiArray dtype size is 1, use the array directly.
// This may be happen due to the skipTyping shortcut even for non-uint8-datasets
if (multiArray.getDataType.getSize == 1) {
array.asInstanceOf[Array[Byte]]
} else {
dataType match {
case ArrayDataType.u1 | ArrayDataType.i1 =>
array.asInstanceOf[Array[Byte]]
case ArrayDataType.u2 | ArrayDataType.i2 =>
val arrayTyped = array.asInstanceOf[Array[Short]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asShortBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.u4 | ArrayDataType.i4 =>
val arrayTyped = array.asInstanceOf[Array[Int]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asIntBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.i8 | ArrayDataType.u8 =>
val arrayTyped = array.asInstanceOf[Array[Long]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asLongBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.f4 =>
val arrayTyped = array.asInstanceOf[Array[Float]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asFloatBuffer().put(arrayTyped)
byteBuffer.array()
case ArrayDataType.f8 =>
val arrayTyped = array.asInstanceOf[Array[Double]]
val byteBuffer = makeByteBuffer(arrayTyped.length * bytesPerElement, byteOrder)
byteBuffer.asDoubleBuffer().put(arrayTyped)
byteBuffer.array()
}
}
}

Expand Down