Skip to content

Commit

Permalink
Properly handle repository URLs with auth in them
Browse files Browse the repository at this point in the history
  • Loading branch information
sigmavirus24 committed May 15, 2024
1 parent 8b85f4d commit 572aebb
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 29 deletions.
22 changes: 22 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,28 @@ def test_get_repository_config_missing(config_file):
assert utils.get_repository_from_config(config_file, "pypi") == exp


def test_get_repository_config_url_with_auth(config_file):
repository_url = "https://user:[email protected]/pypi"
exp = {
"repository": "https://notexisting.python.org/pypi",
"username": "user",
"password": "pass",
}
assert utils.get_repository_from_config(config_file, "foo", repository_url) == exp
assert utils.get_repository_from_config(config_file, "pypi", repository_url) == exp


@pytest.mark.parametrize(
"input_url, expected_url",
[
("https://upload.pypi.org/legacy/", "https://upload.pypi.org/legacy/"),
("https://user:[email protected]/legacy/", "https://********@upload.pypi.org/legacy/"),
],
)
def test_sanitize_url(input_url: str, expected_url: str) -> None:
assert utils.sanitize_url(input_url) == expected_url


@pytest.mark.parametrize(
"repo_url, message",
[
Expand Down
27 changes: 3 additions & 24 deletions twine/commands/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,27 +149,6 @@ def _split_inputs(
return Inputs(dists, signatures, attestations_by_dist)


def _sanitize_url(url: str) -> str:
"""Sanitize a URL.
Sanitize URLs, removing any user:password combinations and replacing them with
asterisks. Returns the original URL if the string is a non-matching pattern.
:param url:
str containing a URL to sanitize.
return:
str either sanitized or as entered depending on pattern match.
"""
pattern = r"(.*https?://)(\w+:\w+)@(\w+\..*)"
m = re.match(pattern, url)
if m:
newurl = f"{m.group(1)}*****:*****@{m.group(3)}"
return newurl
else:
return url


def upload(upload_settings: settings.Settings, dists: List[str]) -> None:
"""Upload one or more distributions to a repository, and display the progress.
Expand Down Expand Up @@ -211,7 +190,7 @@ def upload(upload_settings: settings.Settings, dists: List[str]) -> None:
# Determine if the user has passed in pre-signed distributions or any attestations.
uploads, signatures, attestations_by_dist = _split_inputs(dists)

print(f"Uploading distributions to {_sanitize_url(repository_url)}")
print(f"Uploading distributions to {utils.sanitize_url(repository_url)}")

packages_to_upload = [
_make_package(
Expand Down Expand Up @@ -272,8 +251,8 @@ def upload(upload_settings: settings.Settings, dists: List[str]) -> None:
# redirects as well.
if resp.is_redirect:
raise exceptions.RedirectDetected.from_args(
repository_url,
resp.headers["location"],
utils.sanitize_url(repository_url),
utils.sanitize_url(resp.headers["location"]),
)

if skip_upload(resp, upload_settings.skip_existing, package):
Expand Down
36 changes: 31 additions & 5 deletions twine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ def get_config(path: str) -> Dict[str, RepositoryConfig]:
return dict(config)


def sanitize_url(url: str) -> str:
"""Sanitize a URL.
Sanitize URLs, removing any user:password combinations and replacing them with
asterisks. Returns the original URL if the string is a non-matching pattern.
:param url:
str containing a URL to sanitize.
return:
str either sanitized or as entered depending on pattern match.
"""
uri = rfc3986.urlparse(url)
if uri.userinfo:
return uri.copy_with(userinfo="*" * 8).unsplit()
return url


def _validate_repository_url(repository_url: str) -> None:
"""Validate the given url for allowed schemes and components."""
# Allowed schemes are http and https, based on whether the repository
Expand All @@ -126,11 +144,7 @@ def get_repository_from_config(
# Prefer CLI `repository_url` over `repository` or .pypirc
if repository_url:
_validate_repository_url(repository_url)
return {
"repository": repository_url,
"username": None,
"password": None,
}
return _config_from_repository_url(repository_url)

try:
config = get_config(config_file)[repository]
Expand All @@ -154,6 +168,18 @@ def get_repository_from_config(
}


def _config_from_repository_url(url: str) -> RepositoryConfig:
parsed = urlparse(url)
config = {"repository": url, "username": None, "password": None}
if parsed.username:
config["username"] = parsed.username
config["password"] = parsed.password
config["repository"] = urlunparse((parsed.scheme, parsed.hostname) +
parsed[2:])
config["repository"] = normalize_repository_url(config["repository"])
return config


def normalize_repository_url(url: str) -> str:
parsed = urlparse(url)
if parsed.netloc in _HOSTNAMES:
Expand Down

0 comments on commit 572aebb

Please sign in to comment.