Skip to content

Commit

Permalink
Adds support to query Semgrep API to ingest SCA vulns (cartography-cn…
Browse files Browse the repository at this point in the history
…cf#1224)

Adds a new Schema and Intel Job to query Semgrep Enterprise API and
ingest Semgrep Supply Chain (SSC) findings.
The schema connects a Semgrep Deployment with an id specific to a
customer in Semgrep Enterprise to an SCA finding and location as a sub
resource relationships. It also connects to a Github repository to match
findings against where they were found. Each finding can have a location
with the specific lines of code where the vulnerable dependency is being
used.

![SemgrepCartographyfinal](https://github.com/lyft/cartography/assets/9236431/9a99ecdd-b40f-430e-bff5-fe950f4c713e)

---------

Co-authored-by: Alex Chantavy <[email protected]>
  • Loading branch information
2 people authored and chandan-cl committed Jun 26, 2024
1 parent 01b0aed commit 433aed1
Show file tree
Hide file tree
Showing 16 changed files with 901 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cartography/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,15 @@ def _build_parser(self):
'The Duo api hostname'
),
)
parser.add_argument(
'--semgrep-app-token-env-var',
type=str,
default=None,
help=(
'The name of environment variable containing the Semgrep app token key. '
'Required if you are using the Semgrep intel module. Ignored otherwise.'
),
)
return parser

def main(self, argv: str) -> int:
Expand Down Expand Up @@ -669,6 +678,13 @@ def main(self, argv: str) -> int:
config.duo_api_key = None
config.duo_api_secret = None

# Semgrep config
if config.semgrep_app_token_env_var:
logger.debug(f"Reading Semgrep App Token from environment variable {config.semgrep_app_token_env_var}")
config.semgrep_app_token = os.environ.get(config.semgrep_app_token_env_var)
else:
config.semgrep_app_token = None

