Skip to content

Commit

Permalink
add support for git credential helpers
Browse files Browse the repository at this point in the history
- add `CredentialHelper` class to run credential helper commands and
  retrieve values
- add `get_credentials_from_helper` to retrieve credentials from helpers
defined in provided config
- moved (unused) `get_credentials_from_store` to `credentials.py`

Also see https://www.git-scm.com/docs/gitcredentials

fixes jelmer#873
  • Loading branch information
dtrifiro committed Jan 26, 2023
1 parent 5ba74dd commit 8159185
Show file tree
Hide file tree
Showing 4 changed files with 597 additions and 47 deletions.
55 changes: 23 additions & 32 deletions dulwich/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
import urllib3

import dulwich
from dulwich.config import get_xdg_config_home_path, Config, apply_instead_of
from dulwich.config import Config, StackedConfig, apply_instead_of
from dulwich.credentials import CredentialNotFoundError, get_credentials_from_helper
from dulwich.errors import (
GitProtocolError,
NotGitRepository,
Expand Down Expand Up @@ -2235,17 +2236,32 @@ def __init__(
else:
self.pool_manager = pool_manager

if username is not None:
self.config = config
if not self._username:
try:
self._username, self._password = get_credentials_from_helper(
base_url, config or StackedConfig.default()
)
except CredentialNotFoundError:
pass

if self._username:
# No escaping needed: ":" is not allowed in username:
# https://tools.ietf.org/html/rfc2617#section-2
credentials = f"{username}:{password or ''}"
import urllib3.util
if isinstance(self._username, str):
credentials = f"{self._username}:{self._password or ''}".encode("ascii")
elif isinstance(self._username, bytes):
credentials = self._username + b":" + self._password or b""
else:
raise TypeError
import base64

basic_auth = urllib3.util.make_headers(basic_auth=credentials)
encoded = base64.b64encode(credentials).decode('ascii')
basic_auth = {
"authorization": f"Basic {encoded}"
}
self.pool_manager.headers.update(basic_auth)

self.config = config

super().__init__(
base_url=base_url, dumb=dumb, **kwargs)

Expand Down Expand Up @@ -2431,28 +2447,3 @@ def get_transport_and_path(
return default_local_git_client_cls(**kwargs), location
else:
return SSHGitClient(hostname, username=username, **kwargs), path


DEFAULT_GIT_CREDENTIALS_PATHS = [
os.path.expanduser("~/.git-credentials"),
get_xdg_config_home_path("git", "credentials"),
]


def get_credentials_from_store(
scheme, hostname, username=None, fnames=DEFAULT_GIT_CREDENTIALS_PATHS
):
for fname in fnames:
try:
with open(fname, "rb") as f:
for line in f:
parsed_line = urlparse(line.strip())
if (
parsed_line.scheme == scheme
and parsed_line.hostname == hostname
and (username is None or parsed_line.username == username)
):
return parsed_line.username, parsed_line.password
except FileNotFoundError:
# If the file doesn't exist, try the next one.
continue
188 changes: 186 additions & 2 deletions dulwich/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,24 @@
https://git-scm.com/book/en/v2/Git-Tools-Credential-Storage
Currently Dulwich supports only the `get` operation
"""
import os
import shlex
import shutil
import subprocess
import sys
from typing import Iterator, Optional
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from urllib.parse import ParseResult, urlparse

from dulwich.config import ConfigDict, SectionLike
from dulwich.config import (ConfigDict, SectionLike, StackedConfig,
get_xdg_config_home_path)

DEFAULT_GIT_CREDENTIALS_PATHS = [
os.path.expanduser("~/.git-credentials"),
get_xdg_config_home_path("git", "credentials"),
]


def match_urls(url: ParseResult, url_prefix: ParseResult) -> bool:
Expand Down Expand Up @@ -87,3 +99,175 @@ def urlmatch_credential_sections(

if is_match:
yield config_section


class CredentialNotFoundError(Exception):
"""An error occurred while retrieving credentials or no credentials available."""


class CredentialHelper:
"""Helper for retrieving credentials for http/https git remotes
Usage:
>>> helper = CredentialHelper("store") # Use `git credential-store`
>>> credentials = helper.get("https://github.com/dtrifiro/aprivaterepo")
>>> username = credentials["username"]
>>> password = credentials["password"]
"""

def __init__(self, command: str):
self._command = command
self._run_kwargs: Dict[str, Any] = {}
if self._command[0] == "!":
# On Windows this will only work in git-bash and/or WSL2
self._run_kwargs["shell"] = True

def _prepare_command(self) -> Union[str, List[str]]:
if self._command[0] == "!":
return self._command[1:]

if sys.platform != "win32":
argv = shlex.split(self._command)
else:
# On windows, subprocess.run uses subprocess.list2cmdline() to
# join arguments when providing a list, so we can just split
# using whitespace.
argv = self._command.split()

if os.path.isabs(argv[0]):
return argv

executable = f"git-credential-{argv[0]}"
if not shutil.which(executable) and shutil.which("git"):
# If the helper cannot be found in PATH, it might be
# a C git helper in GIT_EXEC_PATH
git_exec_path = subprocess.check_output(
("git", "--exec-path"),
universal_newlines=True, # TODO: replace universal_newlines with `text` when dropping 3.6
).strip()
if shutil.which(executable, path=git_exec_path):
executable = os.path.join(git_exec_path, executable)

return [executable, *argv[1:]]

def get(
self,
*,
protocol: Optional[str] = None,
hostname: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
) -> Tuple[bytes, bytes]:
cmd = self._prepare_command()
if isinstance(cmd, str):
cmd += " get"
else:
cmd.append("get")

helper_input = []
if protocol:
helper_input.append(f"protocol={protocol}")
if hostname:
helper_input.append(
f"host={hostname}{':' + str(port) if port is not None else ''}"
)
if username:
helper_input.append(f"username={username}")

if not helper_input:
raise ValueError("One of protocol, hostname must be provided")

helper_input.append("")

try:
res = subprocess.run( # type: ignore # breaks on 3.6
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
input=os.linesep.join(helper_input).encode("ascii"),
**self._run_kwargs,
)
except subprocess.CalledProcessError as exc:
raise CredentialNotFoundError(exc.stderr) from exc
except FileNotFoundError as exc:
raise CredentialNotFoundError("Helper not found") from exc

credentials = {}
for line in res.stdout.strip().splitlines():
try:
key, value = line.split(b"=")
credentials[key] = value
except ValueError:
continue

if not all(
(credentials, b"username" in credentials, b"password" in credentials)
):
raise CredentialNotFoundError("Could not get credentials from helper")

return credentials[b"username"], credentials[b"password"]

def store(self, *args, **kwargs):
"""Store the credential, if applicable to the helper"""
raise NotImplementedError

def erase(self, *args, **kwargs):
"""Remove a matching credential, if any, from the helper’s storage"""
raise NotImplementedError


def get_credentials_from_store(
scheme: bytes,
hostname: bytes,
username: Optional[bytes] = None,
fnames: List[str] = DEFAULT_GIT_CREDENTIALS_PATHS,
):
for fname in fnames:
try:
with open(fname, "rb") as f:
for line in f:
parsed_line = urlparse(line.strip())
if (
parsed_line.scheme == scheme
and parsed_line.hostname == hostname
and (username is None or parsed_line.username == username)
):
return parsed_line.username, parsed_line.password
except FileNotFoundError:
# If the file doesn't exist, try the next one.
continue


def get_credentials_from_helper(base_url: str, config) -> Tuple[bytes, bytes]:
"""Retrieves credentials for the given url from git credential helpers"""
if isinstance(config, StackedConfig):
backends = config.backends
else:
backends = [config]

for conf in backends:
# We will try to match credential sections' url with the given url,
# falling back to the generic section if there's no match
for section in urlmatch_credential_sections(conf, base_url):
try:
command = conf.get(section, "helper")
except KeyError:
# no helper configured
continue

helper = CredentialHelper(
command.decode(conf.encoding or sys.getdefaultencoding())
)
parsed = urlparse(base_url)
try:
return helper.get(
protocol=parsed.scheme,
hostname=parsed.hostname,
port=parsed.port,
username=parsed.username,
)
except CredentialNotFoundError:
continue

raise CredentialNotFoundError
Loading

0 comments on commit 8159185

Please sign in to comment.