Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extend craft_application.git module #576

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions craft_application/git/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,39 @@

"""Git repository utilities."""

from ._consts import COMMIT_SHORT_SHA_LEN
from ._consts import (
NO_PUSH_URL,
COMMIT_SHA_LEN,
COMMIT_SHORT_SHA_LEN,
CRAFTGIT_BINARY_NAME,
GIT_FALLBACK_BINARY_NAME,
)
from ._errors import GitError
from ._models import GitType, short_commit_sha
from ._git_repo import GitRepo, get_git_repo_type, is_repo, parse_describe
from ._models import GitType, Commit, short_commit_sha

from ._git_repo import (
GitRepo,
get_git_repo_type,
is_repo,
is_commit,
is_short_commit,
parse_describe,
)

__all__ = [
"GitError",
"GitRepo",
"GitType",
"Commit",
"get_git_repo_type",
"is_repo",
"parse_describe",
"is_commit",
"is_short_commit",
"short_commit_sha",
"NO_PUSH_URL",
"COMMIT_SHA_LEN",
"COMMIT_SHORT_SHA_LEN",
"CRAFTGIT_BINARY_NAME",
"GIT_FALLBACK_BINARY_NAME",
]
8 changes: 8 additions & 0 deletions craft_application/git/_consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@

from typing import Final

NO_PUSH_URL: Final[str] = "no_push"

COMMIT_SHA_LEN: Final[int] = 40

COMMIT_SHORT_SHA_LEN: Final[int] = 7

CRAFTGIT_BINARY_NAME: Final[str] = "craft.git"

GIT_FALLBACK_BINARY_NAME: Final[str] = "git"
247 changes: 244 additions & 3 deletions craft_application/git/_git_repo.py
dariuszd21 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

import logging
import os
import re
import shutil
import subprocess
import time
from functools import lru_cache
from pathlib import Path
from shlex import quote
from typing import Final, cast

from craft_parts.utils import os_utils
from typing_extensions import Self
Expand All @@ -45,11 +49,25 @@
else:
del os.environ["SSL_CERT_DIR"]

from ._consts import CRAFTGIT_BINARY_NAME, GIT_FALLBACK_BINARY_NAME, NO_PUSH_URL
from ._errors import GitError
from ._models import GitType
from ._models import Commit, GitType, short_commit_sha

logger = logging.getLogger(__name__)

COMMIT_REGEX: Final[re.Pattern[str]] = re.compile("[0-9a-f]{40}")
SHORT_COMMIT_REGEX: Final[re.Pattern[str]] = re.compile("[0-9a-f]{7}")


def is_commit(ref: str) -> bool:
"""Check if given commit is a valid git commit sha."""
return bool(COMMIT_REGEX.fullmatch(ref))


def is_short_commit(ref: str) -> bool:
"""Check if given short commit is a valid git commit sha."""
return bool(SHORT_COMMIT_REGEX.fullmatch(ref))


def is_repo(path: Path) -> bool:
"""Check if a directory is a git repo.
Expand Down Expand Up @@ -196,6 +214,68 @@ def commit(self, message: str = "auto commit") -> str:
f"in {str(self.path)!r}."
) from error

def get_last_commit(self) -> Commit:
"""Get the last Commit on the current head."""
try:
last_commit = self._repo[self._repo.head.target]
except pygit2.GitError as error:
raise GitError("could not retrieve last commit") from error
else:
commit_message = cast(
str,
last_commit.message, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType]
)
return Commit(
sha=str(last_commit.id),
message=commit_message,
)
lengau marked this conversation as resolved.
Show resolved Hide resolved

def get_last_commit_on_branch_or_tag(
self,
branch_or_tag: str,
*,
remote: str | None = None,
fetch: bool = False,
) -> Commit:
"""Find last commit corresponding to given branch or tag."""
if fetch and remote is not None:
self.fetch(remote=remote, tags=True)
rev_list_output = [
self.get_git_command(),
"rev-list",
"-n",
"1",
branch_or_tag,
]
try:
rev_parse_output = subprocess.check_output(
rev_list_output,
text=True,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as error:
error_details = (
f"cannot find ref: {branch_or_tag!r}.\nCommand output:\n{error.stdout}"
)
raise GitError(error_details) from error

commit_sha = rev_parse_output.strip()
try:
commit_obj = self._repo.get(commit_sha)
except (pygit2.GitError, ValueError) as error:
raise GitError(
f"cannot find commit: {short_commit_sha(commit_sha)!r}"
) from error
else:
commit_message = cast(
str,
commit_obj.message, # pyright: ignore[reportOptionalMemberAccess,reportAttributeAccessIssue,reportUnknownMemberType]
)
return Commit(
sha=commit_sha,
message=commit_message,
)

def is_clean(self) -> bool:
"""Check if the repo is clean.

Expand Down Expand Up @@ -286,6 +366,76 @@ def rename_remote(
f"cannot rename '{remote_name}' to '{new_remote_name}'"
) from error

def get_remote_url(self, remote_name: str) -> str:
"""Get URL associated with the given remote.

Equivalent of git remote get-url <name>


:param remote_name: the remote repository name

:raises GitError: if remote does not exist
"""
if not self.remote_exists(remote_name=remote_name):
raise GitError(f"cannot get URL for non-existing remote '{remote_name}'")
return cast(str, self._repo.remotes[remote_name].url)

def set_remote_url(self, remote_name: str, remote_url: str) -> None:
"""Set new URL for the existing remote.

Equivalent of git remote set-url <name> <url>


:param remote_name: the remote repository name
:param remote_url: URL to be associated with the given remote

:raises GitError: if remote does not exist
"""
if not self.remote_exists(remote_name=remote_name):
raise GitError(f"cannot set URL for non-existing remote: {remote_name!r}")
self._repo.remotes.set_url(remote_name, remote_url)