# Run cartography
try:
return cartography.sync.run_with_config(self.sync, config)
Expand Down
4 changes: 4 additions & 0 deletions cartography/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class Config:
:param duo_api_key: The Duo api secret. Optional.
:type duo_api_hostname: str
:param duo_api_hostname: The Duo api hostname, e.g. "api-abc123.duosecurity.com". Optional.
:param semgrep_app_token: The Semgrep api token. Optional.
:type semgrep_app_token: str
"""

def __init__(
Expand Down Expand Up @@ -157,6 +159,7 @@ def __init__(
duo_api_key=None,
duo_api_secret=None,
duo_api_hostname=None,
semgrep_app_token=None,
):
self.neo4j_uri = neo4j_uri
self.neo4j_user = neo4j_user
Expand Down Expand Up @@ -208,3 +211,4 @@ def __init__(
self.duo_api_key = duo_api_key
self.duo_api_secret = duo_api_secret
self.duo_api_hostname = duo_api_hostname
self.semgrep_app_token = semgrep_app_token
23 changes: 23 additions & 0 deletions cartography/intel/semgrep/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging

import neo4j

from cartography.config import Config
from cartography.intel.semgrep.findings import sync
from cartography.util import timeit


logger = logging.getLogger(__name__)


@timeit
def start_semgrep_ingestion(
neo4j_session: neo4j.Session, config: Config,
) -> None:
common_job_parameters = {
"UPDATE_TAG": config.update_tag,
}
if not config.semgrep_app_token:
logger.info('Semgrep import is not configured - skipping this module. See docs to configure.')
return
sync(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
217 changes: 217 additions & 0 deletions cartography/intel/semgrep/findings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple

import neo4j
import requests

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.semgrep.deployment import SemgrepDeploymentSchema
from cartography.models.semgrep.findings import SemgrepSCAFindingSchema
from cartography.models.semgrep.locations import SemgrepSCALocationSchema
from cartography.stats import get_stats_client
from cartography.util import merge_module_sync_metadata
from cartography.util import timeit

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
_TIMEOUT = (60, 60)


@timeit
def get_deployment(semgrep_app_token: str) -> Dict[str, Any]:
"""
Gets the deployment associated with the passed Semgrep App token.
param: semgrep_app_token: The Semgrep App token to use for authentication.
"""
deployment = {}
deployment_url = "https://semgrep.dev/api/v1/deployments"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}
response = requests.get(deployment_url, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()

data = response.json()
deployment["id"] = data["deployments"][0]["id"]
deployment["name"] = data["deployments"][0]["name"]
deployment["slug"] = data["deployments"][0]["slug"]

return deployment


@timeit
def get_sca_vulns(semgrep_app_token: str, deployment_id: str) -> List[Dict[str, Any]]:
"""
Gets the SCA vulns associated with the passed Semgrep App token and deployment id.
param: semgrep_app_token: The Semgrep App token to use for authentication.
param: deployment_id: The Semgrep deployment id to use for retrieving SCA vulns.
"""
all_vulns = []
sca_url = f"https://semgrep.dev/api/sca/deployments/{deployment_id}/vulns"
has_more = True
cursor = ""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}

while has_more:
params = {}
if cursor:
params = {"cursor": cursor}

response = requests.get(sca_url, params=params, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()
data = response.json()
vulns = data["vulns"]
cursor = data.get("cursor")
has_more = data.get("hasMore", False)
all_vulns.extend(vulns)

return all_vulns


def transform_sca_vulns(raw_vulns: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]:
"""
Transforms the raw SCA vulns response from Semgrep API into a list of dicts
that can be used to create the SemgrepSCAFinding nodes.
"""
vulns = []
usages = []
for vuln in raw_vulns:
sca_vuln: Dict[str, Any] = {}
# Mandatory fields
unique_id = f"{vuln['repositoryName']}|{vuln['advisory']['ruleId']}"
sca_vuln["id"] = unique_id
sca_vuln["repositoryName"] = vuln["repositoryName"]
sca_vuln["ruleId"] = vuln["advisory"]["ruleId"]
sca_vuln["title"] = vuln["advisory"]["title"]
sca_vuln["description"] = vuln["advisory"]["description"]
sca_vuln["ecosystem"] = vuln["advisory"]["ecosystem"]
sca_vuln["severity"] = vuln["advisory"]["severity"]
sca_vuln["reachability"] = vuln["advisory"]["reachability"]
sca_vuln["reachableIf"] = vuln["advisory"]["reachableIf"]
sca_vuln["exposureType"] = vuln["exposureType"]
dependency = f"{vuln['matchedDependency']['name']}|{vuln['matchedDependency']['versionSpecifier']}"
sca_vuln["matchedDependency"] = dependency
sca_vuln["dependencyFileLocation_path"] = vuln["dependencyFileLocation"]["path"]
sca_vuln["dependencyFileLocation_url"] = vuln["dependencyFileLocation"]["url"]
# Optional fields
sca_vuln["transitivity"] = vuln.get("transitivity", None)
cves = vuln.get("advisory", {}).get("references", {}).get("cveIds")
if len(cves) > 0:
# Take the first CVE
sca_vuln["cveId"] = vuln["advisory"]["references"]["cveIds"][0]
if vuln.get('closestSafeDependency'):
dep_fix = f"{vuln['closestSafeDependency']['name']}|{vuln['closestSafeDependency']['versionSpecifier']}"
sca_vuln["closestSafeDependency"] = dep_fix
if vuln["advisory"].get("references", {}).get("urls", []):
sca_vuln["ref_urls"] = vuln["advisory"].get("references", {}).get("urls", [])
sca_vuln["openedAt"] = vuln.get("openedAt", None)
for usage in vuln.get("usages", []):
usage_dict = {}
usage_dict["SCA_ID"] = unique_id
usage_dict["findingId"] = usage["findingId"]
usage_dict["path"] = usage["location"]["path"]
usage_dict["startLine"] = usage["location"]["startLine"]
usage_dict["startCol"] = usage["location"]["startCol"]
usage_dict["endLine"] = usage["location"]["endLine"]
usage_dict["endCol"] = usage["location"]["endCol"]
usage_dict["url"] = usage["location"]["url"]
usages.append(usage_dict)
vulns.append(sca_vuln)
return vulns, usages


@timeit
def load_semgrep_deployment(
neo4j_session: neo4j.Session, deployment: Dict[str, Any], update_tag: int,
) -> None:
logger.info(f"Loading Semgrep deployment info {deployment} into the graph...")
load(
neo4j_session,
SemgrepDeploymentSchema(),
[deployment],
lastupdated=update_tag,
)


@timeit
def load_semgrep_sca_vulns(
neo4j_session: neo4j.Session,
vulns: List[Dict[str, Any]],
deployment_id: str,
update_tag: int,
) -> None:
logger.info(f"Loading {len(vulns)} Semgrep SCA vulns info into the graph.")
load(
neo4j_session,
SemgrepSCAFindingSchema(),
vulns,
lastupdated=update_tag,
DEPLOYMENT_ID=deployment_id,
)


@timeit
def load_semgrep_sca_usages(
neo4j_session: neo4j.Session,
usages: List[Dict[str, Any]],
deployment_id: str,
update_tag: int,
) -> None:
logger.info(f"Loading {len(usages)} Semgrep SCA usages info into the graph.")
load(
neo4j_session,
SemgrepSCALocationSchema(),
usages,
lastupdated=update_tag,
DEPLOYMENT_ID=deployment_id,
)


@timeit
def cleanup(
neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any],
) -> None:
logger.info("Running Semgrep SCA findings cleanup job.")
findings_cleanup_job = GraphJob.from_node_schema(
SemgrepSCAFindingSchema(), common_job_parameters,
)
findings_cleanup_job.run(neo4j_session)
logger.info("Running Semgrep SCA Locations cleanup job.")
locations_cleanup_job = GraphJob.from_node_schema(
SemgrepSCALocationSchema(), common_job_parameters,
)
locations_cleanup_job.run(neo4j_session)


@timeit
def sync(
neo4j_sesion: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:
logger.info("Running Semgrep SCA findings sync job.")
semgrep_deployment = get_deployment(semgrep_app_token)
load_semgrep_deployment(neo4j_sesion, semgrep_deployment, update_tag)
common_job_parameters["DEPLOYMENT_ID"] = semgrep_deployment["id"]
raw_vulns = get_sca_vulns(semgrep_app_token, semgrep_deployment["id"])
vulns, usages = transform_sca_vulns(raw_vulns)
load_semgrep_sca_vulns(neo4j_sesion, vulns, semgrep_deployment["id"], update_tag)
load_semgrep_sca_usages(neo4j_sesion, usages, semgrep_deployment["id"], update_tag)
cleanup(neo4j_sesion, common_job_parameters)
merge_module_sync_metadata(
neo4j_session=neo4j_sesion,
group_type='Semgrep',
group_id=common_job_parameters["DEPLOYMENT_ID"],
synced_type='SCA',
update_tag=update_tag,
stat_handler=stat_handler,
)
Empty file.
19 changes: 19 additions & 0 deletions cartography/models/semgrep/deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dataclasses import dataclass

from cartography.models.core.common import PropertyRef
from cartography.models.core.nodes import CartographyNodeProperties
from cartography.models.core.nodes import CartographyNodeSchema


@dataclass(frozen=True)
class SemgrepDeploymentProperties(CartographyNodeProperties):
id: PropertyRef = PropertyRef('id')
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)
name: PropertyRef = PropertyRef('name', extra_index=True)
slug: PropertyRef = PropertyRef('slug', extra_index=True)


@dataclass(frozen=True)
class SemgrepDeploymentSchema(CartographyNodeSchema):
label: str = 'SemgrepDeployment'
properties: SemgrepDeploymentProperties = SemgrepDeploymentProperties()
80 changes: 80 additions & 0 deletions cartography/models/semgrep/findings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from dataclasses import dataclass

from cartography.models.core.common import PropertyRef
from cartography.models.core.nodes import CartographyNodeProperties
from cartography.models.core.nodes import CartographyNodeSchema
from cartography.models.core.relationships import CartographyRelProperties
from cartography.models.core.relationships import CartographyRelSchema
from cartography.models.core.relationships import LinkDirection
from cartography.models.core.relationships import make_target_node_matcher
from cartography.models.core.relationships import OtherRelationships
from cartography.models.core.relationships import TargetNodeMatcher


@dataclass(frozen=True)
class SemgrepSCAFindingNodeProperties(CartographyNodeProperties):
id: PropertyRef = PropertyRef('id')
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)
rule_id: PropertyRef = PropertyRef('ruleId', extra_index=True)
repository: PropertyRef = PropertyRef('repositoryName', extra_index=True)
summary: PropertyRef = PropertyRef('title', extra_index=True)
description: PropertyRef = PropertyRef('description')
package_manager: PropertyRef = PropertyRef('ecosystem')
severity: PropertyRef = PropertyRef('severity')
cve_id: PropertyRef = PropertyRef('cveId', extra_index=True)
reachability_check: PropertyRef = PropertyRef('reachability')
reachability_condition: PropertyRef = PropertyRef('reachableIf')
reachability: PropertyRef = PropertyRef('exposureType')
transitivity: PropertyRef = PropertyRef('transitivity')
dependency: PropertyRef = PropertyRef('matchedDependency')
dependency_fix: PropertyRef = PropertyRef('closestSafeDependency')
ref_urls: PropertyRef = PropertyRef('ref_urls')
dependency_file: PropertyRef = PropertyRef('dependencyFileLocation_path', extra_index=True)
dependency_file_url: PropertyRef = PropertyRef('dependencyFileLocation_url', extra_index=True)
scan_time: PropertyRef = PropertyRef('openedAt')


@dataclass(frozen=True)
class SemgrepSCAFindingToSemgrepDeploymentRelProperties(CartographyRelProperties):
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)


@dataclass(frozen=True)
# (:SemgrepSCAFinding)<-[:RESOURCE]-(:SemgrepDeployment)
class SemgrepSCAFindingToSemgrepDeploymentSchema(CartographyRelSchema):
target_node_label: str = 'SemgrepDeployment'
target_node_matcher: TargetNodeMatcher = make_target_node_matcher(
{'id': PropertyRef('DEPLOYMENT_ID', set_in_kwargs=True)},
)
direction: LinkDirection = LinkDirection.INWARD
rel_label: str = "RESOURCE"
properties: SemgrepSCAFindingToSemgrepDeploymentRelProperties = SemgrepSCAFindingToSemgrepDeploymentRelProperties()


@dataclass(frozen=True)
class SemgrepSCAFindingToGithubRepoRelProperties(CartographyRelProperties):
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)


@dataclass(frozen=True)
# (:SemgrepSCAFinding)-[:FOUND_IN]->(:GitHubRepository)
class SemgrepSCAFindingToGithubRepoRel(CartographyRelSchema):
target_node_label: str = 'GitHubRepository'
target_node_matcher: TargetNodeMatcher = make_target_node_matcher(
{'fullname': PropertyRef('repositoryName')},
)
direction: LinkDirection = LinkDirection.OUTWARD
rel_label: str = "FOUND_IN"
properties: SemgrepSCAFindingToGithubRepoRelProperties = SemgrepSCAFindingToGithubRepoRelProperties()


@dataclass(frozen=True)
class SemgrepSCAFindingSchema(CartographyNodeSchema):
label: str = 'SemgrepSCAFinding'
properties: SemgrepSCAFindingNodeProperties = SemgrepSCAFindingNodeProperties()
sub_resource_relationship: SemgrepSCAFindingToSemgrepDeploymentSchema = SemgrepSCAFindingToSemgrepDeploymentSchema()
other_relationships: OtherRelationships = OtherRelationships(
[
SemgrepSCAFindingToGithubRepoRel(),
],
)
Loading

0 comments on commit 433aed1

Please sign in to comment.