Skip to content

Commit

Permalink
#349: Generalize graph write queries (#1038)
Browse files Browse the repository at this point in the history
* Build ingest query

* Linter

* Save cleanup query for another PR

* Implement schema

* bump mypy to 0.981 for python/mypy#13398

* linter

* make load_graph_data interface make more sense

* fix comment

* Docs and some better names

* add a todo

* Doc updates, rename some fields

* Fix pre-commit

* Code commment suggestions

Co-authored-by: Ramon Petgrave <[email protected]>

* Stackoverflow comment for clarity)

* Support ingesting only parts of a schema without breaking the others

* Doc comment

* Linter

* Support matching on one or more properties

* Correctly name test

* Change key_refs to TargetNodeMatcher to enforce it as a mandatory field

* Remove use of hacky default_field()

* Support subset of schema relationships for query generation, test multiple node labels

* Docstrings

* Comments in tests

* Better comments

* Test for exception conditions

* Remove irrelevant comment

Co-authored-by: Ramon Petgrave <[email protected]>
  • Loading branch information
Alex Chantavy and ramonpetgrave64 authored Dec 16, 2022
1 parent 680e5c1 commit 9179f59
Show file tree
Hide file tree
Showing 24 changed files with 1,585 additions and 228 deletions.
64 changes: 64 additions & 0 deletions cartography/client/core/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import neo4j

from cartography.util import batch


