Skip to content

Commit

Permalink
Refactor validate_record (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
BlueGlassBlock authored Mar 2, 2023
1 parent b59c8fa commit 50ed1cc
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 138 deletions.
43 changes: 28 additions & 15 deletions src/installer/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,27 +188,29 @@ def read_dist_info(self, filename: str) -> str:
path = posixpath.join(self.dist_info_dir, filename)
return self._zipfile.read(path).decode("utf-8")

def validate_record(self, validate_file: bool = True) -> None:
def validate_record(self, *, validate_contents: bool = True) -> None:
"""Validate ``RECORD`` of the wheel.
This method should be called before :py:func:`install <installer.install>`
if validation is required.
File names will always be validated against ``RECORD``.
If ``validate_file`` is true, every file in the archive will be validated
against corresponding entry in ``RECORD``.
:param validate_file: Whether to validate content integrity.
If ``validate_contents`` is true, sizes and hashes of files
will also be validated against ``RECORD``.
:param validate_contents: Whether to validate content integrity.
"""
try:
record_lines = self.read_dist_info("RECORD").splitlines()
records = parse_record_file(record_lines)
record_mapping = {
record[0]: record for record in parse_record_file(record_lines)
}
except Exception as exc:
raise _WheelFileValidationError(
[f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"]
) from exc

record_mapping = {record[0]: record for record in records}
issues: List[str] = []

for item in self._zipfile.infolist():
Expand All @@ -221,6 +223,11 @@ def validate_record(self, validate_file: bool = True) -> None:
[self.dist_info_dir, item.filename]
) and item.filename.split("/")[-1] in ("RECORD.p7s", "RECORD.jws"):
# both are for digital signatures, and not mentioned in RECORD
if record_args is not None:
# Incorrectly contained
issues.append(
f"In {self._zipfile.filename}, digital signature file {item.filename} is incorrectly contained in RECORD."
)
continue

if record_args is None:
Expand All @@ -229,18 +236,24 @@ def validate_record(self, validate_file: bool = True) -> None:
)
continue

record = RecordEntry.from_elements(*record_args)

if item.filename == f"{self.dist_info_dir}/RECORD":
# Skip record file
continue
if validate_file:
record = RecordEntry.from_elements(*record_args)
data = self._zipfile.read(item)
if record.hash_ is None or record.size is None:
# Report empty hash / size
# Assert that RECORD doesn't have size and hash.
if record.hash_ is not None or record.size is not None:
# Incorrectly contained hash / size
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD"
f"In {self._zipfile.filename}, RECORD file incorrectly contains hash / size."
)
elif not record.validate(data):
continue
if record.hash_ is None or record.size is None:
# Report empty hash / size
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD"
)
if validate_contents:
data = self._zipfile.read(item)
if not record.validate(data):
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} didn't match RECORD"
)
Expand Down
297 changes: 174 additions & 123 deletions tests/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@ def test_raises_not_implemented_error(self):
source.validate_record()


def replace_file_in_zip(path: str, filename: str, content: "str | None") -> None:
"""Helper function for replacing a file in the zip.
Exists because ZipFile doesn't support remove.
"""
files = {}
# Copy everything except `filename`, and replace it with `content`.
with zipfile.ZipFile(path) as archive:
for file in archive.namelist():
if file == filename:
if content is None:
continue # Remove the file
files[file] = content.encode()
else:
files[file] = archive.read(file)
# Replace original archive
with zipfile.ZipFile(path, mode="w") as archive:
for name, content in files.items():
archive.writestr(name, content)


class TestWheelFile:
def test_rejects_not_okay_name(self, tmp_path):
# Create an empty zipfile
Expand Down Expand Up @@ -116,135 +137,165 @@ def test_requires_dist_info_name_match(self, fancy_wheel):
with WheelFile.open(misnamed) as source:
source.dist_info_filenames

def test_rejects_no_record_on_validate(self, fancy_wheel):
# Remove RECORD
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=None,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="Unable to retrieve `RECORD`"
):
source.validate_record(validate_contents=False)

