Skip to content

Commit

Permalink
Add zstd support on legacy record and ensure no variable is referred …
Browse files Browse the repository at this point in the history
…before definition (dpkp#138)

* fix if statement logic and add zstd check

* fix if statement logic and add zstd uncompress

* fix imports

* avoid variable be used before definition

* Remove unused import from legacy_records.py

---------

Co-authored-by: Alexandre Souza <[email protected]>
  • Loading branch information
2 people authored and bradenneal1 committed May 16, 2024
1 parent 03898de commit 0331ddc
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
4 changes: 4 additions & 0 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def _assert_has_codec(self, compression_type: int) -> None:
checker, name = codecs.has_lz4, "lz4"
elif compression_type == self.CODEC_ZSTD:
checker, name = codecs.has_zstd, "zstd"
else:
checker, name = lambda: False, "Unknown"
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")
Expand Down Expand Up @@ -525,6 +527,8 @@ def _maybe_compress(self) -> bool:
compressed = lz4_encode(data)
elif self._compression_type == self.CODEC_ZSTD:
compressed = zstd_encode(data)
else:
compressed = '' # unknown
compressed_size = len(compressed)
if len(data) <= compressed_size:
# We did not get any benefit from compression, lets send
Expand Down
13 changes: 11 additions & 2 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
from kafka.record.util import calc_crc32

from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, zstd_encode,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka, zstd_decode
)
import kafka.codec as codecs
from kafka.errors import CorruptRecordException, UnsupportedCodecError
Expand Down Expand Up @@ -110,6 +110,7 @@ class LegacyRecordBase:
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
CODEC_ZSTD = 0x04
TIMESTAMP_TYPE_MASK = 0x08

LOG_APPEND_TIME = 1
Expand All @@ -124,6 +125,10 @@ def _assert_has_codec(self, compression_type: int) -> None:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
elif compression_type == self.CODEC_ZSTD:
checker, name = codecs.has_zstd, "zstd"
else:
checker, name = lambda: False, "Unknown"
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")
Expand Down Expand Up @@ -195,6 +200,10 @@ def _decompress(self, key_offset: int) -> bytes:
uncompressed = lz4_decode_old_kafka(data.tobytes())
else:
uncompressed = lz4_decode(data.tobytes())
elif compression_type == self.CODEC_ZSTD:
uncompressed = zstd_decode(data)
else:
raise ValueError("Unknown Compression Type - %s" % compression_type)
return uncompressed

def _read_header(self, pos: int) -> Union[Tuple[int, int, int, int, int, None], Tuple[int, int, int, int, int, int]]:
Expand Down

0 comments on commit 0331ddc

Please sign in to comment.