Skip to content

Commit

Permalink
Support finding dist scripts in wheel files.
Browse files Browse the repository at this point in the history
This will be needed when building PEXes that use raw `.whl` file dists.
  • Loading branch information
jsirois committed Dec 13, 2023
1 parent 1c29f5a commit 1ee1fe5
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 48 deletions.
47 changes: 38 additions & 9 deletions pex/finders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import ast
import os

from pex.common import is_python_script
from pex.dist_metadata import Distribution, EntryPoint
from pex.common import is_python_script, open_zip, safe_mkdtemp
from pex.dist_metadata import Distribution, DistributionType, EntryPoint
from pex.pep_376 import InstalledWheel
from pex.pep_427 import Wheel
from pex.pep_503 import ProjectName
from pex.typing import TYPE_CHECKING, cast

Expand All @@ -29,29 +30,57 @@ def find(
name, # type: str
):
# type: (...) -> Optional[DistributionScript]
script_path = InstalledWheel.load(dist.location).stashed_path("bin", name)
return cls(dist=dist, path=script_path) if os.path.isfile(script_path) else None
if dist.type is DistributionType.WHEEL:
script_path = Wheel.load(dist.location).data_path("scripts", name)
with open_zip(dist.location) as zfp:
try:
zfp.getinfo(script_path)
except KeyError:
return None
elif dist.type is DistributionType.INSTALLED:
script_path = InstalledWheel.load(dist.location).stashed_path("bin", name)
if not os.path.isfile(script_path):
return None
else:
raise ValueError(
"Can only probe .whl files and installed wheel chroots for scripts; "
"given sdist: {sdist}".format(sdist=dist.location)
)
return cls(dist=dist, path=script_path)

dist = attr.ib() # type: Distribution
path = attr.ib() # type: str

def read_contents(self):
# type: () -> bytes
with open(self.path, "rb") as fp:
def read_contents(self, path_hint=None):
# type: (Optional[str]) -> bytes
path = path_hint or self._maybe_extract()
with open(path, "rb") as fp:
return fp.read()

def python_script(self):
# type: () -> Optional[ast.AST]
if not is_python_script(self.path):
path = self._maybe_extract()
if not is_python_script(path):
return None

try:
return cast(
ast.AST, compile(self.read_contents(), self.path, "exec", flags=0, dont_inherit=1)
ast.AST,
compile(self.read_contents(path_hint=path), path, "exec", flags=0, dont_inherit=1),
)
except (SyntaxError, TypeError):
return None

def _maybe_extract(self):
# type: () -> str
if self.dist.type is not DistributionType.WHEEL:
return self.path

with open_zip(self.dist.location) as zfp:
chroot = safe_mkdtemp()
zfp.extract(self.path, chroot)
return os.path.join(chroot, self.path)


