Skip to content

Commit

Permalink
Fix cartography-cncf#1088: Refactor AWS Inspector to new model (carto…
Browse files Browse the repository at this point in the history
…graphy-cncf#1241)

Refactors AWS Inspector sync to new data model so that we minimize the
risk of write errors as seen in cartography-cncf#1088.

Also fixes bug where we attempt to attach to inspector findings before
they exist because we ingest the package before the findings.

---------

Co-authored-by: Ramon Petgrave <[email protected]>
  • Loading branch information
2 people authored and chandan-cl committed Jun 26, 2024
1 parent 49ef2c0 commit 16a1882
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 284 deletions.
4 changes: 0 additions & 4 deletions cartography/data/indexes.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ CREATE INDEX IF NOT EXISTS FOR (n:AWSDNSZone) ON (n.zoneid);
CREATE INDEX IF NOT EXISTS FOR (n:AWSDNSZone) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSGroup) ON (n.arn);
CREATE INDEX IF NOT EXISTS FOR (n:AWSGroup) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorFinding) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorFinding) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorPackage) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorPackage) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInternetGateway) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInternetGateway) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSIpv4CidrBlock) ON (n.id);
Expand Down
35 changes: 0 additions & 35 deletions cartography/data/jobs/cleanup/aws_import_inspector_cleanup.json

This file was deleted.

186 changes: 44 additions & 142 deletions cartography/intel/aws/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import boto3
import neo4j

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.aws.inspector.findings import AWSInspectorFindingSchema
from cartography.models.aws.inspector.packages import AWSInspectorPackageSchema
from cartography.util import aws_handle_regions
from cartography.util import aws_paginate
from cartography.util import batch
from cartography.util import run_cleanup_job
from cartography.util import timeit