def replace_file_in_zip(path: str, filename: str, content: "bytes | None") -> None:
"""Helper function for replacing a file in the zip.
Exists because ZipFile doesn't support remove.
"""
files = {}
# Copy everything except `filename`, and replace it with `content`.
with zipfile.ZipFile(path) as archive:
for file in archive.namelist():
if file == filename:
if content is None:
continue # Remove the file
files[file] = content
else:
files[file] = archive.read(file)
# Replace original archive
with zipfile.ZipFile(path, mode="w") as archive:
for name, content in files.items():
archive.writestr(name, content)
def test_rejects_invalid_record_entry(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content="\n".join(
line.replace("sha256=", "") for line in record_file_contents
),
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="Unable to retrieve `RECORD`",
):
source.validate_record()

def test_rejects_record_missing_file_on_validate(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

# Remove the first two entries from the RECORD file
new_record_file_contents = "\n".join(record_file_contents.split("\n")[2:])
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="not mentioned in RECORD"
):
source.validate_record(validate_contents=False)

def test_rejects_no_record_on_validate(fancy_wheel):
# Remove RECORD
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=None,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="Unable to retrieve `RECORD`"
):
source.validate_record()
def test_rejects_record_missing_hash(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

new_record_file_contents = "\n".join(
line.split(",")[0] + ",," # file name with empty size and hash
for line in record_file_contents.split("\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) is not included in RECORD",
):
source.validate_record(validate_contents=False)

def test_rejects_record_missing_file_on_validate(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

# Remove the first two entries from the RECORD file
new_record_file_contents = b"\n".join(record_file_contents.split(b"\n")[2:])
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(WheelFile.validation_error, match="not mentioned in RECORD"):
def test_accept_wheel_with_signature_file(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")
hash_b64_nopad = (
urlsafe_b64encode(sha256(record_file_contents.encode()).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)
with WheelFile.open(fancy_wheel) as source:
source.validate_record()

def test_reject_signature_file_in_record(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")
record_hash_nopad = (
urlsafe_b64encode(sha256(record_file_contents.encode()).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={record_hash_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)

# Add signature file to RECORD
jws_content = jws_content.encode()
jws_hash_nopad = (
urlsafe_b64encode(sha256(jws_content).digest()).decode("utf-8").rstrip("=")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=record_file_contents.rstrip("\n")
+ f"\nfancy-1.0.0.dist-info/RECORD.jws,sha256={jws_hash_nopad},{len(jws_content)}\n",
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="digital signature file (.+) is incorrectly contained in RECORD.",
):
source.validate_record(validate_contents=False)

def test_rejects_record_missing_hash(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) is not included in RECORD",
):
source.validate_record()

def test_rejects_record_contain_self_hash(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

new_record_file_lines = []
for line in record_file_contents.split("\n"):
if not line:
continue
filename, hash_, size = line.split(",")
if filename.split("/")[-1] == "RECORD":
hash_ = "sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A"
size = str(len(record_file_contents))
new_record_file_lines.append(",".join((filename, hash_, size)))

replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content="\n".join(new_record_file_lines),
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="RECORD file incorrectly contains hash / size.",
):
source.validate_record(validate_contents=False)

def test_accept_record_missing_hash_on_skip_validation(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
source.validate_record(validate_file=False)


def test_accept_wheel_with_signed_file(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()
hash_b64_nopad = (
urlsafe_b64encode(sha256(record_file_contents).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)
with WheelFile.open(fancy_wheel) as source:
source.validate_record()


def test_rejects_record_validation_failed(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] # Original filename
+ b",sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A,"
+ line.split(b",")[2] # Original size
for line in record_file_contents.split(b"\n")
if line # ignore trail endline
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) didn't match RECORD",
):
source.validate_record()
def test_rejects_record_validation_failed(self, fancy_wheel):
with WheelFile.open(fancy_wheel) as source:
record_file_contents = source.read_dist_info("RECORD")

new_record_file_lines = []
for line in record_file_contents.split("\n"):
if not line:
continue
filename, hash_, size = line.split(",")
if filename.split("/")[-1] != "RECORD":
hash_ = "sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A"
new_record_file_lines.append(",".join((filename, hash_, size)))

replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content="\n".join(new_record_file_lines),
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) didn't match RECORD",
):
source.validate_record()

0 comments on commit 50ed1cc

Please sign in to comment.