Skip to content

Commit

Permalink
add: unit tests for zipfile2 (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
metaist committed Sep 15, 2024
1 parent 0629e99 commit 73a6706
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 13 deletions.
27 changes: 14 additions & 13 deletions src/cosmofy/zipfile2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
class ZipFile2(ZipFile):
"""Extension of `zipfile.ZipFile` that allows removing members."""

_writing: bool

# NOTE: This function only works on python >= 3.11
# def add_dir(self, name: str, mode: int = 0o777, date: datetime = now) -> ZipFile2:
# """Add a directory to an archive with permissions."""
Expand Down Expand Up @@ -56,7 +58,7 @@ def remove(self, member: Union[str, ZipInfo]) -> ZipFile2:
raise RuntimeError("remove() requires mode 'a'")
if not self.fp:
raise ValueError("Attempt to write to ZIP archive that was already closed")
if self._writing: # type: ignore
if self._writing:
raise ValueError(
"Can't write to ZIP archive while an open writing handle exists."
)
Expand All @@ -76,32 +78,30 @@ def remove(self, member: Union[str, ZipInfo]) -> ZipFile2:
return self

def _remove_member(self, member: ZipInfo) -> ZipFile2:
# get a sorted filelist by header offset, in case the dir order
# doesn't match the actual entry order
"""Internal method to remove a member."""
fp = self.fp
assert fp

# sort by header_offset in case central dir has different order
entry_offset = 0
filelist = sorted(self.filelist, key=attrgetter("header_offset"))
for i in range(len(filelist)):
info = filelist[i]
# find the target member
if info.header_offset < member.header_offset:
last_index = len(filelist) - 1
for i, info in enumerate(filelist):
if info.header_offset < member.header_offset: # keep going until target
continue

# get the total size of the entry
entry_size = None
if i == len(filelist) - 1:
if i == last_index:
entry_size = self.start_dir - info.header_offset
else:
entry_size = filelist[i + 1].header_offset - info.header_offset

# found the member, set the entry offset
if member == info:
if member == info: # set the entry offset
entry_offset = entry_size
continue
# move all subsequent entries

# Move entry
# read the actual entry data
fp.seek(info.header_offset)
entry_data = fp.read(entry_size)
Expand All @@ -122,14 +122,15 @@ def _remove_member(self, member: ZipInfo) -> ZipFile2:

# seek to the start of the central dir
fp.seek(self.start_dir)

return self

def _write_end_record(self) -> None:
"""Write the end record to the file and truncate extra space."""
super()._write_end_record() # type: ignore
if self.fp and hasattr(self.fp, "truncate"):
self.fp.truncate()
else:
else: # pragma: no cover
# This is hard to test without messing up other things.
print(
"WARNING: truncate unimplemented, zip WILL be corrupted if you removed a member!"
)
66 changes: 66 additions & 0 deletions test/test_zipfile2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Test `zipfile2` edge cases."""

# std
from unittest.mock import MagicMock
from unittest.mock import patch
from zipfile import ZipInfo
import io

# lib
import pytest

# pkg
from cosmofy.zipfile2 import ZipFile2


def test_errors() -> None:
"""Run time errors."""
file = ZipFile2(io.BytesIO(), "a") # to keep from `BadZipFile`

file.mode = "r"
with pytest.raises(RuntimeError):
file.remove("fake")

file.mode = "a"
file._writing = True
with pytest.raises(ValueError):
file.remove("fake")

file.mode = "a"
file._writing = False
file.close()
with pytest.raises(ValueError):
file.remove("fake")


@patch("cosmofy.zipfile2.ZipFile2._remove_member")
@patch("cosmofy.zipfile2.ZipFile2.getinfo")
def test_remove(_getinfo: MagicMock, _remove_member: MagicMock) -> None:
"""Test removing a member."""
file = ZipFile2(io.BytesIO(), "a") # to keep from `BadZipFile`
info = ZipInfo("to_remove")

file.remove(info)
_remove_member.assert_called_once_with(info)

file.remove("to_remove")
assert _remove_member.called
assert _getinfo.called

file.remove("fake/*")
assert _remove_member.called


def test_remove_real() -> None:
"""Remove actual members."""
file = ZipFile2(io.BytesIO(), "a")

file.writestr("real/keep1", b"to be kept")
file.writestr("real/remove1", b"to be removed")
file.writestr("real/keep2", b"to be kept")
file.writestr("real/remove2", b"to be removed")
assert len(file.filelist) == 4

file.remove("real/remove2")
file.remove("real/r*")
assert len(file.filelist) == 2

0 comments on commit 73a6706

Please sign in to comment.