Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INVALID] Refactor validate_record #166

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions src/installer/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,27 +188,29 @@ def read_dist_info(self, filename: str) -> str:
path = posixpath.join(self.dist_info_dir, filename)
return self._zipfile.read(path).decode("utf-8")

def validate_record(self, validate_file: bool = True) -> None:
def validate_record(self, *, validate_contents: bool = True) -> None:
"""Validate ``RECORD`` of the wheel.

This method should be called before :py:func:`install <installer.install>`
if validation is required.

File names will always be validated against ``RECORD``.
If ``validate_file`` is true, every file in the archive will be validated
against corresponding entry in ``RECORD``.

:param validate_file: Whether to validate content integrity.
If ``validate_contents`` is true, sizes and hashes of files
will also be validated against ``RECORD``.

:param validate_contents: Whether to validate content integrity.
"""
try:
record_lines = self.read_dist_info("RECORD").splitlines()
records = parse_record_file(record_lines)
record_mapping = {
record[0]: record for record in parse_record_file(record_lines)
}
except Exception as exc:
raise _WheelFileValidationError(
[f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"]
) from exc

record_mapping = {record[0]: record for record in records}
issues: List[str] = []

for item in self._zipfile.infolist():
Expand All @@ -221,6 +223,11 @@ def validate_record(self, validate_file: bool = True) -> None:
[self.dist_info_dir, item.filename]
) and item.filename.split("/")[-1] in ("RECORD.p7s", "RECORD.jws"):
# both are for digital signatures, and not mentioned in RECORD
if record_args is not None:
# Incorrectly contained
issues.append(
f"In {self._zipfile.filename}, digital signature file {item.filename} is incorrectly contained in RECORD."
)
continue

if record_args is None:
Expand All @@ -229,18 +236,24 @@ def validate_record(self, validate_file: bool = True) -> None:
)
continue

record = RecordEntry.from_elements(*record_args)

if item.filename == f"{self.dist_info_dir}/RECORD":
# Skip record file
continue
if validate_file:
record = RecordEntry.from_elements(*record_args)
data = self._zipfile.read(item)
if record.hash_ is None or record.size is None:
# Report empty hash / size
# Assert that RECORD doesn't have size and hash.
if record.hash_ is not None or record.size is not None:
# Incorrectly contained hash / size
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD"
f"In {self._zipfile.filename}, RECORD file incorrectly contains hash / size."
)
elif not record.validate(data):
continue
if record.hash_ is None or record.size is None:
# Report empty hash / size
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD"
)
if validate_contents:
data = self._zipfile.read(item)
if not record.validate(data):
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} didn't match RECORD"
)
Expand Down
301 changes: 177 additions & 124 deletions tests/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,134 +117,187 @@ def test_requires_dist_info_name_match(self, fancy_wheel):
source.dist_info_filenames


def replace_file_in_zip(path: str, filename: str, content: "bytes | None") -> None:
"""Helper function for replacing a file in the zip.

Exists because ZipFile doesn't support remove.
"""
files = {}
# Copy everything except `filename`, and replace it with `content`.
with zipfile.ZipFile(path) as archive:
for file in archive.namelist():
if file == filename:
if content is None:
continue # Remove the file
files[file] = content
else:
files[file] = archive.read(file)
# Replace original archive
with zipfile.ZipFile(path, mode="w") as archive:
for name, content in files.items():
archive.writestr(name, content)


def test_rejects_no_record_on_validate(fancy_wheel):
# Remove RECORD
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=None,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="Unable to retrieve `RECORD`"
):
source.validate_record()
class TestRecordValidation:
@staticmethod
def replace_file_in_zip(path: str, filename: str, content: "str | None") -> None:
"""Helper function for replacing a file in the zip.

Exists because ZipFile doesn't support remove.
"""
files = {}
# Copy everything except `filename`, and replace it with `content`.
with zipfile.ZipFile(path) as archive:
for file in archive.namelist():
if file == filename:
if content is None:
continue # Remove the file
files[file] = content.encode()
else:
files[file] = archive.read(file)
# Replace original archive
with zipfile.ZipFile(path, mode="w") as archive:
for name, content in files.items():
archive.writestr(name, content)

