Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#349: Generalize graph write queries #1038

Merged
merged 31 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
45ec55e
Build ingest query
Aug 15, 2022
a641ad0
Linter
Aug 15, 2022
5b3c83c
Save cleanup query for another PR
Aug 15, 2022
71edc8b
Implement schema
Nov 23, 2022
41c9a24
Merge latest master
Nov 23, 2022
519cce9
bump mypy to 0.981 for python/mypy#13398
Nov 23, 2022
b900146
linter
Nov 23, 2022
05973fb
make load_graph_data interface make more sense
Nov 23, 2022
b6f3faf
fix comment
Nov 23, 2022
aafa38d
Docs and some better names
Nov 24, 2022
1293b2e
add a todo
Nov 24, 2022
b810d75
Doc updates, rename some fields
Nov 28, 2022
be30b3a
Fix pre-commit
Nov 28, 2022
7458af7
Merge branch 'master' into generalizequerywrites
Nov 28, 2022
8021520
Code commment suggestions
Dec 9, 2022
2e3dd8e
Merge branch 'master' into generalizequerywrites
Dec 9, 2022
d2e4794
Merge branch 'generalizequerywrites' of github.com:lyft/cartography i…
Dec 9, 2022
5bcb915
Stackoverflow comment for clarity)
Dec 9, 2022
52a1d29
Support ingesting only parts of a schema without breaking the others
Dec 12, 2022
b5875a9
Doc comment
Dec 12, 2022
fa40b09
Linter
Dec 12, 2022
80b6ff5
Support matching on one or more properties
Dec 13, 2022
346ad50
Correctly name test
Dec 13, 2022
2e65877
Change key_refs to TargetNodeMatcher to enforce it as a mandatory field
Dec 14, 2022
cb233c3
Remove use of hacky default_field()
Dec 14, 2022
3be472b
Support subset of schema relationships for query generation, test mul…
Dec 14, 2022
5dd60a2
Docstrings
Dec 14, 2022
f6fea75
Comments in tests
Dec 15, 2022
fea0308
Better comments
Dec 15, 2022
7b3a268
Test for exception conditions
Dec 15, 2022
ab0af7b
Remove irrelevant comment
Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions cartography/client/core/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import neo4j

from cartography.util import batch


def read_list_of_values_tx(tx: neo4j.Transaction, query: str, **kwargs) -> List[Union[str, int]]:
"""
Expand Down Expand Up @@ -146,3 +148,65 @@ def read_single_dict_tx(tx: neo4j.Transaction, query: str, **kwargs) -> Dict[str

result.consume()
return value


def write_list_of_dicts_tx(
tx: neo4j.Transaction,
query: str,
**kwargs,
) -> None:
"""
Writes a list of dicts to Neo4j.

Example usage:
import neo4j
dict_list: List[Dict[Any, Any]] = [{...}, ...]

neo4j_driver = neo4j.driver(... args ...)
neo4j_session = neo4j_driver.Session(... args ...)

neo4j_session.write_transaction(
write_list_of_dicts_tx,
'''
UNWIND $DictList as data
MERGE (a:SomeNode{id: data.id})
SET
a.other_field = $other_field,
a.yet_another_kwarg_field = $yet_another_kwarg_field
...
''',
DictList=dict_list,
other_field='some extra value',
yet_another_kwarg_field=1234
)

:param tx: The neo4j write transaction.
:param query: The Neo4j write query to run.
:param kwargs: Keyword args to be supplied to the Neo4j query.
:return: None
"""
tx.run(query, kwargs)


def load_graph_data(
neo4j_session: neo4j.Session,
query: str,
dict_list: List[Dict[str, Any]],
**kwargs,
) -> None:
"""
Writes data to the graph.
:param neo4j_session: The Neo4j session
:param query: The Neo4j write query to run. This query is not meant to be handwritten, rather it should be generated
with cartography.graph.querybuilder.build_ingestion_query().
:param dict_list: The data to load to the graph represented as a list of dicts.
:param kwargs: Allows additional keyword args to be supplied to the Neo4j query.
:return: None
"""
for data_batch in batch(dict_list, size=10000):
neo4j_session.write_transaction(
write_list_of_dicts_tx,
query,
DictList=data_batch,
**kwargs,
)
255 changes: 255 additions & 0 deletions cartography/graph/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import abc
from dataclasses import dataclass
from dataclasses import field
from dataclasses import make_dataclass
from enum import auto
from enum import Enum
from typing import Dict
from typing import List
from typing import Optional


class LinkDirection(Enum):
"""
Each CartographyRelSchema has a LinkDirection that determines whether the relationship points toward the original
node ("INWARD") or away from the original node ("OUTWARD").