Expand All @@ -20,17 +22,17 @@
@timeit
@aws_handle_regions
def get_inspector_findings(
session: boto3.session.Session,
region: str,
current_aws_account_id: str,
) -> List[Dict]:
'''
session: boto3.session.Session,
region: str,
current_aws_account_id: str,
) -> List[Dict[str, Any]]:
"""
We must list_findings by filtering the request, otherwise the request could tiemout.
First, we filter by account_id. And since there may be millions of CLOSED findings that may never go away,
we will only fetch those in ACTIVE or SUPPRESSED statuses.
list_members will get us all the accounts that
have delegated access to the account specified by current_aws_account_id.
'''
"""
client = session.client('inspector2', region_name=region)

members = aws_paginate(client, 'list_members', 'members')
Expand Down Expand Up @@ -61,7 +63,7 @@ def get_inspector_findings(
return findings


def transform_inspector_findings(results: List[Dict]) -> Tuple[List, List]:
def transform_inspector_findings(results: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
findings_list: List[Dict] = []
packages: Dict[str, Any] = {}

Expand Down Expand Up @@ -110,7 +112,7 @@ def transform_inspector_findings(results: List[Dict]) -> Tuple[List, List]:
return findings_list, packages_list


def transform_inspector_packages(packages: Dict[str, Any]) -> List[Dict]:
def transform_inspector_packages(packages: Dict[str, Any]) -> List[Dict[str, Any]]:
packages_list: List[Dict] = []
for package_id in packages.keys():
packages_list.append(packages[package_id])
Expand Down Expand Up @@ -146,153 +148,53 @@ def _process_packages(package_details: Dict[str, Any], aws_account_id: str, find
return packages


def _port_range_string(details: Dict) -> str:
def _port_range_string(details: Dict[str, Any]) -> str:
begin = details['openPortRange']['begin']
end = details['openPortRange']['end']
return f"{begin}-{end}"


def _load_findings_tx(
tx: neo4j.Transaction,
findings: List[Dict],
region: str,
aws_update_tag: int,
) -> None:
query = """
UNWIND $Findings as new_finding
MERGE (finding:AWSInspectorFinding{id: new_finding.id})
ON CREATE SET finding.firstseen = timestamp(),
finding.arn = new_finding.arn,
finding.region = $Region,
finding.awsaccount = new_finding.awsaccount
SET finding.lastupdated = $UpdateTag,
finding.name = new_finding.title,
finding.instanceid = new_finding.instanceid,
finding.ecrimageid = new_finding.ecrimageid,
finding.ecrrepositoryid = new_finding.ecrrepositoryid,
finding.severity = new_finding.severity,
finding.firstobservedat = new_finding.firstobservedat,
finding.updatedat = new_finding.updatedat,
finding.description = new_finding.description,
finding.type = new_finding.type,
finding.cvssscore = new_finding.cvssscore,
finding.protocol = new_finding.protocol,
finding.portrange = new_finding.portrange,
finding.portrangebegin = new_finding.portrangebegin,
finding.portrangeend = new_finding.portrangeend,
finding.vulnerabilityid = new_finding.vulnerabilityid,
finding.referenceurls = new_finding.referenceurls,
finding.relatedvulnerabilities = new_finding.relatedvulnerabilities,
finding.source = new_finding.source,
finding.sourceurl = new_finding.sourceurl,
finding.status = new_finding.status,
finding.vendorcreatedat = new_finding.vendorcreatedat,
finding.vendorseverity = new_finding.vendorseverity,
finding.vendorupdatedat = new_finding.vendorupdatedat,
finding.vulnerablepackageids = new_finding.vulnerablepackageids,
finding:Risk
WITH finding
MATCH (account:AWSAccount{id: finding.awsaccount})
MERGE (account)-[r:RESOURCE]->(finding)
ON CREATE SET r.firstseen = timestamp()
SET r.lastupdated = $UpdateTag
WITH finding
MATCH (instance:EC2Instance{id: finding.instanceid})
MERGE (instance)<-[r2:AFFECTS]-(finding)
ON CREATE SET r2.firstseen = timestamp()
SET r2.lastupdated = $UpdateTag
WITH finding
MATCH (repo:ECRRepository{id: finding.ecrrepositoryid})
MERGE (repo)<-[r3:AFFECTS]-(finding)
ON CREATE SET r3.firstseen = timestamp()
SET r3.lastupdated = $UpdateTag
WITH finding
MATCH (image:ECRImage{id: finding.ecrimageid})
MERGE (image)<-[r4:AFFECTS]-(finding)
ON CREATE SET r4.firstseen = timestamp()
SET r4.lastupdated = $UpdateTag
"""

tx.run(
query,
Findings=findings,
UpdateTag=aws_update_tag,
Region=region,
)


@timeit
def load_inspector_findings(
neo4j_session: neo4j.Session, findings: List[Dict], region: str,
aws_update_tag: int,
neo4j_session: neo4j.Session,
findings: List[Dict[str, Any]],
region: str,
aws_update_tag: int,
current_aws_account_id: str,
) -> None:
for i, findings_batch in enumerate(batch(findings), start=1):
logger.info(f'Loading batch number {i}')
neo4j_session.write_transaction(
_load_findings_tx,
findings=findings_batch,
region=region,
aws_update_tag=aws_update_tag,
)


def _load_packages_tx(
tx: neo4j.Transaction,
packages: List[Dict],
region: str,
aws_update_tag: int,
) -> None:
query = """
UNWIND $Packages as new_package
MERGE (package:AWSInspectorPackage{id: new_package.id})
ON CREATE SET package.firstseen = timestamp(),
package.region = $Region,
package.awsaccount = new_package.awsaccount,
package.findingarn = new_package.findingarn
SET package.lastupdated = $UpdateTag,
package.name = new_package.name,
package.arch = new_package.arch,
package.version = new_package.version,
package.release = new_package.release,
package.epoch = new_package.epoch,
package.manager = new_package.packageManager,
package.filepath = new_package.filePath,
package.fixedinversion = new_package.fixedInVersion,
package.sourcelayerhash = new_package.sourceLayerHash
WITH package
MATCH (finding:AWSInspectorFinding{id: package.findingarn})
MERGE (finding)-[r:HAS]->(package)
WITH package
MATCH (account:AWSAccount{id: package.awsaccount})
MERGE (account)-[r:RESOURCE]->(package)
ON CREATE SET r.firstseen = timestamp()
SET r.lastupdated = $UpdateTag
"""

tx.run(
query,
Packages=packages,
UpdateTag=aws_update_tag,
load(
neo4j_session,
AWSInspectorFindingSchema(),
findings,
Region=region,
AWS_ID=current_aws_account_id,
lastupdated=aws_update_tag,
)


@timeit
def load_inspector_packages(
neo4j_session: neo4j.Session, packages: List[Dict], region: str,
aws_update_tag: int,
neo4j_session: neo4j.Session,
packages: List[Dict[str, Any]],
region: str,
aws_update_tag: int,
current_aws_account_id: str,
) -> None:
neo4j_session.write_transaction(
_load_packages_tx,
packages=packages,
region=region,
aws_update_tag=aws_update_tag,
load(
neo4j_session,
AWSInspectorPackageSchema(),
packages,
Region=region,
AWS_ID=current_aws_account_id,
lastupdated=aws_update_tag,
)


@timeit
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
run_cleanup_job('aws_import_inspector_cleanup.json', neo4j_session, common_job_parameters)
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any]) -> None:
logger.info("Running AWS Inspector cleanup")
GraphJob.from_node_schema(AWSInspectorFindingSchema(), common_job_parameters).run(neo4j_session)
GraphJob.from_node_schema(AWSInspectorPackageSchema(), common_job_parameters).run(neo4j_session)


@timeit
Expand All @@ -302,14 +204,14 @@ def sync(
regions: List[str],
current_aws_account_id: str,
update_tag: int,
common_job_parameters: Dict,
common_job_parameters: Dict[str, Any],
) -> None:
for region in regions:
logger.info(f"Syncing AWS Inspector findings for account {current_aws_account_id} and region {region}")
findings = get_inspector_findings(boto3_session, region, current_aws_account_id)
finding_data, package_data = transform_inspector_findings(findings)
logger.info(f"Loading {len(package_data)} packages")
load_inspector_packages(neo4j_session, package_data, region, update_tag)
logger.info(f"Loading {len(finding_data)} findings")
load_inspector_findings(neo4j_session, finding_data, region, update_tag)
load_inspector_findings(neo4j_session, finding_data, region, update_tag, current_aws_account_id)
logger.info(f"Loading {len(package_data)} packages")
load_inspector_packages(neo4j_session, package_data, region, update_tag, current_aws_account_id)
cleanup(neo4j_session, common_job_parameters)
Empty file.
Loading

0 comments on commit 16a1882

Please sign in to comment.