Skip to content

Commit

Permalink
Handle more variations in private index html to improve hash collecti…
Browse files Browse the repository at this point in the history
…on (#5898)

* Handle more cases of hash collection

* add news fragment
  • Loading branch information
matteius authored Sep 1, 2023
1 parent 8caed47 commit 56d1e1c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
1 change: 1 addition & 0 deletions news/5898.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle more variations in private index html to improve hash collection.
51 changes: 40 additions & 11 deletions pipenv/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,37 +292,62 @@ def get_hashes_from_pypi(self, ireq, source):
return None

def get_hashes_from_remote_index_urls(self, ireq, source):
pkg_url = f"{source['url']}/{ireq.name}/"
normalized_name = normalize_name(ireq.name)
url_name = normalized_name.replace(".", "-")
pkg_url = f"{source['url']}/{url_name}/"
session = self.get_requests_session_for_source(source)

try:
collected_hashes = set()
# Grab the hashes from the new warehouse API.
response = session.get(pkg_url, timeout=10)
# Create an instance of the parser
parser = PackageIndexHTMLParser()
# Feed the HTML to the parser
parser.feed(response.text)
# Extract hrefs
hrefs = parser.urls

version = ""
if ireq.specifier:
spec = next(iter(s for s in ireq.specifier), None)
if spec:
version = spec.version

# We'll check if the href looks like a version-specific page (i.e., ends with '/')
for package_url in hrefs:
if version in parse.unquote(package_url):
parsed_url = parse.urlparse(package_url)
if version in parsed_url.path and parsed_url.path.endswith("/"):
# This might be a version-specific page. Fetch and parse it
version_url = urljoin(pkg_url, package_url)
version_response = session.get(version_url, timeout=10)
version_parser = PackageIndexHTMLParser()
version_parser.feed(version_response.text)
version_hrefs = version_parser.urls

# Process these new hrefs as potential wheels
for v_package_url in version_hrefs:
url_params = parse.urlparse(v_package_url).fragment
params_dict = parse.parse_qs(url_params)
if params_dict.get(FAVORITE_HASH):
collected_hashes.add(params_dict[FAVORITE_HASH][0])
else: # Fallback to downloading the file to obtain hash
v_package_full_url = urljoin(version_url, v_package_url)
link = Link(v_package_full_url)
file_hash = self.get_file_hash(session, link)
if file_hash:
collected_hashes.add(file_hash)
elif version in parse.unquote(package_url):
# Process the current href as a potential wheel from the main page
url_params = parse.urlparse(package_url).fragment
params_dict = parse.parse_qs(url_params)
if params_dict.get(FAVORITE_HASH):
collected_hashes.add(params_dict[FAVORITE_HASH][0])
else: # Fallback to downloading the file to obtain hash
package_url = urljoin(source["url"], package_url)
link = Link(package_url)
package_full_url = urljoin(pkg_url, package_url)
link = Link(package_full_url)
file_hash = self.get_file_hash(session, link)
if file_hash:
collected_hashes.add(file_hash)

return self.prepend_hash_types(collected_hashes, FAVORITE_HASH)

except (ValueError, KeyError, ConnectionError):
if self.s.is_verbose():
click.echo(
Expand Down Expand Up @@ -1198,8 +1223,12 @@ def add_pipfile_entry_to_pipfile(self, name, normalized_name, entry, category=No
return newly_added, category, normalized_name

def src_name_from_url(self, index_url):
name, _, tld_guess = urllib.parse.urlsplit(index_url).netloc.rpartition(".")
src_name = name.replace(".", "")
location = urllib.parse.urlsplit(index_url).netloc
if "." in location:
name, _, tld_guess = location.rpartition(".")
else:
name = location
src_name = name.replace(".", "").replace(":", "")
try:
self.get_source(name=src_name)
except SourceNotFound:
Expand All @@ -1221,7 +1250,7 @@ def add_index_to_pipfile(self, index, verify_ssl=True):
with contextlib.suppress(SourceNotFound):
source = self.get_source(name=index)

if source is not None:
if source is not None and source.get("name"):
return source["name"]
source = {"url": index, "verify_ssl": verify_ssl}
source["name"] = self.src_name_from_url(index)
Expand Down

0 comments on commit 56d1e1c

Please sign in to comment.