def read_list_of_values_tx(tx: neo4j.Transaction, query: str, **kwargs) -> List[Union[str, int]]:
"""
Expand Down Expand Up @@ -146,3 +148,65 @@ def read_single_dict_tx(tx: neo4j.Transaction, query: str, **kwargs) -> Dict[str

result.consume()
return value


def write_list_of_dicts_tx(
tx: neo4j.Transaction,
query: str,
**kwargs,
) -> None:
"""
Writes a list of dicts to Neo4j.
Example usage:
import neo4j
dict_list: List[Dict[Any, Any]] = [{...}, ...]
neo4j_driver = neo4j.driver(... args ...)
neo4j_session = neo4j_driver.Session(... args ...)
neo4j_session.write_transaction(
write_list_of_dicts_tx,
'''
UNWIND $DictList as data
MERGE (a:SomeNode{id: data.id})
SET
a.other_field = $other_field,
a.yet_another_kwarg_field = $yet_another_kwarg_field
...
''',
DictList=dict_list,
other_field='some extra value',
yet_another_kwarg_field=1234
)
:param tx: The neo4j write transaction.
:param query: The Neo4j write query to run.
:param kwargs: Keyword args to be supplied to the Neo4j query.
:return: None
"""
tx.run(query, kwargs)


def load_graph_data(
neo4j_session: neo4j.Session,
query: str,
dict_list: List[Dict[str, Any]],
**kwargs,
) -> None:
"""
Writes data to the graph.
:param neo4j_session: The Neo4j session
:param query: The Neo4j write query to run. This query is not meant to be handwritten, rather it should be generated
with cartography.graph.querybuilder.build_ingestion_query().
:param dict_list: The data to load to the graph represented as a list of dicts.
:param kwargs: Allows additional keyword args to be supplied to the Neo4j query.
:return: None
"""
for data_batch in batch(dict_list, size=10000):
neo4j_session.write_transaction(
write_list_of_dicts_tx,
query,
DictList=data_batch,
**kwargs,
)
255 changes: 255 additions & 0 deletions cartography/graph/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import abc
from dataclasses import dataclass
from dataclasses import field
from dataclasses import make_dataclass
from enum import auto
from enum import Enum
from typing import Dict
from typing import List
from typing import Optional


class LinkDirection(Enum):
"""
Each CartographyRelSchema has a LinkDirection that determines whether the relationship points toward the original
node ("INWARD") or away from the original node ("OUTWARD").
For example the following code creates the path `(:EMRCluster)<-[:RESOURCE]-(:AWSAccount)`:
class EMRCluster(CartographyNodeSchema):
label: str = "EMRCluster"
sub_resource_relationship: CartographyRelSchema = EMRClusterToAWSAccount()
# ...
class EMRClusterToAWSAccount(CartographyRelSchema):
target_node_label: str = "AWSAccount"
rel_label: str = "RESOURCE"
direction: LinkDirection = LinkDirection.INWARD
# ...
If `EMRClusterToAWSAccount.direction` was LinkDirection.OUTWARD, then the directionality of the relationship would
be `(:EMRCluster)-[:RESOURCE]->(:AWSAccount)` instead.
"""
INWARD = auto()
OUTWARD = auto()


class PropertyRef:
"""
PropertyRefs represent properties on cartography nodes and relationships.
cartography takes lists of Python dicts and loads them to Neo4j. PropertyRefs allow our dynamically generated Neo4j
ingestion queries to set values for a given node or relationship property from (A) a field on the dict being
processed (PropertyRef.set_in_kwargs=False; default), or (B) from a single variable provided by a keyword argument
(PropertyRef.set_in_kwargs=True).
"""

def __init__(self, name: str, set_in_kwargs=False):
"""
:param name: The name of the property
:param set_in_kwargs: Optional. If True, the property is not defined on the data dict, and we expect to find the
property in the kwargs.
If False, looks for the property in the data dict.
Defaults to False.
"""
self.name = name
self.set_in_kwargs = set_in_kwargs

def _parameterize_name(self) -> str:
return f"${self.name}"

def __repr__(self) -> str:
"""
`querybuilder.build_ingestion_query()`, generates a Neo4j batched ingestion query of the form
`UNWIND $DictList AS item [...]`.
If set_in_kwargs is False (default), we instruct the querybuilder to get the value for this given property from
the dict being processed. To do this, this function returns "item.<key on the dict>". This is used for loading
in lists of nodes.
On the other hand if set_in_kwargs is True, then the value will instead come from kwargs passed to
querybuilder.build_ingestion_query(). This is used for things like applying the same update tag to all nodes of
a given run.
"""
return f"item.{self.name}" if not self.set_in_kwargs else self._parameterize_name()


@dataclass(frozen=True)
class CartographyNodeProperties(abc.ABC):
"""
Abstract base dataclass that represents the properties on a CartographyNodeSchema. This class is abstract so that we
can enforce that all subclasses have an id and a lastupdated field.
"""
id: PropertyRef = field(init=False)
lastupdated: PropertyRef = field(init=False)

def __post_init__(self):
"""
Designed to prevent direct instantiation. This workaround is needed since this is a dataclass and an abstract
class without an abstract method defined.
See https://stackoverflow.com/q/60590442.
"""
if self.__class__ == CartographyNodeProperties:
raise TypeError("Cannot instantiate abstract class.")


@dataclass(frozen=True)
class CartographyRelProperties(abc.ABC):
"""
Abstract class that represents the properties on a CartographyRelSchema. This is intended to enforce that all
subclasses will have a lastupdated field defined on their resulting relationships.
"""
lastupdated: PropertyRef = field(init=False)

def __post_init__(self):
"""
Designed to prevent direct instantiation. This workaround is needed since this is a dataclass and an abstract
class without an abstract method defined.
"""
if self.__class__ == CartographyRelProperties:
raise TypeError("Cannot instantiate abstract class.")


@dataclass(frozen=True)
class TargetNodeMatcher:
"""
Dataclass used to encapsulate the following mapping:
Keys: one or more attribute names on the target_node_label used to uniquely identify what node to connect to.
Values: The value of the target_node_key used to uniquely identify what node to connect to. This is given as a
PropertyRef.
This is used to ensure dataclass immutability when composed as part of a CartographyNodeSchema object.
See `make_target_node_matcher()`.
"""
pass


@dataclass(frozen=True)
class CartographyRelSchema(abc.ABC):
"""
Abstract base dataclass that represents a cartography relationship.
The CartographyRelSchema contains properties that make it possible to connect the CartographyNodeSchema to other
existing nodes in the graph.
"""
@property
@abc.abstractmethod
def properties(self) -> CartographyRelProperties:
"""
:return: The properties of the relationship.
"""
pass

@property
@abc.abstractmethod
def target_node_label(self) -> str:
"""
:return: The target node label that this relationship will connect to.
"""
pass

@property
@abc.abstractmethod
def target_node_matcher(self) -> TargetNodeMatcher:
"""
:return: A TargetNodeMatcher object used to find what node(s) to attach the relationship to.
"""
pass

@property
@abc.abstractmethod
def rel_label(self) -> str:
"""
:return: The string label of the relationship.
"""
pass

@property
@abc.abstractmethod
def direction(self) -> LinkDirection:
"""
:return: The LinkDirection of the query. Please see the `LinkDirection` docs for a detailed explanation.
"""
pass


@dataclass(frozen=True)
class OtherRelationships:
"""
Encapsulates a list of CartographyRelSchema. This is used to ensure dataclass immutability when composed as part of
a CartographyNodeSchema object.
"""
rels: List[CartographyRelSchema]


@dataclass(frozen=True)
class ExtraNodeLabels:
"""
Encapsulates a list of str representing additional labels for the CartographyNodeSchema that this is composed on.
This wrapping is used to ensure dataclass immutability for the CartographyNodeSchema.
"""
labels: List[str]


@dataclass(frozen=True)
class CartographyNodeSchema(abc.ABC):
"""
Abstract base dataclass that represents a graph node in cartography. This is used to dynamically generate graph
ingestion queries.
"""
@property
@abc.abstractmethod
def label(self) -> str:
"""
:return: The primary str label of the node.
"""
pass

@property
@abc.abstractmethod
def properties(self) -> CartographyNodeProperties:
"""
:return: The properties of the node.
"""
pass

@property
def sub_resource_relationship(self) -> Optional[CartographyRelSchema]:
"""
Optional.
Allows subclasses to specify a subresource relationship for the given node. "Sub resource" is a term we made up
best defined by examples:
- In the AWS module, the subresource is an AWSAccount
- In Azure, the subresource is a Subscription
- In GCP, the subresource is a GCPProject
- In Okta, the subresource is an OktaOrganization
... and so on and so forth.
:return:
"""
return None

@property
def other_relationships(self) -> Optional[OtherRelationships]:
"""
Optional.
Allows subclasses to specify additional cartography relationships on the node.
:return: None if not overriden. Else return the node's OtherRelationships.
"""
return None

@property
def extra_node_labels(self) -> Optional[ExtraNodeLabels]:
"""
Optional.
Allows specifying extra labels on the node.
:return: None if not overriden. Else return the ExtraNodeLabels specified on the node.
"""
return None


def make_target_node_matcher(key_ref_dict: Dict[str, PropertyRef]) -> TargetNodeMatcher:
"""
:param key_ref_dict: A Dict mapping keys present on the node to PropertyRef objects.
:return: A TargetNodeMatcher used for CartographyRelSchema to match with other nodes.
"""
fields = [(key, PropertyRef, field(default=prop_ref)) for key, prop_ref in key_ref_dict.items()]
return make_dataclass(TargetNodeMatcher.__name__, fields, frozen=True)()
Loading

0 comments on commit 9179f59

Please sign in to comment.