Skip to content
This repository has been archived by the owner on Nov 30, 2022. It is now read-only.

136 mysql query execution #168

Merged
merged 6 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/fidesops/service/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
LegacyCursorResult,
Connection,
)
from sqlalchemy.sql.elements import TextClause
from sqlalchemy.exc import OperationalError, InternalError
from sqlalchemy.sql.elements import TextClause
from snowflake.sqlalchemy import URL as Snowflake_URL

from fidesops.common_exceptions import ConnectionException
Expand Down Expand Up @@ -183,6 +183,32 @@ def create_client(self) -> Engine:
echo=not self.hide_parameters,
)

# Overrides BaseConnector.cursor_result_to_rows
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
def cursor_result_to_rows(results: LegacyCursorResult) -> List[Row]:
"""Convert SQLAlchemy results to a list of dictionaries"""
columns: List[Column] = results.cursor.description
l = len(columns)
rows = []
for row_tuple in results:
rows.append({columns[i][0]: row_tuple[i] for i in range(l)})
return rows
pattisdr marked this conversation as resolved.
Show resolved Hide resolved

def retrieve_data(
self, node: TraversalNode, policy: Policy, input_data: Dict[str, List[Any]]
) -> List[Row]:
"""Retrieve sql data"""
query_config = self.query_config(node)
client = self.client()
stmt: Optional[TextClause] = query_config.generate_query(input_data, policy)
if stmt is None:
return []
logger.info(f"Starting data retrieval for {node.address}")
with client.connect() as connection:
# fixme: update mssql type too to LegacyCursorResult
results: LegacyCursorResult = connection.execute(stmt)
return MySQLConnector.cursor_result_to_rows(results)
pattisdr marked this conversation as resolved.
Show resolved Hide resolved


class RedshiftConnector(SQLConnector):
"""Connector specific to Amazon Redshift"""
Expand Down
19 changes: 16 additions & 3 deletions tests/api/v1/endpoints/test_dataset_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def test_example_datasets(example_datasets):
assert len(example_datasets[3]["collections"]) == 11
assert example_datasets[4]["fides_key"] == "mssql_example_test_dataset"
assert len(example_datasets[4]["collections"]) == 11
assert example_datasets[5]["fides_key"] == "mysql_example_test_dataset"
assert len(example_datasets[5]["collections"]) == 11


