diff --git a/cartography/intel/github/repos.py b/cartography/intel/github/repos.py index 87ae0c0a6..5bd12b91b 100644 --- a/cartography/intel/github/repos.py +++ b/cartography/intel/github/repos.py @@ -136,13 +136,35 @@ """ +def _get_repo_collaborators_inner_func( + org: str, + api_url: str, + token: str, + repo_name: str, + affiliation: str, + collab_users: list[dict[str, Any]], + collab_permission: list[str], +) -> None: + logger.info(f"Loading {affiliation} collaborators for repo {repo_name}.") + collaborators = _get_repo_collaborators(token, api_url, org, repo_name, affiliation) + + # nodes and edges are expected to always be present given that we only call for them if totalCount is > 0 + # however sometimes GitHub returns None, as in issue 1334 and 1404. + for collab in collaborators.nodes or []: + collab_users.append(collab) + + # The `or []` is because `.edges` can be None. + for perm in collaborators.edges or []: + collab_permission.append(perm['permission']) + + def _get_repo_collaborators_for_multiple_repos( repo_raw_data: list[dict[str, Any]], affiliation: str, org: str, api_url: str, token: str, -) -> dict[str, List[UserAffiliationAndRepoPermission]]: +) -> dict[str, list[UserAffiliationAndRepoPermission]]: """ For every repo in the given list, retrieve the collaborators. :param repo_raw_data: A list of dicts representing repos. See tests.data.github.repos.GET_REPOS for data shape. @@ -153,7 +175,7 @@ def _get_repo_collaborators_for_multiple_repos( :param token: The Github API token as string. :return: A dictionary of repo URL to list of UserAffiliationAndRepoPermission """ - result: dict[str, List[UserAffiliationAndRepoPermission]] = {} + result: dict[str, list[UserAffiliationAndRepoPermission]] = {} for repo in repo_raw_data: repo_name = repo['name'] repo_url = repo['url'] @@ -167,27 +189,8 @@ def _get_repo_collaborators_for_multiple_repos( collab_users: List[dict[str, Any]] = [] collab_permission: List[str] = [] - def get_repo_collaborators_inner_func( - org: str, - api_url: str, - token: str, - repo_name: str, - affiliation: str, - collab_users: List[dict[str, Any]], - collab_permission: List[str], - ) -> None: - logger.info(f"Loading {affiliation} collaborators for repo {repo_name}.") - collaborators = _get_repo_collaborators(token, api_url, org, repo_name, affiliation) - # nodes and edges are expected to always be present given that we only call for them if totalCount is > 0 - # however sometimes GitHub returns None, as in issue 1334 and 1404. - for collab in collaborators.nodes or []: - collab_users.append(collab) - # The `or []` is because `.edges` can be None. - for perm in collaborators.edges or []: - collab_permission.append(perm['permission']) - retries_with_backoff( - get_repo_collaborators_inner_func, + _get_repo_collaborators_inner_func, TypeError, 5, backoff_handler, diff --git a/cartography/intel/github/teams.py b/cartography/intel/github/teams.py index b32ad3a80..4bd619f9d 100644 --- a/cartography/intel/github/teams.py +++ b/cartography/intel/github/teams.py @@ -65,6 +65,26 @@ def get_teams(org: str, api_url: str, token: str) -> Tuple[PaginatedGraphqlData, return fetch_all(token, api_url, org, org_teams_gql, 'teams') +def _get_teams_repos_inner_func( + org: str, + api_url: str, + token: str, + team_name: str, + repo_urls: list[str], + repo_permissions: list[str], +) -> None: + logger.info(f"Loading team repos for {team_name}.") + team_repos = _get_team_repos(org, api_url, token, team_name) + + # The `or []` is because `.nodes` can be None. See: + # https://docs.github.com/en/graphql/reference/objects#teamrepositoryconnection + for repo in team_repos.nodes or []: + repo_urls.append(repo['url']) + # The `or []` is because `.edges` can be None. + for edge in team_repos.edges or []: + repo_permissions.append(edge['permission']) + + @timeit def _get_team_repos_for_multiple_teams( team_raw_data: list[dict[str, Any]], @@ -85,23 +105,18 @@ def _get_team_repos_for_multiple_teams( repo_urls: List[str] = [] repo_permissions: List[str] = [] - def get_teams_repos_inner_func( - org: str, api_url: str, token: str, team_name: str, - repo_urls: List[str], repo_permissions: List[str], - ) -> None: - logger.info(f"Loading team repos for {team_name}.") - team_repos = _get_team_repos(org, api_url, token, team_name) - # The `or []` is because `.nodes` can be None. See: - # https://docs.github.com/en/graphql/reference/objects#teamrepositoryconnection - for repo in team_repos.nodes or []: - repo_urls.append(repo['url']) - # The `or []` is because `.edges` can be None. - for edge in team_repos.edges or []: - repo_permissions.append(edge['permission']) - - retries_with_backoff(get_teams_repos_inner_func, TypeError, 5, backoff_handler)( - org=org, api_url=api_url, token=token, team_name=team_name, - repo_urls=repo_urls, repo_permissions=repo_permissions, + retries_with_backoff( + _get_teams_repos_inner_func, + TypeError, + 5, + backoff_handler, + )( + org=org, + api_url=api_url, + token=token, + team_name=team_name, + repo_urls=repo_urls, + repo_permissions=repo_permissions, ) # Shape = [(repo_url, 'WRITE'), ...]] result[team_name] = [RepoPermission(url, perm) for url, perm in zip(repo_urls, repo_permissions)]