def get_script_from_distributions(
name, # type: str
Expand Down
98 changes: 73 additions & 25 deletions pex/pep_427.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import subprocess
import sys
from contextlib import closing
from email.message import Message
from fileinput import FileInput
from textwrap import dedent

Expand Down Expand Up @@ -145,6 +146,70 @@ def install_wheel_interpreter(
)


@attr.s(frozen=True)
class Wheel(object):
@classmethod
def load(cls, wheel_path):
# type: (str) -> Wheel

metadata_files = load_metadata(wheel_path, restrict_types_to=(MetadataType.DIST_INFO,))
if not metadata_files:
raise ValueError("Could not find any metadata in {wheel}.".format(wheel=wheel_path))

metadata_path = metadata_files.metadata_file_rel_path("WHEEL")
metadata_bytes = metadata_files.read("WHEEL")
if not metadata_path or not metadata_bytes:
raise ValueError("Could not find WHEEL metadata in {wheel}.".format(wheel=wheel_path))
wheel_metadata_dir = os.path.dirname(metadata_path)
if not wheel_metadata_dir.endswith(".dist-info"):
raise ValueError(
"Expected WHEEL metadata for {wheel} to be housed in a .dist-info directory, but was "
"found at {wheel_metadata_path}.".format(
wheel=wheel_path, wheel_metadata_path=metadata_path
)
)
# Although not crisply defined, all PEPs lead to PEP-508 which restricts project names
# to ASCII: https://peps.python.org/pep-0508/#names. Likewise, version numbers are also
# restricted to ASCII: https://peps.python.org/pep-0440/. Since the `.dist-info` dir
# path is defined as `<project name>-<version>.dist-info` in
# https://peps.python.org/pep-0427/, we are safe in assuming ASCII overall for the wheel
# metadata dir path.
metadata_dir = str(wheel_metadata_dir)
metadata = parse_message(metadata_bytes)

data_dir = re.sub(r"\.dist-info$", ".data", metadata_dir)

return cls(
location=wheel_path,
metadata_dir=metadata_dir,
metadata_files=metadata_files,
metadata=metadata,
data_dir=data_dir,
)

location = attr.ib() # type: str
metadata_dir = attr.ib() # type: str
metadata_files = attr.ib() # type: MetadataFiles
metadata = attr.ib() # type: Message
data_dir = attr.ib() # type: str

@property
def purelib(self):
# type: () -> bool
return cast(bool, "true" == self.metadata.get("Root-Is-Purelib"))

def dist_metadata(self):
return DistMetadata.from_metadata_files(self.metadata_files)

def metadata_path(self, *components):
# typ: (*str) -> str
return os.path.join(self.metadata_dir, *components)

def data_path(self, *components):
# typ: (*str) -> str
return os.path.join(self.data_dir, *components)


def install_wheel(
wheel_path, # type: str
install_paths, # type: InstallPaths
Expand All @@ -155,30 +220,13 @@ def install_wheel(
# type: (...) -> MetadataFiles

# See: https://packaging.python.org/en/latest/specifications/binary-distribution-format/#installing-a-wheel-distribution-1-0-py32-none-any-whl
metadata_files = load_metadata(wheel_path, restrict_types_to=(MetadataType.DIST_INFO,))
if not metadata_files:
raise ValueError("Could not find any metadata in {wheel}.".format(wheel=wheel_path))

wheel_metadata_path = metadata_files.metadata_file_rel_path("WHEEL")
wheel_metadata = metadata_files.read("WHEEL")
if not wheel_metadata_path or not wheel_metadata:
raise ValueError("Could not find WHEEL metadata in {wheel}.".format(wheel=wheel_path))
wheel_metadata_dir = os.path.dirname(wheel_metadata_path)
if not wheel_metadata_dir.endswith(".dist-info"):
raise ValueError(
"Expected WHEEL metadata for {wheel} to be housed in a .dist-info directory, but was "
"found at {wheel_metadata_path}.".format(
wheel=wheel_path, wheel_metadata_path=wheel_metadata_path
)
)

purelib = "true" == parse_message(wheel_metadata).get("Root-Is-Purelib")
dest = install_paths.purelib if purelib else install_paths.platlib
wheel = Wheel.load(wheel_path)
dest = install_paths.purelib if wheel.purelib else install_paths.platlib

record_relpath = os.path.join(wheel_metadata_dir, "RECORD")
record_relpath = wheel.metadata_path("RECORD")
record_abspath = os.path.join(dest, record_relpath)

data_rel_path = re.sub(r"\.dist-info$", ".data", wheel_metadata_dir)
data_rel_path = wheel.data_dir
data_path = os.path.join(dest, data_rel_path)

installed_files = [] # type: List[InstalledFile]
Expand Down Expand Up @@ -284,7 +332,7 @@ def record_files(
file = InstalledFile(path=os.path.relpath(os.path.join(root, f), dest))
installed_files.append(file)

dist = Distribution(location=dest, metadata=DistMetadata.from_metadata_files(metadata_files))
dist = Distribution(location=dest, metadata=wheel.dist_metadata())
entry_points = dist.get_entry_map()
for entry_point in itertools.chain.from_iterable(
entry_points.get(key, {}).values() for key in ("console_scripts", "gui_scripts")
Expand Down Expand Up @@ -319,17 +367,17 @@ def record_files(
InstalledWheel.create_installed_file(path=script_abspath, dest_dir=dest)
)

with safe_open(os.path.join(dest, wheel_metadata_dir, "INSTALLER"), "w") as fp:
with safe_open(os.path.join(dest, wheel.metadata_path("INSTALLER")), "w") as fp:
print("pex", file=fp)
installed_files.append(InstalledWheel.create_installed_file(path=fp.name, dest_dir=dest))

if requested:
requested_path = os.path.join(dest, wheel_metadata_dir, "REQUESTED")
requested_path = os.path.join(dest, wheel.metadata_path("REQUESTED"))
touch(requested_path)
installed_files.append(
InstalledWheel.create_installed_file(path=requested_path, dest_dir=dest)
)

installed_files.append(InstalledFile(path=record_relpath, hash=None, size=None))
Record.write(dst=record_abspath, installed_files=installed_files)
return metadata_files
return wheel.metadata_files
39 changes: 25 additions & 14 deletions tests/test_finders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
import pytest

from pex.dist_metadata import CallableEntryPoint, DistMetadata, Distribution, EntryPoint
from pex.finders import get_entry_point_from_console_script, get_script_from_distributions
from pex.finders import (
DistributionScript,
get_entry_point_from_console_script,
get_script_from_distributions,
)
from pex.pep_376 import InstalledWheel
from pex.pep_440 import Version
from pex.pep_503 import ProjectName
from pex.pip.installation import get_pip
from pex.typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Dict, Text
from typing import Any, Dict, Text, Tuple

import attr # vendor:skip
else:
Expand All @@ -26,22 +30,29 @@
# https://github.com/pantsbuild/pex/issues/551
def test_get_script_from_distributions(tmpdir):
# type: (Any) -> None

def assert_script(dist):
# type: (Distribution) -> Tuple[Distribution, DistributionScript]
assert "aws-cfn-bootstrap" == dist.project_name

dist_script = get_script_from_distributions("cfn-signal", [dist])
assert dist_script is not None
assert dist_script.dist is dist
assert dist_script.read_contents().startswith(
b"#!"
), "Expected a `scripts`-style script w/shebang."

assert None is get_script_from_distributions("non_existent_script", [dist])
return dist, dist_script

whl_path = "./tests/example_packages/aws_cfn_bootstrap-1.4-py2-none-any.whl"
_, dist_script = assert_script(Distribution.load(whl_path))
assert "aws_cfn_bootstrap-1.4.data/scripts/cfn-signal" == dist_script.path

install_dir = os.path.join(str(tmpdir), os.path.basename(whl_path))
get_pip().spawn_install_wheel(wheel=whl_path, install_dir=install_dir).wait()

dist = Distribution.load(install_dir)
assert "aws-cfn-bootstrap" == dist.project_name

dist_script = get_script_from_distributions("cfn-signal", [dist])
assert dist_script is not None
assert dist_script.dist is dist
installed_wheel_dist, dist_script = assert_script(Distribution.load(install_dir))
assert InstalledWheel.load(install_dir).stashed_path("bin/cfn-signal") == dist_script.path
assert dist_script.read_contents().startswith(
b"#!"
), "Expected a `scripts`-style script w/shebang."

assert None is get_script_from_distributions("non_existent_script", [dist])


def create_dist(
Expand Down

0 comments on commit 1ee1fe5

Please sign in to comment.