Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Zarr 3 sharding index codecs and zstd #7305

Merged
merged 14 commits into from
Sep 5, 2023
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- OpenID Connect authorization is now compatible with Providers that send the user information in an id_token. [#7294](https://github.com/scalableminds/webknossos/pull/7294)

### Changed
- Adapted Zarr 3 implementations to recent changes in the specification (index codecs, zstd codec). [#7305](https://github.com/scalableminds/webknossos/pull/7305)

### Fixed

Expand Down
4 changes: 3 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object Dependencies {
private val brotli4jLinuxX86 = brotli4j.withName("native-linux-x86_64")
private val brotli4cOsXX86 = brotli4j.withName("native-osx-x86_64")
private val brotli4cOsXArm = brotli4j.withName("native-osx-aarch64")
private val zstdJni = "com.github.luben" % "zstd-jni" % "1.5.5-5"

private val sql = Seq(
"com.typesafe.slick" %% "slick" % "3.3.3",
Expand Down Expand Up @@ -112,7 +113,8 @@ object Dependencies {
brotli4j,
brotli4jLinuxX86,
brotli4cOsXX86,
brotli4cOsXArm
brotli4cOsXArm,
zstdJni
)

val webknossosTracingstoreDependencies: Seq[ModuleID] = Seq(
Expand Down
23 changes: 23 additions & 0 deletions test/backend/CompressorTestSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package backend

import com.scalableminds.webknossos.datastore.datareaders.ZstdCompressor
import org.scalatestplus.play.PlaySpec

import java.security.SecureRandom

class CompressorTestSuite extends PlaySpec {

"Zstd compressor" when {
"compressing and decompressing" should {

val compressor = new ZstdCompressor(0, true)
"return original data" in {
val bytes = new Array[Byte](20)
SecureRandom.getInstanceStrong.nextBytes(bytes)
val decompressed = compressor.decompress(compressor.compress(bytes))
assert(bytes.sameElements(decompressed))

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ trait ByteUtils {
byte == 0
}

/**
*
* @param l a 64 bit number
* @return l as array of 8 bytes, little endian
*/
protected def longToBytes(l: Long): Array[Byte] = {
var w = l
val result = new Array[Byte](8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.commons.compress.compressors.gzip.{
GzipCompressorOutputStream,
GzipParameters
}
import org.apache.commons.compress.compressors.zstandard.{ZstdCompressorInputStream, ZstdCompressorOutputStream}
import org.blosc.{BufferSizes, IBloscDll, JBlosc}
import play.api.libs.json.{Format, JsResult, JsValue, Json}

Expand Down Expand Up @@ -339,6 +340,30 @@ class CompressedSegmentationCompressor(dataType: PrecomputedDataType, volumeSize
override def compress(input: Array[Byte]): Array[Byte] = ???
}

class ZstdCompressor(level: Int, checksum: Boolean) extends Compressor {
override def getId: String = "zstd"

override def toString: String = s"compressor=$getId/level=$level/checksum=$checksum"

override def compress(input: Array[Byte]): Array[Byte] = {
val is = new ByteArrayInputStream(input)
val os = new ByteArrayOutputStream()
val zstd = new ZstdCompressorOutputStream(os, level, true, checksum)
try passThrough(is, zstd)
finally if (zstd != null) zstd.close()
os.toByteArray
}

override def decompress(input: Array[Byte]): Array[Byte] = {
val is = new ByteArrayInputStream(input)
val os = new ByteArrayOutputStream()
val zstd = new ZstdCompressorInputStream(is)
try passThrough(zstd, os)
finally if (zstd != null) zstd.close()
os.toByteArray
}
}

class ChainedCompressor(compressors: Seq[Compressor]) extends Compressor {
override def getId: String = "chainedcompressor"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package com.scalableminds.webknossos.datastore.datareaders.zarr3

import com.scalableminds.util.tools.ByteUtils
import com.scalableminds.webknossos.datastore.datareaders.{
BloscCompressor,
BoolCompressionSetting,
CompressionSetting,
GzipCompressor,
IntCompressionSetting,
StringCompressionSetting
StringCompressionSetting,
ZstdCompressor
}
import com.scalableminds.webknossos.datastore.helpers.JsonImplicits
import play.api.libs.json.{Format, JsResult, JsValue, Json, OFormat}
import com.typesafe.scalalogging.LazyLogging
import play.api.libs.json.{Format, JsObject, JsResult, JsSuccess, JsValue, Json, OFormat, Reads, Writes}
import play.api.libs.json.Json.WithDefaultValues
import ucar.ma2.{Array => MultiArray}

import java.util.zip.CRC32C

trait Codec

/*
Expand All @@ -34,7 +39,7 @@ trait BytesToBytesCodec extends Codec {
def decode(bytes: Array[Byte]): Array[Byte]
}

class EndianCodec(val endian: String) extends ArrayToBytesCodec {
class EndianCodec(val endian: Option[String]) extends ArrayToBytesCodec {

/*
https://zarr-specs.readthedocs.io/en/latest/v3/codecs/endian/v1.0.html
Expand Down Expand Up @@ -103,7 +108,49 @@ class GzipCodec(level: Int) extends BytesToBytesCodec {
override def decode(bytes: Array[Byte]): Array[Byte] = compressor.decompress(bytes)
}

class ShardingCodec(val chunk_shape: Array[Int], val codecs: Seq[CodecConfiguration]) extends ArrayToBytesCodec {
class ZstdCodec(level: Int, checksum: Boolean) extends BytesToBytesCodec {

// https://github.com/zarr-developers/zarr-specs/pull/256

lazy val compressor = new ZstdCompressor(level, checksum)

override def encode(bytes: Array[Byte]): Array[Byte] = compressor.compress(bytes)

override def decode(bytes: Array[Byte]): Array[Byte] = compressor.decompress(bytes)

}

class Crc32CCodec extends BytesToBytesCodec with ByteUtils with LazyLogging {

// https://zarr-specs.readthedocs.io/en/latest/v3/codecs/crc32c/v1.0.html

private def crc32ByteLength = 4

private class CRC32CChecksumInvalidException extends Exception

override def encode(bytes: Array[Byte]): Array[Byte] = {
val crc = new CRC32C()
crc.update(bytes)
bytes ++ longToBytes(crc.getValue).take(crc32ByteLength)
}

override def decode(bytes: Array[Byte]): Array[Byte] = {
val crcPart = bytes.takeRight(crc32ByteLength)
val dataPart = bytes.dropRight(crc32ByteLength)
val crc = new CRC32C()
crc.update(dataPart)
val valid = longToBytes(crc.getValue).take(crc32ByteLength).sameElements(crcPart)
if (!valid) {
throw new CRC32CChecksumInvalidException
}
dataPart
}
}

class ShardingCodec(val chunk_shape: Array[Int],
val codecs: Seq[CodecConfiguration],
val index_codecs: Seq[CodecConfiguration])
extends ArrayToBytesCodec {

// https://zarr-specs.readthedocs.io/en/latest/v3/codecs/sharding-indexed/v1.0.html
// encode, decode not implemented as sharding is done in Zarr3Array
Expand All @@ -114,11 +161,12 @@ class ShardingCodec(val chunk_shape: Array[Int], val codecs: Seq[CodecConfigurat

sealed trait CodecConfiguration

final case class EndianCodecConfiguration(endian: String) extends CodecConfiguration
final case class EndianCodecConfiguration(endian: Option[String]) extends CodecConfiguration

object EndianCodecConfiguration {
implicit val jsonFormat: OFormat[EndianCodecConfiguration] = Json.format[EndianCodecConfiguration]
val name = "endian"
val legacyName = "endian"
val name = "bytes"
}
final case class TransposeCodecConfiguration(order: String) extends CodecConfiguration // Should also support other parameters

Expand All @@ -145,6 +193,24 @@ object GzipCodecConfiguration {
val name = "gzip"
}

final case class ZstdCodecConfiguration(level: Int, checksum: Boolean) extends CodecConfiguration
object ZstdCodecConfiguration {
implicit val jsonFormat: OFormat[ZstdCodecConfiguration] = Json.format[ZstdCodecConfiguration]
val name = "zstd"
}

case object Crc32CCodecConfiguration extends CodecConfiguration {
val name = "crc32c"

implicit object Crc32CodecConfigurationReads extends Reads[Crc32CCodecConfiguration.type] {
override def reads(json: JsValue): JsResult[Crc32CCodecConfiguration.type] = JsSuccess(Crc32CCodecConfiguration)
}

implicit object Crc32CodecConfigurationWrites extends Writes[Crc32CCodecConfiguration.type] {
override def writes(o: Crc32CCodecConfiguration.type): JsValue = JsObject(Seq())
}
}

object CodecConfiguration extends JsonImplicits {
implicit object CodecSpecificationFormat extends Format[CodecConfiguration] {
override def reads(json: JsValue): JsResult[CodecConfiguration] =
Expand All @@ -160,7 +226,9 @@ object CodecSpecification {
implicit val jsonFormat: OFormat[CodecSpecification] = Json.format[CodecSpecification]
}

final case class ShardingCodecConfiguration(chunk_shape: Array[Int], codecs: Seq[CodecConfiguration])
final case class ShardingCodecConfiguration(chunk_shape: Array[Int],
codecs: Seq[CodecConfiguration],
index_codecs: Seq[CodecConfiguration])
extends CodecConfiguration

object ShardingCodecConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.datasource.AdditionalAxis
import com.typesafe.scalalogging.LazyLogging
import com.scalableminds.util.tools.Fox.box2Fox

import scala.collection.immutable.NumericRange
import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -56,16 +57,20 @@ class Zarr3Array(vaultPath: VaultPath,
override protected def getChunkFilename(chunkIndex: Array[Int]): String =
s"c${header.dimension_separator.toString}${super.getChunkFilename(chunkIndex)}"

lazy val (shardingCodec: Option[ShardingCodec], codecs: Seq[Codec]) = initializeCodecs(header.codecs)
lazy val (shardingCodec: Option[ShardingCodec], codecs: Seq[Codec], indexCodecs: Seq[Codec]) = initializeCodecs(
header.codecs)

private def initializeCodecs(codecSpecs: Seq[CodecConfiguration]): (Option[ShardingCodec], Seq[Codec]) = {
private def initializeCodecs(codecSpecs: Seq[CodecConfiguration]): (Option[ShardingCodec], Seq[Codec], Seq[Codec]) = {
val outerCodecs = codecSpecs.map {
case EndianCodecConfiguration(endian) => new EndianCodec(endian)
case TransposeCodecConfiguration(order) => new TransposeCodec(order)
case BloscCodecConfiguration(cname, clevel, shuffle, typesize, blocksize) =>
new BloscCodec(cname, clevel, shuffle, typesize, blocksize)
case GzipCodecConfiguration(level) => new GzipCodec(level)
case ShardingCodecConfiguration(chunk_shape, codecs) => new ShardingCodec(chunk_shape, codecs)
case GzipCodecConfiguration(level) => new GzipCodec(level)
case ZstdCodecConfiguration(level, checksum) => new ZstdCodec(level, checksum)
case Crc32CCodecConfiguration => new Crc32CCodec
case ShardingCodecConfiguration(chunk_shape, codecs, index_codecs) =>
new ShardingCodec(chunk_shape, codecs, index_codecs)
}
val shardingCodecOpt: Option[ShardingCodec] = outerCodecs.flatMap {
case codec: ShardingCodec => Some(codec)
Expand All @@ -74,8 +79,10 @@ class Zarr3Array(vaultPath: VaultPath,

shardingCodecOpt match {
case Some(shardingCodec: ShardingCodec) =>
(Some(shardingCodec), initializeCodecs(shardingCodec.codecs)._2)
case None => (None, outerCodecs)
(Some(shardingCodec),
initializeCodecs(shardingCodec.codecs)._2,
initializeCodecs(shardingCodec.index_codecs)._2)
case None => (None, outerCodecs, Seq())
}
}

Expand Down Expand Up @@ -111,9 +118,16 @@ class Zarr3Array(vaultPath: VaultPath,
shardPath.readLastBytes(getShardIndexSize)

private def parseShardIndex(index: Array[Byte]): Seq[(Long, Long)] = {
val _ = index.takeRight(4) // checksum: not checked for now
val indexProper = index.dropRight(4)
indexProper
val decodedIndex = shardingCodec match {
case Some(shardingCodec: ShardingCodec) =>
indexCodecs.foldRight(index)((c, bytes) =>
c match {
case codec: BytesToBytesCodec => codec.decode(bytes)
case _ => bytes
})
case None => ???
}
decodedIndex
.grouped(shardIndexEntryLength)
.map((bytes: Array[Byte]) => {
// BigInt constructor is big endian, sharding index stores values little endian, thus reverse is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ case class Zarr3ArrayHeader(

private def getChunkSize: Array[Int] = {
val shardingCodecInnerChunkSize = codecs.flatMap {
case ShardingCodecConfiguration(chunk_shape, _) => Some(chunk_shape)
case _ => None
case ShardingCodecConfiguration(chunk_shape, _, _) => Some(chunk_shape)
case _ => None
}.headOption
shardingCodecInnerChunkSize.getOrElse(outerChunkSize)
}
Expand Down Expand Up @@ -191,7 +191,8 @@ object Zarr3ArrayHeader extends JsonImplicits {
for {
chunk_shape <- config("chunk_shape").validate[Array[Int]]
codecs = readCodecs(config("codecs"))
} yield ShardingCodecConfiguration(chunk_shape, codecs)
index_codecs = readCodecs(config("index_codecs"))
} yield ShardingCodecConfiguration(chunk_shape, codecs, index_codecs)

private def readCodecs(value: JsValue): Seq[CodecConfiguration] = {
val rawCodecSpecs: Seq[JsValue] = value match {
Expand All @@ -202,13 +203,17 @@ object Zarr3ArrayHeader extends JsonImplicits {
val codecSpecs = rawCodecSpecs.map(c => {
for {
spec: CodecConfiguration <- c("name") match {
case JsString(EndianCodecConfiguration.name) => c(configurationKey).validate[EndianCodecConfiguration]
case JsString(TransposeCodecConfiguration.name) => c(configurationKey).validate[TransposeCodecConfiguration]
case JsString(GzipCodecConfiguration.name) => c(configurationKey).validate[GzipCodecConfiguration]
case JsString(BloscCodecConfiguration.name) => c(configurationKey).validate[BloscCodecConfiguration]
case JsString(ShardingCodecConfiguration.name) => readShardingCodecConfiguration(c(configurationKey))
case JsString(name) => throw new UnsupportedOperationException(s"Codec $name is not supported.")
case _ => throw new IllegalArgumentException()
case JsString(EndianCodecConfiguration.name) => c(configurationKey).validate[EndianCodecConfiguration]
case JsString(EndianCodecConfiguration.legacyName) => c(configurationKey).validate[EndianCodecConfiguration]
case JsString(TransposeCodecConfiguration.name) => c(configurationKey).validate[TransposeCodecConfiguration]
case JsString(GzipCodecConfiguration.name) => c(configurationKey).validate[GzipCodecConfiguration]
case JsString(BloscCodecConfiguration.name) => c(configurationKey).validate[BloscCodecConfiguration]
case JsString(ZstdCodecConfiguration.name) => c(configurationKey).validate[ZstdCodecConfiguration]
case JsString(Crc32CCodecConfiguration.name) =>
JsSuccess(Crc32CCodecConfiguration) // Crc32 codec has no configuration
case JsString(ShardingCodecConfiguration.name) => readShardingCodecConfiguration(c(configurationKey))
case JsString(name) => throw new UnsupportedOperationException(s"Codec $name is not supported.")
case _ => throw new IllegalArgumentException()
}
} yield spec
})
Expand Down