Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ghapi support #455

Merged
merged 15 commits into from
Apr 7, 2021
148 changes: 40 additions & 108 deletions goth/runner/download/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from abc import ABC
import logging
import os
import json
from pathlib import Path
import re
import shutil
import tempfile
from typing import Any, Callable, Optional
from typing import Optional

from ghapi.all import GhApi, paged
from fastcore.utils import obj2dict
import requests

logging.basicConfig(
Expand All @@ -18,8 +20,6 @@

ASSET_CACHE_DIR = Path(tempfile.gettempdir()) / "goth_asset_cache"

BASE_URL = "https://api.github.com/repos"

ENV_API_TOKEN = "GITHUB_API_TOKEN"
ENV_YAGNA_BRANCH = "YAGNA_BRANCH"
ENV_YAGNA_COMMIT = "YAGNA_COMMIT_HASH"
Expand All @@ -41,8 +41,9 @@ class AssetNotFound(Exception):
class GithubDownloader(ABC):
"""Base class for downloading assets using GitHub's REST API."""

repo_url: str
"""Repo URL to be used as base in API requests."""
gh_api: GhApi
"""GitHub REST API client."""

session: requests.Session
"""Session object for making HTTP requests."""

Expand All @@ -65,7 +66,7 @@ def __init__(
if purge_cache:
shutil.rmtree(ASSET_CACHE_DIR)

self.repo_url = BASE_URL + f"/{owner}/{repo}"
pradeepbbl marked this conversation as resolved.
Show resolved Hide resolved
self.gh_api = GhApi(owner=owner, repo=repo, token=token)
self.session = requests.Session()
self.session.headers["Authorization"] = f"token {token}"

Expand All @@ -82,107 +83,45 @@ def _create_cache_dir(self, asset_id: str) -> Path:
asset_path.mkdir(exist_ok=True, parents=True)
return asset_path

def _parse_link_header(self, header_value: str) -> dict:
pradeepbbl marked this conversation as resolved.
Show resolved Hide resolved
"""Parse URLs and their relative positions from a `Link` header value.

The value of a `Link` header consists of comma-separated tuples, where each
tuple has a pagination URL and its `rel` attribute.
`rel` describes its URL's relation to the request the header originates from.
The value of the `rel` attribute is one of the following:
`first`, `prev`, `next`, `last`.
"""
relation_to_url = {}
links = [link.strip() for link in header_value.split(",")]

for link in links:
result = re.search(r'<(\S+)>; rel="(\S+)"', link)
if not result:
raise LookupError

url = result.group(1)
relation = result.group(2)
relation_to_url[relation] = url

return relation_to_url

def _search_with_pagination(
self,
initial_request: requests.PreparedRequest,
selector: Callable[[requests.Response], Any],
):
"""Search response data with `Link` header pagination support.

First request is made using `initial_request`. Consecutive requests are made
based on the `Link` header until the last page is reached
(i.e. no `next` URL is present). The `selector` function is called for each
response received. If the result from `selector` is non-null, this function
exits early returning that result.
"""
response = self.session.send(initial_request)
logger.debug("_search_with_pagination. initial_url=%s", response.url)

while True:
response.raise_for_status()

result = selector(response)
if result:
logger.debug("_search_with_pagination. result=%s", result)
return result

relation_to_url = self._parse_link_header(response.headers["Link"])
logger.debug("_search_with_pagination. relation_to_url=%s", relation_to_url)
next_url = relation_to_url.get("next")
if next_url:
logger.debug("_search_with_pagination. next_url=%s", next_url)
response = self.session.get(next_url)
else:
return None


class ArtifactDownloader(GithubDownloader):
"""Downloader for GitHub Actions artifacts using GitHub's REST API."""

def _get_workflow(self, workflow_name: str) -> dict:
"""Query the workflow on GitHub Actions."""
url = f"{self.repo_url}/actions/workflows"
logger.debug("Fetching workflows. url=%s", url)
response = self.session.get(url)
response.raise_for_status()

workflows = response.json()["workflows"]
logger.debug("workflows=%s", workflows)
logger.debug("Fetching workflows. name=%s", workflow_name)
workflows = self.gh_api.actions.list_repo_workflows().workflows
workflow = next(filter(lambda w: w["name"] == workflow_name, workflows))
logger.debug("workflow=%s", workflow)
logger.debug("workflow=%s", json.dumps(obj2dict(workflow)))

return workflow

def _get_latest_run(
self, workflow: dict, branch: str, commit: Optional[str] = None
) -> dict:
"""Filter out the latest workflow run."""
workflow_id = workflow["id"]
url = f"{self.repo_url}/actions/workflows/{workflow_id}/runs"
params = {"status": "completed"}
if not commit:
params["branch"] = branch
logger.debug("Fetching workflow runs. workflow_id=%s", workflow_id)

request = self.session.prepare_request(
requests.Request("GET", url, params=params)
)
logger.debug("Fetching workflow runs. url=%s", request.url)
if commit:
paged_workflow_runs = paged(
self.gh_api.actions.list_workflow_runs, workflow_id, status="completed"
)

def _filter_workflows(response: requests.Response) -> Optional[dict]:
workflow_runs = response.json()["workflow_runs"]
if commit:
return next(
filter(lambda r: r["head_sha"].startswith(commit), workflow_runs),
for page in paged_workflow_runs:
workflow_runs = next(
filter(
lambda r: r["head_sha"].startswith(commit), page.workflow_runs
),
None,
)
else:
return workflow_runs[0]

workflow_run = self._search_with_pagination(request, _filter_workflows)
logger.debug("workflow_run=%s", workflow_run)
return workflow_run
if workflow_runs:
return workflow_runs

return self.gh_api.actions.list_workflow_runs(
workflow_id, branch=branch, status="completed"
).workflow_runs[0]

def _get_artifact(self, artifact_name: str, workflow_run: dict) -> Optional[dict]:
artifacts_url = workflow_run["artifacts_url"]
Expand All @@ -201,20 +140,19 @@ def _download_artifact(self, artifact: dict) -> Path:

Return path to the extracted artifact.
"""
artifact_id = str(artifact["id"])
archive_url = artifact["archive_download_url"]

logger.info("Downloading artifact. url=%s", archive_url)
with self.session.get(archive_url) as response:
response.raise_for_status()

with tempfile.NamedTemporaryFile() as fd:
fd.write(response.content)
logger.debug("Extracting zip archive. path=%s", fd.name)
cache_dir = self._create_cache_dir(artifact_id)
shutil.unpack_archive(fd.name, format="zip", extract_dir=str(cache_dir))
logger.debug("Extracted package. path=%s", cache_dir)
logger.info("Downloaded artifact. url=%s", archive_url)
with tempfile.NamedTemporaryFile() as fd:
fd.write(response.content)
logger.debug("Extracting zip archive. path=%s", fd.name)
cache_dir = self._create_cache_dir(str(artifact["id"]))
shutil.unpack_archive(fd.name, format="zip", extract_dir=str(cache_dir))
logger.debug("Extracted package. path=%s", cache_dir)
logger.info("Downloaded artifact. url=%s", archive_url)

return cache_dir

Expand Down Expand Up @@ -286,13 +224,8 @@ def _get_latest_release(
Only the versions with `tag_name` that contains `self.tag_substring`
as a substring are considered.
"""
url = f"{self.repo_url}/releases"
logger.debug("Fetching releases. url=%s", url)
response = self.session.get(url)
response.raise_for_status()

all_releases = response.json()
logger.debug("releases=%s", all_releases)
all_releases = self.gh_api.repos.list_releases()
logger.debug("releases=%s", json.dumps(obj2dict(all_releases)))

def release_filter(release: dict, tag_substring: str) -> bool:
has_matching_asset = any(
Expand All @@ -310,7 +243,7 @@ def _get_asset(
self, release: dict, content_type: str, asset_name: Optional[str] = None
) -> Optional[dict]:
assets = release["assets"]
logger.debug("assets=%s", assets)
logger.debug("assets=%s", json.dumps(obj2dict(assets)))

content_assets = filter(lambda a: a["content_type"] == content_type, assets)
if content_assets and asset_name:
Expand All @@ -320,12 +253,11 @@ def _get_asset(
def _download_asset(self, asset: dict) -> Path:
"""Download an asset from a specific GitHub release."""
download_url = asset["browser_download_url"]
asset_id = str(asset["id"])

logger.info("Downloading asset. url=%s", download_url)
with self.session.get(download_url) as response:
response.raise_for_status()
cache_file = self._create_cache_dir(asset_id) / asset["name"]
cache_file = self._create_cache_dir(str(asset["id"])) / asset["name"]
with cache_file.open(mode="wb") as fd:
fd.write(response.content)
logger.info("Downloaded asset. path=%s", str(cache_file))
Expand Down Expand Up @@ -364,7 +296,7 @@ def download(
)

logger.debug("Found matching asset. name=%s", asset["name"])
logger.debug("asset=%s", asset)
logger.debug("asset=%s", json.dumps(asset))

asset_id = str(asset["id"])
cache_path = self._cache_get(asset_id)
Expand Down
Loading