-
Notifications
You must be signed in to change notification settings - Fork 347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EBSVolume => New data model; Allow node attr updates from multiple intel modules #1154
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,9 @@ | |
import boto3 | ||
import neo4j | ||
|
||
from cartography.client.core.tx import load | ||
from cartography.graph.job import GraphJob | ||
from cartography.intel.aws.util.arns import build_arn | ||
from cartography.models.aws.ec2.volumes import EBSVolumeSchema | ||
from cartography.util import aws_handle_regions | ||
from cartography.util import timeit | ||
|
@@ -16,7 +18,7 @@ | |
|
||
@timeit | ||
@aws_handle_regions | ||
def get_volumes(boto3_session: boto3.session.Session, region: str) -> List[Dict]: | ||
def get_volumes(boto3_session: boto3.session.Session, region: str) -> List[Dict[str, Any]]: | ||
client = boto3_session.client('ec2', region_name=region) | ||
paginator = client.get_paginator('describe_volumes') | ||
volumes: List[Dict] = [] | ||
|
@@ -26,90 +28,76 @@ def get_volumes(boto3_session: boto3.session.Session, region: str) -> List[Dict] | |
|
||
|
||
def transform_volumes(volumes: List[Dict[str, Any]], region: str, current_aws_account_id: str) -> List[Dict[str, Any]]: | ||
result = [] | ||
for volume in volumes: | ||
volume['VolumeArn'] = f"arn:aws:ec2:{region}:{current_aws_account_id}:volume/{volume['VolumeId']}" | ||
volume['CreateTime'] = str(volume['CreateTime']) | ||
return volumes | ||
attachments = volume.get('Attachments', []) | ||
active_attachments = [a for a in attachments if a['State'] == 'attached'] | ||
|
||
volume_id = volume['VolumeId'] | ||
raw_vol = { | ||
'Arn': build_arn('ec2', current_aws_account_id, 'volume', volume_id, region), | ||
'AvailabilityZone': volume['AvailabilityZone'], | ||
'CreateTime': volume['CreateTime'], | ||
'Encrypted': volume['Encrypted'], | ||
'Size': volume['Size'], | ||
'State': volume['State'], | ||
'OutpostArn': volume['OutpostArn'], | ||
'SnapshotId': volume['SnapshotId'], | ||
'Iops': volume['Iops'], | ||
'FastRestored': volume['FastRestored'], | ||
'MultiAttachEnabled': volume['MultiAttachEnabled'], | ||
'VolumeType': volume['VolumeType'], | ||
'VolumeId': volume_id, | ||
'KmsKeyId': volume['KmsKeyId'], | ||
} | ||
|
||
if not active_attachments: | ||
result.append(raw_vol) | ||
continue | ||
|
||
for attachment in active_attachments: | ||
vol_with_attachment = raw_vol.copy() | ||
vol_with_attachment['InstanceId'] = attachment['InstanceId'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do this because volumes can be attached to multiple instances? |
||
result.append(vol_with_attachment) | ||
|
||
return result | ||
|
||
|
||
@timeit | ||
def load_volumes( | ||
neo4j_session: neo4j.Session, data: List[Dict], region: str, current_aws_account_id: str, update_tag: int, | ||
neo4j_session: neo4j.Session, | ||
ebs_data: List[Dict[str, Any]], | ||
region: str, | ||
current_aws_account_id: str, | ||
update_tag: int, | ||
) -> None: | ||
ingest_volumes = """ | ||
UNWIND $volumes_list as volume | ||
MERGE (vol:EBSVolume{id: volume.VolumeId}) | ||
ON CREATE SET vol.firstseen = timestamp() | ||
SET vol.arn = volume.VolumeArn, | ||
vol.lastupdated = $update_tag, | ||
vol.availabilityzone = volume.AvailabilityZone, | ||
vol.createtime = volume.CreateTime, | ||
vol.encrypted = volume.Encrypted, | ||
vol.size = volume.Size, | ||
vol.state = volume.State, | ||
vol.outpostarn = volume.OutpostArn, | ||
vol.snapshotid = volume.SnapshotId, | ||
vol.iops = volume.Iops, | ||
vol.fastrestored = volume.FastRestored, | ||
vol.multiattachenabled = volume.MultiAttachEnabled, | ||
vol.type = volume.VolumeType, | ||
vol.kmskeyid = volume.KmsKeyId, | ||
vol.region=$Region | ||
WITH vol | ||
MATCH (aa:AWSAccount{id: $AWS_ACCOUNT_ID}) | ||
MERGE (aa)-[r:RESOURCE]->(vol) | ||
ON CREATE SET r.firstseen = timestamp() | ||
SET r.lastupdated = $update_tag | ||
""" | ||
|
||
neo4j_session.run( | ||
ingest_volumes, | ||
volumes_list=data, | ||
AWS_ACCOUNT_ID=current_aws_account_id, | ||
load( | ||
neo4j_session, | ||
EBSVolumeSchema(), | ||
ebs_data, | ||
Region=region, | ||
update_tag=update_tag, | ||
AWS_ID=current_aws_account_id, | ||
lastupdated=update_tag, | ||
) | ||
|
||
|
||
def load_volume_relationships( | ||
neo4j_session: neo4j.Session, | ||
volumes: List[Dict[str, Any]], | ||
aws_update_tag: int, | ||
) -> None: | ||
add_relationship_query = """ | ||
MATCH (volume:EBSVolume{arn: $VolumeArn}) | ||
WITH volume | ||
MATCH (instance:EC2Instance{instanceid: $InstanceId}) | ||
MERGE (volume)-[r:ATTACHED_TO_EC2_INSTANCE]->(instance) | ||
ON CREATE SET r.firstseen = timestamp() | ||
SET r.lastupdated = $aws_update_tag | ||
""" | ||
for volume in volumes: | ||
for attachment in volume.get('Attachments', []): | ||
if attachment['State'] != 'attached': | ||
continue | ||
neo4j_session.run( | ||
add_relationship_query, | ||
VolumeArn=volume['VolumeArn'], | ||
InstanceId=attachment['InstanceId'], | ||
aws_update_tag=aws_update_tag, | ||
) | ||
|
||
|
||
@timeit | ||
def cleanup_volumes(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None: | ||
def cleanup_volumes(neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any]) -> None: | ||
GraphJob.from_node_schema(EBSVolumeSchema(), common_job_parameters).run(neo4j_session) | ||
|
||
|
||
@timeit | ||
def sync_ebs_volumes( | ||
neo4j_session: neo4j.Session, boto3_session: boto3.session.Session, regions: List[str], | ||
current_aws_account_id: str, update_tag: int, common_job_parameters: Dict, | ||
neo4j_session: neo4j.Session, | ||
boto3_session: boto3.session.Session, | ||
regions: List[str], | ||
current_aws_account_id: str, | ||
update_tag: int, | ||
common_job_parameters: Dict[str, Any], | ||
) -> None: | ||
for region in regions: | ||
logger.debug("Syncing volumes for region '%s' in account '%s'.", region, current_aws_account_id) | ||
data = get_volumes(boto3_session, region) | ||
transformed_data = transform_volumes(data, region, current_aws_account_id) | ||
load_volumes(neo4j_session, transformed_data, region, current_aws_account_id, update_tag) | ||
load_volume_relationships(neo4j_session, transformed_data, update_tag) | ||
cleanup_volumes(neo4j_session, common_job_parameters) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from typing import Optional | ||
|
||
|
||
def build_arn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
resource: str, | ||
account: str, | ||
typename: str, | ||
name: str, | ||
region: Optional[str] = None, | ||
partition: Optional[str] = None, | ||
) -> str: | ||
if not partition: | ||
# TODO: support partitions from others. Please file an issue on this if needed, would love to hear from you | ||
partition = 'aws' | ||
if not region: | ||
# Some resources are present in all regions, e.g. IAM policies | ||
region = "" | ||
return f"arn:{partition}:{resource}:{region}:{account}:{typename}/{name}" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,41 @@ | ||
class PropertyRef: | ||
""" | ||
PropertyRefs represent properties on cartography nodes and relationships. | ||
PropertyRefs represent properties on cartography nodes and relationships. Here is how they are used: | ||
|
||
cartography takes lists of Python dicts and loads them to Neo4j. PropertyRefs allow our dynamically generated Neo4j | ||
ingestion queries to set values for a given node or relationship property from (A) a field on the dict being | ||
processed (PropertyRef.set_in_kwargs=False; default), or (B) from a single variable provided by a keyword argument | ||
(PropertyRef.set_in_kwargs=True). | ||
(1) cartography takes lists of Python dicts and loads them to Neo4j. PropertyRefs allow our dynamically generated | ||
Neo4j ingestion queries to set values for a given node or relationship property from | ||
(A) a field on the dict being processed (PropertyRef.set_in_kwargs=False; default), or | ||
(B) from a single variable provided by a keyword argument (PropertyRef.set_in_kwargs=True). | ||
|
||
(2) PropertyRefs defined on CartographyNodeProperties objects can specify the `extra_index=True` field to ensure | ||
that an index is created for that given node on the given property. | ||
|
||
(3) When we need to create a relationship from node A to node B based on the value of one or more of node B's | ||
properties, cartography uses PropertyRefs on TargetNodeMatcher objects to generate an appropriate Neo4j `MATCH` | ||
clause. This clause may need special handling, such as when we need to match on one of node B's properties in a | ||
case-insensitive way - see the constructor docs for more details. | ||
""" | ||
|
||
def __init__(self, name: str, set_in_kwargs=False, extra_index=False, ignore_case=False): | ||
""" | ||
:param name: The name of the property | ||
:param set_in_kwargs: Optional. If True, the property is not defined on the data dict, and we expect to find the | ||
property in the kwargs. | ||
If False, looks for the property in the data dict. | ||
Defaults to False. | ||
:param extra_index: If True, make sure that we create an index for this property name. | ||
:param set_in_kwargs: Optional. This param only has effect if the PropertyRef is part of | ||
CartographyNodeProperties or CartographyRelProperties objects. | ||
If True, instructs the querybuilder to find the value for this property name in the kwargs passed to it. | ||
This is used for things like applying the same update tag to all nodes of a given run. | ||
If False (default), we instruct the querybuilder to get the value for this given property from | ||
the current dict being processed. | ||
:param extra_index: This param only has effect if the PropertyRef is part of a CartographyNodeProperties object. | ||
If True, make sure that we create an index for this property name. | ||
Notes: | ||
- extra_index is available for the case where you anticipate a property will be queried frequently. | ||
- The `id` and `lastupdated` properties will always have indexes created for them automatically by | ||
`ensure_indexes()`. | ||
- All properties included in target node matchers will always have indexes created for them. | ||
Defaults to False. | ||
:param ignore_case: If True, performs a case-insensitive match when comparing the value of this property during | ||
relationship creation. Defaults to False. This only has effect as part of a TargetNodeMatcher, and this is not | ||
supported for the sub resource relationship. | ||
:param ignore_case: This param only has effect as part of a TargetNodeMatcher, and this is not | ||
supported for the sub resource relationship. If True, performs a case-insensitive match when comparing the value | ||
of this property during relationship creation. Defaults to False. | ||
Example on why you would set this to True: | ||
GitHub usernames can have both uppercase and lowercase characters, but GitHub itself treats usernames as | ||
case-insensitive. Suppose your company's internal personnel database stores GitHub usernames all as | ||
|
@@ -34,25 +45,51 @@ def __init__(self, name: str, set_in_kwargs=False, extra_index=False, ignore_cas | |
that points to the GitHubUser node's name field, otherwise if one of your employees' GitHub usernames | ||
contains capital letters, you would not be able to map them properly to a GitHubUser node in your graph. | ||
""" | ||
self.name = name | ||
self.set_in_kwargs = set_in_kwargs | ||
self.extra_index = extra_index | ||
self.ignore_case = ignore_case | ||
self.name: str = name | ||
self.set_in_kwargs: bool = set_in_kwargs | ||
self.extra_index: bool = extra_index | ||
self.ignore_case: bool = ignore_case | ||
|
||
def _parameterize_name(self) -> str: | ||
""" | ||
Private function. Turns self.name into a Neo4j query parameter so that the Neo4j ingestion query can access the | ||
value on the dictionary being processed. This is used when parameters are supplied to the querybuilder via | ||
kwargs. | ||
Relevant Neo4j references: https://neo4j.com/docs/python-manual/current/query-simple/#_write_to_the_database and | ||
https://neo4j.com/docs/cypher-manual/current/syntax/parameters/. | ||
""" | ||
return f"${self.name}" | ||
|
||
def __repr__(self) -> str: | ||
""" | ||
`querybuilder.build_ingestion_query()`, generates a Neo4j batched ingestion query of the form | ||
`UNWIND $DictList AS item [...]`. | ||
Returns string representation of the property so that it can be rendered in the querybuilder based on the | ||
various inputs passed to the constructor. | ||
""" | ||
if self.set_in_kwargs: | ||
return self._parameterize_name() | ||
|
||
If set_in_kwargs is False (default), we instruct the querybuilder to get the value for this given property from | ||
the dict being processed. To do this, this function returns "item.<key on the dict>". This is used for loading | ||
in lists of nodes. | ||
if self.name.lower() == 'id' or self.ignore_case: | ||
# Don't do coalesce() on caseinsensitive attr match. | ||
return f"item.{self.name}" | ||
|
||
On the other hand if set_in_kwargs is True, then the value will instead come from kwargs passed to | ||
querybuilder.build_ingestion_query(). This is used for things like applying the same update tag to all nodes of | ||
a given run. | ||
""" | ||
return f"item.{self.name}" if not self.set_in_kwargs else self._parameterize_name() | ||
# Attention: This implementation detail is quirky and deserves the following essay to explain. | ||
# This function ensures that the Neo4j query sets the value of this node/relationship property by checking the | ||
# following sources in order: | ||
# 1. the current dict being processed. That is, we check if dict `item` contains a key called `self.name` with | ||
# a non-None value. If so, we use that value. Else continue and check the next source. | ||
# 2. An existing value on the node itself. That is, we convert self.name to lowercase and check if that is non | ||
# null on the Neo4j node `i` already. If None, then the property is not set (this a Neo4j driver behavior). | ||
# This means we make an ASSUMPTION that the property name set on the node is the lowercase version of self.name. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might need to enforce that PropertyRef variables on NodeSchemas are all lowercase. And also enforce a standard between the variable names and the PropertyRef inputs.
instead of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Or we need to find another way to allow different intel modules to update the same node types using the same data schema objects. I don't know, can you think of a better way to solve this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, another way would be to use subclasses. The ec2/volumes module can have a base version of
This way only the ec2/instances module can overwrite the One problem with that: What if we wanted the subclass to have fewer props that the the base class? Then we would need something that could make the |
||
# | ||
# We do this because not all fields defined on a given CartographyNodeSchema may be present on the dict being | ||
# processed (this is by design because we encourage multiple intel modules to update the same nodes as they | ||
# may enrich the node with different properties), and when a dict is processed by the Neo4j driver, fields that | ||
# are not defined on the dict are treated as None values. | ||
# | ||
# As an example, the EC2 instance sync loads in :EC2Instances with :EBSVolumes attached based on the output of | ||
# `describe-ec2-instances`. This data includes a field called `deleteontermination` on the :EBSVolume. | ||
# `deleteontermination` is only set during the EC2 instance sync. When the EBS Volume sync runs, it enriches | ||
# existing :EBSVolumes with additional fields from data retrieved with `describe-ebs-volumes` but because this | ||
# API call does not include `deleteontermination`, we will overwrite the previous values of | ||
# `deleteontermination` with `None`, thus removing the property from all :EBSVolumes. | ||
return f"COALESCE(item.{self.name}, i.{self.name.lower()})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for
Arn
, should we bother being explicit about the rest of the props?We could do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I don't have a strong opinion here. I think being explicit is nice because it lets a reader see what fields are being processed.