Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest Go dependencies using Semgrep API #1368

Merged
merged 30 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
defe865
Ingest dependencies using Semgrep API
hanzo Oct 15, 2024
eb79a6a
rm
hanzo Oct 15, 2024
dadaa2a
basic ingestion working
hanzo Oct 16, 2024
d549c02
add data models and parameterize functions
hanzo Oct 21, 2024
ed0573d
remove manual indices
hanzo Oct 21, 2024
0cc44a3
cleanup
hanzo Oct 21, 2024
6f995fc
Add CNCF to docs (#1369)
achantavy Oct 24, 2024
8cdc540
0.95.0rc1 (#1370)
achantavy Oct 25, 2024
cdbd8cf
refactor sync functions into separate files
hanzo Oct 29, 2024
6ba5000
Merge branch 'master' into semgrep-dependencies
hanzo Oct 29, 2024
681d7af
fix test
hanzo Oct 29, 2024
d71ed6c
test fix
hanzo Oct 29, 2024
6a7926e
move slug to common params
hanzo Oct 29, 2024
9000aaa
add sync_deployment method
hanzo Oct 30, 2024
55e4e04
better warnings
hanzo Oct 30, 2024
3250dd9
move deployment test to separate file
hanzo Oct 31, 2024
3164bcf
rm unused import
hanzo Oct 31, 2024
53c5e0d
undo test changes
hanzo Oct 31, 2024
e58ddaf
move test functions to common.py
hanzo Oct 31, 2024
61bffe8
refactor tests, start on deps tests
hanzo Oct 31, 2024
6a7985b
tests
hanzo Nov 1, 2024
feeab74
tweak test
hanzo Nov 1, 2024
4276faf
rename test
hanzo Nov 1, 2024
ea638fd
add back create_dependency_nodes
hanzo Nov 1, 2024
6f44fbd
fix test
hanzo Nov 1, 2024
0daf3ec
add specifier property
hanzo Nov 1, 2024
ce2c15a
update schema
hanzo Nov 1, 2024
c31b1d6
rename var
hanzo Nov 4, 2024
edf7c59
Merge branch 'master' into semgrep-dependencies
hanzo Nov 4, 2024
aa4a403
address review feedback
hanzo Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cartography/intel/semgrep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import neo4j

from cartography.config import Config
from cartography.intel.semgrep.findings import sync
from cartography.intel.semgrep.dependencies import sync_dependencies
from cartography.intel.semgrep.deployment import sync_deployment
from cartography.intel.semgrep.findings import sync_findings
from cartography.util import timeit


Expand All @@ -20,4 +22,9 @@ def start_semgrep_ingestion(
if not config.semgrep_app_token:
logger.info('Semgrep import is not configured - skipping this module. See docs to configure.')
return
sync(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)

# sync_deployment must be called first since it populates common_job_parameters
# with the deployment ID and slug, which are required by the other sync functions
sync_deployment(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
sync_dependencies(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
sync_findings(neo4j_session, config.semgrep_app_token, config.update_tag, common_job_parameters)
201 changes: 201 additions & 0 deletions cartography/intel/semgrep/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import List

import neo4j
import requests
from requests.exceptions import HTTPError
from requests.exceptions import ReadTimeout

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.semgrep.dependencies import SemgrepGoLibrarySchema
from cartography.stats import get_stats_client
from cartography.util import merge_module_sync_metadata
from cartography.util import timeit

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
_PAGE_SIZE = 10000
_TIMEOUT = (60, 60)
_MAX_RETRIES = 3


@timeit
def get_dependencies(semgrep_app_token: str, deployment_id: str, ecosystems: List[str]) -> List[Dict[str, Any]]:
"""
Gets all dependencies for the given ecosystems within the given Semgrep deployment ID.
param: semgrep_app_token: The Semgrep App token to use for authentication.
param: deployment_id: The Semgrep deployment ID to use for retrieving dependencies.
param: ecosystems: One or more ecosystems to import dependencies from, e.g. "gomod" or "pypi".
The list of supported ecosystems is defined here:
https://semgrep.dev/api/v1/docs/#tag/SupplyChainService/operation/semgrep_app.products.sca.handlers.dependency.list_dependencies_conexxion
"""
all_deps = []
deps_url = f"https://semgrep.dev/api/v1/deployments/{deployment_id}/dependencies"
has_more = True
page = 0
retries = 0
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}

request_data: dict[str, Any] = {
"pageSize": _PAGE_SIZE,
"dependencyFilter": {
"ecosystem": ecosystems,
},
}

logger.info(f"Retrieving Semgrep dependencies for deployment '{deployment_id}'.")
while has_more:
try:
response = requests.post(deps_url, json=request_data, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()
data = response.json()
except (ReadTimeout, HTTPError) as e:
logger.warning(f"Failed to retrieve Semgrep dependencies for page {page}. Retrying...")
retries += 1
if retries >= _MAX_RETRIES:
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, why not just raise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because this is copied from

I'll update both lines to raise

continue
deps = data.get("dependencies", [])
has_more = data.get("hasMore", False)
logger.info(f"Processed page {page} of Semgrep dependencies.")
all_deps.extend(deps)
retries = 0
page += 1
request_data["cursor"] = data.get("cursor")

logger.info(f"Retrieved {len(all_deps)} Semgrep dependencies in {page} pages.")
return all_deps


def transform_dependencies(raw_deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Transforms the raw dependencies response from Semgrep API into a list of dicts
that can be used to create the Dependency nodes.
"""

"""
sample raw_dep as of November 2024:
{
"repositoryId": "123456",
"definedAt": {
"path": "go.mod",
"startLine": "6",
"endLine": "6",
"url": "https://github.com/org/repo-name/blob/00000000000000000000000000000000/go.mod#L6",
"committedAt": "1970-01-01T00:00:00Z",
"startCol": "0",
"endCol": "0"
},
"transitivity": "DIRECT",
"package": {
"name": "github.com/foo/bar",
"versionSpecifier": "1.2.3"
},
"ecosystem": "gomod",
"licenses": [],
"pathToTransitivity": []
},
"""
deps = []
for raw_dep in raw_deps:

# We could call a different endpoint to get all repo IDs and store a mapping of repo ID to URL,
# but it's much simpler to just extract the URL from the definedAt field.
repo_url = raw_dep["definedAt"]["url"].split("/blob/", 1)[0]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered what might cause this string split to give the wrong result, but I think it's very unlikely. Even if a repo stored its go.mod file inside a directory named /blob/ (which would be really strange), the url returned from semgrep would be something like https://github.com/org/repo/blob/sha/blob/go.mod#L112, so repo_url would still be set to https://github.com/org/repo as expected.


name = raw_dep["package"]["name"]
version = raw_dep["package"]["versionSpecifier"]
id = f"{name}|{version}"

# As of November 2024, Semgrep does not import dependencies with version specifiers such as >, <, etc.
# For now, hardcode the specifier to ==<version> to align with GitHub-sourced Python dependencies.
# If Semgrep eventually supports version specifiers, update this line accordingly.
specifier = f"=={version}"

deps.append({
# existing dependency properties:
"id": id,
"name": name,
"specifier": specifier,
"version": version,
"repo_url": repo_url,

# Semgrep-specific properties:
"ecosystem": raw_dep["ecosystem"],
"transitivity": raw_dep["transitivity"].lower(),
"url": raw_dep["definedAt"]["url"],
})

return deps


@timeit
def load_dependencies(
neo4j_session: neo4j.Session,
dependency_schema: Callable,
dependencies: List[Dict],
deployment_id: str,
update_tag: int,
) -> None:
logger.info(f"Loading {len(dependencies)} Semgrep dependencies into the graph.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-block] i'm not a huge fan of metaprogramming here but I won't block on this.

That aside, if we decide to keep this bit of metaprogramming, it'd be good to log the label of the dependency_schema object so that the log message shows what asset is getting written to the graph.

Copy link
Contributor Author

@hanzo hanzo Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from

logger.info(f"Loading {len(vulns)} Semgrep SCA vulns info into the graph.")

I'll update all of these log lines to use the label of the schema object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logs:

INFO:cartography.intel.semgrep.dependencies:Retrieved X Semgrep dependencies in Y pages.
INFO:cartography.intel.semgrep.dependencies:Loading X GoLibrary objects into the graph.
INFO:cartography.intel.semgrep.dependencies:Running Semgrep Go Library cleanup job.

load(
neo4j_session,
dependency_schema(),
dependencies,
lastupdated=update_tag,
DEPLOYMENT_ID=deployment_id,
)


@timeit
def cleanup(
neo4j_session: neo4j.Session,
common_job_parameters: Dict[str, Any],
) -> None:
logger.info("Running Semgrep Go Library cleanup job.")
go_libraries_cleanup_job = GraphJob.from_node_schema(
SemgrepGoLibrarySchema(), common_job_parameters,
)
go_libraries_cleanup_job.run(neo4j_session)


@timeit
def sync_dependencies(
neo4j_session: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:

deployment_id = common_job_parameters.get("DEPLOYMENT_ID")
if not deployment_id:
logger.warning(
"Missing Semgrep deployment ID, ensure that sync_deployment() has been called."
"Skipping Semgrep dependencies sync job.",
)
return

logger.info("Running Semgrep dependencies sync job.")

# fetch and load dependencies for the Go ecosystem
raw_deps = get_dependencies(semgrep_app_token, deployment_id, ecosystems=["gomod"])
deps = transform_dependencies(raw_deps)
load_dependencies(neo4j_session, SemgrepGoLibrarySchema, deps, deployment_id, update_tag)

cleanup(neo4j_session, common_job_parameters)

merge_module_sync_metadata(
neo4j_session=neo4j_session,
group_type='Semgrep',
group_id=deployment_id,
synced_type='Dependency', # TODO: should this be "SemgrepDependency"?
hanzo marked this conversation as resolved.
Show resolved Hide resolved
update_tag=update_tag,
stat_handler=stat_handler,
)
67 changes: 67 additions & 0 deletions cartography/intel/semgrep/deployment.py
Copy link
Contributor Author

@hanzo hanzo Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named this deployment.py instead of deployments.py to match the existing file models/deployment.py.

The contents of this file have been moved here from intel/semgrep/findings.py without changes

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from typing import Any
from typing import Dict

import neo4j
import requests

from cartography.client.core.tx import load
from cartography.models.semgrep.deployment import SemgrepDeploymentSchema
from cartography.stats import get_stats_client
from cartography.util import timeit

logger = logging.getLogger(__name__)
stat_handler = get_stats_client(__name__)
_TIMEOUT = (60, 60)
hanzo marked this conversation as resolved.
Show resolved Hide resolved


@timeit
def get_deployment(semgrep_app_token: str) -> Dict[str, Any]:
"""
Gets the deployment associated with the passed Semgrep App token.
param: semgrep_app_token: The Semgrep App token to use for authentication.
"""
deployment = {}
deployment_url = "https://semgrep.dev/api/v1/deployments"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}
response = requests.get(deployment_url, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()

data = response.json()
deployment["id"] = data["deployments"][0]["id"]
deployment["name"] = data["deployments"][0]["name"]
deployment["slug"] = data["deployments"][0]["slug"]

return deployment


@timeit
def load_semgrep_deployment(
neo4j_session: neo4j.Session, deployment: Dict[str, Any], update_tag: int,
) -> None:
logger.info(f"Loading Semgrep deployment info {deployment} into the graph...")
load(
neo4j_session,
SemgrepDeploymentSchema(),
[deployment],
lastupdated=update_tag,
)


@timeit
def sync_deployment(
neo4j_session: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:

semgrep_deployment = get_deployment(semgrep_app_token)
deployment_id = semgrep_deployment["id"]
deployment_slug = semgrep_deployment["slug"]
load_semgrep_deployment(neo4j_session, semgrep_deployment, update_tag)
common_job_parameters["DEPLOYMENT_ID"] = deployment_id
common_job_parameters["DEPLOYMENT_SLUG"] = deployment_slug
67 changes: 18 additions & 49 deletions cartography/intel/semgrep/findings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.semgrep.deployment import SemgrepDeploymentSchema
from cartography.models.semgrep.findings import SemgrepSCAFindingSchema
from cartography.models.semgrep.locations import SemgrepSCALocationSchema
from cartography.stats import get_stats_client
Expand All @@ -26,29 +25,6 @@
_MAX_RETRIES = 3


@timeit
def get_deployment(semgrep_app_token: str) -> Dict[str, Any]:
"""
Gets the deployment associated with the passed Semgrep App token.
param: semgrep_app_token: The Semgrep App token to use for authentication.
"""
deployment = {}
deployment_url = "https://semgrep.dev/api/v1/deployments"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {semgrep_app_token}",
}
response = requests.get(deployment_url, headers=headers, timeout=_TIMEOUT)
response.raise_for_status()

data = response.json()
deployment["id"] = data["deployments"][0]["id"]
deployment["name"] = data["deployments"][0]["name"]
deployment["slug"] = data["deployments"][0]["slug"]

return deployment


@timeit
def get_sca_vulns(semgrep_app_token: str, deployment_slug: str) -> List[Dict[str, Any]]:
"""
Expand Down Expand Up @@ -201,19 +177,6 @@ def transform_sca_vulns(raw_vulns: List[Dict[str, Any]]) -> Tuple[List[Dict[str,
return vulns, usages


@timeit
def load_semgrep_deployment(
neo4j_session: neo4j.Session, deployment: Dict[str, Any], update_tag: int,
) -> None:
logger.info(f"Loading Semgrep deployment info {deployment} into the graph...")
load(
neo4j_session,
SemgrepDeploymentSchema(),
[deployment],
lastupdated=update_tag,
)


@timeit
def load_semgrep_sca_vulns(
neo4j_session: neo4j.Session,
Expand Down Expand Up @@ -265,26 +228,32 @@ def cleanup(


@timeit
def sync(
neo4j_sesion: neo4j.Session,
def sync_findings(
neo4j_session: neo4j.Session,
semgrep_app_token: str,
update_tag: int,
common_job_parameters: Dict[str, Any],
) -> None:

deployment_id = common_job_parameters.get("DEPLOYMENT_ID")
deployment_slug = common_job_parameters.get("DEPLOYMENT_SLUG")
if not deployment_id or not deployment_slug:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this mechanism for getting the required parameters, would love to hear suggestions for improvement

logger.warning(
"Missing Semgrep deployment ID or slug, ensure that sync_deployment() has been called."
"Skipping SCA findings sync job.",
)
return

logger.info("Running Semgrep SCA findings sync job.")
semgrep_deployment = get_deployment(semgrep_app_token)
deployment_id = semgrep_deployment["id"]
deployment_slug = semgrep_deployment["slug"]
load_semgrep_deployment(neo4j_sesion, semgrep_deployment, update_tag)
common_job_parameters["DEPLOYMENT_ID"] = deployment_id
raw_vulns = get_sca_vulns(semgrep_app_token, deployment_slug)
vulns, usages = transform_sca_vulns(raw_vulns)
load_semgrep_sca_vulns(neo4j_sesion, vulns, deployment_id, update_tag)
load_semgrep_sca_usages(neo4j_sesion, usages, deployment_id, update_tag)
run_scoped_analysis_job('semgrep_sca_risk_analysis.json', neo4j_sesion, common_job_parameters)
cleanup(neo4j_sesion, common_job_parameters)
load_semgrep_sca_vulns(neo4j_session, vulns, deployment_id, update_tag)
load_semgrep_sca_usages(neo4j_session, usages, deployment_id, update_tag)
run_scoped_analysis_job('semgrep_sca_risk_analysis.json', neo4j_session, common_job_parameters)

cleanup(neo4j_session, common_job_parameters)
merge_module_sync_metadata(
neo4j_session=neo4j_sesion,
neo4j_session=neo4j_session,
group_type='Semgrep',
group_id=deployment_id,
synced_type='SCA',
Expand Down
Loading
Loading