From 930188001142654ced1db6492be3e9245f3f4ffe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Trifir=C3=B2?= Date: Mon, 30 May 2022 18:28:56 +0200 Subject: [PATCH] add support for git credential helpers Introduces `GitCredentialsHttpClient` that will retry http requests failing with 401/404 with authorization headers generated from credentials returned by git credentials helpers (if any). https://www.git-scm.com/docs/gitcredentials fixes #873 --- dulwich/client.py | 66 ++++++++- dulwich/credentials.py | 141 ++++++++++++++++++ dulwich/tests/__init__.py | 1 + dulwich/tests/test_client.py | 201 +++++++++++++++++++++++++ dulwich/tests/test_credentials.py | 238 ++++++++++++++++++++++++++++++ 5 files changed, 643 insertions(+), 4 deletions(-) create mode 100644 dulwich/credentials.py create mode 100644 dulwich/tests/test_credentials.py diff --git a/dulwich/client.py b/dulwich/client.py index 168e28f0d..99f58ff45 100644 --- a/dulwich/client.py +++ b/dulwich/client.py @@ -46,7 +46,8 @@ import socket import subprocess import sys -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, IO +from typing import (IO, Any, Callable, Dict, Generator, List, Optional, Set, + Tuple) from urllib.parse import ( quote as urlquote, @@ -59,7 +60,8 @@ import dulwich -from dulwich.config import get_xdg_config_home_path +from dulwich.config import Config, StackedConfig, get_xdg_config_home_path +from dulwich.credentials import CredentialHelper, CredentialNotFoundError from dulwich.errors import ( GitProtocolError, NotGitRepository, @@ -2165,7 +2167,7 @@ def __init__( base_url, dumb=None, pool_manager=None, - config=None, + config: Optional[Config] = None, username=None, password=None, **kwargs @@ -2236,7 +2238,63 @@ def _http_request(self, url, headers=None, data=None): return resp, resp.read -HttpGitClient = Urllib3HttpGitClient +class GitCredentialsHttpClient(Urllib3HttpGitClient): + """HTTP git client which uses credentials from git credential helpers""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if not self.config: + # Note that using the StackedConfig default backends does not include + # the repo's local git config, but this should only happen for + # clones (no local config yet) and for dulwich CLI commands + self.config = StackedConfig.default() + self.credentials = None + + def _http_request(self, url, *args, **kwargs): + while True: + try: + return super()._http_request(url, *args, **kwargs) + except ( + HTTPUnauthorized, + NotGitRepository, # some git servers returns 404 instead of 401 (github) + ) as exc: + if self.credentials is None: + self.credentials = self.get_credentials(url) + try: + credentials = next(self.credentials) + except StopIteration: + raise exc + + import base64 + + basic_auth = credentials[b"username"] + b":" + credentials[b"password"] + self.pool_manager.headers.update( + { + "authorization": f"Basic {base64.b64encode(basic_auth).decode('ascii')}" + } + ) + + def get_credentials(self, url: str) -> Generator[Dict[bytes, bytes], None, None]: + assert self.config + if isinstance(self.config, StackedConfig): + backends = self.config.backends + else: + backends = [self.config] + + for config in backends: + try: + helper = CredentialHelper.from_config(config, url) + except KeyError: + # no credential helpers in the given config + continue + + try: + yield helper.get(url) + except CredentialNotFoundError: + continue + + +HttpGitClient = GitCredentialsHttpClient def _win32_url_to_path(parsed) -> str: diff --git a/dulwich/credentials.py b/dulwich/credentials.py new file mode 100644 index 000000000..3a247a002 --- /dev/null +++ b/dulwich/credentials.py @@ -0,0 +1,141 @@ +# credentials.py -- support for git credential helpers + +# Copyright (C) 2022 Daniele Trifirò +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. +# + +"""Support for git credential helpers + +https://git-scm.com/book/en/v2/Git-Tools-Credential-Storage + +Currently Dulwich supports only the `get` operation + +""" +import os +import shlex +import shutil +import subprocess +import sys +from typing import Any, Dict, List, Optional, Union + +from dulwich.config import ConfigDict + + +class CredentialNotFoundError(Exception): + pass + + +class CredentialHelper: + """Helper for retrieving credentials for http/https git remotes + + Usage: + >>> helper = CredentialHelper("store") # Use `git credential-store` + >>> credentials = helper.get("https://github.com/dtrifiro/aprivaterepo") + >>> username = credentials["username"] + >>> password = credentials["password"] + """ + + def __init__(self, command: str): + self._command = command + self._run_kwargs: Dict[str, Any] = {} + if self._command[0] == "!": + # On Windows this will only work in git-bash and/or WSL2 + self._run_kwargs["shell"] = True + + def _prepare_command(self) -> Union[str, List[str]]: + if self._command[0] == "!": + return self._command[1:] + + argv = shlex.split(self._command) + if sys.platform == "win32": + # Windows paths are mangled by shlex + argv[0] = self._command.split(maxsplit=1)[0] + + if os.path.isabs(argv[0]): + return argv + + executable = f"git-credential-{argv[0]}" + if not shutil.which(executable) and shutil.which("git"): + # If the helper cannot be found in PATH, it might be + # a C git helper in GIT_EXEC_PATH + git_exec_path = subprocess.check_output( + ("git", "--exec-path"), + universal_newlines=True, # TODO: replace universal_newlines with `text` when dropping 3.6 + ).strip() + if shutil.which(executable, path=git_exec_path): + executable = os.path.join(git_exec_path, executable) + + return [executable, *argv[1:]] + + def get(self, url: str) -> Dict[bytes, bytes]: + cmd = self._prepare_command() + if isinstance(cmd, str): + cmd += " get" + else: + cmd.append("get") + + helper_input = f"url={url}{os.linesep}".encode("ascii") + + try: + res = subprocess.run( # type: ignore # breaks on 3.6 + cmd, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + input=helper_input, + **self._run_kwargs, + ) + except subprocess.CalledProcessError as exc: + raise CredentialNotFoundError(exc.stderr) from exc + except FileNotFoundError as exc: + raise CredentialNotFoundError("Helper not found") from exc + + credentials = {} + for line in res.stdout.strip().splitlines(): + try: + key, value = line.split(b"=") + credentials[key] = value + except ValueError: + continue + + if not all( + (credentials, b"username" in credentials, b"password" in credentials) + ): + raise CredentialNotFoundError("Could not get credentials from helper") + + return credentials + + def store(self, *args, **kwargs): + """Store the credential, if applicable to the helper""" + raise NotImplementedError + + def erase(self, *args, **kwargs): + """Remove a matching credential, if any, from the helper’s storage""" + raise NotImplementedError + + @classmethod + def from_config( + cls, config: ConfigDict, url: Optional[str] = None + ) -> "CredentialHelper": + # We will try to get the url-specific credential section, in case that + # is not defined, config.get() will fallback to the generic section. + encoding = config.encoding or sys.getdefaultencoding() + section = (b"credential", url.encode(encoding)) if url else (b"credential",) + command = config.get(section, b"helper") + assert command and isinstance(command, bytes) + return cls(command.decode(encoding)) diff --git a/dulwich/tests/__init__.py b/dulwich/tests/__init__.py index 1fd839da6..eb3f65819 100644 --- a/dulwich/tests/__init__.py +++ b/dulwich/tests/__init__.py @@ -109,6 +109,7 @@ def self_test_suite(): "bundle", "client", "config", + "credentials", "diff_tree", "fastexport", "file", diff --git a/dulwich/tests/test_client.py b/dulwich/tests/test_client.py index 12f62d15e..ff250d839 100644 --- a/dulwich/tests/test_client.py +++ b/dulwich/tests/test_client.py @@ -44,6 +44,7 @@ TCPGitClient, SSHGitClient, HttpGitClient, + GitCredentialsHttpClient, FetchPackResult, ReportStatusParser, SendPackError, @@ -62,7 +63,9 @@ ) from dulwich.config import ( ConfigDict, + StackedConfig ) +from dulwich.credentials import CredentialNotFoundError from dulwich.tests import ( TestCase, ) @@ -1130,6 +1133,204 @@ def request(self, method, url, fields=None, headers=None, redirect=True, preload # check also the no redirection case self.assertEqual(processed_url, base_url) + @patch("dulwich.credentials.CredentialHelper.get") + def test_git_credentials(self, get_mock): + config = ConfigDict() + config.set(b"credential", b"helper", b"foo") + get_mock.return_value = {b"username": b"username", b"password": b"password"} + + from urllib3.response import HTTPResponse + + encoded_credentials = base64.b64encode(b"username:password") + + class PoolManagerMock: + def __init__(self): + self.headers = {} + + def request(self, method, url, headers, **kwargs): + if ( + "authorization" in headers + and headers["authorization"] + == f"Basic {encoded_credentials.decode('ascii')}" + ): + status = 200 + else: + status = 401 + return HTTPResponse( + headers={}, + request_method=method, + status=status, + ) + + base_url = "https://github.com/jelmer/dulwich" + pool_manager = PoolManagerMock() + http_client = HttpGitClient(base_url, config=config, pool_manager=pool_manager) + response, _ = http_client._http_request(base_url) + self.assertEqual(response.status, 200) + + @patch("dulwich.credentials.CredentialHelper.get") + def test_no_credentials(self, get_mock): + config = ConfigDict() + config.set(b"credential", b"helper", b"foo") + + def raise_not_found(*args, **kargs): + raise CredentialNotFoundError + + get_mock.side_effect = raise_not_found + + from urllib3.response import HTTPResponse + + base_url = "https://github.com/jelmer/dulwich" + for status_code, expected_exception in ( + (401, client.HTTPUnauthorized), + (404, client.NotGitRepository), + ): + class PoolManagerMock: + def __init__(self): + self.headers = {} + + def request(self, method, url, headers, **kwargs): + return HTTPResponse( + headers={}, + request_method=method, + status=status_code, + ) + + pool_manager = PoolManagerMock() + http_client = HttpGitClient(base_url, config=config, pool_manager=pool_manager) + with self.assertRaises(expected_exception): + http_client._http_request(base_url) + + @patch("dulwich.credentials.CredentialHelper.get") + def test_invalid_credentials(self, get_mock): + config = ConfigDict() + config.set(b"credential", b"helper", b"foo") + get_mock.return_value = {b"username": b"username", b"password": b"password"} + + from urllib3.response import HTTPResponse + + base_url = "https://github.com/jelmer/dulwich" + for status_code, expected_exception in ( + (401, client.HTTPUnauthorized), + (404, client.NotGitRepository), + ): + + class PoolManagerMock: + def __init__(self): + self.headers = {} + + def request(self, method, url, headers, **kwargs): + return HTTPResponse( + headers={}, + request_method=method, + status=status_code, + ) + + pool_manager = PoolManagerMock() + http_client = HttpGitClient( + base_url, config=config, pool_manager=pool_manager + ) + with self.assertRaises(expected_exception): + http_client._http_request(base_url) + + @patch("dulwich.client.HttpGitClient.get_credentials") + def test_invalid_and_valid_credentials(self, get_credentials_mock): + config = ConfigDict() + + def get_credentials(): + yield {b"username": b"username", b"password": b"invalid_password"} + yield {b"username": b"username", b"password": b"valid_password"} + + b64_credentials = base64.b64encode(b"username" + b":" + b"valid_password") + expected_basic_auth = f"Basic {b64_credentials.decode('ascii')}" + get_credentials_mock.return_value = get_credentials() + + from urllib3.response import HTTPResponse + + base_url = "https://github.com/jelmer/dulwich" + + class PoolManagerMock: + def __init__(self): + self.headers = {} + + def request(self, method, url, headers, **kwargs): + if ( + "authorization" in headers + and headers["authorization"] == expected_basic_auth + ): + status_code = 200 + else: + status_code = 401 + return HTTPResponse( + headers={}, + request_method=method, + status=status_code, + ) + + pool_manager = PoolManagerMock() + http_client = HttpGitClient(base_url, config=config, pool_manager=pool_manager) + response, _ = http_client._http_request(base_url) + self.assertEqual(response.status, 200) + + +class GitCredentialsHttpClientTests(TestCase): + def test_git_credentials_no_config(self): + http_client = GitCredentialsHttpClient("dummy") + self.assertIsInstance(http_client.config, StackedConfig) + self.assertEqual(http_client.config.backends, StackedConfig.default_backends()) + + @patch("dulwich.credentials.CredentialHelper.get") + def test_no_credentials(self, get_mock): + def raise_not_found(*args, **kargs): + raise CredentialNotFoundError + get_mock.side_effect = raise_not_found + + config = ConfigDict() + config.set(b"credential", b"helper", b"foo") + http_client = GitCredentialsHttpClient("dummy", config=config) + available_credentials = http_client.get_credentials("dummy") + with self.assertRaises(StopIteration): + next(available_credentials) + + @patch("dulwich.credentials.CredentialHelper.get") + def test_get_credentials(self, get_mock): + get_mock.return_value = {b"username": b"username", b"password": b"password"} + + config = ConfigDict() + config.set(b"credential", b"helper", b"foo") + + http_client = HttpGitClient("dummy", config=config) + all_credentials = http_client.get_credentials("dummy") + + credentials = next(all_credentials) + + self.assertEqual(credentials[b"username"], b"username") + self.assertEqual(credentials[b"password"], b"password") + + with self.assertRaises(StopIteration): + next(all_credentials) + + def test_get_credentials_no_configured_helper(self): + config = ConfigDict() + http_client = HttpGitClient("dummy", config=config) + available_credentials = http_client.get_credentials("dummy") + with self.assertRaises(StopIteration): + next(available_credentials) + + @patch("dulwich.credentials.CredentialHelper.get") + def test_get_credentials_multiple_configs(self, get): + config = ConfigDict() + config.set(b"credential", b"helper", "foo") + empty_config = ConfigDict() + config = StackedConfig([empty_config, config]) + + expected = {"dummy": "dummy"} + get.return_value = expected + http_client = HttpGitClient("dummy", config=config) + available_credentials = http_client.get_credentials("dummy") + credentials = next(available_credentials) + self.assertEqual(credentials, expected) + class TCPGitClientTests(TestCase): def test_get_url(self): diff --git a/dulwich/tests/test_credentials.py b/dulwich/tests/test_credentials.py new file mode 100644 index 000000000..1105b9e39 --- /dev/null +++ b/dulwich/tests/test_credentials.py @@ -0,0 +1,238 @@ +import os +import shutil +import subprocess +import sys +import tempfile +from unittest import mock, skipIf, skipUnless + +from dulwich.config import ConfigDict +from dulwich.credentials import CredentialHelper, CredentialNotFoundError +from dulwich.tests import TestCase + + +class CredentialHelperTests(TestCase): + def test_prepare_command_shell(self): + command = """!f() { echo foo}; f""" + + helper = CredentialHelper(command) + self.assertEqual(helper._prepare_command(), command[1:]) + + def test_prepare_command_abspath(self): + executable_path = os.path.join(os.sep, "path", "to", "executable") + + helper = CredentialHelper(executable_path) + self.assertEqual(helper._prepare_command(), [executable_path]) + + def test_prepare_command_abspath_extra_args(self): + executable_path = os.path.join(os.sep, "path", "to", "executable") + helper = CredentialHelper( + f'{executable_path} --foo bar --quz "arg with spaces"' + ) + self.assertEqual( + helper._prepare_command(), + [executable_path, "--foo", "bar", "--quz", "arg with spaces"], + ) + + @mock.patch("shutil.which") + def test_prepare_command_in_path(self, which): + which.return_value = True + + helper = CredentialHelper("foo") + self.assertEqual(helper._prepare_command(), ["git-credential-foo"]) + + @mock.patch("subprocess.check_output") + def test_prepare_command_cli_git_helpers(self, check_output): + git_exec_path = os.path.join(os.sep, "path", "to", "git-core") + check_output.return_value = git_exec_path + + def which_mock(arg, **kwargs): + if arg == "git" or "path" in kwargs: + return True + return False + + helper = CredentialHelper("foo") + expected = [os.path.join(git_exec_path, "git-credential-foo")] + + with mock.patch.object(shutil, "which", new=which_mock): + self.assertEqual(helper._prepare_command(), expected) + + @mock.patch("shutil.which") + def test_prepare_command_extra_args(self, which): + which.return_value = True + + helper = CredentialHelper('foo --bar baz --quz "arg with spaces"') + command = helper._prepare_command() + self.assertEqual( + command, + [ + "git-credential-foo", + "--bar", + "baz", + "--quz", + "arg with spaces", + ], + ) + + def test_get_nonexisting_executable(self): + helper = CredentialHelper("nonexisting") + with self.assertRaises(CredentialNotFoundError): + helper.get("dummy") + + def test_get_nonexisting_executable_abspath(self): + path = os.path.join(os.sep, "path", "to", "nonexisting") + helper = CredentialHelper(path) + with self.assertRaises(CredentialNotFoundError): + helper.get("dummy") + + @mock.patch("shutil.which") + @mock.patch("subprocess.run") + def test_get(self, run, which): + run.return_value.stdout = os.linesep.join( + ["username=username", "password=password", ""] + ).encode("UTF-8") + which.return_value = True + + helper = CredentialHelper("foo") + credentials = helper.get("https://example.com") + self.assertEqual(credentials[b"username"], b"username") + self.assertEqual(credentials[b"password"], b"password") + + @skipIf( + os.name == "nt", reason="On Windows, this only will work for git-bash or WSL2" + ) + def test_get_shell(self): + command = """!f() { printf "username=username\npassword=password"; }; f""" + helper = CredentialHelper(command) + credentials = helper.get("dummy") + self.assertEqual(credentials[b"username"], b"username") + self.assertEqual(credentials[b"password"], b"password") + + @mock.patch("subprocess.run") + def test_get_failing_command(self, run): + run.return_value.stderr = b"error message" + run.return_value.returncode = 1 + with self.assertRaises(CredentialNotFoundError, msg=b"error message"): + CredentialHelper("dummy").get("dummy") + + @mock.patch("shutil.which") + @mock.patch("subprocess.run") + def test_get_missing_username(self, run, which): + run.return_value.stdout = b"password=password" + which.return_value = True + with self.assertRaises(CredentialNotFoundError): + CredentialHelper("dummy").get("dummy") + + @mock.patch("shutil.which") + @mock.patch("subprocess.run") + def test_get_missing_password(self, run, which): + run.return_value.stdout = b"username=username" + which.return_value = True + with self.assertRaises(CredentialNotFoundError): + CredentialHelper("dummy").get("dummy") + + @mock.patch("shutil.which") + @mock.patch("subprocess.run") + def test_get_malformed_output(self, run, which): + run.return_value.stdout = os.linesep.join(["username", "password", ""]).encode( + "UTF-8" + ) + which.return_value = True + + with self.assertRaises(CredentialNotFoundError): + CredentialHelper("dummy").get("dummy") + + def test_store(self): + with self.assertRaises(NotImplementedError): + CredentialHelper("dummy").store() + + def test_erase(self): + with self.assertRaises(NotImplementedError): + CredentialHelper("dummy").erase() + + def test_from_config(self): + config = ConfigDict() + config.set(b"credential", b"helper", b"generichelper") + helper = CredentialHelper.from_config(config) + self.assertEqual(helper._command, "generichelper") + + def test_from_config_with_url(self): + config = ConfigDict() + config.set((b"credential", b"https://git.sr.ht"), b"helper", b"urlspecific") + helper = CredentialHelper.from_config(config, url="https://git.sr.ht") + self.assertEqual(helper._command, "urlspecific") + + def test_from_config_no_helper(self): + config = ConfigDict() + with self.assertRaises(KeyError): + CredentialHelper.from_config(config) + + def test_from_config_multiple_sections(self): + config = ConfigDict() + config.set(b"credential", b"helper", b"generichelper") + config.set((b"credential", b"https://git.sr.ht"), b"helper", b"urlspecific") + + helper = CredentialHelper.from_config(config) + self.assertEqual(helper._command, "generichelper") + helper = CredentialHelper.from_config(config, url="https://git.sr.ht") + self.assertEqual(helper._command, "urlspecific") + + +@skipUnless(shutil.which("git"), "requires git cli") +class CredentialHelperCredentialStore(TestCase): + """tests CredentialHandler with `git credential-store`""" + + def setUp(self): + super().setUp() + self.encoding = sys.getdefaultencoding() + self.store_path = os.path.join( + tempfile.gettempdir(), "dulwich-git-credential-store-test" + ) + self.git_exec_path = subprocess.check_output( + ["git", "--exec-path"], universal_newlines=True + ).strip() + + self.urls = ( + ("https://example.com", "username", "password"), + ("https://example1.com", "username1", "password1"), + ) + + for url, username, password in self.urls: + subprocess_in = os.linesep.join( + [f"url={url}", f"username={username}", f"password={password}", ""] + ).encode(self.encoding) + subprocess.run( + f"git credential-store --file {self.store_path} store".split(" "), + input=subprocess_in, + ) + + self.helper = CredentialHelper(f"store --file {self.store_path}") + + def tearDown(self): + super().tearDown() + os.unlink(self.store_path) + + def test_init(self): + expected = [ + os.path.join(self.git_exec_path, "git-credential-store"), + "--file", + self.store_path, + ] + self.assertEqual(self.helper._prepare_command(), expected) + + def test_get(self): + for url, username, password in self.urls: + credentials = self.helper.get(url) + self.assertEqual(credentials[b"username"], username.encode(self.encoding)) + self.assertEqual(credentials[b"password"], password.encode(self.encoding)) + + def test_missing(self): + with self.assertRaises(CredentialNotFoundError): + self.helper.get("https://dummy.com") + + def test_store(self): + with self.assertRaises(NotImplementedError): + self.helper.store() + + def test_erase(self): + with self.assertRaises(NotImplementedError): + self.helper.erase()