Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement viewing sharded neuroglancer precomputed datasets #6920

Merged
merged 11 commits into from
Mar 27, 2023
1 change: 0 additions & 1 deletion app/models/binary/explore/PrecomputedExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class PrecomputedExplorer extends RemoteLayerExplorer {
for {
name <- guessNameFromPath(remotePath)
firstScale <- precomputedHeader.scales.headOption.toFox
_ <- bool2Fox(firstScale.sharding.isEmpty) ?~> "Failed to read dataset: sharding not supported"
boundingBox <- BoundingBox.fromSizeArray(firstScale.size).toFox
elementClass: ElementClass.Value <- elementClassFromPrecomputedDataType(precomputedHeader.data_type) ?~> "Unknown data type"
smallestResolution = firstScale.resolution
Expand Down
37 changes: 37 additions & 0 deletions test/backend/CompressedMortonCodeTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package backend

import com.scalableminds.webknossos.datastore.datareaders.precomputed.CompressedMortonCode
import org.scalatestplus.play.PlaySpec

class CompressedMortonCodeTestSuite extends PlaySpec {

"Compressed Morton Code" when {
"Grid size = 10,10,10" should {
val grid_size = Array(10, 10, 10)
"encode 0,0,0" in {
assert(CompressedMortonCode.encode(Array(0, 0, 0), grid_size) == 0)
}
"encode 1,2,3" in {
assert(CompressedMortonCode.encode(Array(1, 2, 3), grid_size) == 53)
}
"encode 9,9,9" in {
assert(CompressedMortonCode.encode(Array(9, 9, 9), grid_size) == 3591)
}
"encode 10,10,10" in {
assert(CompressedMortonCode.encode(Array(10, 10, 10), grid_size) == 3640)
}
}
"Grid size = 2048,2048,1024" should {
val grid_size = Array(2048, 2048, 1024)
"encode 0,0,0" in {
assert(CompressedMortonCode.encode(Array(0, 0, 0), grid_size) == 0)
}
"encode 1,2,3" in {
assert(CompressedMortonCode.encode(Array(1, 2, 3), grid_size) == 53)
}
"encode 1024, 512, 684" in {
assert(CompressedMortonCode.encode(Array(1024, 512, 684), grid_size) == 1887570176)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ case class Vec3Int(x: Int, y: Int, z: Int) {
} yield Vec3Int(x, y, z)

def product: Int = x * y * z

def alignWithGridFloor(gridCellSize: Vec3Int): Vec3Int =
this / gridCellSize * gridCellSize
}

object Vec3Int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ class ChunkReader(val header: DatasetHeader, val vaultPath: VaultPath, val chunk
chunkTyper.wrapAndType(chunkBytesAndShape.map(_._1), chunkBytesAndShape.flatMap(_._2).getOrElse(chunkShape))
}

def parseChunk(bytes: Array[Byte], chunkShape: Array[Int]): Future[MultiArray] = {
val chunkBytesAndShape: Array[Byte] = Using.Manager { use =>
val is = use(new ByteArrayInputStream(bytes))
val os = use(new ByteArrayOutputStream())
header.compressorImpl.uncompress(is, os)
os.toByteArray
}.get
chunkTyper.wrapAndType(Some(chunkBytesAndShape), chunkShape)
}

// Returns bytes (optional, None may later be replaced with fill value)
// and chunk shape (optional, only for data formats where each chunk reports its own shape, e.g. N5)
protected def readChunkBytesAndShape(path: String): Option[(Array[Byte], Option[Array[Int]])] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object ChunkUtils extends LazyLogging {
}
if (numChunks < 0) {
logger.warn(
s"Failed to compute zarr chunk indices. array shape ${arrayShape.toList}, chunkShape: ${arrayChunkSize.toList}, requested ${selectedShape.toList} at ${selectedOffset.toList}")
s"Failed to compute chunk indices. array shape ${arrayShape.toList}, chunkShape: ${arrayChunkSize.toList}, requested ${selectedShape.toList} at ${selectedOffset.toList}")
}
val chunkIndices = new Array[Array[Int]](numChunks)
val currentIdx = start.clone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class DatasetArray(relativePath: DatasetPath,
val targetBuffer = MultiArrayUtils.createDataBuffer(header.resolvedDataType, shape)
val targetInCOrder: MultiArray =
MultiArrayUtils.orderFlippedView(MultiArrayUtils.createArrayWithGivenStorage(targetBuffer, shape.reverse))
val wasCopiedFox = Fox.serialCombined(chunkIndices) { chunkIndex: Array[Int] =>
val copiedFuture = Future.sequence(chunkIndices.map { chunkIndex: Array[Int] =>
for {
sourceChunk: MultiArray <- getSourceChunkDataWithCache(axisOrder.permuteIndices(chunkIndex))
offsetInChunk = computeOffsetInChunk(chunkIndex, offset)
Expand All @@ -82,21 +82,32 @@ class DatasetArray(relativePath: DatasetPath,
flip = header.order != ArrayOrder.C)
_ = MultiArrayUtils.copyRange(offsetInChunk, sourceChunkInCOrder, targetInCOrder)
} yield ()
}
})
for {
_ <- wasCopiedFox
_ <- copiedFuture
} yield targetBuffer
}
}

private def getSourceChunkDataWithCache(chunkIndex: Array[Int]): Future[MultiArray] = {
val chunkFilename = getChunkFilename(chunkIndex)
val chunkFilePath = relativePath.resolve(chunkFilename)
val storeKey = chunkFilePath.storeKey
val chunkShape = header.chunkSizeAtIndex(chunkIndex)
protected def readShardedChunk(chunkIndex: Array[Int])(implicit ec: ExecutionContext): Future[Array[Byte]] = ???

chunkContentsCache.getOrLoad(storeKey, key => chunkReader.read(key, chunkShape))
}
private def getSourceChunkDataWithCache(chunkIndex: Array[Int])(implicit ec: ExecutionContext): Future[MultiArray] =
chunkContentsCache.getOrLoad(chunkIndex.mkString(","), key => readSourceChunkData(chunkIndex))

private def readSourceChunkData(chunkIndex: Array[Int])(implicit ec: ExecutionContext): Future[MultiArray] =
if (header.isSharded) {
for {
chunkData: Array[Byte] <- readShardedChunk(chunkIndex)
chunkShape = header.chunkSizeAtIndex(chunkIndex)
multiArray <- chunkReader.parseChunk(chunkData, chunkShape)
} yield multiArray
} else {
val chunkFilename = getChunkFilename(chunkIndex)
val chunkFilePath = relativePath.resolve(chunkFilename)
val storeKey = chunkFilePath.storeKey
val chunkShape = header.chunkSizeAtIndex(chunkIndex)
chunkReader.read(storeKey, chunkShape)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m wondering whether we can get the two branches here into one. As I understand, currently, both branches have their own version of reading from the store, decompressing, then typing. Could it be unified? Maybe the sharding implementation could just return the chunk path plus byte range to be passed to the existing chunkReader? (With non-sharding returning None for the range)

}

protected def getChunkFilename(chunkIndex: Array[Int]): String =
chunkIndex.mkString(header.dimension_separator.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ trait DatasetHeader {
lazy val rank: Int = datasetShape.length

def chunkSizeAtIndex(chunkIndex: Array[Int]): Array[Int] = chunkSize

def isSharded = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.scalableminds.webknossos.datastore.datareaders.precomputed

import scala.math.log10

object CompressedMortonCode {

def log2(x: Double): Double = log10(x) / log10(2.0)

def encode(position: Array[Int], gridSize: Array[Int]): Long = {
/*
Computes the compressed morton code as per
https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/volume.md#compressed-morton-code
https://github.com/google/neuroglancer/blob/162b698f703c86e0b3e92b8d8e0cacb0d3b098df/src/neuroglancer/util/zorder.ts#L72
*/
val bits = gridSize.map(log2(_).ceil.toInt)
val maxBits = bits.max
var outputBit = 0L
val one = 1L

var output = 0L
for (bit <- 0 to maxBits) {
if (bit < bits(0)) {
output |= (((position(0) >> bit) & one) << outputBit)
outputBit += 1
}
if (bit < bits(1)) {
output |= (((position(1) >> bit) & one) << outputBit)
outputBit += 1
}
if (bit < bits(2)) {
output |= (((position(2) >> bit) & one) << outputBit)
outputBit += 1
}
}

output
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package com.scalableminds.webknossos.datastore.datareaders.precomputed

import com.scalableminds.util.cache.AlfuFoxCache
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.datareaders.{AxisOrder, ChunkReader, DatasetArray, DatasetPath}
import com.scalableminds.webknossos.datastore.datavault.VaultPath
import com.typesafe.scalalogging.LazyLogging
import play.api.libs.json.{JsError, JsSuccess, Json}

import java.io.IOException
import java.nio.ByteOrder

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import scala.collection.immutable.NumericRange
import scala.concurrent.{ExecutionContext, Future}

object PrecomputedArray extends LazyLogging {
@throws[IOException]
Expand Down Expand Up @@ -69,4 +76,182 @@ class PrecomputedArray(relativePath: DatasetPath,
.mkString(header.dimension_separator.toString)
}

// SHARDING
// Implemented according to https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/sharded.md,
// directly adapted from https://github.com/scalableminds/webknossos-connect/blob/master/wkconnect/backends/neuroglancer/sharding.py.

private val shardIndexCache: AlfuFoxCache[VaultPath, Array[Byte]] =
AlfuFoxCache()

private val minishardIndexCache: AlfuFoxCache[(VaultPath, Int), Seq[(Long, Long, Long)]] =
AlfuFoxCache()

private def getHashForChunk(chunkIndex: Array[Int]): Long =
CompressedMortonCode.encode(chunkIndex, header.gridSize)

private lazy val minishardMask = {
header.precomputedScale.sharding match {
case Some(shardingSpec: ShardingSpecification) =>
if (shardingSpec.minishard_bits == 0) {
0
} else {
var minishardMask = 1L
for (_ <- 0 until shardingSpec.minishard_bits - 1) {
minishardMask <<= 1
minishardMask |= 1
}
minishardMask
}
case None => 0
}
}

private lazy val shardMask = {
header.precomputedScale.sharding match {
case Some(shardingSpec: ShardingSpecification) =>
val oneMask = Long.MinValue // 0xFFFFFFFFFFFFFFFF
val cursor = shardingSpec.minishard_bits + shardingSpec.shard_bits
val shardMask = ~((oneMask >> cursor) << cursor)
shardMask & (~minishardMask)
case None => 0
}
}

private lazy val minishardCount = 1 << header.precomputedScale.sharding.map(_.minishard_bits).getOrElse(0)

private lazy val shardIndexRange: NumericRange.Exclusive[Long] = {
val end = minishardCount * 16
Range.Long(0, end, 1)
}

private def getShardIndex(shardPath: VaultPath)(implicit ec: ExecutionContext): Fox[Array[Byte]] =
shardIndexCache.getOrLoad(shardPath, readShardIndex)

private def readShardIndex(shardPath: VaultPath)(implicit ec: ExecutionContext): Fox[Array[Byte]] =
for {
bytes <- Fox.option2Fox(shardPath.readBytes(Some(shardIndexRange)))
} yield bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for {
bytes <- Fox.option2Fox(shardPath.readBytes(Some(shardIndexRange)))
} yield bytes
Fox.option2Fox(shardPath.readBytes(Some(shardIndexRange)))

should be identical


private def parseShardIndex(index: Array[Byte]): Seq[(Long, Long)] =
// See https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/sharded.md#shard-index-format
index
.grouped(16) // 16 Bytes: 2 uint64 numbers: start_offset, end_offset
.map((bytes: Array[Byte]) => {
(BigInt(bytes.take(8).reverse).toLong, BigInt(bytes.slice(8, 16).reverse).toLong) // bytes reversed because they are stored little endian
})
.toSeq

private def getMinishardInfo(chunkHash: Long): (Long, Long) =
header.precomputedScale.sharding match {
case Some(shardingSpec: ShardingSpecification) =>
val rawChunkIdentifier = chunkHash >> shardingSpec.preshift_bits
val chunkIdentifier = shardingSpec.hashFunction(rawChunkIdentifier)
val minishardNumber = chunkIdentifier & minishardMask
val shardNumber = (chunkIdentifier & shardMask) >> shardingSpec.minishard_bits
(shardNumber, minishardNumber)
case None => (0, 0)
}

private def getPathForShard(shardNumber: Long): VaultPath = {
val shardBits = header.precomputedScale.sharding.map(_.shard_bits.toFloat).getOrElse(0f)
if (shardBits == 0) {
vaultPath / relativePath.storeKey / "0.shard"
} else {
val shardString = String.format(s"%1$$${(shardBits / 4).ceil.toInt}s", shardNumber.toHexString).replace(' ', '0')
vaultPath / relativePath.storeKey / s"$shardString.shard"
}

}

private def getMinishardIndexRange(minishardNumber: Int,
parsedShardIndex: Seq[(Long, Long)]): NumericRange.Exclusive[Long] = {
val miniShardIndexStart: Long = (shardIndexRange.end).toLong + parsedShardIndex(minishardNumber)._1
val miniShardIndexEnd: Long = (shardIndexRange.end).toLong + parsedShardIndex(minishardNumber)._2
Range.Long(miniShardIndexStart, miniShardIndexEnd, 1)
}

private def parseMinishardIndex(bytes: Array[Byte]): Seq[(Long, Long, Long)] = {
// Because readBytes already decodes gzip, we don't need to decompress here
/*
From: https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/sharded.md#minishard-index-format
The decoded "minishard index" is a binary string of 24*n bytes, specifying a contiguous C-order array of [3, n]
uint64le values.
*/
val n = bytes.length / 24
val buf = ByteBuffer.allocate(bytes.length)
buf.put(bytes)

val longArray = new Array[Long](n * 3)
buf.position(0)
buf.order(ByteOrder.LITTLE_ENDIAN)
buf.asLongBuffer().get(longArray)
// longArray is row major / C-order
/*
From: https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/sharded.md#minishard-index-format
Values array[0, 0], ..., array[0, n-1] specify the chunk IDs in the minishard, and are delta encoded, such that
array[0, 0] is equal to the ID of the first chunk, and the ID of chunk i is equal to the sum
of array[0, 0], ..., array[0, i].
*/
val chunkIds = new Array[Long](n)
chunkIds(0) = longArray(0)
for (i <- 1 until n) {
chunkIds(i) = longArray(i) + chunkIds(i - 1)
}
/*
From: https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/sharded.md#minishard-index-format
The size of the data for chunk i is stored as array[2, i].
Values array[1, 0], ..., array[1, n-1] specify the starting offsets in the shard file of the data corresponding to
each chunk, and are also delta encoded relative to the end of the prior chunk, such that the starting offset of the
first chunk is equal to shard_index_end + array[1, 0], and the starting offset of chunk i is the sum of
shard_index_end + array[1, 0], ..., array[1, i] and array[2, 0], ..., array[2, i-1].
*/
val chunkSizes = longArray.slice(2 * n, 3 * n)
val chunkStartOffsets = new Array[Long](n)
chunkStartOffsets(0) = longArray(n)
for (i <- 1 until n) {
val startOffsetIndex = i + n
chunkStartOffsets(i) = chunkStartOffsets(i - 1) + longArray(startOffsetIndex) + chunkSizes(i - 1)
}
(chunkIds, chunkStartOffsets, chunkSizes).zipped.map((a, b, c) => (a, b, c))
}

private def getMinishardIndex(shardPath: VaultPath, minishardNumber: Int)(
implicit ec: ExecutionContext): Fox[Seq[(Long, Long, Long)]] =
minishardIndexCache.getOrLoad((shardPath, minishardNumber), readMinishardIndex)

private def readMinishardIndex(vaultPathAndMinishardNumber: (VaultPath, Int))(
implicit ec: ExecutionContext): Fox[Seq[(Long, Long, Long)]] = {
val (vaultPath, minishardNumber) = vaultPathAndMinishardNumber
for {
index <- getShardIndex(vaultPath)
parsedIndex = parseShardIndex(index)
minishardIndexRange = getMinishardIndexRange(minishardNumber, parsedIndex)
indexRaw <- vaultPath.readBytes(Some(minishardIndexRange))
} yield parseMinishardIndex(indexRaw)
}

private def getChunkRange(chunkId: Long,
minishardIndex: Seq[(Long, Long, Long)]): Option[NumericRange.Exclusive[Long]] =
for {
chunkSpecification <- minishardIndex.find(_._1 == chunkId)
chunkStart = (shardIndexRange.end).toLong + chunkSpecification._2
chunkEnd = (shardIndexRange.end).toLong + chunkSpecification._2 + chunkSpecification._3
} yield Range.Long(chunkStart, chunkEnd, 1)

override def readShardedChunk(chunkIndex: Array[Int])(implicit ec: ExecutionContext): Future[Array[Byte]] = {
val chunkIdentifier = getHashForChunk(chunkIndex)
val minishardInfo = getMinishardInfo(chunkIdentifier)
val shardPath = getPathForShard(minishardInfo._1)
for {
minishardIndex <- getMinishardIndex(shardPath, minishardInfo._2.toInt)
.toFutureOrThrowException("Could not get minishard index")
chunkRange: NumericRange.Exclusive[Long] <- Fox
.option2Fox(getChunkRange(chunkIdentifier, minishardIndex))
.toFutureOrThrowException("Chunk range not found in minishard index")
chunkData <- Fox
.option2Fox(shardPath.readBytes(Some(chunkRange)))
.toFutureOrThrowException(s"Could not read chunk data from path ${shardPath.toString}")
} yield chunkData
}

}
Loading