Skip to content

Commit

Permalink
Set outside_collaborators new func argument as optional to avoid brea…
Browse files Browse the repository at this point in the history
…king changes
  • Loading branch information
cmm-lyft committed Dec 11, 2024
1 parent de67d9c commit 427ed7e
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 1 deletion.
72 changes: 72 additions & 0 deletions cartography/intel/aws/image_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import boto3
import logging
from typing import List, Dict

logger = logging.getLogger(__name__)

class ECRCrossCheck:
def __init__(self, aws_region: str):
self.ecr_client = boto3.client('ecr', region_name=aws_region)
self.ecs_client = boto3.client('ecs', region_name=aws_region)
self.eks_client = boto3.client('eks', region_name=aws_region)
self.ec2_client = boto3.client('ec2', region_name=aws_region)

def get_ecr_images(self, repository_name: str) -> List[str]:
response = self.ecr_client.list_images(repositoryName=repository_name)
image_ids = response['imageIds']
return [image['imageDigest'] for image in image_ids]

def check_image_in_ecs(self, image_digest: str) -> bool:
clusters = self.ecs_client.list_clusters()['clusterArns']
for cluster in clusters:
tasks = self.ecs_client.list_tasks(cluster=cluster)['taskArns']
for task in tasks:
task_desc = self.ecs_client.describe_tasks(cluster=cluster, tasks=[task])['tasks']
for container in task_desc[0]['containers']:
if image_digest in container['image']:
return True
return False

def check_image_in_eks(self, image_digest: str) -> bool:
clusters = self.eks_client.list_clusters()['clusters']
for cluster in clusters:
nodegroups = self.eks_client.list_nodegroups(clusterName=cluster)['nodegroups']
for nodegroup in nodegroups:
nodegroup_desc = self.eks_client.describe_nodegroup(clusterName=cluster, nodegroupName=nodegroup)['nodegroup']
for image in nodegroup_desc['amiType']:
if image_digest in image:
return True
return False

def check_image_in_ec2(self, image_digest: str) -> bool:
instances = self.ec2_client.describe_instances()['Reservations']
for reservation in instances:
for instance in reservation['Instances']:
if 'ImageId' in instance and image_digest in instance['ImageId']:
return True
return False

def cross_check_images(self, repository_name: str) -> Dict[str, bool]:
image_digests = self.get_ecr_images(repository_name)
results = {}
for image_digest in image_digests:
in_ecs = self.check_image_in_ecs(image_digest)
in_eks = self.check_image_in_eks(image_digest)
in_ec2 = self.check_image_in_ec2(image_digest)
results[image_digest] = in_ecs or in_eks or in_ec2
logger.info(f'Image {image_digest} in use: {results[image_digest]}')
return results

def main():
aws_region = 'us-west-2'
repository_name = 'your-ecr-repository'
cross_checker = ECRCrossCheck(aws_region)
results = cross_checker.cross_check_images(repository_name)
for image_digest, in_use in results.items():
if in_use:
logger.info(f'Image {image_digest} is in use.')
else:
logger.warning(f'Image {image_digest} is not in use.')

if __name__ == '__main__':
main()
55 changes: 55 additions & 0 deletions cartography/intel/github/pullrequest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import requests
from typing import Dict, Any

GITHUB_API_URL = 'https://api.github.com'

class GitHubPullRequestClient:
def __init__(self, token: str):
self.token = token
self.headers = {'Authorization': f"token {token}"}

def _handle_rate_limit(self):
response = requests.get(f'{GITHUB_API_URL}/rate_limit', headers=self.headers)
response.raise_for_status()
rate_limit_obj = response.json()['resources']['core']
remaining = rate_limit_obj['remaining']
if remaining == 0:
reset_at = datetime.fromtimestamp(rate_limit_obj['reset'], tz=tz.utc)
now = datetime.now(tz.utc)
sleep_duration = reset_at - now + timedelta(minutes=1)
logger.warning(
f'GitHub API rate limit has {remaining} remaining, sleeping until reset at {reset_at} for {sleep_duration}',
)
time.sleep(sleep_duration.seconds)

def get_pull_request_data(self, repo: str, pr_id: int) -> Dict[str, Any]:
self._handle_rate_limit()
response = requests.get(f'{GITHUB_API_URL}/repos/{repo}/pulls/{pr_id}', headers=self.headers)
response.raise_for_status()
return response.json()

def get_pull_request_comments(self, repo: str, pr_id: int) -> Dict[str, Any]:
self._handle_rate_limit()
response = requests.get(f'{GITHUB_API_URL}/repos/{repo}/issues/{pr_id}/comments', headers=self.headers)
response.raise_for_status()
return response.json()

def get_pull_request_info(token: str, repo: str, pr_id: int) -> Dict[str, Any]:
client = GitHubPullRequestClient(token)
pr_data = client.get_pull_request_data(repo, pr_id)
pr_comments = client.get_pull_request_comments(repo, pr_id)
return {
'pull_request': pr_data,
'comments': pr_comments
}

if __name__ == '__main__':
import os
import json

token = os.getenv('GITHUB_TOKEN')
repo = 'OWNER/REPO' # Replace with the actual repository
pr_id = 1 # Replace with the actual pull request ID

pr_info = get_pull_request_info(token, repo, pr_id)
print(json.dumps(pr_info, indent=4))
2 changes: 1 addition & 1 deletion cartography/intel/github/repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def get(token: str, api_url: str, organization: str) -> List[Dict]:


def transform(
repos_json: List[Dict], direct_collaborators: dict[str, List[UserAffiliationAndRepoPermission]],
repos_json: List[Dict], direct_collaborators: dict[str, List[UserAffiliationAndRepoPermission]] | None = None,
outside_collaborators: dict[str, List[UserAffiliationAndRepoPermission]] | None = None,
) -> Dict:
"""
Expand Down

0 comments on commit 427ed7e

Please sign in to comment.