From f789663891887d3e9e83d0d3a4b691ffe45dfe6d Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 13 Nov 2022 21:23:43 +0800 Subject: [PATCH 01/10] Add basic validation logic for `RECORD` --- src/installer/sources.py | 59 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/src/installer/sources.py b/src/installer/sources.py index fa0bc34c..0e81d101 100644 --- a/src/installer/sources.py +++ b/src/installer/sources.py @@ -13,7 +13,18 @@ WheelContentElement = Tuple[Tuple[str, str, str], BinaryIO, bool] -__all__ = ["WheelSource", "WheelFile"] +__all__ = ["WheelSource", "WheelFile", "WheelValidationError"] + + +class WheelValidationError(Exception): + """Raised when a wheel fails validation.""" + + def __init__(self, issues: List[str]) -> None: # noqa: D107 + super().__init__(", ".join(issues)) + self.issues = issues + + def __repr__(self) -> str: + return f"WheelValidationError(issues={self.issues!r})" class WheelSource: @@ -65,6 +76,14 @@ def read_dist_info(self, filename: str) -> str: """ raise NotImplementedError + def validate_record(self) -> None: + """Validate ``RECORD`` of the wheel. + + A ``ValidationError`` will be raised if any file in the wheel + is not both mentioned and hashed. + """ + raise NotImplementedError + def get_contents(self) -> Iterator[WheelContentElement]: """Sequential access to all contents of the wheel (including dist-info files). @@ -138,6 +157,44 @@ def read_dist_info(self, filename: str) -> str: path = posixpath.join(self.dist_info_dir, filename) return self._zipfile.read(path).decode("utf-8") + def validate_record(self) -> None: + """Validate ``RECORD`` of the wheel.""" + try: + record_lines = self.read_dist_info("RECORD").splitlines() + records = parse_record_file(record_lines) + except Exception as exc: + raise WheelValidationError( + [f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"] + ) from exc + + record_mapping = {record[0]: record for record in records} + issues: List[str] = [] + + for item in self._zipfile.infolist(): + if item.filename[-1:] == "/": # looks like a directory + continue + + record = record_mapping.pop(item.filename, None) + + if self.dist_info_dir == posixpath.commonprefix( + [self.dist_info_dir, item.filename] + ) and item.filename.split("/")[-1] not in ("RECORD.p7s", "RECORD.jwt"): + # both are for digital signatures, and not mentioned in RECORD + continue + + if record is None: + issues.append( + f"In {self._zipfile.filename}, {item.filename} is not mentioned in RECORD" + ) + elif not record[1] and item.filename != f"{self.dist_info_dir}/RECORD": + # Empty hash, skip unless it's RECORD + issues.append( + f"In {self._zipfile.filename}, hash of {item.filename} is not included in RECORD" + ) + + if issues: + raise WheelValidationError(issues) + def get_contents(self) -> Iterator[WheelContentElement]: """Sequential access to all contents of the wheel (including dist-info files). From 926f7de7916f8cb017721b29152b692799278b3c Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 13 Nov 2022 21:24:58 +0800 Subject: [PATCH 02/10] Remove `RECORD` entry validation in `get_contents` --- src/installer/_core.py | 7 ++++++- src/installer/sources.py | 7 ++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/installer/_core.py b/src/installer/_core.py index 9a02728f..39a34082 100644 --- a/src/installer/_core.py +++ b/src/installer/_core.py @@ -65,6 +65,7 @@ def install( source: WheelSource, destination: WheelDestination, additional_metadata: Dict[str, bytes], + validate_record: bool = True, ) -> None: """Install wheel described by ``source`` into ``destination``. @@ -72,7 +73,7 @@ def install( :param destination: where to write the wheel. :param additional_metadata: additional metadata files to generate, usually generated by the caller. - + :param validate_record: whether to validate ``RECORD`` of the wheel. """ root_scheme = _process_WHEEL_file(source) @@ -80,6 +81,10 @@ def install( record_file_path = posixpath.join(source.dist_info_dir, "RECORD") written_records = [] + # Validate RECORD + if validate_record: + source.validate_record() + # Write the entry_points based scripts. if "entry_points.txt" in source.dist_info_filenames: entrypoints_text = source.read_dist_info("entry_points.txt") diff --git a/src/installer/sources.py b/src/installer/sources.py index 0e81d101..801964ab 100644 --- a/src/installer/sources.py +++ b/src/installer/sources.py @@ -211,11 +211,8 @@ def get_contents(self) -> Iterator[WheelContentElement]: if item.filename[-1:] == "/": # looks like a directory continue - record = record_mapping.pop(item.filename, None) - assert record is not None, "In {}, {} is not mentioned in RECORD".format( - self._zipfile.filename, - item.filename, - ) # should not happen for valid wheels + # Pop record with empty default, because validation is handled by `validate_record` + record = record_mapping.pop(item.filename, (item.filename, "", "")) # Borrowed from: # https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L96-L100 From cec4021658addd0e06ac3e3e778ce09bd2a3a22a Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 13 Nov 2022 22:48:55 +0800 Subject: [PATCH 03/10] Add corresponding tests --- src/installer/sources.py | 2 +- tests/conftest.py | 150 ++-- tests/test_core.py | 1814 +++++++++++++++++++------------------- tests/test_sources.py | 58 +- 4 files changed, 1042 insertions(+), 982 deletions(-) diff --git a/src/installer/sources.py b/src/installer/sources.py index 801964ab..458dd699 100644 --- a/src/installer/sources.py +++ b/src/installer/sources.py @@ -20,7 +20,7 @@ class WheelValidationError(Exception): """Raised when a wheel fails validation.""" def __init__(self, issues: List[str]) -> None: # noqa: D107 - super().__init__(", ".join(issues)) + super().__init__(repr(issues)) self.issues = issues def __repr__(self) -> str: diff --git a/tests/conftest.py b/tests/conftest.py index 029cb8f2..ec004311 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,75 +1,75 @@ -import textwrap -import zipfile - -import pytest - - -@pytest.fixture -def fancy_wheel(tmp_path): - path = tmp_path / "fancy-1.0.0-py2.py3-none-any.whl" - files = { - "fancy/": b"""""", - "fancy/__init__.py": b"""\ - def main(): - print("I'm fancy.") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - "fancy-1.0.0.data/data/fancy/": b"""""", - "fancy-1.0.0.data/data/fancy/data.py": b"""\ - # put me in data - """, - "fancy-1.0.0.dist-info/": b"""""", - "fancy-1.0.0.dist-info/top_level.txt": b"""\ - fancy - """, - "fancy-1.0.0.dist-info/entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "fancy-1.0.0.dist-info/WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "fancy-1.0.0.dist-info/METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - # The RECORD file is indirectly validated by the WheelFile, since it only - # provides the items that are a part of the wheel. - "fancy-1.0.0.dist-info/RECORD": b"""\ - fancy/__init__.py,, - fancy/__main__.py,, - fancy-1.0.0.data/data/fancy/data.py,, - fancy-1.0.0.dist-info/top_level.txt,, - fancy-1.0.0.dist-info/entry_points.txt,, - fancy-1.0.0.dist-info/WHEEL,, - fancy-1.0.0.dist-info/METADATA,, - fancy-1.0.0.dist-info/RECORD,, - """, - } - - with zipfile.ZipFile(path, "w") as archive: - for name, indented_content in files.items(): - archive.writestr( - name, - textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8"), - ) - - return path +import textwrap +import zipfile + +import pytest + + +@pytest.fixture +def fancy_wheel(tmp_path): + path = tmp_path / "fancy-1.0.0-py2.py3-none-any.whl" + files = { + "fancy/": b"""""", + "fancy/__init__.py": b"""\ + def main(): + print("I'm fancy.") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + "fancy-1.0.0.data/data/fancy/": b"""""", + "fancy-1.0.0.data/data/fancy/data.py": b"""\ + # put me in data + """, + "fancy-1.0.0.dist-info/": b"""""", + "fancy-1.0.0.dist-info/top_level.txt": b"""\ + fancy + """, + "fancy-1.0.0.dist-info/entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "fancy-1.0.0.dist-info/WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "fancy-1.0.0.dist-info/METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + # The RECORD file is indirectly validated by the WheelFile, since it only + # provides the items that are a part of the wheel. + "fancy-1.0.0.dist-info/RECORD": b"""\ + fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36 + fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61 + fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17 + fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6 + fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75 + fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84 + fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236 + fancy-1.0.0.dist-info/RECORD,, + """, + } + + with zipfile.ZipFile(path, "w") as archive: + for name, indented_content in files.items(): + archive.writestr( + name, + textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8"), + ) + + return path diff --git a/tests/test_core.py b/tests/test_core.py index 1f3a44e6..c918c03b 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,905 +1,909 @@ -import hashlib -import textwrap -from io import BytesIO -from unittest import mock - -import pytest - -from installer import install -from installer.exceptions import InvalidWheelSource -from installer.records import RecordEntry -from installer.sources import WheelSource - - -# -------------------------------------------------------------------------------------- -# Helpers -# -------------------------------------------------------------------------------------- -def hash_and_size(data): - return hashlib.sha256(data).hexdigest(), len(data) - - -@pytest.fixture -def mock_destination(): - retval = mock.Mock() - - # A hacky approach to making sure we got the right objects going in. - def custom_write_file(scheme, path, stream, is_executable): - assert isinstance(stream, BytesIO) - return (path, scheme, 0) - - def custom_write_script(name, module, attr, section): - return (name, module, attr, section) - - retval.write_file.side_effect = custom_write_file - retval.write_script.side_effect = custom_write_script - - return retval - - -class FakeWheelSource(WheelSource): - def __init__(self, *, distribution, version, regular_files, dist_info_files): - super().__init__(distribution, version) - - self.dist_info_files = { - file: textwrap.dedent(content.decode("utf-8")) - for file, content in dist_info_files.items() - } - self.regular_files = { - file: textwrap.dedent(content.decode("utf-8")).encode("utf-8") - for file, content in regular_files.items() - } - - # Compute RECORD file. - _records = [record for record, _, _ in self.get_contents()] - self.dist_info_files["RECORD"] = "\n".join( - sorted( - ",".join([file, "sha256=" + hash_, str(size)]) - for file, hash_, size in _records - ) - ) - - @property - def dist_info_filenames(self): - return list(self.dist_info_files) - - def read_dist_info(self, filename): - return self.dist_info_files[filename] - - def get_contents(self): - # Sort for deterministic behaviour for Python versions that do not preserve - # insertion order for dictionaries. - for file, content in sorted(self.regular_files.items()): - hashed, size = hash_and_size(content) - record = (file, f"sha256={hashed}", str(size)) - with BytesIO(content) as stream: - yield record, stream, False - - # Sort for deterministic behaviour for Python versions that do not preserve - # insertion order for dictionaries. - for file, text in sorted(self.dist_info_files.items()): - content = text.encode("utf-8") - hashed, size = hash_and_size(content) - record = ( - self.dist_info_dir + "/" + file, - f"sha256={hashed}", - str(size), - ) - with BytesIO(content) as stream: - yield record, stream, False - - -# -------------------------------------------------------------------------------------- -# Actual Tests -# -------------------------------------------------------------------------------------- -class TestInstall: - def test_calls_destination_correctly(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_script( - name="fancy", - module="fancy", - attr="main", - section="console", - ), - mock.call.write_script( - name="fancy-gui", - module="fancy", - attr="main", - section="gui", - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__main__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/entry_points.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="purelib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("scripts", ("fancy", "fancy", "main", "console")), - ("scripts", ("fancy-gui", "fancy", "main", "gui")), - ("purelib", ("fancy/__init__.py", "purelib", 0)), - ("purelib", ("fancy/__main__.py", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), - ( - "purelib", - ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), - ), - ( - "purelib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_no_entrypoints_is_ok(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__main__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="purelib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("purelib", ("fancy/__init__.py", "purelib", 0)), - ("purelib", ("fancy/__main__.py", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), - ( - "purelib", - ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), - ), - ( - "purelib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_handles_platlib(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: false - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_script( - name="fancy", - module="fancy", - attr="main", - section="console", - ), - mock.call.write_script( - name="fancy-gui", - module="fancy", - attr="main", - section="gui", - ), - mock.call.write_file( - scheme="platlib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy/__main__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/entry_points.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="platlib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("scripts", ("fancy", "fancy", "main", "console")), - ("scripts", ("fancy-gui", "fancy", "main", "gui")), - ("platlib", ("fancy/__init__.py", "platlib", 0)), - ("platlib", ("fancy/__main__.py", "platlib", 0)), - ("platlib", ("fancy-1.0.0.dist-info/METADATA", "platlib", 0)), - ("platlib", ("fancy-1.0.0.dist-info/WHEEL", "platlib", 0)), - ( - "platlib", - ("fancy-1.0.0.dist-info/entry_points.txt", "platlib", 0), - ), - ( - "platlib", - ("fancy-1.0.0.dist-info/top_level.txt", "platlib", 0), - ), - ( - "platlib", - ("fancy-1.0.0.dist-info/fun_file.txt", "platlib", 0), - ), - ( - "platlib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_accepts_newer_minor_wheel_versions(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.1 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - # no assertions necessary, since we want to make sure this test didn't - # raises errors. - assert True - - def test_rejects_newer_major_wheel_versions(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 2.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - with pytest.raises(InvalidWheelSource) as ctx: - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - assert "Incompatible Wheel-Version" in str(ctx.value) - - def test_handles_data_properly(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/platlib/fancy/platlib.py": b"""\ - # put me in platlib - """, - "fancy-1.0.0.data/scripts/fancy/scripts.py": b"""\ - # put me in scripts - """, - "fancy-1.0.0.data/headers/fancy/headers.py": b"""\ - # put me in headers - """, - "fancy-1.0.0.data/data/fancy/data.py": b"""\ - # put me in data - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={}, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_script( - name="fancy", - module="fancy", - attr="main", - section="console", - ), - mock.call.write_script( - name="fancy-gui", - module="fancy", - attr="main", - section="gui", - ), - mock.call.write_file( - scheme="data", - path="fancy/data.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="headers", - path="fancy/headers.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy/platlib.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/purelib.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="scripts", - path="fancy/scripts.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/entry_points.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="purelib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("scripts", ("fancy", "fancy", "main", "console")), - ("scripts", ("fancy-gui", "fancy", "main", "gui")), - ("data", ("fancy/data.py", "data", 0)), - ("headers", ("fancy/headers.py", "headers", 0)), - ("platlib", ("fancy/platlib.py", "platlib", 0)), - ("purelib", ("fancy/purelib.py", "purelib", 0)), - ("scripts", ("fancy/scripts.py", "scripts", 0)), - ("purelib", ("fancy/__init__.py", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), - ( - "purelib", - ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), - ), - ( - "purelib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_errors_out_when_given_invalid_scheme_in_data(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/invalid/fancy/invalid.py": b"""\ - # i am invalid - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - with pytest.raises(InvalidWheelSource) as ctx: - install( - source=source, - destination=mock_destination, - additional_metadata={}, - ) - - assert "fancy-1.0.0.data/invalid/fancy/invalid.py" in str(ctx.value) - - def test_ensure_non_executable_for_additional_metadata(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - # put me in purelib - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - all_contents = list(source.get_contents()) - source.get_contents = lambda: ( - (*contents, True) for (*contents, _) in all_contents - ) - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=True, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=True, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - ], - any_order=True, - ) +import hashlib +import textwrap +from io import BytesIO +from unittest import mock + +import pytest + +from installer import install +from installer.exceptions import InvalidWheelSource +from installer.records import RecordEntry +from installer.sources import WheelSource + + +# -------------------------------------------------------------------------------------- +# Helpers +# -------------------------------------------------------------------------------------- +def hash_and_size(data): + return hashlib.sha256(data).hexdigest(), len(data) + + +@pytest.fixture +def mock_destination(): + retval = mock.Mock() + + # A hacky approach to making sure we got the right objects going in. + def custom_write_file(scheme, path, stream, is_executable): + assert isinstance(stream, BytesIO) + return (path, scheme, 0) + + def custom_write_script(name, module, attr, section): + return (name, module, attr, section) + + retval.write_file.side_effect = custom_write_file + retval.write_script.side_effect = custom_write_script + + return retval + + +class FakeWheelSource(WheelSource): + def __init__(self, *, distribution, version, regular_files, dist_info_files): + super().__init__(distribution, version) + + self.dist_info_files = { + file: textwrap.dedent(content.decode("utf-8")) + for file, content in dist_info_files.items() + } + self.regular_files = { + file: textwrap.dedent(content.decode("utf-8")).encode("utf-8") + for file, content in regular_files.items() + } + + # Compute RECORD file. + _records = [record for record, _, _ in self.get_contents()] + self.dist_info_files["RECORD"] = "\n".join( + sorted( + ",".join([file, "sha256=" + hash_, str(size)]) + for file, hash_, size in _records + ) + ) + + @property + def dist_info_filenames(self): + return list(self.dist_info_files) + + def read_dist_info(self, filename): + return self.dist_info_files[filename] + + def validate_record(self) -> None: + # Skip validation since the logic is different. + return + + def get_contents(self): + # Sort for deterministic behaviour for Python versions that do not preserve + # insertion order for dictionaries. + for file, content in sorted(self.regular_files.items()): + hashed, size = hash_and_size(content) + record = (file, f"sha256={hashed}", str(size)) + with BytesIO(content) as stream: + yield record, stream, False + + # Sort for deterministic behaviour for Python versions that do not preserve + # insertion order for dictionaries. + for file, text in sorted(self.dist_info_files.items()): + content = text.encode("utf-8") + hashed, size = hash_and_size(content) + record = ( + self.dist_info_dir + "/" + file, + f"sha256={hashed}", + str(size), + ) + with BytesIO(content) as stream: + yield record, stream, False + + +# -------------------------------------------------------------------------------------- +# Actual Tests +# -------------------------------------------------------------------------------------- +class TestInstall: + def test_calls_destination_correctly(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_script( + name="fancy", + module="fancy", + attr="main", + section="console", + ), + mock.call.write_script( + name="fancy-gui", + module="fancy", + attr="main", + section="gui", + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__main__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/entry_points.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="purelib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("scripts", ("fancy", "fancy", "main", "console")), + ("scripts", ("fancy-gui", "fancy", "main", "gui")), + ("purelib", ("fancy/__init__.py", "purelib", 0)), + ("purelib", ("fancy/__main__.py", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), + ( + "purelib", + ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), + ), + ( + "purelib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_no_entrypoints_is_ok(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__main__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="purelib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("purelib", ("fancy/__init__.py", "purelib", 0)), + ("purelib", ("fancy/__main__.py", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), + ( + "purelib", + ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), + ), + ( + "purelib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_handles_platlib(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: false + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_script( + name="fancy", + module="fancy", + attr="main", + section="console", + ), + mock.call.write_script( + name="fancy-gui", + module="fancy", + attr="main", + section="gui", + ), + mock.call.write_file( + scheme="platlib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy/__main__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/entry_points.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="platlib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("scripts", ("fancy", "fancy", "main", "console")), + ("scripts", ("fancy-gui", "fancy", "main", "gui")), + ("platlib", ("fancy/__init__.py", "platlib", 0)), + ("platlib", ("fancy/__main__.py", "platlib", 0)), + ("platlib", ("fancy-1.0.0.dist-info/METADATA", "platlib", 0)), + ("platlib", ("fancy-1.0.0.dist-info/WHEEL", "platlib", 0)), + ( + "platlib", + ("fancy-1.0.0.dist-info/entry_points.txt", "platlib", 0), + ), + ( + "platlib", + ("fancy-1.0.0.dist-info/top_level.txt", "platlib", 0), + ), + ( + "platlib", + ("fancy-1.0.0.dist-info/fun_file.txt", "platlib", 0), + ), + ( + "platlib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_accepts_newer_minor_wheel_versions(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.1 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + # no assertions necessary, since we want to make sure this test didn't + # raises errors. + assert True + + def test_rejects_newer_major_wheel_versions(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 2.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + with pytest.raises(InvalidWheelSource) as ctx: + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + assert "Incompatible Wheel-Version" in str(ctx.value) + + def test_handles_data_properly(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/platlib/fancy/platlib.py": b"""\ + # put me in platlib + """, + "fancy-1.0.0.data/scripts/fancy/scripts.py": b"""\ + # put me in scripts + """, + "fancy-1.0.0.data/headers/fancy/headers.py": b"""\ + # put me in headers + """, + "fancy-1.0.0.data/data/fancy/data.py": b"""\ + # put me in data + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={}, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_script( + name="fancy", + module="fancy", + attr="main", + section="console", + ), + mock.call.write_script( + name="fancy-gui", + module="fancy", + attr="main", + section="gui", + ), + mock.call.write_file( + scheme="data", + path="fancy/data.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="headers", + path="fancy/headers.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy/platlib.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/purelib.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="scripts", + path="fancy/scripts.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/entry_points.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="purelib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("scripts", ("fancy", "fancy", "main", "console")), + ("scripts", ("fancy-gui", "fancy", "main", "gui")), + ("data", ("fancy/data.py", "data", 0)), + ("headers", ("fancy/headers.py", "headers", 0)), + ("platlib", ("fancy/platlib.py", "platlib", 0)), + ("purelib", ("fancy/purelib.py", "purelib", 0)), + ("scripts", ("fancy/scripts.py", "scripts", 0)), + ("purelib", ("fancy/__init__.py", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), + ( + "purelib", + ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), + ), + ( + "purelib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_errors_out_when_given_invalid_scheme_in_data(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/invalid/fancy/invalid.py": b"""\ + # i am invalid + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + with pytest.raises(InvalidWheelSource) as ctx: + install( + source=source, + destination=mock_destination, + additional_metadata={}, + ) + + assert "fancy-1.0.0.data/invalid/fancy/invalid.py" in str(ctx.value) + + def test_ensure_non_executable_for_additional_metadata(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + # put me in purelib + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + all_contents = list(source.get_contents()) + source.get_contents = lambda: ( + (*contents, True) for (*contents, _) in all_contents + ) + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=True, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=True, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + ], + any_order=True, + ) diff --git a/tests/test_sources.py b/tests/test_sources.py index a79cc24f..81094d11 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -4,7 +4,7 @@ import pytest from installer.records import parse_record_file -from installer.sources import WheelFile, WheelSource +from installer.sources import WheelFile, WheelSource, WheelValidationError class TestWheelSource: @@ -30,6 +30,9 @@ def test_raises_not_implemented_error(self): with pytest.raises(NotImplementedError): source.get_contents() + with pytest.raises(NotImplementedError): + source.validate_record() + class TestWheelFile: def test_rejects_not_okay_name(self, tmp_path): @@ -92,3 +95,56 @@ def test_provides_correct_contents(self, fancy_wheel): assert sorted(got_records) == sorted(expected_records) assert got_files == files + + def modify_wheel_record(self, fancy_wheel, manipulation_func): + """Helper function for modifying RECORD in the wheel file. + + Exists because ZipFile doesn't support remove. + """ + files = {} + # Read everything except RECORD and add it back immediately + with zipfile.ZipFile(fancy_wheel) as archive: + for file in archive.namelist(): + # Call manipulation function so that we can add RECORD back. + if file.endswith("RECORD"): + manipulation_func(files, file, archive.read(file)) + continue + + files[file] = archive.read(file) + # Replace original archive + with zipfile.ZipFile(fancy_wheel, mode="w") as archive: + for name, content in files.items(): + archive.writestr(name, content) + + def test_validation_error_no_record(self, fancy_wheel): + # Replace the wheel without adding RECORD + self.modify_wheel_record(fancy_wheel, lambda *_: None) + with WheelFile.open(fancy_wheel) as w: + with pytest.raises( + WheelValidationError, match="Unable to retrieve `RECORD`" + ): + w.validate_record() + + def test_validation_error_record_missing_file(self, fancy_wheel): + def modifier(file_dict, file_name, data): + # Throw away first two entries + file_dict[file_name] = b"\n".join(data.split(b"\n")[2:]) + + self.modify_wheel_record(fancy_wheel, modifier) + with WheelFile.open(fancy_wheel) as w: + with pytest.raises(WheelValidationError, match="not mentioned in RECORD"): + w.validate_record() + + def test_validation_error_record_missing_hash(self, fancy_wheel): + def modifier(file_dict, file_name, data): + # Extract filename and write back without hash or size + file_dict[file_name] = b"\n".join( + line.split(b",")[0] + b",," for line in data.split(b"\n")[2:] + ) + + self.modify_wheel_record(fancy_wheel, modifier) + with WheelFile.open(fancy_wheel) as w: + with pytest.raises( + WheelValidationError, match="hash of (.+) is not included in RECORD" + ): + w.validate_record() From 17f8c315e16854888bbc7f964132c0b555cc9a86 Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 13 Nov 2022 23:02:52 +0800 Subject: [PATCH 04/10] Fix line endings --- tests/conftest.py | 150 ++-- tests/test_core.py | 1818 ++++++++++++++++++++++---------------------- 2 files changed, 984 insertions(+), 984 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ec004311..065bdfe9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,75 +1,75 @@ -import textwrap -import zipfile - -import pytest - - -@pytest.fixture -def fancy_wheel(tmp_path): - path = tmp_path / "fancy-1.0.0-py2.py3-none-any.whl" - files = { - "fancy/": b"""""", - "fancy/__init__.py": b"""\ - def main(): - print("I'm fancy.") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - "fancy-1.0.0.data/data/fancy/": b"""""", - "fancy-1.0.0.data/data/fancy/data.py": b"""\ - # put me in data - """, - "fancy-1.0.0.dist-info/": b"""""", - "fancy-1.0.0.dist-info/top_level.txt": b"""\ - fancy - """, - "fancy-1.0.0.dist-info/entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "fancy-1.0.0.dist-info/WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "fancy-1.0.0.dist-info/METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - # The RECORD file is indirectly validated by the WheelFile, since it only - # provides the items that are a part of the wheel. - "fancy-1.0.0.dist-info/RECORD": b"""\ - fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36 - fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61 - fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17 - fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6 - fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75 - fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84 - fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236 - fancy-1.0.0.dist-info/RECORD,, - """, - } - - with zipfile.ZipFile(path, "w") as archive: - for name, indented_content in files.items(): - archive.writestr( - name, - textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8"), - ) - - return path +import textwrap +import zipfile + +import pytest + + +@pytest.fixture +def fancy_wheel(tmp_path): + path = tmp_path / "fancy-1.0.0-py2.py3-none-any.whl" + files = { + "fancy/": b"""""", + "fancy/__init__.py": b"""\ + def main(): + print("I'm fancy.") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + "fancy-1.0.0.data/data/fancy/": b"""""", + "fancy-1.0.0.data/data/fancy/data.py": b"""\ + # put me in data + """, + "fancy-1.0.0.dist-info/": b"""""", + "fancy-1.0.0.dist-info/top_level.txt": b"""\ + fancy + """, + "fancy-1.0.0.dist-info/entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "fancy-1.0.0.dist-info/WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "fancy-1.0.0.dist-info/METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + # The RECORD file is indirectly validated by the WheelFile, since it only + # provides the items that are a part of the wheel. + "fancy-1.0.0.dist-info/RECORD": b"""\ + fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36 + fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61 + fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17 + fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6 + fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75 + fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84 + fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236 + fancy-1.0.0.dist-info/RECORD,, + """, + } + + with zipfile.ZipFile(path, "w") as archive: + for name, indented_content in files.items(): + archive.writestr( + name, + textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8"), + ) + + return path diff --git a/tests/test_core.py b/tests/test_core.py index c918c03b..a5674fd3 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,909 +1,909 @@ -import hashlib -import textwrap -from io import BytesIO -from unittest import mock - -import pytest - -from installer import install -from installer.exceptions import InvalidWheelSource -from installer.records import RecordEntry -from installer.sources import WheelSource - - -# -------------------------------------------------------------------------------------- -# Helpers -# -------------------------------------------------------------------------------------- -def hash_and_size(data): - return hashlib.sha256(data).hexdigest(), len(data) - - -@pytest.fixture -def mock_destination(): - retval = mock.Mock() - - # A hacky approach to making sure we got the right objects going in. - def custom_write_file(scheme, path, stream, is_executable): - assert isinstance(stream, BytesIO) - return (path, scheme, 0) - - def custom_write_script(name, module, attr, section): - return (name, module, attr, section) - - retval.write_file.side_effect = custom_write_file - retval.write_script.side_effect = custom_write_script - - return retval - - -class FakeWheelSource(WheelSource): - def __init__(self, *, distribution, version, regular_files, dist_info_files): - super().__init__(distribution, version) - - self.dist_info_files = { - file: textwrap.dedent(content.decode("utf-8")) - for file, content in dist_info_files.items() - } - self.regular_files = { - file: textwrap.dedent(content.decode("utf-8")).encode("utf-8") - for file, content in regular_files.items() - } - - # Compute RECORD file. - _records = [record for record, _, _ in self.get_contents()] - self.dist_info_files["RECORD"] = "\n".join( - sorted( - ",".join([file, "sha256=" + hash_, str(size)]) - for file, hash_, size in _records - ) - ) - - @property - def dist_info_filenames(self): - return list(self.dist_info_files) - - def read_dist_info(self, filename): - return self.dist_info_files[filename] - - def validate_record(self) -> None: - # Skip validation since the logic is different. - return - - def get_contents(self): - # Sort for deterministic behaviour for Python versions that do not preserve - # insertion order for dictionaries. - for file, content in sorted(self.regular_files.items()): - hashed, size = hash_and_size(content) - record = (file, f"sha256={hashed}", str(size)) - with BytesIO(content) as stream: - yield record, stream, False - - # Sort for deterministic behaviour for Python versions that do not preserve - # insertion order for dictionaries. - for file, text in sorted(self.dist_info_files.items()): - content = text.encode("utf-8") - hashed, size = hash_and_size(content) - record = ( - self.dist_info_dir + "/" + file, - f"sha256={hashed}", - str(size), - ) - with BytesIO(content) as stream: - yield record, stream, False - - -# -------------------------------------------------------------------------------------- -# Actual Tests -# -------------------------------------------------------------------------------------- -class TestInstall: - def test_calls_destination_correctly(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_script( - name="fancy", - module="fancy", - attr="main", - section="console", - ), - mock.call.write_script( - name="fancy-gui", - module="fancy", - attr="main", - section="gui", - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__main__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/entry_points.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="purelib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("scripts", ("fancy", "fancy", "main", "console")), - ("scripts", ("fancy-gui", "fancy", "main", "gui")), - ("purelib", ("fancy/__init__.py", "purelib", 0)), - ("purelib", ("fancy/__main__.py", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), - ( - "purelib", - ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), - ), - ( - "purelib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_no_entrypoints_is_ok(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__main__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="purelib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("purelib", ("fancy/__init__.py", "purelib", 0)), - ("purelib", ("fancy/__main__.py", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), - ( - "purelib", - ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), - ), - ( - "purelib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_handles_platlib(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: false - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_script( - name="fancy", - module="fancy", - attr="main", - section="console", - ), - mock.call.write_script( - name="fancy-gui", - module="fancy", - attr="main", - section="gui", - ), - mock.call.write_file( - scheme="platlib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy/__main__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/entry_points.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="platlib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("scripts", ("fancy", "fancy", "main", "console")), - ("scripts", ("fancy-gui", "fancy", "main", "gui")), - ("platlib", ("fancy/__init__.py", "platlib", 0)), - ("platlib", ("fancy/__main__.py", "platlib", 0)), - ("platlib", ("fancy-1.0.0.dist-info/METADATA", "platlib", 0)), - ("platlib", ("fancy-1.0.0.dist-info/WHEEL", "platlib", 0)), - ( - "platlib", - ("fancy-1.0.0.dist-info/entry_points.txt", "platlib", 0), - ), - ( - "platlib", - ("fancy-1.0.0.dist-info/top_level.txt", "platlib", 0), - ), - ( - "platlib", - ("fancy-1.0.0.dist-info/fun_file.txt", "platlib", 0), - ), - ( - "platlib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_accepts_newer_minor_wheel_versions(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.1 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - # no assertions necessary, since we want to make sure this test didn't - # raises errors. - assert True - - def test_rejects_newer_major_wheel_versions(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - def main(): - print("I'm a fancy package") - """, - "fancy/__main__.py": b"""\ - if __name__ == "__main__": - from . import main - main() - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 2.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - with pytest.raises(InvalidWheelSource) as ctx: - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - assert "Incompatible Wheel-Version" in str(ctx.value) - - def test_handles_data_properly(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/platlib/fancy/platlib.py": b"""\ - # put me in platlib - """, - "fancy-1.0.0.data/scripts/fancy/scripts.py": b"""\ - # put me in scripts - """, - "fancy-1.0.0.data/headers/fancy/headers.py": b"""\ - # put me in headers - """, - "fancy-1.0.0.data/data/fancy/data.py": b"""\ - # put me in data - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - install( - source=source, - destination=mock_destination, - additional_metadata={}, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_script( - name="fancy", - module="fancy", - attr="main", - section="console", - ), - mock.call.write_script( - name="fancy-gui", - module="fancy", - attr="main", - section="gui", - ), - mock.call.write_file( - scheme="data", - path="fancy/data.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="headers", - path="fancy/headers.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="platlib", - path="fancy/platlib.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/purelib.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="scripts", - path="fancy/scripts.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/WHEEL", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/entry_points.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/top_level.txt", - stream=mock.ANY, - is_executable=False, - ), - mock.call.finalize_installation( - scheme="purelib", - record_file_path="fancy-1.0.0.dist-info/RECORD", - records=[ - ("scripts", ("fancy", "fancy", "main", "console")), - ("scripts", ("fancy-gui", "fancy", "main", "gui")), - ("data", ("fancy/data.py", "data", 0)), - ("headers", ("fancy/headers.py", "headers", 0)), - ("platlib", ("fancy/platlib.py", "platlib", 0)), - ("purelib", ("fancy/purelib.py", "purelib", 0)), - ("scripts", ("fancy/scripts.py", "scripts", 0)), - ("purelib", ("fancy/__init__.py", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), - ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), - ( - "purelib", - ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), - ), - ( - "purelib", - ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), - ), - ( - "purelib", - RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), - ), - ], - ), - ] - ) - - def test_errors_out_when_given_invalid_scheme_in_data(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ - # put me in purelib - """, - "fancy-1.0.0.data/invalid/fancy/invalid.py": b"""\ - # i am invalid - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "entry_points.txt": b"""\ - [console_scripts] - fancy = fancy:main - - [gui_scripts] - fancy-gui = fancy:main - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - - with pytest.raises(InvalidWheelSource) as ctx: - install( - source=source, - destination=mock_destination, - additional_metadata={}, - ) - - assert "fancy-1.0.0.data/invalid/fancy/invalid.py" in str(ctx.value) - - def test_ensure_non_executable_for_additional_metadata(self, mock_destination): - # Create a fake wheel - source = FakeWheelSource( - distribution="fancy", - version="1.0.0", - regular_files={ - "fancy/__init__.py": b"""\ - # put me in purelib - """, - }, - dist_info_files={ - "top_level.txt": b"""\ - fancy - """, - "WHEEL": b"""\ - Wheel-Version: 1.0 - Generator: magic (1.0.0) - Root-Is-Purelib: true - Tag: py3-none-any - """, - "METADATA": b"""\ - Metadata-Version: 2.1 - Name: fancy - Version: 1.0.0 - Summary: A fancy package - Author: Agendaless Consulting - Author-email: nobody@example.com - License: MIT - Keywords: fancy amazing - Platform: UNKNOWN - Classifier: Intended Audience :: Developers - """, - }, - ) - all_contents = list(source.get_contents()) - source.get_contents = lambda: ( - (*contents, True) for (*contents, _) in all_contents - ) - install( - source=source, - destination=mock_destination, - additional_metadata={ - "fun_file.txt": b"this should be in dist-info!", - }, - ) - - mock_destination.assert_has_calls( - [ - mock.call.write_file( - scheme="purelib", - path="fancy/__init__.py", - stream=mock.ANY, - is_executable=True, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/METADATA", - stream=mock.ANY, - is_executable=True, - ), - mock.call.write_file( - scheme="purelib", - path="fancy-1.0.0.dist-info/fun_file.txt", - stream=mock.ANY, - is_executable=False, - ), - ], - any_order=True, - ) +import hashlib +import textwrap +from io import BytesIO +from unittest import mock + +import pytest + +from installer import install +from installer.exceptions import InvalidWheelSource +from installer.records import RecordEntry +from installer.sources import WheelSource + + +# -------------------------------------------------------------------------------------- +# Helpers +# -------------------------------------------------------------------------------------- +def hash_and_size(data): + return hashlib.sha256(data).hexdigest(), len(data) + + +@pytest.fixture +def mock_destination(): + retval = mock.Mock() + + # A hacky approach to making sure we got the right objects going in. + def custom_write_file(scheme, path, stream, is_executable): + assert isinstance(stream, BytesIO) + return (path, scheme, 0) + + def custom_write_script(name, module, attr, section): + return (name, module, attr, section) + + retval.write_file.side_effect = custom_write_file + retval.write_script.side_effect = custom_write_script + + return retval + + +class FakeWheelSource(WheelSource): + def __init__(self, *, distribution, version, regular_files, dist_info_files): + super().__init__(distribution, version) + + self.dist_info_files = { + file: textwrap.dedent(content.decode("utf-8")) + for file, content in dist_info_files.items() + } + self.regular_files = { + file: textwrap.dedent(content.decode("utf-8")).encode("utf-8") + for file, content in regular_files.items() + } + + # Compute RECORD file. + _records = [record for record, _, _ in self.get_contents()] + self.dist_info_files["RECORD"] = "\n".join( + sorted( + ",".join([file, "sha256=" + hash_, str(size)]) + for file, hash_, size in _records + ) + ) + + @property + def dist_info_filenames(self): + return list(self.dist_info_files) + + def read_dist_info(self, filename): + return self.dist_info_files[filename] + + def validate_record(self) -> None: + # Skip validation since the logic is different. + return + + def get_contents(self): + # Sort for deterministic behaviour for Python versions that do not preserve + # insertion order for dictionaries. + for file, content in sorted(self.regular_files.items()): + hashed, size = hash_and_size(content) + record = (file, f"sha256={hashed}", str(size)) + with BytesIO(content) as stream: + yield record, stream, False + + # Sort for deterministic behaviour for Python versions that do not preserve + # insertion order for dictionaries. + for file, text in sorted(self.dist_info_files.items()): + content = text.encode("utf-8") + hashed, size = hash_and_size(content) + record = ( + self.dist_info_dir + "/" + file, + f"sha256={hashed}", + str(size), + ) + with BytesIO(content) as stream: + yield record, stream, False + + +# -------------------------------------------------------------------------------------- +# Actual Tests +# -------------------------------------------------------------------------------------- +class TestInstall: + def test_calls_destination_correctly(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_script( + name="fancy", + module="fancy", + attr="main", + section="console", + ), + mock.call.write_script( + name="fancy-gui", + module="fancy", + attr="main", + section="gui", + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__main__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/entry_points.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="purelib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("scripts", ("fancy", "fancy", "main", "console")), + ("scripts", ("fancy-gui", "fancy", "main", "gui")), + ("purelib", ("fancy/__init__.py", "purelib", 0)), + ("purelib", ("fancy/__main__.py", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), + ( + "purelib", + ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), + ), + ( + "purelib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_no_entrypoints_is_ok(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__main__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="purelib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("purelib", ("fancy/__init__.py", "purelib", 0)), + ("purelib", ("fancy/__main__.py", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), + ( + "purelib", + ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/fun_file.txt", "purelib", 0), + ), + ( + "purelib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_handles_platlib(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: false + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_script( + name="fancy", + module="fancy", + attr="main", + section="console", + ), + mock.call.write_script( + name="fancy-gui", + module="fancy", + attr="main", + section="gui", + ), + mock.call.write_file( + scheme="platlib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy/__main__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/entry_points.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="platlib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("scripts", ("fancy", "fancy", "main", "console")), + ("scripts", ("fancy-gui", "fancy", "main", "gui")), + ("platlib", ("fancy/__init__.py", "platlib", 0)), + ("platlib", ("fancy/__main__.py", "platlib", 0)), + ("platlib", ("fancy-1.0.0.dist-info/METADATA", "platlib", 0)), + ("platlib", ("fancy-1.0.0.dist-info/WHEEL", "platlib", 0)), + ( + "platlib", + ("fancy-1.0.0.dist-info/entry_points.txt", "platlib", 0), + ), + ( + "platlib", + ("fancy-1.0.0.dist-info/top_level.txt", "platlib", 0), + ), + ( + "platlib", + ("fancy-1.0.0.dist-info/fun_file.txt", "platlib", 0), + ), + ( + "platlib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_accepts_newer_minor_wheel_versions(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.1 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + # no assertions necessary, since we want to make sure this test didn't + # raises errors. + assert True + + def test_rejects_newer_major_wheel_versions(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + def main(): + print("I'm a fancy package") + """, + "fancy/__main__.py": b"""\ + if __name__ == "__main__": + from . import main + main() + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 2.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + with pytest.raises(InvalidWheelSource) as ctx: + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + assert "Incompatible Wheel-Version" in str(ctx.value) + + def test_handles_data_properly(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/platlib/fancy/platlib.py": b"""\ + # put me in platlib + """, + "fancy-1.0.0.data/scripts/fancy/scripts.py": b"""\ + # put me in scripts + """, + "fancy-1.0.0.data/headers/fancy/headers.py": b"""\ + # put me in headers + """, + "fancy-1.0.0.data/data/fancy/data.py": b"""\ + # put me in data + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + install( + source=source, + destination=mock_destination, + additional_metadata={}, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_script( + name="fancy", + module="fancy", + attr="main", + section="console", + ), + mock.call.write_script( + name="fancy-gui", + module="fancy", + attr="main", + section="gui", + ), + mock.call.write_file( + scheme="data", + path="fancy/data.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="headers", + path="fancy/headers.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="platlib", + path="fancy/platlib.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/purelib.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="scripts", + path="fancy/scripts.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/WHEEL", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/entry_points.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/top_level.txt", + stream=mock.ANY, + is_executable=False, + ), + mock.call.finalize_installation( + scheme="purelib", + record_file_path="fancy-1.0.0.dist-info/RECORD", + records=[ + ("scripts", ("fancy", "fancy", "main", "console")), + ("scripts", ("fancy-gui", "fancy", "main", "gui")), + ("data", ("fancy/data.py", "data", 0)), + ("headers", ("fancy/headers.py", "headers", 0)), + ("platlib", ("fancy/platlib.py", "platlib", 0)), + ("purelib", ("fancy/purelib.py", "purelib", 0)), + ("scripts", ("fancy/scripts.py", "scripts", 0)), + ("purelib", ("fancy/__init__.py", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/METADATA", "purelib", 0)), + ("purelib", ("fancy-1.0.0.dist-info/WHEEL", "purelib", 0)), + ( + "purelib", + ("fancy-1.0.0.dist-info/entry_points.txt", "purelib", 0), + ), + ( + "purelib", + ("fancy-1.0.0.dist-info/top_level.txt", "purelib", 0), + ), + ( + "purelib", + RecordEntry("fancy-1.0.0.dist-info/RECORD", None, None), + ), + ], + ), + ] + ) + + def test_errors_out_when_given_invalid_scheme_in_data(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/purelib/fancy/purelib.py": b"""\ + # put me in purelib + """, + "fancy-1.0.0.data/invalid/fancy/invalid.py": b"""\ + # i am invalid + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "entry_points.txt": b"""\ + [console_scripts] + fancy = fancy:main + + [gui_scripts] + fancy-gui = fancy:main + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + + with pytest.raises(InvalidWheelSource) as ctx: + install( + source=source, + destination=mock_destination, + additional_metadata={}, + ) + + assert "fancy-1.0.0.data/invalid/fancy/invalid.py" in str(ctx.value) + + def test_ensure_non_executable_for_additional_metadata(self, mock_destination): + # Create a fake wheel + source = FakeWheelSource( + distribution="fancy", + version="1.0.0", + regular_files={ + "fancy/__init__.py": b"""\ + # put me in purelib + """, + }, + dist_info_files={ + "top_level.txt": b"""\ + fancy + """, + "WHEEL": b"""\ + Wheel-Version: 1.0 + Generator: magic (1.0.0) + Root-Is-Purelib: true + Tag: py3-none-any + """, + "METADATA": b"""\ + Metadata-Version: 2.1 + Name: fancy + Version: 1.0.0 + Summary: A fancy package + Author: Agendaless Consulting + Author-email: nobody@example.com + License: MIT + Keywords: fancy amazing + Platform: UNKNOWN + Classifier: Intended Audience :: Developers + """, + }, + ) + all_contents = list(source.get_contents()) + source.get_contents = lambda: ( + (*contents, True) for (*contents, _) in all_contents + ) + install( + source=source, + destination=mock_destination, + additional_metadata={ + "fun_file.txt": b"this should be in dist-info!", + }, + ) + + mock_destination.assert_has_calls( + [ + mock.call.write_file( + scheme="purelib", + path="fancy/__init__.py", + stream=mock.ANY, + is_executable=True, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/METADATA", + stream=mock.ANY, + is_executable=True, + ), + mock.call.write_file( + scheme="purelib", + path="fancy-1.0.0.dist-info/fun_file.txt", + stream=mock.ANY, + is_executable=False, + ), + ], + any_order=True, + ) From b5976682212a932c2d0ff8592de85c55d9cd9436 Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Tue, 15 Nov 2022 17:52:51 +0800 Subject: [PATCH 05/10] Generate RECORD automatically in test --- tests/conftest.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 065bdfe9..9c582dd3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import textwrap import zipfile +from base64 import urlsafe_b64encode +from hashlib import sha256 import pytest @@ -51,25 +53,21 @@ def main(): Platform: UNKNOWN Classifier: Intended Audience :: Developers """, - # The RECORD file is indirectly validated by the WheelFile, since it only - # provides the items that are a part of the wheel. - "fancy-1.0.0.dist-info/RECORD": b"""\ - fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36 - fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61 - fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17 - fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6 - fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75 - fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84 - fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236 - fancy-1.0.0.dist-info/RECORD,, - """, } + record_name = "fancy-1.0.0.dist-info/RECORD" + record_lines = [] + with zipfile.ZipFile(path, "w") as archive: for name, indented_content in files.items(): - archive.writestr( - name, - textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8"), - ) + data = textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8") + archive.writestr(name, data) + if name[-1:] != "/": # Only files go into RECORD + digest = sha256(data).digest() + value = urlsafe_b64encode(digest).decode("ascii").rstrip("=") + record_lines.append(f"{name},sha256={value},{len(data)}") + + record_lines.append(f"{record_name},,") + archive.writestr(record_name, "\n".join(record_lines).encode("utf-8")) return path From cd43e4a2f5f04c2b695c552d397ab483e8156a25 Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 27 Nov 2022 07:57:48 +0800 Subject: [PATCH 06/10] Remove `validate_record` parameter from `install` Add corresponding doc to `WheelSource.validate` --- src/installer/_core.py | 6 ------ src/installer/sources.py | 6 ++++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/installer/_core.py b/src/installer/_core.py index 39a34082..0b55b369 100644 --- a/src/installer/_core.py +++ b/src/installer/_core.py @@ -65,7 +65,6 @@ def install( source: WheelSource, destination: WheelDestination, additional_metadata: Dict[str, bytes], - validate_record: bool = True, ) -> None: """Install wheel described by ``source`` into ``destination``. @@ -73,7 +72,6 @@ def install( :param destination: where to write the wheel. :param additional_metadata: additional metadata files to generate, usually generated by the caller. - :param validate_record: whether to validate ``RECORD`` of the wheel. """ root_scheme = _process_WHEEL_file(source) @@ -81,10 +79,6 @@ def install( record_file_path = posixpath.join(source.dist_info_dir, "RECORD") written_records = [] - # Validate RECORD - if validate_record: - source.validate_record() - # Write the entry_points based scripts. if "entry_points.txt" in source.dist_info_filenames: entrypoints_text = source.read_dist_info("entry_points.txt") diff --git a/src/installer/sources.py b/src/installer/sources.py index 458dd699..4a488b49 100644 --- a/src/installer/sources.py +++ b/src/installer/sources.py @@ -79,8 +79,10 @@ def read_dist_info(self, filename: str) -> str: def validate_record(self) -> None: """Validate ``RECORD`` of the wheel. + This method should be called before :py:func:`install ` + if validation is required. A ``ValidationError`` will be raised if any file in the wheel - is not both mentioned and hashed. + is not both mentioned and properly hashed. """ raise NotImplementedError @@ -187,7 +189,7 @@ def validate_record(self) -> None: f"In {self._zipfile.filename}, {item.filename} is not mentioned in RECORD" ) elif not record[1] and item.filename != f"{self.dist_info_dir}/RECORD": - # Empty hash, skip unless it's RECORD + # Empty hash, report unless it's RECORD issues.append( f"In {self._zipfile.filename}, hash of {item.filename} is not included in RECORD" ) From 040eccb62f8fab8c4fbb4fcf9262e61b7807c715 Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 27 Nov 2022 08:39:53 +0800 Subject: [PATCH 07/10] chore: modify test function names --- tests/test_sources.py | 119 ++++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 50 deletions(-) diff --git a/tests/test_sources.py b/tests/test_sources.py index 81094d11..2cccc45f 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -96,55 +96,74 @@ def test_provides_correct_contents(self, fancy_wheel): assert sorted(got_records) == sorted(expected_records) assert got_files == files - def modify_wheel_record(self, fancy_wheel, manipulation_func): - """Helper function for modifying RECORD in the wheel file. - - Exists because ZipFile doesn't support remove. - """ - files = {} - # Read everything except RECORD and add it back immediately - with zipfile.ZipFile(fancy_wheel) as archive: - for file in archive.namelist(): - # Call manipulation function so that we can add RECORD back. - if file.endswith("RECORD"): - manipulation_func(files, file, archive.read(file)) - continue +def replace_file_in_zip(path: str, filename: str, content: "bytes | None") -> None: + """Helper function for replacing a file in the zip. + + Exists because ZipFile doesn't support remove. + """ + files = {} + # Copy everything except `filename`, and replace it with `content`. + with zipfile.ZipFile(path) as archive: + for file in archive.namelist(): + if file == filename: + if content is None: + continue # Remove the file + files[file] = content + else: files[file] = archive.read(file) - # Replace original archive - with zipfile.ZipFile(fancy_wheel, mode="w") as archive: - for name, content in files.items(): - archive.writestr(name, content) - - def test_validation_error_no_record(self, fancy_wheel): - # Replace the wheel without adding RECORD - self.modify_wheel_record(fancy_wheel, lambda *_: None) - with WheelFile.open(fancy_wheel) as w: - with pytest.raises( - WheelValidationError, match="Unable to retrieve `RECORD`" - ): - w.validate_record() - - def test_validation_error_record_missing_file(self, fancy_wheel): - def modifier(file_dict, file_name, data): - # Throw away first two entries - file_dict[file_name] = b"\n".join(data.split(b"\n")[2:]) - - self.modify_wheel_record(fancy_wheel, modifier) - with WheelFile.open(fancy_wheel) as w: - with pytest.raises(WheelValidationError, match="not mentioned in RECORD"): - w.validate_record() - - def test_validation_error_record_missing_hash(self, fancy_wheel): - def modifier(file_dict, file_name, data): - # Extract filename and write back without hash or size - file_dict[file_name] = b"\n".join( - line.split(b",")[0] + b",," for line in data.split(b"\n")[2:] - ) - - self.modify_wheel_record(fancy_wheel, modifier) - with WheelFile.open(fancy_wheel) as w: - with pytest.raises( - WheelValidationError, match="hash of (.+) is not included in RECORD" - ): - w.validate_record() + # Replace original archive + with zipfile.ZipFile(path, mode="w") as archive: + for name, content in files.items(): + archive.writestr(name, content) + + +def test_rejects_no_record_on_validate(fancy_wheel): + # Remove RECORD + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=None, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises(WheelValidationError, match="Unable to retrieve `RECORD`"): + source.validate_record() + + +def test_rejects_record_missing_file_on_validate(fancy_wheel): + with zipfile.ZipFile(fancy_wheel) as archive: + with archive.open("fancy-1.0.0.dist-info/RECORD") as f: + record_file_contents = f.read() + + # Remove the first two entries from the RECORD file + new_record_file_contents = b"\n".join(record_file_contents.split(b"\n")[2:]) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=new_record_file_contents, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises(WheelValidationError, match="not mentioned in RECORD"): + source.validate_record() + + +def test_validation_error_record_missing_hash(fancy_wheel): + with zipfile.ZipFile(fancy_wheel) as archive: + with archive.open("fancy-1.0.0.dist-info/RECORD") as f: + record_file_contents = f.read() + + # Remove the first two entries from the RECORD file + new_record_file_contents = b"\n".join( + line.split(b",")[0] + b",," # file name with empty size and hash + for line in record_file_contents.split(b"\n") + ) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=new_record_file_contents, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelValidationError, match="hash of (.+) is not included in RECORD" + ): + source.validate_record() From 7db618b422d105101239679dead705121f60e02a Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 27 Nov 2022 08:43:33 +0800 Subject: [PATCH 08/10] revert b597668 --- tests/conftest.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9c582dd3..b74770db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -53,21 +53,23 @@ def main(): Platform: UNKNOWN Classifier: Intended Audience :: Developers """, + "fancy-1.0.0.dist-info/RECORD": b"""\ + fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36 + fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61 + fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17 + fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6 + fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75 + fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84 + fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236 + fancy-1.0.0.dist-info/RECORD,, + """, } - record_name = "fancy-1.0.0.dist-info/RECORD" - record_lines = [] - with zipfile.ZipFile(path, "w") as archive: for name, indented_content in files.items(): - data = textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8") - archive.writestr(name, data) - if name[-1:] != "/": # Only files go into RECORD - digest = sha256(data).digest() - value = urlsafe_b64encode(digest).decode("ascii").rstrip("=") - record_lines.append(f"{name},sha256={value},{len(data)}") - - record_lines.append(f"{record_name},,") - archive.writestr(record_name, "\n".join(record_lines).encode("utf-8")) + archive.writestr( + name, + textwrap.dedent(indented_content.decode("utf-8")).encode("utf-8"), + ) return path From 784fec4466c4d6025385aed4e6dc1000bfc80e99 Mon Sep 17 00:00:00 2001 From: BlueGlassBlock Date: Sun, 27 Nov 2022 13:22:34 +0800 Subject: [PATCH 09/10] Add `validate_file` arg switch to `validation_error` property Fix `RECORD.jws` --- src/installer/sources.py | 80 ++++++++++++++++++++++++++-------------- tests/conftest.py | 2 - tests/test_sources.py | 76 +++++++++++++++++++++++++++++++++++--- 3 files changed, 122 insertions(+), 36 deletions(-) diff --git a/src/installer/sources.py b/src/installer/sources.py index 4a488b49..8bddcfce 100644 --- a/src/installer/sources.py +++ b/src/installer/sources.py @@ -5,26 +5,15 @@ import stat import zipfile from contextlib import contextmanager -from typing import BinaryIO, Iterator, List, Tuple, cast +from typing import BinaryIO, ClassVar, Iterator, List, Tuple, Type, cast -from installer.records import parse_record_file +from installer.records import RecordEntry, parse_record_file from installer.utils import parse_wheel_filename WheelContentElement = Tuple[Tuple[str, str, str], BinaryIO, bool] -__all__ = ["WheelSource", "WheelFile", "WheelValidationError"] - - -class WheelValidationError(Exception): - """Raised when a wheel fails validation.""" - - def __init__(self, issues: List[str]) -> None: # noqa: D107 - super().__init__(repr(issues)) - self.issues = issues - - def __repr__(self) -> str: - return f"WheelValidationError(issues={self.issues!r})" +__all__ = ["WheelSource", "WheelFile"] class WheelSource: @@ -33,6 +22,8 @@ class WheelSource: This is an abstract class, whose methods have to be implemented by subclasses. """ + validation_error: ClassVar[Type[Exception]] = ValueError + def __init__(self, distribution: str, version: str) -> None: """Initialize a WheelSource object. @@ -81,8 +72,6 @@ def validate_record(self) -> None: This method should be called before :py:func:`install ` if validation is required. - A ``ValidationError`` will be raised if any file in the wheel - is not both mentioned and properly hashed. """ raise NotImplementedError @@ -112,6 +101,17 @@ def get_contents(self) -> Iterator[WheelContentElement]: raise NotImplementedError +class _WheelFileValidationError(ValueError): + """Raised when a wheel file fails validation.""" + + def __init__(self, issues: List[str]) -> None: # noqa: D107 + super().__init__(repr(issues)) + self.issues = issues + + def __repr__(self) -> str: + return f"WheelFileValidationError(issues={self.issues!r})" + + class WheelFile(WheelSource): """Implements `WheelSource`, for an existing file from the filesystem. @@ -121,6 +121,8 @@ class WheelFile(WheelSource): ... installer.install(source, destination) """ + validation_error = _WheelFileValidationError + def __init__(self, f: zipfile.ZipFile) -> None: """Initialize a WheelFile object. @@ -159,13 +161,23 @@ def read_dist_info(self, filename: str) -> str: path = posixpath.join(self.dist_info_dir, filename) return self._zipfile.read(path).decode("utf-8") - def validate_record(self) -> None: - """Validate ``RECORD`` of the wheel.""" + def validate_record(self, validate_file: bool = True) -> None: + """Validate ``RECORD`` of the wheel. + + This method should be called before :py:func:`install ` + if validation is required. + + File names will always be validated against ``RECORD``. + If ``validate_file`` is true, every file in the archive will be validated + against corresponding entry in ``RECORD``. + + :param validate_file: Whether to validate content integrity. + """ try: record_lines = self.read_dist_info("RECORD").splitlines() records = parse_record_file(record_lines) except Exception as exc: - raise WheelValidationError( + raise _WheelFileValidationError( [f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"] ) from exc @@ -176,26 +188,38 @@ def validate_record(self) -> None: if item.filename[-1:] == "/": # looks like a directory continue - record = record_mapping.pop(item.filename, None) + record_args = record_mapping.pop(item.filename, None) if self.dist_info_dir == posixpath.commonprefix( [self.dist_info_dir, item.filename] - ) and item.filename.split("/")[-1] not in ("RECORD.p7s", "RECORD.jwt"): + ) and item.filename.split("/")[-1] in ("RECORD.p7s", "RECORD.jws"): # both are for digital signatures, and not mentioned in RECORD continue - if record is None: + if record_args is None: issues.append( f"In {self._zipfile.filename}, {item.filename} is not mentioned in RECORD" ) - elif not record[1] and item.filename != f"{self.dist_info_dir}/RECORD": - # Empty hash, report unless it's RECORD - issues.append( - f"In {self._zipfile.filename}, hash of {item.filename} is not included in RECORD" - ) + continue + + if item.filename == f"{self.dist_info_dir}/RECORD": + # Skip record file + continue + if validate_file: + record = RecordEntry.from_elements(*record_args) + data = self._zipfile.read(item) + if record.hash_ is None or record.size is None: + # Report empty hash / size + issues.append( + f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD" + ) + elif not record.validate(data): + issues.append( + f"In {self._zipfile.filename}, hash / size of {item.filename} didn't match RECORD" + ) if issues: - raise WheelValidationError(issues) + raise _WheelFileValidationError(issues) def get_contents(self) -> Iterator[WheelContentElement]: """Sequential access to all contents of the wheel (including dist-info files). diff --git a/tests/conftest.py b/tests/conftest.py index b74770db..3a258e04 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,5 @@ import textwrap import zipfile -from base64 import urlsafe_b64encode -from hashlib import sha256 import pytest diff --git a/tests/test_sources.py b/tests/test_sources.py index 2cccc45f..4ae4fb47 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -1,10 +1,13 @@ +import json import posixpath import zipfile +from base64 import urlsafe_b64encode +from hashlib import sha256 import pytest from installer.records import parse_record_file -from installer.sources import WheelFile, WheelSource, WheelValidationError +from installer.sources import WheelFile, WheelSource class TestWheelSource: @@ -126,7 +129,9 @@ def test_rejects_no_record_on_validate(fancy_wheel): content=None, ) with WheelFile.open(fancy_wheel) as source: - with pytest.raises(WheelValidationError, match="Unable to retrieve `RECORD`"): + with pytest.raises( + WheelFile.validation_error, match="Unable to retrieve `RECORD`" + ): source.validate_record() @@ -143,16 +148,37 @@ def test_rejects_record_missing_file_on_validate(fancy_wheel): content=new_record_file_contents, ) with WheelFile.open(fancy_wheel) as source: - with pytest.raises(WheelValidationError, match="not mentioned in RECORD"): + with pytest.raises(WheelFile.validation_error, match="not mentioned in RECORD"): source.validate_record() -def test_validation_error_record_missing_hash(fancy_wheel): +def test_rejects_record_missing_hash(fancy_wheel): + with zipfile.ZipFile(fancy_wheel) as archive: + with archive.open("fancy-1.0.0.dist-info/RECORD") as f: + record_file_contents = f.read() + + new_record_file_contents = b"\n".join( + line.split(b",")[0] + b",," # file name with empty size and hash + for line in record_file_contents.split(b"\n") + ) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=new_record_file_contents, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, + match="hash / size of (.+) is not included in RECORD", + ): + source.validate_record() + + +def test_accept_record_missing_hash_on_skip_validation(fancy_wheel): with zipfile.ZipFile(fancy_wheel) as archive: with archive.open("fancy-1.0.0.dist-info/RECORD") as f: record_file_contents = f.read() - # Remove the first two entries from the RECORD file new_record_file_contents = b"\n".join( line.split(b",")[0] + b",," # file name with empty size and hash for line in record_file_contents.split(b"\n") @@ -162,8 +188,46 @@ def test_validation_error_record_missing_hash(fancy_wheel): filename="fancy-1.0.0.dist-info/RECORD", content=new_record_file_contents, ) + with WheelFile.open(fancy_wheel) as source: + source.validate_record(validate_file=False) + + +def test_accept_wheel_with_signed_file(fancy_wheel): + with zipfile.ZipFile(fancy_wheel) as archive: + with archive.open("fancy-1.0.0.dist-info/RECORD") as f: + record_file_contents = f.read() + hash_b64_nopad = ( + urlsafe_b64encode(sha256(record_file_contents).digest()) + .decode("utf-8") + .rstrip("=") + ) + jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"}) + with zipfile.ZipFile(fancy_wheel, "a") as archive: + archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content) + with WheelFile.open(fancy_wheel) as source: + source.validate_record() + + +def test_rejects_record_validation_failed(fancy_wheel): + with zipfile.ZipFile(fancy_wheel) as archive: + with archive.open("fancy-1.0.0.dist-info/RECORD") as f: + record_file_contents = f.read() + + new_record_file_contents = b"\n".join( + line.split(b",")[0] # Original filename + + b",sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A," + + line.split(b",")[2] # Original size + for line in record_file_contents.split(b"\n") + if line # ignore trail endline + ) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=new_record_file_contents, + ) with WheelFile.open(fancy_wheel) as source: with pytest.raises( - WheelValidationError, match="hash of (.+) is not included in RECORD" + WheelFile.validation_error, + match="hash / size of (.+) didn't match RECORD", ): source.validate_record() From 38288f0739229d28cd767c9e97f600df031a0bc5 Mon Sep 17 00:00:00 2001 From: Nyuan Zhang Date: Mon, 28 Nov 2022 08:41:37 +0800 Subject: [PATCH 10/10] Update _core.py --- src/installer/_core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/installer/_core.py b/src/installer/_core.py index 0b55b369..9a02728f 100644 --- a/src/installer/_core.py +++ b/src/installer/_core.py @@ -72,6 +72,7 @@ def install( :param destination: where to write the wheel. :param additional_metadata: additional metadata files to generate, usually generated by the caller. + """ root_scheme = _process_WHEEL_file(source)