From 91c6289618497185dda11bfdd29cea0bc76c7de6 Mon Sep 17 00:00:00 2001 From: George Hickman Date: Mon, 27 Nov 2023 16:52:00 +0000 Subject: [PATCH] Hoist the logic from `github backfill` to `github` We're moving to replacing all the data every day, since that's what we've ended up doing more often than not and it's already quite fast! --- metrics/github/api.py | 135 +++++++++---------- metrics/github/backfill.py | 217 ------------------------------- metrics/github/cli.py | 118 +++++++++-------- pyproject.toml | 2 +- tests/metrics/github/test_cli.py | 23 ++++ 5 files changed, 146 insertions(+), 349 deletions(-) delete mode 100644 metrics/github/backfill.py create mode 100644 tests/metrics/github/test_cli.py diff --git a/metrics/github/api.py b/metrics/github/api.py index 77e70d1f..5702b095 100644 --- a/metrics/github/api.py +++ b/metrics/github/api.py @@ -21,7 +21,7 @@ } -def _get_query_page(*, query, session, cursor, **kwargs): +def get_query_page(*, query, session, cursor, **kwargs): """ Get a page of the given query @@ -65,109 +65,92 @@ def _get_query_page(*, query, session, cursor, **kwargs): ) raise RuntimeError(msg) - return results["data"]["search"] + return results["data"] -def _iter_query_results(query, **kwargs): - """ - Get results from a GraphQL query - - Given a GraphQL query, return all results across one or more pages as a - single generator. We currently assume all results live under - - data.organization.team.repositories +def get_query(query, path, **kwargs): + def extract(data): + result = data + for key in path: + result = result[key] + return result - GitHub's GraphQL API provides cursor-based pagination, so this function - wraps the actual API calls done in _get_query_page and tracks the cursor. - one. - """ + more_pages = True cursor = None - while True: - data = _get_query_page( - query=query, - session=session, - cursor=cursor, - **kwargs, + while more_pages: + page = extract( + get_query_page(query=query, session=session, cursor=cursor, **kwargs) ) + yield from page["nodes"] + more_pages = page["pageInfo"]["hasNextPage"] + cursor = page["pageInfo"]["endCursor"] - for edge in data["edges"]: - yield edge["node"] - - if not data["pageInfo"]["hasNextPage"]: - break - - # update the cursor we pass into the GraphQL query - cursor = data["pageInfo"]["endCursor"] # pragma: no cover +def iter_repos(org): + query = """ + query repos($cursor: String, $org: String!) { + organization(login: $org) { + repositories(first: 100, after: $cursor) { + nodes { + name + } + pageInfo { + endCursor + hasNextPage + } + } + } + } + """ + for repo in get_query(query, path=["organization", "repositories"], org=org): + yield { + "name": repo["name"], + } -def _iter_pull_requests(org, date_range): - # we can't seem to interpolate graphql variables into a string, so doing it - # here - search_query = f"is:pr draft:false org:{org} {date_range}" - log.debug(f"GitHub search query: {search_query}") +def iter_repo_prs(org, repo): query = """ - query getPRs($cursor: String, $searchQuery: String!){ - search(query: $searchQuery, type:ISSUE, first: 100, after: $cursor) { - edges { - node { - ... on PullRequest { - createdAt - closedAt - mergedAt + query prs($cursor: String, $org: String!, $repo: String!) { + organization(login: $org) { + repository(name: $repo) { + pullRequests(first: 100, after: $cursor) { + nodes { author { login } - repository { - name - owner { - login - } - } + number + createdAt + closedAt + mergedAt + } + pageInfo { + endCursor + hasNextPage } } } - pageInfo { - endCursor - hasNextPage - } } } - """ - results = list(_iter_query_results(query, searchQuery=search_query)) - for pr in results: + for pr in get_query( + query, path=["organization", "repository", "pullRequests"], org=org, repo=repo + ): yield { + "org": org, + "repo": repo, + "author": pr["author"]["login"], "created": date_from_iso(pr["createdAt"]), "closed": date_from_iso(pr["closedAt"]), "merged": date_from_iso(pr["mergedAt"]), - "author": pr["author"]["login"], - "repo": pr["repository"]["name"], - "org": pr["repository"]["owner"]["login"], } -def prs_open_in_range(org, start, end): - start = start.isoformat() - end = end.isoformat() - date_range = f"created:<={start} closed:>={end}" - - return list(_iter_pull_requests(org, date_range)) - - -def prs_merged_on_date(org, date): - query = f"merged:{date}" - - return list(_iter_pull_requests(org, query)) - - -def prs_opened_on_date(org, date): - query = f"created:{date}" - - return list(_iter_pull_requests(org, query)) +def iter_prs(org): + for r in iter_repos(org): + yield from iter_repo_prs(org, r["name"]) if __name__ == "__main__": orgs = ["ebmdatalab", "opensafely-core"] - for pr in list(_iter_pull_requests(orgs[1], date(2023, 10, 24))): + for pr in list(iter_prs(orgs[1], date(2023, 10, 24))): print(pr) diff --git a/metrics/github/backfill.py b/metrics/github/backfill.py deleted file mode 100644 index d4df3e80..00000000 --- a/metrics/github/backfill.py +++ /dev/null @@ -1,217 +0,0 @@ -import json -import textwrap -from datetime import date, timedelta - -import click -import structlog - -from ..logs import setup_logging -from ..timescaledb import TimescaleDBWriter -from ..timescaledb.tables import GitHubPullRequests -from ..tools.dates import date_from_iso, iter_days, previous_weekday -from .api import session -from .prs import process_prs - - -setup_logging() - -log = structlog.get_logger() - - -def get_query_page(*, query, session, cursor, **kwargs): - """ - Get a page of the given query - - This uses the GraphQL API to avoid making O(N) calls to GitHub's (v3) REST - API. The passed cursor is a GraphQL cursor [1] allowing us to call this - function in a loop, passing in the responses cursor to advance our view of - the data. - - [1]: https://graphql.org/learn/pagination/#end-of-list-counts-and-connections - """ - # use GraphQL variables to avoid string interpolation - variables = {"cursor": cursor, **kwargs} - payload = {"query": query, "variables": variables} - - log.debug(query=query, **variables) - r = session.post("https://api.github.com/graphql", json=payload) - - if not r.ok: # pragma: no cover - print(r.headers) - print(r.content) - - r.raise_for_status() - results = r.json() - - # In some cases graphql will return a 200 response when there are errors. - # https://sachee.medium.com/200-ok-error-handling-in-graphql-7ec869aec9bc - # Handling things robustly is complex and query specific, so here we simply - # take the absence of 'data' as an error, rather than the presence of - # 'errors' key. - if "data" not in results: - msg = textwrap.dedent( - f""" - graphql query failed - - query: - {query} - - response: - {json.dumps(results, indent=2)} - """ - ) - raise RuntimeError(msg) - - return results["data"] - - -def get_query(query, path, **kwargs): - def extract(data): - result = data - for key in path: - result = result[key] - return result - - more_pages = True - cursor = None - while more_pages: - page = extract( - get_query_page(query=query, session=session, cursor=cursor, **kwargs) - ) - yield from page["nodes"] - more_pages = page["pageInfo"]["hasNextPage"] - cursor = page["pageInfo"]["endCursor"] - - -def iter_repos(org): - query = """ - query repos($cursor: String, $org: String!) { - organization(login: $org) { - repositories(first: 100, after: $cursor) { - nodes { - name - } - pageInfo { - endCursor - hasNextPage - } - } - } - } - """ - for repo in get_query(query, path=["organization", "repositories"], org=org): - yield { - "name": repo["name"], - } - - -def iter_repo_prs(org, repo): - query = """ - query prs($cursor: String, $org: String!, $repo: String!) { - organization(login: $org) { - repository(name: $repo) { - pullRequests(first: 100, after: $cursor) { - nodes { - author { - login - } - number - createdAt - closedAt - mergedAt - } - pageInfo { - endCursor - hasNextPage - } - } - } - } - } - """ - for pr in get_query( - query, path=["organization", "repository", "pullRequests"], org=org, repo=repo - ): - yield { - "org": org, - "repo": repo, - "author": pr["author"]["login"], - "created": date_from_iso(pr["createdAt"]), - "closed": date_from_iso(pr["closedAt"]), - "merged": date_from_iso(pr["mergedAt"]), - } - - -def iter_prs(org): - for r in iter_repos(org): - yield from iter_repo_prs(org, r["name"]) - - -def open_prs(prs, org, days_threshold): - earliest = min([pr["created"] for pr in prs]) - start = previous_weekday(earliest, 0) # Monday - mondays = list(iter_days(start, date.today(), step=timedelta(days=7))) - - today = date.today() - threshold = timedelta(days=days_threshold) - - def open_on_day(pr, start, end): - """ - Filter function for PRs - - Checks whether a PR is open today and if it's been open for greater or - equal to the threshold of days. - """ - closed = pr["closed"] or today - opened = pr["created"] - - open_today = (opened <= start) and (closed >= end) - if not open_today: - return False - - return (closed - opened) >= threshold - - with TimescaleDBWriter(GitHubPullRequests) as writer: - for start in mondays: - end = start + timedelta(days=6) - prs_on_day = [pr for pr in prs if open_on_day(pr, start, end)] - - name = f"queue_older_than_{days_threshold}_days" - - log.info( - "%s | %s | Processing %s PRs from week starting %s", - name, - org, - len(prs_on_day), - start, - ) - process_prs(writer, prs_on_day, start, name=name) - - -def pr_throughput(prs, org): - start = min([pr["created"] for pr in prs]) - days = list(iter_days(start, date.today())) - - with TimescaleDBWriter(GitHubPullRequests) as writer: - for day in days: - opened_prs = [pr for pr in prs if pr["created"] == day] - log.info("%s | %s | Processing %s opened PRs", day, org, len(opened_prs)) - process_prs(writer, opened_prs, day, name="prs_opened") - - merged_prs = [pr for pr in prs if pr["merged"] and pr["merged"] == day] - log.info("%s | %s | Processing %s merged PRs", day, org, len(merged_prs)) - process_prs(writer, merged_prs, day, name="prs_merged") - - -@click.command() -@click.argument("org") -@click.pass_context -def backfill(ctx, org): - """Backfill GitHub data for the given GitHub ORG""" - prs = list(iter_prs(org)) - - org_prs = [pr for pr in prs if pr["org"] == org] - log.info("Backfilling with %s PRs for %s", len(org_prs), org) - - open_prs(org_prs, org, days_threshold=7) - pr_throughput(org_prs, org) diff --git a/metrics/github/cli.py b/metrics/github/cli.py index aaf7cb7b..f5524d3c 100644 --- a/metrics/github/cli.py +++ b/metrics/github/cli.py @@ -1,80 +1,88 @@ -from datetime import timedelta +from datetime import date, timedelta import click import structlog from ..timescaledb import TimescaleDBWriter from ..timescaledb.tables import GitHubPullRequests -from ..tools.dates import previous_weekday +from ..tools.dates import iter_days, previous_weekday from . import api -from .backfill import backfill from .prs import process_prs log = structlog.get_logger() -@click.group() -@click.option("--token", required=True, envvar="GITHUB_TOKEN") -@click.pass_context -def github(ctx, token): - ctx.ensure_object(dict) +def open_prs(prs, org, days_threshold): + earliest = min([pr["created"] for pr in prs]) + start = previous_weekday(earliest, 0) # Monday + mondays = list(iter_days(start, date.today(), step=timedelta(days=7))) - ctx.obj["TOKEN"] = token + today = date.today() + threshold = timedelta(days=days_threshold) + def open_on_day(pr, start, end): + """ + Filter function for PRs -@github.command() -@click.argument("org") -@click.argument("date", type=click.DateTime()) -@click.argument("--days-threshold", type=int, default=7) -@click.pass_context -def open_prs(ctx, org, date, days_threshold): - """ - How many open PRs were there this week? - - The number of PRs open for DAYS_THRESHOLD (defaults to 7 days) in the - previous week to the given date. - - Week here is defined as the dates covering the most recent Monday to Sunday - (inclusive) before the given date, eg if the given date is a Tuesday this - command will step back a week+1 day to collect a full weeks worth of data. - """ - date = date.date() - - end = previous_weekday(date, 6) # Most recent Sunday - start = end - timedelta(days=6) # Monday before that Sunday - prs = api.prs_open_in_range(org, start, end) - - # remove PRs which have been open = timedelta(days=days_threshold) - ] + Checks whether a PR is open today and if it's been open for greater or + equal to the threshold of days. + """ + closed = pr["closed"] or today + opened = pr["created"] + + open_today = (opened <= start) and (closed >= end) + if not open_today: + return False + + return (closed - opened) >= threshold - log.info("%s | %s | Processing %s PRs", date, org, len(open_prs)) with TimescaleDBWriter(GitHubPullRequests) as writer: - process_prs( - writer, open_prs, date, name=f"queue_older_than_{days_threshold}_days" - ) + for start in mondays: + end = start + timedelta(days=6) + prs_on_day = [pr for pr in prs if open_on_day(pr, start, end)] + name = f"queue_older_than_{days_threshold}_days" -@github.command() -@click.argument("org") -@click.argument("date", type=click.DateTime()) -@click.pass_context -def pr_throughput(ctx, org, date): - """PRs opened and PRs closed in the given day""" - date = date.date() + log.info( + "%s | %s | Processing %s PRs from week starting %s", + name, + org, + len(prs_on_day), + start, + ) + process_prs(writer, prs_on_day, start, name=name) + + +def pr_throughput(prs, org): + start = min([pr["created"] for pr in prs]) + days = list(iter_days(start, date.today())) with TimescaleDBWriter(GitHubPullRequests) as writer: - opened_prs = api.prs_opened_on_date(org, date) - log.info("%s | %s | Processing %s opened PRs", date, org, len(opened_prs)) - process_prs(writer, opened_prs, date, name="prs_opened") + for day in days: + opened_prs = [pr for pr in prs if pr["created"] == day] + log.info("%s | %s | Processing %s opened PRs", day, org, len(opened_prs)) + process_prs(writer, opened_prs, day, name="prs_opened") + + merged_prs = [pr for pr in prs if pr["merged"] and pr["merged"] == day] + log.info("%s | %s | Processing %s merged PRs", day, org, len(merged_prs)) + process_prs(writer, merged_prs, day, name="prs_merged") - merged_prs = api.prs_merged_on_date(org, date) - log.info("%s | %s | Processing %s merged PRs", date, org, len(merged_prs)) - process_prs(writer, merged_prs, date, name="prs_merged") +@click.command() +@click.option("--token", required=True, envvar="GITHUB_TOKEN") +@click.pass_context +def github(ctx, token): + ctx.ensure_object(dict) + ctx.obj["TOKEN"] = token + + orgs = [ + "ebmdatalab", + "opensafely-core", + ] + for org in orgs: + prs = list(api.iter_prs(org)) + log.info("Backfilling with %s PRs for %s", len(prs), org) -github.add_command(backfill) + open_prs(prs, org, days_threshold=7) + pr_throughput(prs, org) diff --git a/pyproject.toml b/pyproject.toml index e049cbd8..2e42c399 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ source = [ ] [tool.coverage.report] -fail_under = 67 +fail_under = 70 skip_covered = true show_missing = true diff --git a/tests/metrics/github/test_cli.py b/tests/metrics/github/test_cli.py new file mode 100644 index 00000000..07afb496 --- /dev/null +++ b/tests/metrics/github/test_cli.py @@ -0,0 +1,23 @@ +import textwrap + +from click.testing import CliRunner + +from metrics.cli import cli + + +def test_github(): + runner = CliRunner() + result = runner.invoke(cli, ["github", "--help"]) + + assert result.exit_code == 0 + + expected = textwrap.dedent( + """ + Usage: cli github [OPTIONS] + + Options: + --token TEXT [required] + --help Show this message and exit. + """ + ).lstrip() # lstrip so dedent works and we retain the leading newline + assert result.output == expected, result.output