Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest Go dependencies using Semgrep API #1368

Merged
merged 30 commits into from
Nov 5, 2024
Merged
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cartography/intel/semgrep/__init__.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,9 @@
import neo4j

from cartography.config import Config
from cartography.intel.semgrep.findings import sync
from cartography.intel.semgrep.dependencies import sync_dependencies
from cartography.intel.semgrep.deployment import sync_deployment
from cartography.intel.semgrep.findings import sync_findings
from cartography.util import timeit


@@ -20,4 +22,9 @@ def start_semgrep_ingestion(
if not config.semgrep_app_token:
logger.info('Semgrep import is not configured - skipping this module. See docs to configure.')
return
sync(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)

# sync_deployment must be called first since it populates common_job_parameters
# with the deployment ID and slug, which are required by the other sync functions
sync_deployment(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
sync_dependencies(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
sync_findings(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
201 changes: 201 additions & 0 deletions cartography/intel/semgrep/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import List

import neo4j
import requests
from requests.exceptions import HTTPError
from requests.exceptions import ReadTimeout

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.semgrep.dependencies import SemgrepGoLibrarySchema
from cartography.stats import get_stats_client
from cartography.util import merge_module_sync_metadata
from cartography.util import timeit

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
_PAGE_SIZE = 10000
_TIMEOUT = (60, 60)
_MAX_RETRIES = 3


@timeit
def get_dependencies(semgrep_app_token: str, deployment_id: str, ecosystems: List[str]) -> List[Dict[str, Any]]:
"""
Gets all dependencies for the given ecosystems within the given Semgrep deployment ID.
param: semgrep_app_token: The Semgrep App token to use for authentication.
param: deployment_id: The Semgrep deployment ID to use for retrieving dependencies.
param: ecosystems: One or more ecosystems to import dependencies from, e.g. "gomod" or "pypi".
The list of supported ecosystems is defined here:
https://semgrep.dev/api/v1/docs/#tag/SupplyChainService/operation/semgrep_app.products.sca.handlers.dependency.list_dependencies_conexxion
"""
all_deps = []
deps_url = f"https://semgrep.dev/api/v1/deployments/{deployment_id}/dependencies"
has_more = True
page = 0
retries = 0
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}

request_data: dict[str, Any] = {
"pageSize": _PAGE_SIZE,
"dependencyFilter": {
"ecosystem": ecosystems,
},
}

logger.info(f"Retrieving Semgrep dependencies for deployment '{deployment_id}'.")
while has_more:
try:
response = requests.post(deps_url, json=request_data, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()
data = response.json()
except (ReadTimeout, HTTPError) as e:
logger.warning(f"Failed to retrieve Semgrep dependencies for page {page}. Retrying...")
retries += 1
if retries >= _MAX_RETRIES:
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, why not just raise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because this is copied from

I'll update both lines to raise

continue
deps = data.get("dependencies", [])
has_more = data.get("hasMore", False)
logger.info(f"Processed page {page} of Semgrep dependencies.")
all_deps.extend(deps)
retries = 0
page += 1
request_data["cursor"] = data.get("cursor")

logger.info(f"Retrieved {len(all_deps)} Semgrep dependencies in {page} pages.")
return all_deps


def transform_dependencies(raw_deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Transforms the raw dependencies response from Semgrep API into a list of dicts
that can be used to create the Dependency nodes.
"""

"""
sample raw_dep as of November 2024:
{
"repositoryId": "123456",
"definedAt": {
"path": "go.mod",
"startLine": "6",
"endLine": "6",
"url": "https://github.com/org/repo-name/blob/00000000000000000000000000000000/go.mod#L6",
"committedAt": "1970-01-01T00:00:00Z",
"startCol": "0",
"endCol": "0"
},
"transitivity": "DIRECT",
"package": {
"name": "github.com/foo/bar",
"versionSpecifier": "1.2.3"
},
"ecosystem": "gomod",
"licenses": [],
"pathToTransitivity": []
},
"""
deps = []
for raw_dep in raw_deps:

# We could call a different endpoint to get all repo IDs and store a mapping of repo ID to URL,
# but it's much simpler to just extract the URL from the definedAt field.
repo_url = raw_dep["definedAt"]["url"].split("/blob/", 1)[0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered what might cause this string split to give the wrong result, but I think it's very unlikely. Even if a repo stored its go.mod file inside a directory named /blob/ (which would be really strange), the url returned from semgrep would be something like https://github.com/org/repo/blob/sha/blob/go.mod#L112, so repo_url would still be set to https://github.com/org/repo as expected.


name = raw_dep["package"]["name"]
version = raw_dep["package"]["versionSpecifier"]
id = f"{name}|{version}"

# As of November 2024, Semgrep does not import dependencies with version specifiers such as >, <, etc.
# For now, hardcode the specifier to ==<version> to align with GitHub-sourced Python dependencies.
# If Semgrep eventually supports version specifiers, update this line accordingly.
specifier = f"=={version}"

deps.append({
# existing dependency properties:
"id": id,
"name": name,
"specifier": specifier,
"version": version,
"repo_url": repo_url,

# Semgrep-specific properties:
"ecosystem": raw_dep["ecosystem"],
"transitivity": raw_dep["transitivity"].lower(),
"url": raw_dep["definedAt"]["url"],
})

return deps


@timeit
def load_dependencies(
neo4j_session: neo4j.Session,
dependency_schema: Callable,
dependencies: List[Dict],
deployment_id: str,
update_tag: int,
) -> None:
logger.info(f"Loading {len(dependencies)} Semgrep dependencies into the graph.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-block] i'm not a huge fan of metaprogramming here but I won't block on this.

That aside, if we decide to keep this bit of metaprogramming, it'd be good to log the label of the dependency_schema object so that the log message shows what asset is getting written to the graph.

Copy link
Contributor Author

@hanzo hanzo Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from

logger.info(f"Loading {len(vulns)} Semgrep SCA vulns info into the graph.")

I'll update all of these log lines to use the label of the schema object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logs:

INFO:cartography.intel.semgrep.dependencies:Retrieved X Semgrep dependencies in Y pages.
INFO:cartography.intel.semgrep.dependencies:Loading X GoLibrary objects into the graph.
INFO:cartography.intel.semgrep.dependencies:Running Semgrep Go Library cleanup job.

load(
neo4j_session,
dependency_schema(),
dependencies,
lastupdated=update_tag,
DEPLOYMENT_ID=deployment_id,
)


@timeit
def cleanup(
neo4j_session: neo4j.Session,
common_job_parameters: Dict[str, Any],
) -> None:
logger.info("Running Semgrep Go Library cleanup job.")
go_libraries_cleanup_job = GraphJob.from_node_schema(
SemgrepGoLibrarySchema(), common_job_parameters,
)
go_libraries_cleanup_job.run(neo4j_session)


@timeit
def sync_dependencies(
neo4j_session: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:

deployment_id = common_job_parameters.get("DEPLOYMENT_ID")
if not deployment_id:
logger.warning(
"Missing Semgrep deployment ID, ensure that sync_deployment() has been called."
"Skipping Semgrep dependencies sync job.",
)
return

logger.info("Running Semgrep dependencies sync job.")

# fetch and load dependencies for the Go ecosystem
raw_go_deps = get_dependencies(semgrep_app_token, deployment_id, ecosystems=["gomod"])
go_deps = transform_dependencies(raw_go_deps)
load_dependencies(neo4j_session, SemgrepGoLibrarySchema, go_deps, deployment_id, update_tag)

cleanup(neo4j_session, common_job_parameters)

merge_module_sync_metadata(
neo4j_session=neo4j_session,
group_type='Semgrep',
group_id=deployment_id,
synced_type='Dependency', # TODO: should this be "SemgrepDependency"?
hanzo marked this conversation as resolved.
Show resolved Hide resolved
update_tag=update_tag,
stat_handler=stat_handler,
)
67 changes: 67 additions & 0 deletions cartography/intel/semgrep/deployment.py
Copy link
Contributor Author

@hanzo hanzo Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named this deployment.py instead of deployments.py to match the existing file models/deployment.py.

The contents of this file have been moved here from intel/semgrep/findings.py without changes

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from typing import Any
from typing import Dict

import neo4j
import requests

from cartography.client.core.tx import load
from cartography.models.semgrep.deployment import SemgrepDeploymentSchema
from cartography.stats import get_stats_client
from cartography.util import timeit

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
_TIMEOUT = (60, 60)
hanzo marked this conversation as resolved.
Show resolved Hide resolved


@timeit
def get_deployment(semgrep_app_token: str) -> Dict[str, Any]:
"""
Gets the deployment associated with the passed Semgrep App token.
param: semgrep_app_token: The Semgrep App token to use for authentication.
"""
deployment = {}
deployment_url = "https://semgrep.dev/api/v1/deployments"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}
response = requests.get(deployment_url, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()

data = response.json()
deployment["id"] = data["deployments"][0]["id"]
deployment["name"] = data["deployments"][0]["name"]
deployment["slug"] = data["deployments"][0]["slug"]

return deployment


@timeit
def load_semgrep_deployment(
neo4j_session: neo4j.Session, deployment: Dict[str, Any], update_tag: int,
) -> None:
logger.info(f"Loading Semgrep deployment info {deployment} into the graph...")
load(
neo4j_session,
SemgrepDeploymentSchema(),
[deployment],
lastupdated=update_tag,
)


@timeit
def sync_deployment(
neo4j_session: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:

semgrep_deployment = get_deployment(semgrep_app_token)
deployment_id = semgrep_deployment["id"]
deployment_slug = semgrep_deployment["slug"]
load_semgrep_deployment(neo4j_session, semgrep_deployment, update_tag)
common_job_parameters["DEPLOYMENT_ID"] = deployment_id
common_job_parameters["DEPLOYMENT_SLUG"] = deployment_slug
67 changes: 18 additions & 49 deletions cartography/intel/semgrep/findings.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.semgrep.deployment import SemgrepDeploymentSchema
from cartography.models.semgrep.findings import SemgrepSCAFindingSchema
from cartography.models.semgrep.locations import SemgrepSCALocationSchema
from cartography.stats import get_stats_client
@@ -26,29 +25,6 @@
_MAX_RETRIES = 3


@timeit
def get_deployment(semgrep_app_token: str) -> Dict[str, Any]:
"""
Gets the deployment associated with the passed Semgrep App token.
param: semgrep_app_token: The Semgrep App token to use for authentication.
"""
deployment = {}
deployment_url = "https://semgrep.dev/api/v1/deployments"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}
response = requests.get(deployment_url, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()

data = response.json()
deployment["id"] = data["deployments"][0]["id"]
deployment["name"] = data["deployments"][0]["name"]
deployment["slug"] = data["deployments"][0]["slug"]

return deployment


@timeit
def get_sca_vulns(semgrep_app_token: str, deployment_slug: str) -> List[Dict[str, Any]]:
"""
@@ -201,19 +177,6 @@ def transform_sca_vulns(raw_vulns: List[Dict[str, Any]]) -> Tuple[List[Dict[str,
return vulns, usages


@timeit
def load_semgrep_deployment(
neo4j_session: neo4j.Session, deployment: Dict[str, Any], update_tag: int,
) -> None:
logger.info(f"Loading Semgrep deployment info {deployment} into the graph...")
load(
neo4j_session,
SemgrepDeploymentSchema(),
[deployment],
lastupdated=update_tag,
)


@timeit
def load_semgrep_sca_vulns(
neo4j_session: neo4j.Session,
@@ -265,26 +228,32 @@ def cleanup(


@timeit
def sync(
neo4j_sesion: neo4j.Session,
def sync_findings(
neo4j_session: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:

deployment_id = common_job_parameters.get("DEPLOYMENT_ID")
deployment_slug = common_job_parameters.get("DEPLOYMENT_SLUG")
if not deployment_id or not deployment_slug:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this mechanism for getting the required parameters, would love to hear suggestions for improvement

logger.warning(
"Missing Semgrep deployment ID or slug, ensure that sync_deployment() has been called."
"Skipping SCA findings sync job.",
)
return

logger.info("Running Semgrep SCA findings sync job.")
semgrep_deployment = get_deployment(semgrep_app_token)
deployment_id = semgrep_deployment["id"]
deployment_slug = semgrep_deployment["slug"]
load_semgrep_deployment(neo4j_sesion, semgrep_deployment, update_tag)
common_job_parameters["DEPLOYMENT_ID"] = deployment_id
raw_vulns = get_sca_vulns(semgrep_app_token, deployment_slug)
vulns, usages = transform_sca_vulns(raw_vulns)
load_semgrep_sca_vulns(neo4j_sesion, vulns, deployment_id, update_tag)
load_semgrep_sca_usages(neo4j_sesion, usages, deployment_id, update_tag)
run_scoped_analysis_job('semgrep_sca_risk_analysis.json', neo4j_sesion, common_job_parameters)
cleanup(neo4j_sesion, common_job_parameters)
load_semgrep_sca_vulns(neo4j_session, vulns, deployment_id, update_tag)
load_semgrep_sca_usages(neo4j_session, usages, deployment_id, update_tag)
run_scoped_analysis_job('semgrep_sca_risk_analysis.json', neo4j_session, common_job_parameters)

cleanup(neo4j_session, common_job_parameters)
merge_module_sync_metadata(
neo4j_session=neo4j_sesion,
neo4j_session=neo4j_session,
group_type='Semgrep',
group_id=deployment_id,
synced_type='SCA',
Loading