For example the following code creates the path `(:EMRCluster)<-[:RESOURCE]-(:AWSAccount)`:

class EMRCluster(CartographyNodeSchema):
label: str = "EMRCluster"
sub_resource_relationship: CartographyRelSchema = EMRClusterToAWSAccount()
# ...

class EMRClusterToAWSAccount(CartographyRelSchema):
target_node_label: str = "AWSAccount"
rel_label: str = "RESOURCE"
direction: LinkDirection = LinkDirection.INWARD
# ...

If `EMRClusterToAWSAccount.direction` was LinkDirection.OUTWARD, then the directionality of the relationship would
be `(:EMRCluster)-[:RESOURCE]->(:AWSAccount)` instead.
"""
INWARD = auto()
OUTWARD = auto()


class PropertyRef:
"""
PropertyRefs represent properties on cartography nodes and relationships.

cartography takes lists of Python dicts and loads them to Neo4j. PropertyRefs allow our dynamically generated Neo4j
ingestion queries to set values for a given node or relationship property from (A) a field on the dict being
processed (PropertyRef.set_in_kwargs=False; default), or (B) from a single variable provided by a keyword argument
(PropertyRef.set_in_kwargs=True).
"""

def __init__(self, name: str, set_in_kwargs=False):
"""
:param name: The name of the property
:param set_in_kwargs: Optional. If True, the property is not defined on the data dict, and we expect to find the
property in the kwargs.
If False, looks for the property in the data dict.
Defaults to False.
"""
self.name = name
self.set_in_kwargs = set_in_kwargs

def _parameterize_name(self) -> str:
return f"${self.name}"

def __repr__(self) -> str:
achantavy marked this conversation as resolved.
Show resolved Hide resolved
achantavy marked this conversation as resolved.
Show resolved Hide resolved
"""
`querybuilder.build_ingestion_query()`, generates a Neo4j batched ingestion query of the form
`UNWIND $DictList AS item [...]`.

If set_in_kwargs is False (default), we instruct the querybuilder to get the value for this given property from
the dict being processed. To do this, this function returns "item.<key on the dict>". This is used for loading
in lists of nodes.

On the other hand if set_in_kwargs is True, then the value will instead come from kwargs passed to
querybuilder.build_ingestion_query(). This is used for things like applying the same update tag to all nodes of
a given run.
"""
return f"item.{self.name}" if not self.set_in_kwargs else self._parameterize_name()


@dataclass(frozen=True)
class CartographyNodeProperties(abc.ABC):
"""
Abstract base dataclass that represents the properties on a CartographyNodeSchema. This class is abstract so that we
can enforce that all subclasses have an id and a lastupdated field.
"""
id: PropertyRef = field(init=False)
lastupdated: PropertyRef = field(init=False)

def __post_init__(self):
achantavy marked this conversation as resolved.
Show resolved Hide resolved
"""
Designed to prevent direct instantiation. This workaround is needed since this is a dataclass and an abstract
class without an abstract method defined.
See https://stackoverflow.com/q/60590442.
"""
if self.__class__ == CartographyNodeProperties:
raise TypeError("Cannot instantiate abstract class.")


@dataclass(frozen=True)
class CartographyRelProperties(abc.ABC):
"""
Abstract class that represents the properties on a CartographyRelSchema. This is intended to enforce that all
subclasses will have a lastupdated field defined on their resulting relationships.
"""
lastupdated: PropertyRef = field(init=False)

def __post_init__(self):
"""
Designed to prevent direct instantiation. This workaround is needed since this is a dataclass and an abstract
class without an abstract method defined.
"""
if self.__class__ == CartographyRelProperties:
raise TypeError("Cannot instantiate abstract class.")


@dataclass(frozen=True)
class TargetNodeMatcher:
"""
Dataclass used to encapsulate the following mapping:
Keys: one or more attribute names on the target_node_label used to uniquely identify what node to connect to.
Values: The value of the target_node_key used to uniquely identify what node to connect to. This is given as a
PropertyRef.
This is used to ensure dataclass immutability when composed as part of a CartographyNodeSchema object.
See `make_target_node_matcher()`.
"""
pass


@dataclass(frozen=True)
class CartographyRelSchema(abc.ABC):
"""
Abstract base dataclass that represents a cartography relationship.

