From 50ed1ccbb3e9df1d443db3c7f6e59cbfd5cf25a3 Mon Sep 17 00:00:00 2001 From: Nyuan Zhang Date: Fri, 3 Mar 2023 06:09:18 +0800 Subject: [PATCH] Refactor `validate_record` (#167) --- src/installer/sources.py | 43 ++++-- tests/test_sources.py | 297 +++++++++++++++++++++++---------------- 2 files changed, 202 insertions(+), 138 deletions(-) diff --git a/src/installer/sources.py b/src/installer/sources.py index 0d70f103..e752b465 100644 --- a/src/installer/sources.py +++ b/src/installer/sources.py @@ -188,27 +188,29 @@ def read_dist_info(self, filename: str) -> str: path = posixpath.join(self.dist_info_dir, filename) return self._zipfile.read(path).decode("utf-8") - def validate_record(self, validate_file: bool = True) -> None: + def validate_record(self, *, validate_contents: bool = True) -> None: """Validate ``RECORD`` of the wheel. This method should be called before :py:func:`install ` if validation is required. File names will always be validated against ``RECORD``. - If ``validate_file`` is true, every file in the archive will be validated - against corresponding entry in ``RECORD``. - :param validate_file: Whether to validate content integrity. + If ``validate_contents`` is true, sizes and hashes of files + will also be validated against ``RECORD``. + + :param validate_contents: Whether to validate content integrity. """ try: record_lines = self.read_dist_info("RECORD").splitlines() - records = parse_record_file(record_lines) + record_mapping = { + record[0]: record for record in parse_record_file(record_lines) + } except Exception as exc: raise _WheelFileValidationError( [f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"] ) from exc - record_mapping = {record[0]: record for record in records} issues: List[str] = [] for item in self._zipfile.infolist(): @@ -221,6 +223,11 @@ def validate_record(self, validate_file: bool = True) -> None: [self.dist_info_dir, item.filename] ) and item.filename.split("/")[-1] in ("RECORD.p7s", "RECORD.jws"): # both are for digital signatures, and not mentioned in RECORD + if record_args is not None: + # Incorrectly contained + issues.append( + f"In {self._zipfile.filename}, digital signature file {item.filename} is incorrectly contained in RECORD." + ) continue if record_args is None: @@ -229,18 +236,24 @@ def validate_record(self, validate_file: bool = True) -> None: ) continue + record = RecordEntry.from_elements(*record_args) + if item.filename == f"{self.dist_info_dir}/RECORD": - # Skip record file - continue - if validate_file: - record = RecordEntry.from_elements(*record_args) - data = self._zipfile.read(item) - if record.hash_ is None or record.size is None: - # Report empty hash / size + # Assert that RECORD doesn't have size and hash. + if record.hash_ is not None or record.size is not None: + # Incorrectly contained hash / size issues.append( - f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD" + f"In {self._zipfile.filename}, RECORD file incorrectly contains hash / size." ) - elif not record.validate(data): + continue + if record.hash_ is None or record.size is None: + # Report empty hash / size + issues.append( + f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD" + ) + if validate_contents: + data = self._zipfile.read(item) + if not record.validate(data): issues.append( f"In {self._zipfile.filename}, hash / size of {item.filename} didn't match RECORD" ) diff --git a/tests/test_sources.py b/tests/test_sources.py index ec71351f..4811c533 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -37,6 +37,27 @@ def test_raises_not_implemented_error(self): source.validate_record() +def replace_file_in_zip(path: str, filename: str, content: "str | None") -> None: + """Helper function for replacing a file in the zip. + + Exists because ZipFile doesn't support remove. + """ + files = {} + # Copy everything except `filename`, and replace it with `content`. + with zipfile.ZipFile(path) as archive: + for file in archive.namelist(): + if file == filename: + if content is None: + continue # Remove the file + files[file] = content.encode() + else: + files[file] = archive.read(file) + # Replace original archive + with zipfile.ZipFile(path, mode="w") as archive: + for name, content in files.items(): + archive.writestr(name, content) + + class TestWheelFile: def test_rejects_not_okay_name(self, tmp_path): # Create an empty zipfile @@ -116,135 +137,165 @@ def test_requires_dist_info_name_match(self, fancy_wheel): with WheelFile.open(misnamed) as source: source.dist_info_filenames + def test_rejects_no_record_on_validate(self, fancy_wheel): + # Remove RECORD + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=None, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, match="Unable to retrieve `RECORD`" + ): + source.validate_record(validate_contents=False) -def replace_file_in_zip(path: str, filename: str, content: "bytes | None") -> None: - """Helper function for replacing a file in the zip. - - Exists because ZipFile doesn't support remove. - """ - files = {} - # Copy everything except `filename`, and replace it with `content`. - with zipfile.ZipFile(path) as archive: - for file in archive.namelist(): - if file == filename: - if content is None: - continue # Remove the file - files[file] = content - else: - files[file] = archive.read(file) - # Replace original archive - with zipfile.ZipFile(path, mode="w") as archive: - for name, content in files.items(): - archive.writestr(name, content) + def test_rejects_invalid_record_entry(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content="\n".join( + line.replace("sha256=", "") for line in record_file_contents + ), + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, + match="Unable to retrieve `RECORD`", + ): + source.validate_record() + def test_rejects_record_missing_file_on_validate(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + + # Remove the first two entries from the RECORD file + new_record_file_contents = "\n".join(record_file_contents.split("\n")[2:]) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=new_record_file_contents, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, match="not mentioned in RECORD" + ): + source.validate_record(validate_contents=False) -def test_rejects_no_record_on_validate(fancy_wheel): - # Remove RECORD - replace_file_in_zip( - fancy_wheel, - filename="fancy-1.0.0.dist-info/RECORD", - content=None, - ) - with WheelFile.open(fancy_wheel) as source: - with pytest.raises( - WheelFile.validation_error, match="Unable to retrieve `RECORD`" - ): - source.validate_record() + def test_rejects_record_missing_hash(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + new_record_file_contents = "\n".join( + line.split(",")[0] + ",," # file name with empty size and hash + for line in record_file_contents.split("\n") + ) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=new_record_file_contents, + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, + match="hash / size of (.+) is not included in RECORD", + ): + source.validate_record(validate_contents=False) -def test_rejects_record_missing_file_on_validate(fancy_wheel): - with zipfile.ZipFile(fancy_wheel) as archive: - with archive.open("fancy-1.0.0.dist-info/RECORD") as f: - record_file_contents = f.read() - - # Remove the first two entries from the RECORD file - new_record_file_contents = b"\n".join(record_file_contents.split(b"\n")[2:]) - replace_file_in_zip( - fancy_wheel, - filename="fancy-1.0.0.dist-info/RECORD", - content=new_record_file_contents, - ) - with WheelFile.open(fancy_wheel) as source: - with pytest.raises(WheelFile.validation_error, match="not mentioned in RECORD"): + def test_accept_wheel_with_signature_file(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + hash_b64_nopad = ( + urlsafe_b64encode(sha256(record_file_contents.encode()).digest()) + .decode("utf-8") + .rstrip("=") + ) + jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"}) + with zipfile.ZipFile(fancy_wheel, "a") as archive: + archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content) + with WheelFile.open(fancy_wheel) as source: source.validate_record() + def test_reject_signature_file_in_record(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + record_hash_nopad = ( + urlsafe_b64encode(sha256(record_file_contents.encode()).digest()) + .decode("utf-8") + .rstrip("=") + ) + jws_content = json.dumps({"hash": f"sha256={record_hash_nopad}"}) + with zipfile.ZipFile(fancy_wheel, "a") as archive: + archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content) + + # Add signature file to RECORD + jws_content = jws_content.encode() + jws_hash_nopad = ( + urlsafe_b64encode(sha256(jws_content).digest()).decode("utf-8").rstrip("=") + ) + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content=record_file_contents.rstrip("\n") + + f"\nfancy-1.0.0.dist-info/RECORD.jws,sha256={jws_hash_nopad},{len(jws_content)}\n", + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, + match="digital signature file (.+) is incorrectly contained in RECORD.", + ): + source.validate_record(validate_contents=False) -def test_rejects_record_missing_hash(fancy_wheel): - with zipfile.ZipFile(fancy_wheel) as archive: - with archive.open("fancy-1.0.0.dist-info/RECORD") as f: - record_file_contents = f.read() - - new_record_file_contents = b"\n".join( - line.split(b",")[0] + b",," # file name with empty size and hash - for line in record_file_contents.split(b"\n") - ) - replace_file_in_zip( - fancy_wheel, - filename="fancy-1.0.0.dist-info/RECORD", - content=new_record_file_contents, - ) - with WheelFile.open(fancy_wheel) as source: - with pytest.raises( - WheelFile.validation_error, - match="hash / size of (.+) is not included in RECORD", - ): - source.validate_record() - + def test_rejects_record_contain_self_hash(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + + new_record_file_lines = [] + for line in record_file_contents.split("\n"): + if not line: + continue + filename, hash_, size = line.split(",") + if filename.split("/")[-1] == "RECORD": + hash_ = "sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A" + size = str(len(record_file_contents)) + new_record_file_lines.append(",".join((filename, hash_, size))) + + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content="\n".join(new_record_file_lines), + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, + match="RECORD file incorrectly contains hash / size.", + ): + source.validate_record(validate_contents=False) -def test_accept_record_missing_hash_on_skip_validation(fancy_wheel): - with zipfile.ZipFile(fancy_wheel) as archive: - with archive.open("fancy-1.0.0.dist-info/RECORD") as f: - record_file_contents = f.read() - - new_record_file_contents = b"\n".join( - line.split(b",")[0] + b",," # file name with empty size and hash - for line in record_file_contents.split(b"\n") - ) - replace_file_in_zip( - fancy_wheel, - filename="fancy-1.0.0.dist-info/RECORD", - content=new_record_file_contents, - ) - with WheelFile.open(fancy_wheel) as source: - source.validate_record(validate_file=False) - - -def test_accept_wheel_with_signed_file(fancy_wheel): - with zipfile.ZipFile(fancy_wheel) as archive: - with archive.open("fancy-1.0.0.dist-info/RECORD") as f: - record_file_contents = f.read() - hash_b64_nopad = ( - urlsafe_b64encode(sha256(record_file_contents).digest()) - .decode("utf-8") - .rstrip("=") - ) - jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"}) - with zipfile.ZipFile(fancy_wheel, "a") as archive: - archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content) - with WheelFile.open(fancy_wheel) as source: - source.validate_record() - - -def test_rejects_record_validation_failed(fancy_wheel): - with zipfile.ZipFile(fancy_wheel) as archive: - with archive.open("fancy-1.0.0.dist-info/RECORD") as f: - record_file_contents = f.read() - - new_record_file_contents = b"\n".join( - line.split(b",")[0] # Original filename - + b",sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A," - + line.split(b",")[2] # Original size - for line in record_file_contents.split(b"\n") - if line # ignore trail endline - ) - replace_file_in_zip( - fancy_wheel, - filename="fancy-1.0.0.dist-info/RECORD", - content=new_record_file_contents, - ) - with WheelFile.open(fancy_wheel) as source: - with pytest.raises( - WheelFile.validation_error, - match="hash / size of (.+) didn't match RECORD", - ): - source.validate_record() + def test_rejects_record_validation_failed(self, fancy_wheel): + with WheelFile.open(fancy_wheel) as source: + record_file_contents = source.read_dist_info("RECORD") + + new_record_file_lines = [] + for line in record_file_contents.split("\n"): + if not line: + continue + filename, hash_, size = line.split(",") + if filename.split("/")[-1] != "RECORD": + hash_ = "sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A" + new_record_file_lines.append(",".join((filename, hash_, size))) + + replace_file_in_zip( + fancy_wheel, + filename="fancy-1.0.0.dist-info/RECORD", + content="\n".join(new_record_file_lines), + ) + with WheelFile.open(fancy_wheel) as source: + with pytest.raises( + WheelFile.validation_error, + match="hash / size of (.+) didn't match RECORD", + ): + source.validate_record()