Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19112][CORE] add codec for ZStandard #17303

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.io

import java.io._

import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.LZ4BlockOutputStream
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
Expand Down Expand Up @@ -49,13 +50,14 @@ private[spark] object CompressionCodec {

private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec])
|| codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStandardCompressionCodec])
}

private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
"snappy" -> classOf[SnappyCompressionCodec].getName,
"zstd" -> classOf[ZStandardCompressionCodec].getName)

def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
Expand Down Expand Up @@ -215,3 +217,22 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou
}
}
}

/**
* :: DeveloperApi ::
* ZStandard implementation of [[org.apache.spark.io.CompressionCodec]].
*
* @note The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
@DeveloperApi
class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val level = conf.getSizeAsBytes("spark.io.compression.zstandard.level", "3").toInt
Copy link

@Cyan4973 Cyan4973 May 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cases which favor speed over size should prefer using level 1.
Compression speed difference can be fairly large.

new ZstdOutputStream(s, level)
}

override def compressedInputStream(s: InputStream): InputStream = new ZstdInputStream(s)
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite {
testConcatenationOfSerializedStreams(codec)
}

test("zstd compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStandardCompressionCodec].getName)
assert(codec.getClass === classOf[ZStandardCompressionCodec])
testCodec(codec)
}

test("zstd compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "zstd")
assert(codec.getClass === classOf[ZStandardCompressionCodec])
testCodec(codec)
}

test("zstd supports concatenation of serialized zstd") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStandardCompressionCodec].getName)
assert(codec.getClass === classOf[ZStandardCompressionCodec])
testConcatenationOfSerializedStreams(codec)
}

test("bad compression codec") {
intercept[IllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar")
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down