The CartographyRelSchema contains properties that make it possible to connect the CartographyNodeSchema to other
existing nodes in the graph.
"""
@property
@abc.abstractmethod
def properties(self) -> CartographyRelProperties:
"""
:return: The properties of the relationship.
"""
pass

@property
@abc.abstractmethod
def target_node_label(self) -> str:
"""
:return: The target node label that this relationship will connect to.
"""
pass

@property
@abc.abstractmethod
def target_node_matcher(self) -> TargetNodeMatcher:
"""
:return: A TargetNodeMatcher object used to find what node(s) to attach the relationship to.
"""
pass

@property
@abc.abstractmethod
def rel_label(self) -> str:
"""
:return: The string label of the relationship.
"""
pass

@property
@abc.abstractmethod
def direction(self) -> LinkDirection:
"""
:return: The LinkDirection of the query. Please see the `LinkDirection` docs for a detailed explanation.
"""
pass


@dataclass(frozen=True)
class OtherRelationships:
"""
Encapsulates a list of CartographyRelSchema. This is used to ensure dataclass immutability when composed as part of
a CartographyNodeSchema object.
"""
rels: List[CartographyRelSchema]


@dataclass(frozen=True)
class ExtraNodeLabels:
"""
Encapsulates a list of str representing additional labels for the CartographyNodeSchema that this is composed on.
This wrapping is used to ensure dataclass immutability for the CartographyNodeSchema.
"""
labels: List[str]


@dataclass(frozen=True)
class CartographyNodeSchema(abc.ABC):
"""
Abstract base dataclass that represents a graph node in cartography. This is used to dynamically generate graph
ingestion queries.
"""
@property
@abc.abstractmethod
def label(self) -> str:
"""
:return: The primary str label of the node.
"""
pass

@property
@abc.abstractmethod
def properties(self) -> CartographyNodeProperties:
"""
:return: The properties of the node.
"""
pass

@property
def sub_resource_relationship(self) -> Optional[CartographyRelSchema]:
"""
Optional.
Allows subclasses to specify a subresource relationship for the given node. "Sub resource" is a term we made up
best defined by examples:
- In the AWS module, the subresource is an AWSAccount
- In Azure, the subresource is a Subscription
- In GCP, the subresource is a GCPProject
- In Okta, the subresource is an OktaOrganization
... and so on and so forth.
:return:
"""
return None

@property
def other_relationships(self) -> Optional[OtherRelationships]:
"""
Optional.
Allows subclasses to specify additional cartography relationships on the node.
:return: None if not overriden. Else return the node's OtherRelationships.
"""
return None

@property
def extra_node_labels(self) -> Optional[ExtraNodeLabels]:
"""
Optional.
Allows specifying extra labels on the node.
:return: None if not overriden. Else return the ExtraNodeLabels specified on the node.
"""
return None


def make_target_node_matcher(key_ref_dict: Dict[str, PropertyRef]) -> TargetNodeMatcher:
"""
:param key_ref_dict: A Dict mapping keys present on the node to PropertyRef objects.
:return: A TargetNodeMatcher used for CartographyRelSchema to match with other nodes.
"""
fields = [(key, PropertyRef, field(default=prop_ref)) for key, prop_ref in key_ref_dict.items()]
return make_dataclass(TargetNodeMatcher.__name__, fields, frozen=True)()
Loading