def test_rejects_no_record_on_validate(self, fancy_wheel):
# Remove RECORD
self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=None,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="Unable to retrieve `RECORD`"
):
source.validate_record(validate_contents=False)

def test_rejects_record_missing_file_on_validate(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

# Remove the first two entries from the RECORD file
new_record_file_contents = b"\n".join(record_file_contents.split(b"\n")[2:])
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(WheelFile.validation_error, match="not mentioned in RECORD"):
source.validate_record()
def test_record_invalid_record_entry(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content="\n".join(
line.replace("sha256=", "") for line in record_file_contents
),
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="Unable to retrieve `RECORD`",
):
source.validate_record()

def test_rejects_record_missing_file_on_validate(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

# Remove the first two entries from the RECORD file
new_record_file_contents = "\n".join(record_file_contents.split("\n")[2:])
self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="not mentioned in RECORD"
):
source.validate_record(validate_contents=False)

def test_rejects_record_missing_hash(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) is not included in RECORD",
):
source.validate_record()
def test_rejects_record_missing_hash(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

new_record_file_contents = "\n".join(
line.split(",")[0] + ",," # file name with empty size and hash
for line in record_file_contents.split("\n")
)
self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) is not included in RECORD",
):
source.validate_record(validate_contents=False)

def test_accept_record_missing_hash_on_skip_validation(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
source.validate_record(validate_file=False)


def test_accept_wheel_with_signed_file(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()
hash_b64_nopad = (
urlsafe_b64encode(sha256(record_file_contents).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)
with WheelFile.open(fancy_wheel) as source:
source.validate_record()


def test_rejects_record_validation_failed(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] # Original filename
+ b",sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A,"
+ line.split(b",")[2] # Original size
for line in record_file_contents.split(b"\n")
if line # ignore trail endline
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) didn't match RECORD",
):
def test_accept_wheel_with_signature_file(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")
hash_b64_nopad = (
urlsafe_b64encode(sha256(record_file_contents.encode()).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)
with WheelFile.open(fancy_wheel) as source:
source.validate_record()

def test_reject_signature_file_in_record(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")
record_hash_nopad = (
urlsafe_b64encode(sha256(record_file_contents.encode()).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={record_hash_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)

# Add signature file to RECORD
jws_content = jws_content.encode()
jws_hash_nopad = (
urlsafe_b64encode(sha256(jws_content).digest()).decode("utf-8").rstrip("=")
)
self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=record_file_contents.rstrip("\n")
+ f"\nfancy-1.0.0.dist-info/RECORD.jws,sha256={jws_hash_nopad},{len(jws_content)}\n",
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="digital signature file (.+) is incorrectly contained in RECORD.",
):
source.validate_record(validate_contents=False)

def test_record_contain_self_hash(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

new_record_file_lines = []
for line in record_file_contents.split("\n"):
if not line:
continue
filename, hash_, size = line.split(",")
if filename.split("/")[-1] == "RECORD":
hash_ = "sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A"
size = str(len(record_file_contents))
new_record_file_lines.append(",".join((filename, hash_, size)))

self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content="\n".join(new_record_file_lines),
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="RECORD file incorrectly contains hash / size.",
):
source.validate_record(validate_contents=False)

def test_rejects_record_validation_failed(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

new_record_file_lines = []
for line in record_file_contents.split("\n"):
if not line:
continue
filename, hash_, size = line.split(",")
if filename.split("/")[-1] != "RECORD":
hash_ = "sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A"
new_record_file_lines.append(",".join((filename, hash_, size)))

self.replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content="\n".join(new_record_file_lines),
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) didn't match RECORD",
):
source.validate_record()