class TestValidateDataset:
Expand Down Expand Up @@ -440,7 +442,7 @@ def test_patch_datasets_bulk_create(

assert response.status_code == 200
response_body = json.loads(response.text)
assert len(response_body["succeeded"]) == 5
assert len(response_body["succeeded"]) == 6
assert len(response_body["failed"]) == 0

# Confirm that postgres dataset matches the values we provided
Expand Down Expand Up @@ -476,6 +478,17 @@ def test_patch_datasets_bulk_create(
assert "Example of a Microsoft SQLServer dataset" in mssql_dataset["description"]
assert len(mssql_dataset["collections"]) == 11

# check the mysql dataset
mysql_dataset = response_body["succeeded"][5]
mysql_config = DatasetConfig.get_by(
db=db, field="fides_key", value="mysql_example_test_dataset"
)
assert mysql_config is not None
assert mysql_dataset["fides_key"] == "mysql_example_test_dataset"
assert mysql_dataset["name"] == "MySQL Example Test Dataset"
assert "Example of a MySQL dataset" in mysql_dataset["description"]
assert len(mysql_dataset["collections"]) == 11

postgres_config.delete(db)
mongo_config.delete(db)
mssql_config.delete(db)
Expand Down Expand Up @@ -527,7 +540,7 @@ def test_patch_datasets_bulk_update(

assert response.status_code == 200
response_body = json.loads(response.text)
assert len(response_body["succeeded"]) == 5
assert len(response_body["succeeded"]) == 6
assert len(response_body["failed"]) == 0

# test postgres
Expand Down Expand Up @@ -600,7 +613,7 @@ def test_patch_datasets_failed_response(
assert response.status_code == 200 # Returns 200 regardless
response_body = json.loads(response.text)
assert len(response_body["succeeded"]) == 0
assert len(response_body["failed"]) == 5
assert len(response_body["failed"]) == 6

for failed_response in response_body["failed"]:
assert "Dataset create/update failed" in failed_response["message"]
Expand Down
6 changes: 3 additions & 3 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,8 @@ def example_datasets() -> List[Dict]:
"data/dataset/mongo_example_test_dataset.yml",
"data/dataset/snowflake_example_test_dataset.yml",
"data/dataset/redshift_example_test_dataset.yml",
"data/dataset/mssql_example_test_dataset.yml"
"data/dataset/mssql_example_test_dataset.yml",
"data/dataset/mysql_example_test_dataset.yml"
]
for filename in example_filenames:
example_datasets += load_dataset(filename)
Expand Down Expand Up @@ -1100,8 +1101,7 @@ def mysql_example_test_dataset_config(
db: Session,
example_datasets: List[Dict],
) -> Generator:
# todo: this references the incorrect dataset
mysql_dataset = example_datasets[0]
mysql_dataset = example_datasets[5]
fides_key = mysql_dataset["fides_key"]
connection_config_mysql.name = fides_key
connection_config_mysql.key = fides_key
Expand Down
87 changes: 85 additions & 2 deletions tests/integration_tests/test_sql_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ def test_sql_erasure_task(db, postgres_inserts, integration_postgres_config):


@pytest.mark.integration
def test_sql_access_request_task(db, policy, integration_postgres_config) -> None:
def test_postgres_access_request_task(db, policy, integration_postgres_config) -> None:

privacy_request = PrivacyRequest(
id=f"test_sql_access_request_task_{random.randint(0, 1000)}"
id=f"test_postgres_access_request_task_{random.randint(0, 1000)}"
eastandwestwind marked this conversation as resolved.
Show resolved Hide resolved
)

v = graph_task.run_access_request(
Expand Down Expand Up @@ -333,6 +333,89 @@ def test_sql_access_request_task(db, policy, integration_postgres_config) -> Non
> 0
)

# todo- add mssql test
pattisdr marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.integration
def test_mysql_access_request_task(db, policy, connection_config_mysql) -> None:

privacy_request = PrivacyRequest(
id=f"test_mysql_access_request_task_{random.randint(0, 1000)}"
)

v = graph_task.run_access_request(
privacy_request,
policy,
integration_db_graph("my_mysql_db_1"),
[connection_config_mysql],
{"email": "[email protected]"},
)

pattisdr marked this conversation as resolved.
Show resolved Hide resolved
assert_rows_match(
v["my_mysql_db_1:address"],
min_size=2,
keys=["id", "street", "city", "state", "zip"],
)
assert_rows_match(
v["my_mysql_db_1:orders"],
min_size=3,
keys=["id", "customer_id", "shipping_address_id", "payment_card_id"],
)
assert_rows_match(
v["my_mysql_db_1:payment_card"],
min_size=2,
keys=["id", "name", "ccn", "customer_id", "billing_address_id"],
)
assert_rows_match(
v["my_mysql_db_1:customer"],
min_size=1,
keys=["id", "name", "email", "address_id"],
)

# links
assert v["my_mysql_db_1:customer"][0]["email"] == "[email protected]"

logs = (
ExecutionLog.query(db=db)
.filter(ExecutionLog.privacy_request_id == privacy_request.id)
.all()
)

logs = [log.__dict__ for log in logs]
assert (
len(
records_matching_fields(
logs, dataset_name="my_mysql_db_1", collection_name="customer"
)
)
> 0
)
assert (
len(
records_matching_fields(
logs, dataset_name="my_mysql_db_1", collection_name="address"
)
)
> 0
)
assert (
len(
records_matching_fields(
logs, dataset_name="my_mysql_db_1", collection_name="orders"
)
)
> 0
)
assert (
len(
records_matching_fields(
logs,
dataset_name="my_mysql_db_1",
collection_name="payment_card",
)
)
> 0
)


@pytest.mark.integration
def test_filter_on_data_categories(
Expand Down
82 changes: 81 additions & 1 deletion tests/service/privacy_request/request_runner_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
from fidesops.service.connectors import PostgreSQLConnector
from fidesops.service.connectors.sql_connector import (
SnowflakeConnector,
RedshiftConnector, MicrosoftSQLServerConnector,
RedshiftConnector,
MicrosoftSQLServerConnector,
MySQLConnector,
)
from fidesops.service.masking.strategy.masking_strategy_factory import get_strategy
from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner
Expand Down Expand Up @@ -212,6 +214,45 @@ def test_create_and_process_access_request_mssql(
pr.delete(db=db)


@pytest.mark.integration
@mock.patch("fidesops.models.privacy_request.PrivacyRequest.trigger_policy_webhook")
def test_create_and_process_access_request_mysql(
trigger_webhook_mock,
mysql_example_test_dataset_config,
db,
cache,
policy,
policy_pre_execution_webhooks,
policy_post_execution_webhooks,
):

customer_email = "[email protected]"
data = {
"requested_at": "2021-08-30T16:09:37.359Z",
"policy_key": policy.key,
"identity": {"email": customer_email},
}

pr = get_privacy_request_results(db, policy, cache, data)

results = pr.get_results()
assert len(results.keys()) == 11

for key in results.keys():
assert results[key] is not None
assert results[key] != {}

result_key_prefix = f"EN_{pr.id}__access_request__mysql_example_test_dataset:"
customer_key = result_key_prefix + "customer"
assert results[customer_key][0]["email"] == customer_email

visit_key = result_key_prefix + "visit"
assert results[visit_key][0]["email"] == customer_email
# Both pre-execution webhooks and both post-execution webhooks were called
assert trigger_webhook_mock.call_count == 4
pr.delete(db=db)


@pytest.mark.integration_erasure
def test_create_and_process_erasure_request_specific_category(
postgres_example_test_dataset_config,
Expand Down Expand Up @@ -290,6 +331,45 @@ def test_create_and_process_erasure_request_specific_category_mssql(
assert customer_found


@pytest.mark.integration_erasure
def test_create_and_process_erasure_request_specific_category_mysql(
mysql_example_test_dataset_config,
cache,
db,
generate_auth_header,
erasure_policy,
connection_config_mysql,
):
customer_email = "[email protected]"
customer_id = 1
data = {
"requested_at": "2021-08-30T16:09:37.359Z",
"policy_key": erasure_policy.key,
"identity": {"email": customer_email},
}

pr = get_privacy_request_results(db, erasure_policy, cache, data)
pr.delete(db=db)

example_mysql_uri = MySQLConnector(connection_config_mysql).build_uri()
engine = get_db_engine(database_uri=example_mysql_uri)
SessionLocal = get_db_session(engine=engine)
integration_db = SessionLocal()
stmt = select(
column("id"),
column("name"),
).select_from(table("customer"))
res = integration_db.execute(stmt).all()

customer_found = False
for row in res:
if customer_id in row:
customer_found = True
# Check that the `name` field is `None`
assert row.name is None
assert customer_found


@pytest.mark.integration_erasure
def test_create_and_process_erasure_request_generic_category(
postgres_example_test_dataset_config,
Expand Down