def get_remote_push_url(self, remote_name: str) -> str:
"""Get push-only URL associated with the given remote.

Equivalent of git remote get-url --push <name>

:param remote_name: the remote repository name

:raises GitError: if remote does not exist
"""
if not self.remote_exists(remote_name=remote_name):
raise GitError(
f"cannot get push URL for non-existing remote: {remote_name!r}"
)
return cast(str, self._repo.remotes[remote_name].push_url)

def set_remote_push_url(self, remote_name: str, remote_push_url: str) -> None:
"""Set new push-only URL for the existing remote.

Equivalent of git remote set-url --push <name> <url>


:param remote_name: the remote repository name
:param remote_url: push URL to be associated with the given remote

:raises GitError: if remote does not exist
"""
if not self.remote_exists(remote_name=remote_name):
raise GitError(
f"cannot set push URL for non-existing remote: {remote_name!r}"
)
self._repo.remotes.set_push_url(remote_name, remote_push_url)

def set_no_push(self, remote_name: str) -> None:
"""Disable pushing to the selected remote.

:param remote_name: the remote repository name

:raises GitError: if remote does not exist
"""
self.set_remote_push_url(remote_name, NO_PUSH_URL)

def push_url( # noqa: PLR0912 (too-many-branches)
self,
remote_url: str,
Expand Down Expand Up @@ -321,7 +471,14 @@ def push_url( # noqa: PLR0912 (too-many-branches)
# Force push in case this repository already exists. The repository is always
# going to exist solely for remote builds, so the only potential issue here is a
# race condition with multiple remote builds on the same machine.
cmd: list[str] = ["git", "push", "--force", remote_url, refspec, "--progress"]
cmd: list[str] = [
self.get_git_command(),
"push",
"--force",
remote_url,
refspec,
"--progress",
]
if push_tags:
cmd.append("--tags")

Expand Down Expand Up @@ -382,6 +539,72 @@ def push_url( # noqa: PLR0912 (too-many-branches)
f"for the git repository in {str(self.path)!r}."
)

def fetch(
self,
*,
remote: str,
tags: bool = False,
ref: str | None = None,
depth: int | None = None,
lengau marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Fetch the contents of the given remote.

:param remote: The name of the remote.
:param tags: Whether to fetch tags.
:param ref: Optional reference to the specific object to fetch.
:param depth: Maximum number of commits to fetch (all by default).
"""
fetch_command = [self.get_git_command(), "fetch"]

if not self.remote_exists(remote):
raise GitError(f"cannot fetch undefined remote: {remote!r}")

if tags:
fetch_command.append("--tags")
if depth is not None:
fetch_command.extend(["--depth", f"{depth}"])

fetch_command.append(remote)
if ref is not None:
fetch_command.append(ref)

try:
os_utils.process_run(fetch_command, logger.debug)
except FileNotFoundError as error:
raise GitError("git command not found in the system") from error
except subprocess.CalledProcessError as error:
raise GitError(f"cannot fetch remote: {remote!r}") from error

def remote_contains(
self,
*,
remote: str,
commit_sha: str,
) -> bool:
"""Check if the given commit is pushed to the remote repository."""
logger.debug(
"Checking if %r was pushed to %r", short_commit_sha(commit_sha), remote
)
checking_command = [
self.get_git_command(),
"branch",
"--remotes",
"--contains",
commit_sha,
]
try:
remotes_that_has_given_commit = subprocess.check_output(
checking_command,
text=True,
)
except subprocess.CalledProcessError as error:
raise GitError("incorrect commit provided, cannot check") from error
else:
for line in remotes_that_has_given_commit.splitlines():
if line.strip().startswith(f"{remote}/"):
return True
return False

def describe(
self,
*,
Expand Down Expand Up @@ -461,7 +684,7 @@ def clone_repository(
raise GitError("Cannot clone to existing repository")

logger.debug("Cloning %s to %s", url, path)
clone_cmd = ["git", "clone"]
clone_cmd = [cls.get_git_command(), "clone"]
if checkout_branch is not None:
logger.debug("Checking out to branch: %s", checkout_branch)
clone_cmd.extend(["--branch", quote(checkout_branch)])
Expand All @@ -482,3 +705,21 @@ def clone_repository(
f"cannot clone repository: {url} to {str(path)!r}"
) from error
return cls(path)

@classmethod
@lru_cache(maxsize=1)
def get_git_command(cls) -> str:
"""Get name of the git executable that may be used in subprocesses.

Fallback to the previous behavior in case of non-snap / local installation or
if snap does not provide expected binary.
"""
craftgit_binary = CRAFTGIT_BINARY_NAME
if shutil.which(craftgit_binary):
return craftgit_binary
logger.warning(
"Cannot find craftgit binary: %r. Is it a part of snap package?",
craftgit_binary,
)
logger.warning("Falling back to: %r", GIT_FALLBACK_BINARY_NAME)
return GIT_FALLBACK_BINARY_NAME
14 changes: 14 additions & 0 deletions craft_application/git/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Git repository models."""

from dataclasses import dataclass
from enum import Enum

from ._consts import COMMIT_SHORT_SHA_LEN
Expand All @@ -30,3 +31,16 @@ class GitType(Enum):
INVALID = 0
NORMAL = 1
SHALLOW = 2


@dataclass
class Commit:
"""Model representing a commit."""

sha: str
message: str

@property
def short_sha(self) -> str:
"""Get short commit sha."""
return short_commit_sha(self.sha)
Loading
Loading