Skip to content

Commit

Permalink
rohmu: zstandard (zstd) compression support.
Browse files Browse the repository at this point in the history
File interface is currently broken in python-zstandard: upon closing the file writer is also closing the underlying file,
indygreg/python-zstandard#76
  • Loading branch information
Jani Jäppinen committed May 21, 2019
1 parent 1a82e60 commit 971e179
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 0 deletions.
16 changes: 16 additions & 0 deletions pghoard/rohmu/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
from .errors import InvalidConfigurationError
from .filewrap import Sink, Stream
from .snappyfile import SnappyFile
from .zstdfile import open as zstd_open
import lzma

try:
import snappy
except ImportError:
snappy = None

try:
import zstandard as zstd
except ImportError:
zstd = None


def CompressionFile(dst_fp, algorithm, level=0):
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
Expand All @@ -24,6 +30,9 @@ def CompressionFile(dst_fp, algorithm, level=0):
if algorithm == "snappy":
return SnappyFile(dst_fp, "wb")

if algorithm == "zstd":
return zstd_open(dst_fp, "wb")

if algorithm:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))

Expand All @@ -39,6 +48,8 @@ def __init__(self, src_fp, algorithm, level=0):
self._compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
elif algorithm == "snappy":
self._compressor = snappy.StreamCompressor()
elif algorithm == "zstd":
self._compressor = zstd.ZstdCompressor(level=level).compressobj()
else:
InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))

Expand All @@ -57,6 +68,9 @@ def DecompressionFile(src_fp, algorithm):
if algorithm == "snappy":
return SnappyFile(src_fp, "rb")

if algorithm == "zstd":
return zstd_open(src_fp, "rb")

if algorithm:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))

Expand All @@ -73,6 +87,8 @@ def _create_decompressor(self, alg):
return snappy.StreamDecompressor()
elif alg == "lzma":
return lzma.LZMADecompressor()
elif alg == "zstd":
return zstd.ZstdDecompressor().decompressobj()
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(alg))

def write(self, data):
Expand Down
87 changes: 87 additions & 0 deletions pghoard/rohmu/zstdfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
rohmu - file-like interface for zstd
Copyright (c) 2016 Ohmu Ltd
See LICENSE for details
"""

from . import IO_BLOCK_SIZE
from .filewrap import FileWrap
import io

try:
import zstandard as zstd
except ImportError:
zstd = None


class _ZstdFileWriter(FileWrap):

def __init__(self, next_fp, level):
self._zstd = zstd.ZstdCompressor(level=level).compressobj()
super().__init__(next_fp)

def close(self):
if self.closed:
return
data = self._zstd.flush() or b""
if data:
self.next_fp.write(data)
self.next_fp.flush()
super().close()

def write(self, data):
self._check_not_closed()
compressed_data = self._zstd.compress(data)
self.next_fp.write(compressed_data)
self.offset += len(data)
return len(data)

def writable(self):
return True


class _ZtsdFileReader(FileWrap):

def __init__(self, next_fp):
self._zstd = zstd.ZstdDecompressor().decompressobj()
super().__init__(next_fp)
self._done = False

def close(self):
if self.closed:
return
super().close()

def read(self, size=-1): # pylint: disable=unused-argument
# NOTE: size arg is ignored, random size output is returned
self._check_not_closed()
while not self._done:
compressed = self.next_fp.read(IO_BLOCK_SIZE)
if not compressed:
self._done = True
output = self._zstd.flush() or b""
else:
output = self._zstd.decompress(compressed)

if output:
self.offset += len(output)
return output

return b""

def readable(self):
return True


def open(fp, mode, level=0): # pylint: disable=redefined-builtin
if zstd is None:
raise io.UnsupportedOperation("zstd is not available")

if mode == "wb":
return _ZstdFileWriter(fp, level)

if mode == "rb":
return _ZtsdFileReader(fp)

raise io.UnsupportedOperation("unsupported mode for zstd")
2 changes: 2 additions & 0 deletions requirements-zstd.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r requirements.txt
zstandard
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"python-dateutil",
"python-snappy >= 0.5",
"requests >= 1.2.0",
"zstandard >= 0.11.1",
],
extras_require={},
dependency_links=[],
Expand Down
15 changes: 15 additions & 0 deletions test/test_compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pghoard.compressor import CompressorThread
from pghoard.rohmu import compressor, IO_BLOCK_SIZE, rohmufile
from pghoard.rohmu.snappyfile import snappy, SnappyFile
from pghoard.rohmu.compressor import zstd
from queue import Queue
import io
import lzma
Expand Down Expand Up @@ -460,3 +461,17 @@ def test_snappy_read(self, tmpdir):

full = b"".join(out)
assert full == b"hello, world"


@pytest.mark.skipif(not zstd, reason="zstd not installed")
class TestZstdCompression(CompressionCase):
algorithm = "zstd"

def compress(self, data):
return zstd.ZstdCompressor().compress(data)

def decompress(self, data):
return zstd.ZstdDecompressor().decompressobj().decompress(data)

def make_compress_stream(self, src_fp):
return compressor.CompressionStream(src_fp, "zstd")

0 comments on commit 971e179

Please sign in to comment.