Skip to content

Commit

Permalink
PROD-2734 - initial partitioning support (user-defined windows) (#5325)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamsachs authored Oct 4, 2024
1 parent eb9ba5e commit c8f5b1f
Show file tree
Hide file tree
Showing 15 changed files with 649 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The types of changes are:

### Added
- Make all "Description" table columns expandable in Admin UI tables [#5340](https://github.com/ethyca/fides/pull/5340)
- Initial support for DSR requests against partitioned BigQuery tables [#5325](https://github.com/ethyca/fides/pull/5325)
- Added new RDS MySQL Connector [#5343](https://github.com/ethyca/fides/pull/5343)

### Developer Experience
Expand Down
16 changes: 16 additions & 0 deletions data/dataset/bigquery_example_test_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,19 @@ dataset:
data_type: string
- name: last_visit
data_categories: [system.operations]
- name: visit_partitioned
fides_meta:
partitioning:
where_clauses: [
"`last_visit` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 500 DAY) AND `last_visit` <= CURRENT_TIMESTAMP()",
"`last_visit` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `last_visit` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 500 DAY)",
"`last_visit` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)",
]
fields:
- name: email
data_categories: [user.contact.email]
fides_meta:
identity: email
data_type: string
- name: last_visit
data_categories: [system.operations]
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218
expandvars==0.9.0
fastapi[all]==0.111.0
fastapi-pagination[sqlalchemy]==0.12.25
fideslang==3.0.6
fideslang==3.0.7
fideslog==1.2.10
firebase-admin==5.3.0
GitPython==3.1.41
Expand Down
87 changes: 87 additions & 0 deletions src/fides/api/graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from re import match, search
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union

from fideslang.models import MaskingStrategyOverride
Expand Down Expand Up @@ -443,6 +444,22 @@ class MaskingOverride:
length: Optional[int]


# for now, we only support BQ partitioning, so the clause pattern we expect is BQ-specific
BIGQUERY_PARTITION_CLAUSE_PATTERN = r"^`(?P<field_1>[a-zA-Z0-9_]*)` ([<|>][=]?) (?P<operand_1>[a-zA-Z0-9_\s(),\.\"\']*)(\sAND `(?P<field_2>[a-zA-Z0-9_]*)` ([<|>][=]?) (?P<operand_2>[a-zA-Z0-9_\s(),\.\"\']*))?$"
# protected keywords that are _not_ allowed in the operands, to avoid any potential malicious execution.
PROHIBITED_KEYWORDS = [
"UNION",
"INSERT",
"UPDATE",
"CREATE",
"DROP",
"SELECT",
"CHAR",
"HAVING",
"EXEC",
]


class Collection(BaseModel):
"""A single grouping of individual data points that are accessed together"""

Expand All @@ -456,6 +473,7 @@ class Collection(BaseModel):
grouped_inputs: Set[str] = set()
data_categories: Set[FidesKey] = set()
masking_strategy_override: Optional[MaskingStrategyOverride] = None
partitioning: Optional[Dict] = None

@property
def field_dict(self) -> Dict[FieldPath, Field]:
Expand Down Expand Up @@ -613,6 +631,7 @@ def build_field(serialized_field: dict) -> Field:
CollectionAddress.from_string(addr_string)
for addr_string in data.get("erase_after", [])
}
data["partitioning"] = data.get("partitioning")

return Collection.model_validate(data)

Expand Down Expand Up @@ -647,6 +666,74 @@ def serialize_erase_after(self, erase_after: Set[CollectionAddress]) -> Set[str]
},
)

@field_validator("partitioning")
@classmethod
def validate_partitioning(
cls, partitioning: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Validates the `partitioning` dict field.
The `partitioning` dict field is untyped in Fideslang, but here we enforce
that it has the required and expected `where_clauses` key, whose value must be
a list of strings.
The string values are validated to ensure they match the expected syntax, which
is strictly prescribed. The string values MUST be a valid SQL clause that defines
a partition window, with the form:
```
`column_1` >(=) [some value] AND `column_1` <(=) [some value]
```
To be clear, some notable constraints on the input:
- the clause string must begin by referencing a column name wrapped by backticks (`)
- the clause string must compare that first column with a `<>(=)` operator, and may
include at most one other conditional with a `<>(=)` operator that's joined to the first
conditional via an AND operator
- if the clause string contains a second conditional, it must reference the same column name
as the first conditional, also wrapped by backticks
- column names (wrapped by backticks) must always be on the _left_ side of the `<>(=)`operator
in its conditional
"""
if not partitioning:
return partitioning

# NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL,
# we should be sure to still support the existing `where_clause` partition definition on
# any in-progress DSRs so that they can run through to completion.
if where_clauses := partitioning.get("where_clauses"):
if not isinstance(where_clauses, List) or not all(
isinstance(where_clause, str) for where_clause in where_clauses
):
raise ValueError("`where_clauses` must be a list of strings!")
for partition_clause in where_clauses:
if matching := match(
BIGQUERY_PARTITION_CLAUSE_PATTERN, partition_clause
):
# check that if there are two field comparison sub-clauses, they reference the same field, e.g.:
# "`my_field_1` > 5 AND `my_field_1` <= 10", not "`my_field_1` > 5 AND `my_field_1` <= 10"
if matching["field_2"] is not None and (
matching["field_1"] != matching["field_2"]
):
raise ValueError(
f"Partition clause must have matching fields. Identified non-matching field references '{matching['field_1']}' and '{matching['field_2']}"
)

for prohibited_keyword in PROHIBITED_KEYWORDS:
search_str = prohibited_keyword.lower() + r"\s"
if search(search_str, partition_clause.lower()):
raise ValueError(
"Prohibited keyword referenced in partition clause"
)
else:
raise ValueError("Unsupported partition clause format")
return partitioning
raise ValueError(
"`where_clauses` must be specified in `partitioning` specification!"
)


class GraphDataset(BaseModel):
"""Master collection of collections that are accessed in a common way"""
Expand Down
5 changes: 5 additions & 0 deletions src/fides/api/models/datasetconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ def convert_dataset_to_graph(
if collection.fides_meta and collection.fides_meta.masking_strategy_override:
masking_override = collection.fides_meta.masking_strategy_override

collection_partitioning = None
if collection.fides_meta and collection.fides_meta.partitioning:
collection_partitioning = collection.fides_meta.partitioning

graph_collection = Collection(
name=collection.name,
fields=graph_fields,
Expand All @@ -339,6 +343,7 @@ def convert_dataset_to_graph(
data_categories=(
set(collection.data_categories) if collection.data_categories else set()
),
partitioning=collection_partitioning,
)
graph_collections.append(graph_collection)
logger.debug(
Expand Down
109 changes: 98 additions & 11 deletions src/fides/api/service/connectors/query_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ class QueryConfig(Generic[T], ABC):
def __init__(self, node: ExecutionNode):
self.node = node

@property
def partitioning(self) -> Optional[Dict]: # pylint: disable=R1711
# decided to de-scope partitioning support to only bigquery as this grew more complex,
# but keeping more generic support stubbed out feels like a reasonable step.
if self.node.collection.partitioning:
logger.warning(
"Partitioning is only supported on BigQuery connectors at this time!"
)
return None

def field_map(self) -> Dict[FieldPath, Field]:
"""Flattened FieldPaths of interest from this traversal_node."""
return self.node.collection.field_dict
Expand Down Expand Up @@ -707,7 +717,7 @@ def generate_raw_query(
data_vals = list(data)
query_data_keys: List[str] = []
for val in data_vals:
# appending "_in_stmt_generated_" (can be any arbitrary str) so that this name has less change of conflicting with pre-existing column in table
# appending "_in_stmt_generated_" (can be any arbitrary str) so that this name has lower chance of conflicting with pre-existing column in table
query_data_name = (
field_name + "_in_stmt_generated_" + str(data_vals.index(val))
)
Expand Down Expand Up @@ -837,6 +847,42 @@ class BigQueryQueryConfig(QueryStringWithoutTuplesOverrideQueryConfig):

namespace_meta_schema = BigQueryNamespaceMeta

@property
def partitioning(self) -> Optional[Dict]:
# Overriden from base implementation to allow for _only_ BQ partitioning, for now
return self.node.collection.partitioning

def get_partition_clauses(
self,
) -> List[str]:
"""
Returns the WHERE clauses specified in the partitioning spec
Currently, only where-clause based partitioning is supported.
TODO: derive partitions from a start/end/interval specification
NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL,
we should be sure to still support the existing `where_clause` partition definition on
any in-progress DSRs so that they can run through to completion.
"""
partition_spec = self.partitioning
if not partition_spec:
logger.error(
"Partitioning clauses cannot be retrieved, no partitioning specification found"
)
return []

if where_clauses := partition_spec.get("where_clauses"):
return where_clauses

# TODO: implement more advanced partitioning support!

raise ValueError(
"`where_clauses` must be specified in partitioning specification!"
)

def _generate_table_name(self) -> str:
"""
Prepends the dataset ID and project ID to the base table name
Expand All @@ -860,7 +906,6 @@ def get_formatted_query_string(
Returns a query string with backtick formatting for tables that have the same names as
BigQuery reserved words.
"""

return f'SELECT {field_list} FROM `{self._generate_table_name()}` WHERE {" OR ".join(clauses)}'

def generate_masking_stmt(
Expand All @@ -870,7 +915,7 @@ def generate_masking_stmt(
policy: Policy,
request: PrivacyRequest,
client: Engine,
) -> Union[Optional[Update], Optional[Delete]]:
) -> Union[List[Update], List[Delete]]:
"""
Generate a masking statement for BigQuery.
Expand All @@ -887,10 +932,15 @@ def generate_masking_stmt(

def generate_update(
self, row: Row, policy: Policy, request: PrivacyRequest, client: Engine
) -> Optional[Update]:
) -> List[Update]:
"""
Using TextClause to insert 'None' values into BigQuery throws an exception, so we use update clause instead.
Returns a SQLAlchemy Update object. Does not actually execute the update object.
Returns a List of SQLAlchemy Update object. Does not actually execute the update object.
A List of multiple Update objects are returned for partitioned tables; for a non-partitioned table,
a single Update object is returned in a List for consistent typing.
TODO: DRY up this method and `generate_delete` a bit
"""
update_value_map: Dict[str, Any] = self.update_value_map(row, policy, request)
non_empty_primary_keys: Dict[str, Field] = filter_nonempty_values(
Expand All @@ -907,19 +957,41 @@ def generate_update(
"There is not enough data to generate a valid update statement for {}",
self.node.address,
)
return None
return []

table = Table(self._generate_table_name(), MetaData(bind=client), autoload=True)
pk_clauses: List[ColumnElement] = [
getattr(table.c, k) == v for k, v in non_empty_primary_keys.items()
]
return table.update().where(*pk_clauses).values(**update_value_map)

def generate_delete(self, row: Row, client: Engine) -> Optional[Delete]:
"""Returns a SQLAlchemy DELETE statement for BigQuery. Does not actually execute the delete statement.
if self.partitioning:
partition_clauses = self.get_partition_clauses()
partitioned_queries = []
logger.info(
f"Generating {len(partition_clauses)} partition queries for node '{self.node.address}' in DSR execution"
)
for partition_clause in partition_clauses:
partitioned_queries.append(
table.update()
.where(*(pk_clauses + [text(partition_clause)]))
.values(**update_value_map)
)

return partitioned_queries

return [table.update().where(*pk_clauses).values(**update_value_map)]

def generate_delete(self, row: Row, client: Engine) -> List[Delete]:
"""Returns a List of SQLAlchemy DELETE statements for BigQuery. Does not actually execute the delete statement.
Used when a collection-level masking override is present and the masking strategy is DELETE.
A List of multiple DELETE statements are returned for partitioned tables; for a non-partitioned table,
a single DELETE statement is returned in a List for consistent typing.
TODO: DRY up this method and `generate_update` a bit
"""

non_empty_primary_keys: Dict[str, Field] = filter_nonempty_values(
{
fpath.string_path: fld.cast(row[fpath.string_path])
Expand All @@ -934,13 +1006,28 @@ def generate_delete(self, row: Row, client: Engine) -> Optional[Delete]:
"There is not enough data to generate a valid DELETE statement for {}",
self.node.address,
)
return None
return []

table = Table(self._generate_table_name(), MetaData(bind=client), autoload=True)
pk_clauses: List[ColumnElement] = [
getattr(table.c, k) == v for k, v in non_empty_primary_keys.items()
]
return table.delete().where(*pk_clauses)

if self.partitioning:
partition_clauses = self.get_partition_clauses()
partitioned_queries = []
logger.info(
f"Generating {len(partition_clauses)} partition queries for node '{self.node.address}' in DSR execution"
)

for partition_clause in partition_clauses:
partitioned_queries.append(
table.delete().where(*(pk_clauses + [text(partition_clause)]))
)

return partitioned_queries

return [table.delete().where(*pk_clauses)]


MongoStatement = Tuple[Dict[str, Any], Dict[str, Any]]
Expand Down
Loading

0 comments on commit c8f5b1f

Please sign in to comment.