From 162f868d12cd965c2c72b4fc440dbefab7f019f7 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Thu, 2 Jun 2022 17:06:50 -0500 Subject: [PATCH 01/10] WIP Cache SQL queries for the manual connector for retrieve/update data. The manual connector is probably not a SQL database, but it's a pretty readable way to surface what needs to be performed manually. The actually format will probably change. - To the privacy request status endpoint, surface the stopped step, stopped collection, manual queries, and resume endpoint for paused or failed privacy requests. --- data/dataset/manual_dataset.yml | 4 +- .../v1/endpoints/privacy_request_endpoints.py | 49 +++++++++++++- src/fidesops/models/privacy_request.py | 26 ++++++- src/fidesops/schemas/privacy_request.py | 6 +- .../service/connectors/manual_connector.py | 67 ++++++++++++++++++- tests/integration_tests/test_manual_task.py | 66 +++++++++++++++--- tests/task/traversal_data.py | 11 ++- 7 files changed, 209 insertions(+), 20 deletions(-) diff --git a/data/dataset/manual_dataset.yml b/data/dataset/manual_dataset.yml index 91632b4b1..c278575d9 100644 --- a/data/dataset/manual_dataset.yml +++ b/data/dataset/manual_dataset.yml @@ -10,11 +10,11 @@ dataset: fidesops_meta: primary_key: true - name: authorized_user - data_categories: [ user.derived.identifiable ] + data_categories: [ user.provided.identifiable ] fidesops_meta: data_type: string - name: customer_id - data_categories: [ user.derived.identifiable.unique_id ] + data_categories: [ user.provided.identifiable.unique_id ] fidesops_meta: references: - dataset: postgres_example_test_dataset diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index 80bb17ecd..8732f9e56 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -281,7 +281,6 @@ def execution_logs_by_dataset_name( def _filter_privacy_request_queryset( query: Query, - db: Session = Depends(deps.get_db), request_id: Optional[str] = None, status: Optional[PrivacyRequestStatus] = None, created_lt: Optional[datetime] = None, @@ -306,7 +305,7 @@ def _filter_privacy_request_queryset( for end, start, field_name in [ [created_lt, created_gt, "created"], [completed_lt, completed_gt, "completed"], - [errored_lt, errored_gt, "errorer"], + [errored_lt, errored_gt, "errored"], [started_lt, started_gt, "started"], ]: if end is None or start is None: @@ -362,6 +361,48 @@ def _filter_privacy_request_queryset( return query.order_by(PrivacyRequest.created_at.desc()) +def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: + """ + Update a paused or errored privacy request with instructions about resuming. + """ + stopped_step: Optional[PausedStep] = None + stopped_collection: Optional[CollectionAddress] = None + resume_endpoint: Optional[str] = None + cached_queries: List[Dict[str, Any]] = [] + + if privacy_request.status == PrivacyRequestStatus.paused: + ( + stopped_step, + stopped_collection, + ) = privacy_request.get_paused_step_and_collection() + cached_queries = privacy_request.get_cached_queries( + stopped_step, stopped_collection + ) + + if stopped_step: + resume_endpoint = ( + PRIVACY_REQUEST_MANUAL_ERASURE + if stopped_step == PausedStep.erasure + else PRIVACY_REQUEST_MANUAL_INPUT + ) + else: + resume_endpoint = PRIVACY_REQUEST_RESUME + + elif privacy_request.status == PrivacyRequestStatus.error: + ( + stopped_step, + stopped_collection, + ) = privacy_request.get_failed_step_and_collection() + resume_endpoint = PRIVACY_REQUEST_RETRY + + privacy_request.stopped_step = stopped_step.value if stopped_step else None + privacy_request.stopped_collection = ( + stopped_collection.value if stopped_collection else None + ) + privacy_request.manual_queries = cached_queries + privacy_request.resume_endpoint = resume_endpoint + + @router.get( urls.PRIVACY_REQUESTS, dependencies=[Security(verify_oauth_client, scopes=[scopes.PRIVACY_REQUEST_READ])], @@ -401,7 +442,6 @@ def get_request_status( query = db.query(PrivacyRequest) query = _filter_privacy_request_queryset( query, - db, request_id, status, created_lt, @@ -436,6 +476,9 @@ def get_request_status( for item in paginated.items: item.identity = item.get_cached_identity_data() + for item in paginated.items: + attach_resume_instructions(item) + return paginated diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index ad9d14307..1d03e96e6 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -72,7 +72,7 @@ def generate_request_callback_jwe(webhook: PolicyPreWebhook) -> str: return generate_jwe(json.dumps(jwe.dict())) -class PrivacyRequest(Base): +class PrivacyRequest(Base): # pylint: disable=R0904 """ The DB ORM model to describe current and historic PrivacyRequests. A privacy request is a database record representing a data subject request's progression within the Fidesops system. @@ -318,6 +318,30 @@ def get_manual_erasure_count(self, collection: CollectionAddress) -> Optional[in ) return list(value_dict.values())[0] if value_dict else None + def cache_queries( + self, + paused_step: PausedStep, + paused_collection: CollectionAddress, + query_params: List[Dict[str, Any]], + ) -> None: + """Cache the list of queries to be performed manually for a given paused step and collection""" + cache: FidesopsRedis = get_cache() + cache.set_encoded_object( + f"PAUSED_QUERY__{paused_step.value}__{paused_collection}__{self.id}", + query_params, + ) + + def get_cached_queries( + self, paused_step: PausedStep, paused_collection: CollectionAddress + ) -> Optional[List[Dict[str, Any]]]: + """Get the list of queries to be performed manually for a given paused step and collection""" + cache: FidesopsRedis = get_cache() + prefix = f"PAUSED_QUERY__{paused_step.value}__{paused_collection}__{self.id}" + value_dict: Optional[ + Dict[str, Optional[List[Dict[str, Any]]]] + ] = cache.get_encoded_objects_by_prefix(prefix) + return list(value_dict.values())[0] if value_dict else None + def trigger_policy_webhook(self, webhook: WebhookTypes) -> None: """Trigger a request to a single customer-defined policy webhook. Raises an exception if webhook response should cause privacy request execution to stop. diff --git a/src/fidesops/schemas/privacy_request.py b/src/fidesops/schemas/privacy_request.py index b645558b0..3c8ce8106 100644 --- a/src/fidesops/schemas/privacy_request.py +++ b/src/fidesops/schemas/privacy_request.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum as EnumType -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from pydantic import Field, validator @@ -127,6 +127,10 @@ class PrivacyRequestResponse(BaseSchema): # creation. identity: Optional[Dict[str, str]] policy: PolicySchema + stopped_step: Optional[str] + stopped_collection: Optional[str] + manual_queries: List[Dict[str, Any]] = [] + resume_endpoint: Optional[str] class Config: """Set orm_mode and use_enum_values""" diff --git a/src/fidesops/service/connectors/manual_connector.py b/src/fidesops/service/connectors/manual_connector.py index 5013c1e44..1890a839b 100644 --- a/src/fidesops/service/connectors/manual_connector.py +++ b/src/fidesops/service/connectors/manual_connector.py @@ -1,17 +1,23 @@ +import logging from typing import Any, Dict, List, Optional +from sqlalchemy.sql.elements import TextClause + from fidesops.common_exceptions import PrivacyRequestPaused from fidesops.graph.traversal import TraversalNode from fidesops.models.policy import PausedStep, Policy from fidesops.models.privacy_request import PrivacyRequest from fidesops.service.connectors.base_connector import BaseConnector +from fidesops.service.connectors.query_config import SQLQueryConfig from fidesops.util.collection_util import Row +logger = logging.getLogger(__name__) + class ManualConnector(BaseConnector[None]): - def query_config(self, node: TraversalNode) -> None: + def query_config(self, node: TraversalNode) -> SQLQueryConfig: """No query_config for the Manual Connector""" - return None + return SQLQueryConfig(node) def create_client(self) -> None: """Not needed because this connector involves a human performing some lookup step""" @@ -34,6 +40,8 @@ def retrieve_data( ) -> Optional[List[Row]]: """ Returns manually added data for the given collection if it exists, otherwise pauses the Privacy Request. + + Caches a SQL query as manual instructions for the user to fulfill a manual request. """ cached_results: Optional[List[Row]] = privacy_request.get_manual_input( node.address @@ -47,6 +55,14 @@ def retrieve_data( privacy_request.cache_paused_step_and_collection( PausedStep.access, node.address ) + + query_config = self.query_config(node) + stmt: Optional[TextClause] = query_config.generate_query(input_data, policy) + cached_query: Optional[Dict[str, Any]] = format_cached_query(stmt) + privacy_request.cache_queries( + PausedStep.access, node.address, [cached_query] if cached_query else [] + ) + raise PrivacyRequestPaused( f"Collection '{node.address.value}' waiting on manual data for privacy request '{privacy_request.id}'" ) @@ -59,7 +75,10 @@ def mask_data( rows: List[Row], ) -> Optional[int]: """If erasure confirmation has been added to the manual cache, continue, otherwise, - pause and wait for manual input.""" + pause and wait for manual input. + + Caches a SQL query as manual instructions for the user to fulfill the manual erasure request. + """ manual_cached_count: Optional[int] = privacy_request.get_manual_erasure_count( node.address ) @@ -71,6 +90,48 @@ def mask_data( privacy_request.cache_paused_step_and_collection( PausedStep.erasure, node.address ) + + query_config = self.query_config(node) + cached_queries: List[Dict[str, Any]] = [] + for row in rows: + # Cache a manual update query to surface to the user + update_stmt: Optional[TextClause] = query_config.generate_update_stmt( + row, policy, privacy_request + ) + formatted_update_stmt: Optional[Dict[str, Any]] = format_cached_query( + update_stmt + ) + if formatted_update_stmt: + cached_queries.append(formatted_update_stmt) + + privacy_request.cache_queries(PausedStep.erasure, node.address, cached_queries) + raise PrivacyRequestPaused( f"Collection '{node.address.value}' waiting on manual erasure confirmation for privacy request '{privacy_request.id}'" ) + + +def format_cached_query(stmt: Optional[TextClause]) -> Optional[Dict[str, Any]]: + """ + Format the SQLAlchemy TextClause for caching in Redis, for returning to the user the manual actions required on their + end. Store the query and the parameters separately. + + For example: + { + 'query': 'UPDATE filing_cabinet SET authorized_user = :authorized_user WHERE id = :id', + 'parameters': { + 'authorized_user': None, + 'id': 121 + } + } + """ + if stmt is None: + return None + + query_elems: Dict[str, Any] = {"query": str(stmt), "parameters": {}} + for ( + param_name, + bind_parameters, + ) in stmt._bindparams.items(): # pylint: disable=W0212 + query_elems["parameters"][param_name] = bind_parameters.value + return query_elems diff --git a/tests/integration_tests/test_manual_task.py b/tests/integration_tests/test_manual_task.py index 3f662d840..ba5496cdb 100644 --- a/tests/integration_tests/test_manual_task.py +++ b/tests/integration_tests/test_manual_task.py @@ -44,6 +44,17 @@ def test_postgres_with_manual_input_access_request_task( assert paused_node.value == "manual_example:storage_unit" assert request_type == PausedStep.access + cached_queries = privacy_request.get_cached_queries( + PausedStep.access, CollectionAddress("manual_example", "storage_unit") + ) + # Assert instructional query we'll surface to the user + assert cached_queries == [ + { + "query": "SELECT box_id,email FROM storage_unit WHERE email = :email", + "parameters": {"email": ("customer-1@example.com",)}, + } + ] + # Mock user retrieving storage unit data by adding manual data to cache privacy_request.cache_manual_input( CollectionAddress.from_string("manual_example:storage_unit"), @@ -60,6 +71,21 @@ def test_postgres_with_manual_input_access_request_task( {"email": "customer-1@example.com"}, ) + request_type, paused_node = privacy_request.get_paused_step_and_collection() + assert paused_node.value == "manual_example:filing_cabinet" + assert request_type == PausedStep.access + + cached_queries = privacy_request.get_cached_queries( + PausedStep.access, CollectionAddress("manual_example", "filing_cabinet") + ) + assert cached_queries == [ + { + "query": "SELECT id,authorized_user,customer_id,payment_card_id FROM filing_cabinet WHERE customer_id = :customer_id", + "parameters": {"customer_id": (1,)}, + } + ], "Assert instructional query that we will surface to the user" + + # Add manual filing cabinet data from the user privacy_request.cache_manual_input( CollectionAddress.from_string("manual_example:filing_cabinet"), [{"id": 1, "authorized_user": "Jane Doe", "payment_card_id": "pay_bbb-bbb"}], @@ -271,14 +297,17 @@ def test_no_manual_input_found( @pytest.mark.integration_postgres @pytest.mark.integration def test_collections_with_manual_erasure_confirmation( - policy, + db, + erasure_policy, integration_postgres_config, integration_manual_config, + privacy_request, ) -> None: """Run an erasure privacy request with two manual nodes""" - privacy_request = PrivacyRequest( - id=f"test_postgres_access_request_task_{uuid.uuid4()}" - ) + privacy_request.policy = erasure_policy + rule = erasure_policy.rules[0] + target = rule.targets[0] + target.data_category = "user.provided.identifiable" cached_data_for_erasures = { "postgres_example:payment_card": [ @@ -349,11 +378,12 @@ def test_collections_with_manual_erasure_confirmation( ], } - # ATTEMPT 1 - erasure request will pause to wait for confirmation that data has been destroyed from the filing cabinet + # ATTEMPT 1 - erasure request will pause to wait for confirmation that data has been destroyed from + # the filing cabinet with pytest.raises(PrivacyRequestPaused): graph_task.run_erasure( privacy_request, - policy, + erasure_policy, postgres_and_manual_nodes("postgres_example", "manual_example"), [integration_postgres_config, integration_manual_config], {"email": "customer-1@example.com"}, @@ -364,6 +394,16 @@ def test_collections_with_manual_erasure_confirmation( assert paused_node.value == "manual_example:filing_cabinet" assert request_type == PausedStep.erasure + cached_manual_queries = privacy_request.get_cached_queries( + PausedStep.erasure, CollectionAddress("manual_example", "filing_cabinet") + ) + assert cached_manual_queries == [ + { + "query": "UPDATE filing_cabinet SET authorized_user = :authorized_user WHERE id = :id", + "parameters": {"authorized_user": None, "id": 1}, + } + ] + # Mock confirming from user that there was no data in the filing cabinet privacy_request.cache_manual_erasure_count( CollectionAddress.from_string("manual_example:filing_cabinet"), @@ -374,7 +414,7 @@ def test_collections_with_manual_erasure_confirmation( with pytest.raises(PrivacyRequestPaused): graph_task.run_erasure( privacy_request, - policy, + erasure_policy, postgres_and_manual_nodes("postgres_example", "manual_example"), [integration_postgres_config, integration_manual_config], {"email": "customer-1@example.com"}, @@ -385,6 +425,16 @@ def test_collections_with_manual_erasure_confirmation( assert paused_node.value == "manual_example:storage_unit" assert request_type == PausedStep.erasure + cached_manual_queries = privacy_request.get_cached_queries( + PausedStep.erasure, CollectionAddress("manual_example", "storage_unit") + ) + assert cached_manual_queries == [ + { + "query": "UPDATE storage_unit SET email = :email WHERE box_id = :box_id", + "parameters": {"email": None, "box_id": 5}, + } + ] + # Mock confirming from user that storage unit erasure is complete privacy_request.cache_manual_erasure_count( CollectionAddress.from_string("manual_example:storage_unit"), 1 @@ -393,7 +443,7 @@ def test_collections_with_manual_erasure_confirmation( # Attempt 3 - We've confirmed data has been removed for manual nodes so we can proceed with the rest of the erasure v = graph_task.run_erasure( privacy_request, - policy, + erasure_policy, postgres_and_manual_nodes("postgres_example", "manual_example"), [integration_postgres_config, integration_manual_config], {"email": "customer-1@example.com"}, diff --git a/tests/task/traversal_data.py b/tests/task/traversal_data.py index 000ea1f47..2d4a233bd 100644 --- a/tests/task/traversal_data.py +++ b/tests/task/traversal_data.py @@ -523,7 +523,11 @@ def manual_dataset(db_name: str, postgres_db_name) -> Dataset: name="filing_cabinet", fields=[ ScalarField(name="id", primary_key=True, data_type_converter=int_converter), - ScalarField(name="authorized_user", data_type_converter=str_converter), + ScalarField( + name="authorized_user", + data_type_converter=str_converter, + data_categories=["user.provided.identifiable"], + ), ScalarField( name="customer_id", references=[(FieldAddress(postgres_db_name, "customer", "id"), "from")], @@ -543,7 +547,10 @@ def manual_dataset(db_name: str, postgres_db_name) -> Dataset: name="box_id", primary_key=True, data_type_converter=int_converter ), ScalarField( - name="email", identity="email", data_type_converter=str_converter + name="email", + identity="email", + data_type_converter=str_converter, + data_categories=["user.provided.identifiable"], ), ], ) From 03e30681e80aa567e5c3f1765be3fa62c5a71441 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Thu, 2 Jun 2022 18:38:55 -0500 Subject: [PATCH 02/10] Add unit tests asserting expected response for paused/failed privacy requests. --- data/dataset/manual_dataset.yml | 2 +- .../v1/endpoints/privacy_request_endpoints.py | 16 +- src/fidesops/models/privacy_request.py | 4 +- .../service/connectors/manual_connector.py | 4 +- .../test_privacy_request_endpoints.py | 137 ++++++++++++++++++ tests/integration_tests/test_manual_task.py | 19 +++ tests/models/test_privacy_request.py | 19 +++ 7 files changed, 191 insertions(+), 10 deletions(-) diff --git a/data/dataset/manual_dataset.yml b/data/dataset/manual_dataset.yml index c278575d9..16ac685db 100644 --- a/data/dataset/manual_dataset.yml +++ b/data/dataset/manual_dataset.yml @@ -14,7 +14,7 @@ dataset: fidesops_meta: data_type: string - name: customer_id - data_categories: [ user.provided.identifiable.unique_id ] + data_categories: [ user.provided.identifiable ] fidesops_meta: references: - dataset: postgres_example_test_dataset diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index 8732f9e56..5c8ee1dee 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -363,7 +363,7 @@ def _filter_privacy_request_queryset( def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: """ - Update a paused or errored privacy request with instructions about resuming. + Temporarily update a paused or errored privacy request object with instructions about how to resume manually. """ stopped_step: Optional[PausedStep] = None stopped_collection: Optional[CollectionAddress] = None @@ -375,17 +375,19 @@ def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: stopped_step, stopped_collection, ) = privacy_request.get_paused_step_and_collection() - cached_queries = privacy_request.get_cached_queries( - stopped_step, stopped_collection - ) if stopped_step: + # Graph is paused on a specific collection + cached_queries = privacy_request.get_cached_queries( + stopped_step, stopped_collection + ) resume_endpoint = ( PRIVACY_REQUEST_MANUAL_ERASURE if stopped_step == PausedStep.erasure else PRIVACY_REQUEST_MANUAL_INPUT ) else: + # Graph is paused on a pre-processing webhook resume_endpoint = PRIVACY_REQUEST_RESUME elif privacy_request.status == PrivacyRequestStatus.error: @@ -400,7 +402,11 @@ def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: stopped_collection.value if stopped_collection else None ) privacy_request.manual_queries = cached_queries - privacy_request.resume_endpoint = resume_endpoint + privacy_request.resume_endpoint = ( + resume_endpoint.format(privacy_request_id=privacy_request.id) + if resume_endpoint + else None + ) @router.get( diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index 1d03e96e6..efbf11526 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -322,13 +322,13 @@ def cache_queries( self, paused_step: PausedStep, paused_collection: CollectionAddress, - query_params: List[Dict[str, Any]], + query: List[Dict[str, Any]], ) -> None: """Cache the list of queries to be performed manually for a given paused step and collection""" cache: FidesopsRedis = get_cache() cache.set_encoded_object( f"PAUSED_QUERY__{paused_step.value}__{paused_collection}__{self.id}", - query_params, + query, ) def get_cached_queries( diff --git a/src/fidesops/service/connectors/manual_connector.py b/src/fidesops/service/connectors/manual_connector.py index 1890a839b..46da773ae 100644 --- a/src/fidesops/service/connectors/manual_connector.py +++ b/src/fidesops/service/connectors/manual_connector.py @@ -113,8 +113,8 @@ def mask_data( def format_cached_query(stmt: Optional[TextClause]) -> Optional[Dict[str, Any]]: """ - Format the SQLAlchemy TextClause for caching in Redis, for returning to the user the manual actions required on their - end. Store the query and the parameters separately. + Format the SQLAlchemy TextClause for caching in Redis, for describing to the user the manual actions required on their + part. Store the query and the parameters separately. For example: { diff --git a/tests/api/v1/endpoints/test_privacy_request_endpoints.py b/tests/api/v1/endpoints/test_privacy_request_endpoints.py index 199b3a4f6..61889e2cd 100644 --- a/tests/api/v1/endpoints/test_privacy_request_endpoints.py +++ b/tests/api/v1/endpoints/test_privacy_request_endpoints.py @@ -496,6 +496,10 @@ def test_get_privacy_requests_by_id( ).rules ], }, + "stopped_step": None, + "stopped_collection": None, + "manual_queries": [], + "resume_endpoint": None, } ], "total": 1, @@ -547,6 +551,10 @@ def test_get_privacy_requests_by_partial_id( ).rules ], }, + "stopped_step": None, + "stopped_collection": None, + "manual_queries": [], + "resume_endpoint": None, } ], "total": 1, @@ -850,6 +858,10 @@ def test_verbose_privacy_requests( ).rules ], }, + "stopped_step": None, + "stopped_collection": None, + "manual_queries": [], + "resume_endpoint": None, "results": { "my-mongo-db": [ { @@ -999,6 +1011,124 @@ def test_get_privacy_requests_csv_format( assert parse(first_row["Time approved/denied"], ignoretz=True) == reviewed_at assert first_row["Denial reason"] == "" + def test_get_paused_access_privacy_request_resume_info( + self, db, privacy_request, generate_auth_header, api_client, url + ): + # Mock the privacy request being in a paused state waiting for manual input to the "manual_collection" + privacy_request.status = PrivacyRequestStatus.paused + privacy_request.save(db) + paused_step = PausedStep.access + paused_collection = CollectionAddress("manual_dataset", "manual_collection") + privacy_request.cache_paused_step_and_collection(paused_step, paused_collection) + privacy_request.cache_queries( + paused_step, + paused_collection, + query=[ + { + "query": "SELECT authorized_user FROM manual_collection WHERE email = :email", + "parameters": {"email": ("customer-1@example.com",)}, + } + ], + ) + + auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ]) + response = api_client.get(url, headers=auth_header) + assert 200 == response.status_code + + data = response.json()["items"][0] + assert data["status"] == "paused" + assert data["stopped_step"] == "access" + assert data["stopped_collection"] == "manual_dataset:manual_collection" + assert data["manual_queries"] == [ + { + "query": "SELECT authorized_user FROM manual_collection WHERE email = :email", + "parameters": {"email": ["customer-1@example.com"]}, + } + ] + assert data["resume_endpoint"] == "/privacy-request/{}/manual_input".format( + privacy_request.id + ) + + def test_get_paused_erasure_privacy_request_resume_info( + self, db, privacy_request, generate_auth_header, api_client, url + ): + # Mock the privacy request being in a paused state waiting for manual erasure confirmation to the "another_collection" + privacy_request.status = PrivacyRequestStatus.paused + privacy_request.save(db) + paused_step = PausedStep.erasure + paused_collection = CollectionAddress("manual_dataset", "another_collection") + privacy_request.cache_paused_step_and_collection(paused_step, paused_collection) + privacy_request.cache_queries( + paused_step, + paused_collection, + query=[ + { + "query": "UPDATE manual_collection SET authorized_user = :authorized_user WHERE id = :id", + "parameters": {"authorized_user": "abcde_masked_user", "id": 32424}, + } + ], + ) + + auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ]) + response = api_client.get(url, headers=auth_header) + assert 200 == response.status_code + + data = response.json()["items"][0] + assert data["status"] == "paused" + assert data["stopped_step"] == "erasure" + assert data["stopped_collection"] == "manual_dataset:another_collection" + assert data["manual_queries"] == [ + { + "query": "UPDATE manual_collection SET authorized_user = :authorized_user WHERE id = :id", + "parameters": {"authorized_user": "abcde_masked_user", "id": 32424}, + } + ] + assert data["resume_endpoint"] == "/privacy-request/{}/erasure_confirm".format( + privacy_request.id + ) + + def test_get_paused_webhook_resume_info( + self, db, privacy_request, generate_auth_header, api_client, url + ): + privacy_request.status = PrivacyRequestStatus.paused + privacy_request.save(db) + + auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ]) + response = api_client.get(url, headers=auth_header) + assert 200 == response.status_code + + data = response.json()["items"][0] + assert data["status"] == "paused" + assert data["stopped_step"] is None + assert data["stopped_collection"] is None + assert data["manual_queries"] == [] # No manual data needed + assert data["resume_endpoint"] == "/privacy-request/{}/resume".format( + privacy_request.id + ) + + def test_get_failed_request_resume_info( + self, db, privacy_request, generate_auth_header, api_client, url + ): + # Mock the privacy request being in an errored state waiting for retry + privacy_request.status = PrivacyRequestStatus.error + privacy_request.save(db) + paused_step = PausedStep.erasure + paused_collection = CollectionAddress("manual_dataset", "another_collection") + privacy_request.cache_failed_step_and_collection(paused_step, paused_collection) + + auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ]) + response = api_client.get(url, headers=auth_header) + assert 200 == response.status_code + + data = response.json()["items"][0] + assert data["status"] == "error" + assert data["stopped_step"] == "erasure" + assert data["stopped_collection"] == "manual_dataset:another_collection" + assert data["manual_queries"] == [] # No manual data needed + assert data["resume_endpoint"] == "/privacy-request/{}/retry".format( + privacy_request.id + ) + class TestGetExecutionLogs: @pytest.fixture(scope="function") @@ -1639,6 +1769,10 @@ def test_resume_privacy_request( for rule in PolicyResponse.from_orm(privacy_request.policy).rules ], }, + "stopped_step": None, + "stopped_collection": None, + "manual_queries": [], + "resume_endpoint": None, } @@ -1987,3 +2121,6 @@ def test_restart_from_failure( assert privacy_request.status == PrivacyRequestStatus.in_processing submit_mock.assert_called_with(from_step=PausedStep.access) + + +# TODO when I get back add a test that caches data on a privacy request and make sure it's returned properly diff --git a/tests/integration_tests/test_manual_task.py b/tests/integration_tests/test_manual_task.py index ba5496cdb..0d94876f6 100644 --- a/tests/integration_tests/test_manual_task.py +++ b/tests/integration_tests/test_manual_task.py @@ -1,6 +1,8 @@ import uuid import pytest +from sqlalchemy import String, bindparam, text +from sqlalchemy.sql.elements import TextClause from fidesops.common_exceptions import PrivacyRequestPaused from fidesops.graph.config import CollectionAddress @@ -10,6 +12,7 @@ ExecutionLogStatus, PrivacyRequest, ) +from fidesops.service.connectors.manual_connector import format_cached_query from fidesops.task import graph_task from ..graph.graph_test_util import assert_rows_match @@ -458,3 +461,19 @@ def test_collections_with_manual_erasure_confirmation( "postgres_example:address": 0, "manual_example:filing_cabinet": 0, } + + +def test_format_cached_query(): + assert format_cached_query(None) is None + + stmt = text( + "SELECT id,street,state,phone from customer where customer_id=:customer_id" + ) + stmt = stmt.bindparams( + bindparam("customer_id", type_=String, value="12345"), + ) + + assert format_cached_query(stmt) == { + "query": "SELECT id,street,state,phone from customer where customer_id=:customer_id", + "parameters": {"customer_id": "12345"}, + } diff --git a/tests/models/test_privacy_request.py b/tests/models/test_privacy_request.py index ba0b124f0..5412422c2 100644 --- a/tests/models/test_privacy_request.py +++ b/tests/models/test_privacy_request.py @@ -456,3 +456,22 @@ def test_replace_failed_step_and_location(self, privacy_request): PausedStep.access, CollectionAddress("test", "test"), ) + + +class TestPrivacyRequestCacheQueries: + def test_cache_queries(self, privacy_request): + queries = { + "query": "UPDATE manual_collection SET authorized_user = :authorized_user WHERE id = :id", + "parameters": {"authorized_user": "abcde_masked_user", "id": 32424}, + } + privacy_request.cache_queries(PausedStep.access, paused_location, [queries]) + + assert privacy_request.get_cached_queries( + PausedStep.access, paused_location + ) == [queries] + + def test_get_queries_none_cached(self, privacy_request): + assert ( + privacy_request.get_cached_queries(PausedStep.access, paused_location) + is None + ) From 47f4ae79f71a5fb251a905ac1e5c18f23d88c1cd Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Mon, 6 Jun 2022 11:14:37 -0500 Subject: [PATCH 03/10] Refactor caching details about the collection that halted privacy request execution to store all details under the same key: the step, the collection, and any action needed to resume. - Add a ManualQueryConfig - Get rid of using the SQLQueryConfig to cache queries, instead opt to store these in a more generic way for later flexibility. --- .../v1/endpoints/privacy_request_endpoints.py | 63 ++++--- src/fidesops/models/privacy_request.py | 140 ++++++++------- src/fidesops/schemas/privacy_request.py | 17 +- .../service/connectors/http_connector.py | 13 +- .../service/connectors/manual_connector.py | 63 ++++--- .../service/connectors/query_config.py | 72 +++++++- src/fidesops/task/graph_task.py | 4 +- .../test_privacy_request_endpoints.py | 162 ++++++++++-------- .../test_integration_restart.py | 13 +- tests/integration_tests/test_manual_task.py | 124 +++++++------- tests/models/test_privacy_request.py | 82 ++++----- 11 files changed, 420 insertions(+), 333 deletions(-) diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index 5c8ee1dee..6c61aaafb 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -67,6 +67,7 @@ PrivacyRequestVerboseResponse, ReviewPrivacyRequestIds, RowCountRequest, + StoppedCollection, ) from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner from fidesops.service.privacy_request.request_service import ( @@ -363,27 +364,20 @@ def _filter_privacy_request_queryset( def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: """ - Temporarily update a paused or errored privacy request object with instructions about how to resume manually. + Temporarily update a paused or errored privacy request object with instructions from Redis cache + about how to resume manually if applicable. """ - stopped_step: Optional[PausedStep] = None - stopped_collection: Optional[CollectionAddress] = None resume_endpoint: Optional[str] = None - cached_queries: List[Dict[str, Any]] = [] + stopped_collection_details: Optional[StoppedCollection] = None if privacy_request.status == PrivacyRequestStatus.paused: - ( - stopped_step, - stopped_collection, - ) = privacy_request.get_paused_step_and_collection() + stopped_collection_details = privacy_request.get_paused_collection_details() - if stopped_step: + if stopped_collection_details: # Graph is paused on a specific collection - cached_queries = privacy_request.get_cached_queries( - stopped_step, stopped_collection - ) resume_endpoint = ( PRIVACY_REQUEST_MANUAL_ERASURE - if stopped_step == PausedStep.erasure + if stopped_collection_details.step == PausedStep.erasure else PRIVACY_REQUEST_MANUAL_INPUT ) else: @@ -391,17 +385,16 @@ def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: resume_endpoint = PRIVACY_REQUEST_RESUME elif privacy_request.status == PrivacyRequestStatus.error: - ( - stopped_step, - stopped_collection, - ) = privacy_request.get_failed_step_and_collection() + stopped_collection_details = privacy_request.get_failed_collection_details() resume_endpoint = PRIVACY_REQUEST_RETRY - privacy_request.stopped_step = stopped_step.value if stopped_step else None - privacy_request.stopped_collection = ( - stopped_collection.value if stopped_collection else None - ) - privacy_request.manual_queries = cached_queries + if stopped_collection_details: + stopped_collection_details.step = stopped_collection_details.step.value + stopped_collection_details.collection = ( + stopped_collection_details.collection.value + ) + + privacy_request.stopped_collection_details = stopped_collection_details privacy_request.resume_endpoint = ( resume_endpoint.format(privacy_request_id=privacy_request.id) if resume_endpoint @@ -667,15 +660,18 @@ def resume_privacy_request_with_manual_input( f"status = {privacy_request.status.value}. Privacy request is not paused.", ) - paused_step: Optional[PausedStep] - paused_collection: Optional[CollectionAddress] - paused_step, paused_collection = privacy_request.get_paused_step_and_collection() - if not paused_collection or not paused_step: + paused_details: Optional[ + StoppedCollection + ] = privacy_request.get_paused_collection_details() + if not paused_details: raise HTTPException( status_code=HTTP_400_BAD_REQUEST, - detail=f"Cannot resume privacy request '{privacy_request.id}'; no paused collection or no paused step.", + detail=f"Cannot resume privacy request '{privacy_request.id}'; no paused details.", ) + paused_step: PausedStep = paused_details.step + paused_collection: CollectionAddress = paused_details.collection + if paused_step != expected_paused_step: raise HTTPException( status_code=HTTP_400_BAD_REQUEST, @@ -804,15 +800,18 @@ def restart_privacy_request_from_failure( detail=f"Cannot restart privacy request from failure: privacy request '{privacy_request.id}' status = {privacy_request.status.value}.", ) - failed_step: Optional[PausedStep] - failed_collection: Optional[CollectionAddress] - failed_step, failed_collection = privacy_request.get_failed_step_and_collection() - if not failed_step or not failed_collection: + failed_details: Optional[ + StoppedCollection + ] = privacy_request.get_failed_collection_details() + if not failed_details: raise HTTPException( status_code=HTTP_400_BAD_REQUEST, detail=f"Cannot restart privacy request from failure '{privacy_request.id}'; no failed step or collection.", ) + failed_step: Optional[PausedStep] = failed_details.step + failed_collection: Optional[CollectionAddress] = failed_details.collection + logger.info( f"Restarting failed privacy request '{privacy_request_id}' from '{failed_step} step, 'collection '{failed_collection}'" ) @@ -825,7 +824,7 @@ def restart_privacy_request_from_failure( privacy_request=privacy_request, ).submit(from_step=failed_step) - privacy_request.cache_failed_step_and_collection() # Reset failed step and collection to None + privacy_request.cache_failed_collection_details() # Reset failed step and collection to None return privacy_request diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index efbf11526..9f4b17b25 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -3,7 +3,7 @@ import logging from datetime import datetime from enum import Enum as EnumType -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional from sqlalchemy import Column, DateTime from sqlalchemy import Enum as EnumColumn @@ -27,6 +27,7 @@ WebhookDirection, WebhookTypes, ) +from fidesops.schemas.base_class import BaseSchema from fidesops.schemas.drp_privacy_request import DrpPrivacyRequestCreate from fidesops.schemas.external_https import ( SecondPartyRequestFormat, @@ -50,6 +51,30 @@ logger = logging.getLogger(__name__) +class ManualAction(BaseSchema): + """Class to surface how to manually retrieve or mask data in a database-agnostic way + + "locators" are similar to the SQL "WHERE" information. + "get" contains a list of fields that should be retrieved from the source + "update" is a dictionary of fields and the value it should be replaced with. + """ + + locators: Dict[str, Any] + get: Optional[List[str]] + update: Optional[Dict[str, Any]] + + +class StoppedCollection(BaseSchema): + """Class that contains details about the collection that halted privacy request execution""" + + step: PausedStep + collection: CollectionAddress + action_needed: Optional[List[ManualAction]] = None + + class Config: + arbitrary_types_allowed = True + + class PrivacyRequestStatus(str, EnumType): """Enum for privacy request statuses, reflecting where they are in the Privacy Request Lifecycle""" @@ -229,50 +254,58 @@ def get_results(self) -> Dict[str, Any]: result_prefix = f"{self.id}__*" return cache.get_encoded_objects_by_prefix(result_prefix) - def cache_paused_step_and_collection( + def cache_paused_collection_details( self, - paused_step: Optional[PausedStep] = None, - paused_collection: Optional[CollectionAddress] = None, + step: Optional[PausedStep] = None, + collection: Optional[CollectionAddress] = None, + action_needed: Optional[List[ManualAction]] = None, ) -> None: """ - Cache both paused step (access or erasure) and paused collection + Cache details about the paused step, paused collection, and any action needed to resume the privacy request. """ - cache_step_and_collection( - paused_step, paused_collection, cache_key=f"PAUSED_LOCATION__{self.id}" + cache_restart_details( + cache_key=f"PAUSED_LOCATION__{self.id}", + step=step, + collection=collection, + action_needed=action_needed, ) - def get_paused_step_and_collection( + def get_paused_collection_details( self, - ) -> Tuple[Optional[PausedStep], Optional[CollectionAddress]]: - """Get both the paused step (access or erasure) and collection awaiting manual input for the given privacy request. + ) -> Optional[StoppedCollection]: + """Return details about the paused step, paused collection, and any action needed to resume the paused privacy request. The paused step lets us know if we should resume privacy request execution from the "access" or the "erasure" portion of the privacy request flow, and the collection tells us where we should cache manual input data for later use, In other words, this manual data belongs to this collection. """ - return get_step_and_collection(cached_key=f"EN_PAUSED_LOCATION__{self.id}") + return get_restart_details(cached_key=f"EN_PAUSED_LOCATION__{self.id}") - def cache_failed_step_and_collection( + def cache_failed_collection_details( self, - failed_step: Optional[PausedStep] = None, - failed_collection: Optional[CollectionAddress] = None, + step: Optional[PausedStep] = None, + collection: Optional[CollectionAddress] = None, ) -> None: """ - Cache both the failed step (access or erasure) and failed collection. + Cache details about the failed step and failed collection details. No specific input data is required to surface + a failed request, so action_needed is None. """ - cache_step_and_collection( - failed_step, failed_collection, cache_key=f"FAILED_LOCATION__{self.id}" + cache_restart_details( + cache_key=f"FAILED_LOCATION__{self.id}", + step=step, + collection=collection, + action_needed=None, ) - def get_failed_step_and_collection( + def get_failed_collection_details( self, - ) -> Tuple[Optional[PausedStep], Optional[CollectionAddress]]: - """Get both the failed step (access or erasure) and collection that triggered failure. + ) -> Optional[StoppedCollection]: + """Get details about the failed step (access or erasure) and collection that triggered failure. The failed step lets us know if we should resume privacy request execution from the "access" or the "erasure" portion of the privacy request flow. """ - return get_step_and_collection(cached_key=f"EN_FAILED_LOCATION__{self.id}") + return get_restart_details(cached_key=f"EN_FAILED_LOCATION__{self.id}") def cache_manual_input( self, collection: CollectionAddress, manual_rows: Optional[List[Row]] @@ -318,30 +351,6 @@ def get_manual_erasure_count(self, collection: CollectionAddress) -> Optional[in ) return list(value_dict.values())[0] if value_dict else None - def cache_queries( - self, - paused_step: PausedStep, - paused_collection: CollectionAddress, - query: List[Dict[str, Any]], - ) -> None: - """Cache the list of queries to be performed manually for a given paused step and collection""" - cache: FidesopsRedis = get_cache() - cache.set_encoded_object( - f"PAUSED_QUERY__{paused_step.value}__{paused_collection}__{self.id}", - query, - ) - - def get_cached_queries( - self, paused_step: PausedStep, paused_collection: CollectionAddress - ) -> Optional[List[Dict[str, Any]]]: - """Get the list of queries to be performed manually for a given paused step and collection""" - cache: FidesopsRedis = get_cache() - prefix = f"PAUSED_QUERY__{paused_step.value}__{paused_collection}__{self.id}" - value_dict: Optional[ - Dict[str, Optional[List[Dict[str, Any]]]] - ] = cache.get_encoded_objects_by_prefix(prefix) - return list(value_dict.values())[0] if value_dict else None - def trigger_policy_webhook(self, webhook: WebhookTypes) -> None: """Trigger a request to a single customer-defined policy webhook. Raises an exception if webhook response should cause privacy request execution to stop. @@ -420,44 +429,47 @@ def error_processing(self, db: Session) -> None: PAUSED_SEPARATOR = "__fidesops_paused_sep__" -def cache_step_and_collection( - step: Optional[PausedStep], collection: Optional[CollectionAddress], cache_key: str +def cache_restart_details( + cache_key: str, + step: Optional[PausedStep] = None, + collection: Optional[CollectionAddress] = None, + action_needed: Optional[List[ManualAction]] = None, ) -> None: - """Generic method to cache which collection and which step (access/erasure) we're stopped on + """Generic method to cache which collection and step(access/erasure) halted PrivacyRequest execution, + as well as data (action_needed) to resume the privacy request if applicable. - For example, we might pause a privacy request at the erasure step of the postgres_example:address collection or - the request might have failed at the access step of the mongo_example:payment_card collection. + For example, we might pause a privacy request at the access step of the postgres_example:address collection. The + user might need to retrieve an "email" field and an "address" field where the customer_id is 22 to resume the request. The "step" is needed because we may build and execute multiple graphs as part of running a privacy request. An erasure request builds two graphs, one to access the data, and the second to mask it. We need to know if we should resume execution from the access or the erasure portion of the request. """ cache: FidesopsRedis = get_cache() + cache_stopped: Optional[StoppedCollection] = None + if collection and step: + cache_stopped = StoppedCollection( + step=step, collection=collection, action_needed=action_needed + ) - # Store both the step and collection under one value cache.set_encoded_object( cache_key, - f"{step.value}{PAUSED_SEPARATOR}{collection.value}" - if step and collection - else None, + cache_stopped.dict() if cache_stopped else None, ) -def get_step_and_collection( +def get_restart_details( cached_key: str, -) -> Tuple[Optional[PausedStep], Optional[CollectionAddress]]: - """Get both the stopped step (access or erasure) and collection where we've stopped the privacy request. +) -> Optional[StoppedCollection]: + """Get details about the collection that halted privacy request execution and details to resume if applicable. The "step" lets us know if we should resume privacy request execution from the "access" or the "erasure" - portion of the privacy request flow, and the collection lets us know which node we stopped on. + portion of the privacy request flow, and the collection lets us know which node we stopped on. "action_needed" + describes actions that need to be manually performed before resuming request. """ cache: FidesopsRedis = get_cache() - node_addr: str = cache.get_encoded_by_key(cached_key) - - if node_addr: - split_addr = node_addr.split(PAUSED_SEPARATOR) - return PausedStep(split_addr[0]), CollectionAddress.from_string(split_addr[1]) - return None, None # If no cached data, return a tuple of Nones + cached_stopped: Optional[StoppedCollection] = cache.get_encoded_by_key(cached_key) + return StoppedCollection.parse_obj(cached_stopped) if cached_stopped else None class ExecutionLogStatus(EnumType): diff --git a/src/fidesops/schemas/privacy_request.py b/src/fidesops/schemas/privacy_request.py index 3c8ce8106..30eec068c 100644 --- a/src/fidesops/schemas/privacy_request.py +++ b/src/fidesops/schemas/privacy_request.py @@ -1,12 +1,16 @@ from datetime import datetime from enum import Enum as EnumType -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional from pydantic import Field, validator from fidesops.core.config import config from fidesops.models.policy import ActionType -from fidesops.models.privacy_request import ExecutionLogStatus, PrivacyRequestStatus +from fidesops.models.privacy_request import ( + ExecutionLogStatus, + PrivacyRequestStatus, + StoppedCollection, +) from fidesops.schemas.api import BulkResponse, BulkUpdateFailed from fidesops.schemas.base_class import BaseSchema from fidesops.schemas.policy import PolicyResponse as PolicySchema @@ -109,6 +113,11 @@ class RowCountRequest(BaseSchema): row_count: int +class StoppedCollectionDetails(StoppedCollection): + + collection: Optional[str] = None + + class PrivacyRequestResponse(BaseSchema): """Schema to check the status of a PrivacyRequest""" @@ -127,9 +136,7 @@ class PrivacyRequestResponse(BaseSchema): # creation. identity: Optional[Dict[str, str]] policy: PolicySchema - stopped_step: Optional[str] - stopped_collection: Optional[str] - manual_queries: List[Dict[str, Any]] = [] + stopped_collection_details: Optional[StoppedCollectionDetails] = None resume_endpoint: Optional[str] class Config: diff --git a/src/fidesops/service/connectors/http_connector.py b/src/fidesops/service/connectors/http_connector.py index db169f710..157eb63b4 100644 --- a/src/fidesops/service/connectors/http_connector.py +++ b/src/fidesops/service/connectors/http_connector.py @@ -78,13 +78,7 @@ def retrieve_data( privacy_request: PrivacyRequest, input_data: Dict[str, List[Any]], ) -> List[Row]: - """Retrieve data in a connector dependent way based on input data. - - The input data is expected to include a key and list of values for - each input key that may be queried on. - - TODO: implement when HTTPS Connectors can be part of the traversal. - """ + """Currently not supported as webhooks are not called at the collection level""" def mask_data( self, @@ -93,10 +87,7 @@ def mask_data( privacy_request: PrivacyRequest, rows: List[Row], ) -> int: - """Execute a masking request. Return the number of rows that have been updated - - TODO: implement when HTTPS Connectors can be part of the traversal. - """ + """Currently not supported as webhooks are not called at the collection level""" def create_client(self) -> None: """Not required for this type""" diff --git a/src/fidesops/service/connectors/manual_connector.py b/src/fidesops/service/connectors/manual_connector.py index 46da773ae..930df39e6 100644 --- a/src/fidesops/service/connectors/manual_connector.py +++ b/src/fidesops/service/connectors/manual_connector.py @@ -6,18 +6,21 @@ from fidesops.common_exceptions import PrivacyRequestPaused from fidesops.graph.traversal import TraversalNode from fidesops.models.policy import PausedStep, Policy -from fidesops.models.privacy_request import PrivacyRequest +from fidesops.models.privacy_request import ManualAction, PrivacyRequest from fidesops.service.connectors.base_connector import BaseConnector -from fidesops.service.connectors.query_config import SQLQueryConfig +from fidesops.service.connectors.query_config import ManualQueryConfig from fidesops.util.collection_util import Row logger = logging.getLogger(__name__) class ManualConnector(BaseConnector[None]): - def query_config(self, node: TraversalNode) -> SQLQueryConfig: - """No query_config for the Manual Connector""" - return SQLQueryConfig(node) + def query_config(self, node: TraversalNode) -> ManualQueryConfig: + """ + The ManualQueryConfig generates instructions for the user to retrieve and mask + data manually. + """ + return ManualQueryConfig(node) def create_client(self) -> None: """Not needed because this connector involves a human performing some lookup step""" @@ -41,26 +44,25 @@ def retrieve_data( """ Returns manually added data for the given collection if it exists, otherwise pauses the Privacy Request. - Caches a SQL query as manual instructions for the user to fulfill a manual request. + On the event that we pause, caches the stopped step, stopped collection, and details needed to manually resume + the privacy request. """ cached_results: Optional[List[Row]] = privacy_request.get_manual_input( node.address ) if cached_results is not None: # None comparison intentional - privacy_request.cache_paused_step_and_collection() # Caches paused location as None + privacy_request.cache_paused_collection_details() # Resets paused details to None return cached_results - # Save the step (access) and collection where we're paused. - privacy_request.cache_paused_step_and_collection( - PausedStep.access, node.address - ) - query_config = self.query_config(node) - stmt: Optional[TextClause] = query_config.generate_query(input_data, policy) - cached_query: Optional[Dict[str, Any]] = format_cached_query(stmt) - privacy_request.cache_queries( - PausedStep.access, node.address, [cached_query] if cached_query else [] + action_needed: Optional[ManualAction] = query_config.generate_query( + input_data, policy + ) + privacy_request.cache_paused_collection_details( + step=PausedStep.access, + collection=node.address, + action_needed=[action_needed] if action_needed else None, ) raise PrivacyRequestPaused( @@ -77,34 +79,31 @@ def mask_data( """If erasure confirmation has been added to the manual cache, continue, otherwise, pause and wait for manual input. - Caches a SQL query as manual instructions for the user to fulfill the manual erasure request. + On the event that we pause, caches the stopped step, stopped collection, and details needed to manually resume + the privacy request. """ manual_cached_count: Optional[int] = privacy_request.get_manual_erasure_count( node.address ) if manual_cached_count is not None: # None comparison intentional - privacy_request.cache_paused_step_and_collection() # Caches paused location as None + privacy_request.cache_paused_collection_details() # Resets paused details to None return manual_cached_count - privacy_request.cache_paused_step_and_collection( - PausedStep.erasure, node.address - ) - - query_config = self.query_config(node) - cached_queries: List[Dict[str, Any]] = [] + query_config: ManualQueryConfig = self.query_config(node) + action_needed: List[ManualAction] = [] for row in rows: - # Cache a manual update query to surface to the user - update_stmt: Optional[TextClause] = query_config.generate_update_stmt( + action: Optional[ManualAction] = query_config.generate_update_stmt( row, policy, privacy_request ) - formatted_update_stmt: Optional[Dict[str, Any]] = format_cached_query( - update_stmt - ) - if formatted_update_stmt: - cached_queries.append(formatted_update_stmt) + if action: + action_needed.append(action) - privacy_request.cache_queries(PausedStep.erasure, node.address, cached_queries) + privacy_request.cache_paused_collection_details( + step=PausedStep.erasure, + collection=node.address, + action_needed=action_needed if action_needed else None, + ) raise PrivacyRequestPaused( f"Collection '{node.address.value}' waiting on manual erasure confirmation for privacy request '{privacy_request.id}'" diff --git a/src/fidesops/service/connectors/query_config.py b/src/fidesops/service/connectors/query_config.py index baeccef0c..bcbe933cb 100644 --- a/src/fidesops/service/connectors/query_config.py +++ b/src/fidesops/service/connectors/query_config.py @@ -18,7 +18,7 @@ ) from fidesops.graph.traversal import Row, TraversalNode from fidesops.models.policy import ActionType, Policy, Rule -from fidesops.models.privacy_request import PrivacyRequest +from fidesops.models.privacy_request import ManualAction, PrivacyRequest from fidesops.service.masking.strategy.masking_strategy import MaskingStrategy from fidesops.service.masking.strategy.masking_strategy_factory import ( MaskingStrategyFactory, @@ -256,6 +256,76 @@ def generate_update_stmt( returns None""" +class ManualQueryConfig(QueryConfig[Executable]): + def generate_query( + self, input_data: Dict[str, List[Any]], policy: Optional[Policy] + ) -> Optional[ManualAction]: + """Describe the details needed to manually retrieve data from the + current collection. + + Example: + { + "step": "access", + "collection": "manual_dataset:manual_collection", + "action_needed": [ + { + "locators": {'email': "customer-1@example.com"}, + "get": ["id", "box_id"] + "update": {} + } + ] + } + + """ + + locators: Dict[str, Any] = self.node.typed_filtered_values(input_data) + get: List[str] = [ + field_path.string_path + for field_path in self.node.node.collection.top_level_field_dict + ] + + if get and locators: + return ManualAction(locators=locators, get=get, update=None) + return None + + def query_to_str(self, t: T, input_data: Dict[str, List[Any]]) -> str: + """Convert query to string""" + + def dry_run_query(self) -> Optional[str]: + """dry run query for display""" + + def generate_update_stmt( + self, row: Row, policy: Policy, request: PrivacyRequest + ) -> Optional[ManualAction]: + """Describe the details needed to manually mask data in the + current collection. + + Example: + { + "step": "erasure", + "collection": "manual_dataset:manual_collection", + "action_needed": [ + { + "locators": {'id': 1}, + "get": [] + "update": {'authorized_user': None} + } + ] + } + """ + locators: Dict[str, Any] = filter_nonempty_values( + { + field_path.string_path: field.cast(row[field_path.string_path]) + for field_path, field in self.primary_key_field_paths.items() + } + ) + update_stmt: Dict[str, Any] = self.update_value_map(row, policy, request) + + if update_stmt and locators: + return ManualAction(locators=locators, get=None, update=update_stmt) + return None + + class SQLQueryConfig(QueryConfig[Executable]): """Query config that translates parameters into SQL statements.""" diff --git a/src/fidesops/task/graph_task.py b/src/fidesops/task/graph_task.py index b571e3503..6c974fa9b 100644 --- a/src/fidesops/task/graph_task.py +++ b/src/fidesops/task/graph_task.py @@ -90,8 +90,8 @@ def result(*args: Any, **kwargs: Any) -> List[Optional[Row]]: raised_ex = ex self.log_end(action_type, raised_ex) - self.resources.request.cache_failed_step_and_collection( - failed_step=action_type, failed_collection=self.traversal_node.address + self.resources.request.cache_failed_collection_details( + step=action_type, collection=self.traversal_node.address ) # Re-raise to stop privacy request execution on failure. raise raised_ex diff --git a/tests/api/v1/endpoints/test_privacy_request_endpoints.py b/tests/api/v1/endpoints/test_privacy_request_endpoints.py index 61889e2cd..150d5aa5e 100644 --- a/tests/api/v1/endpoints/test_privacy_request_endpoints.py +++ b/tests/api/v1/endpoints/test_privacy_request_endpoints.py @@ -45,8 +45,10 @@ from fidesops.models.privacy_request import ( ExecutionLog, ExecutionLogStatus, + ManualAction, PrivacyRequest, PrivacyRequestStatus, + StoppedCollection, ) from fidesops.schemas.dataset import DryRunDatasetResponse from fidesops.schemas.jwt import ( @@ -496,9 +498,7 @@ def test_get_privacy_requests_by_id( ).rules ], }, - "stopped_step": None, - "stopped_collection": None, - "manual_queries": [], + "stopped_collection_details": None, "resume_endpoint": None, } ], @@ -551,9 +551,7 @@ def test_get_privacy_requests_by_partial_id( ).rules ], }, - "stopped_step": None, - "stopped_collection": None, - "manual_queries": [], + "stopped_collection_details": None, "resume_endpoint": None, } ], @@ -858,9 +856,7 @@ def test_verbose_privacy_requests( ).rules ], }, - "stopped_step": None, - "stopped_collection": None, - "manual_queries": [], + "stopped_collection_details": None, "resume_endpoint": None, "results": { "my-mongo-db": [ @@ -1019,15 +1015,15 @@ def test_get_paused_access_privacy_request_resume_info( privacy_request.save(db) paused_step = PausedStep.access paused_collection = CollectionAddress("manual_dataset", "manual_collection") - privacy_request.cache_paused_step_and_collection(paused_step, paused_collection) - privacy_request.cache_queries( - paused_step, - paused_collection, - query=[ - { - "query": "SELECT authorized_user FROM manual_collection WHERE email = :email", - "parameters": {"email": ("customer-1@example.com",)}, - } + privacy_request.cache_paused_collection_details( + step=paused_step, + collection=paused_collection, + action_needed=[ + ManualAction( + locators={"email": ["customer-1@example.com"]}, + get=["authorized_user"], + update=None, + ) ], ) @@ -1037,14 +1033,17 @@ def test_get_paused_access_privacy_request_resume_info( data = response.json()["items"][0] assert data["status"] == "paused" - assert data["stopped_step"] == "access" - assert data["stopped_collection"] == "manual_dataset:manual_collection" - assert data["manual_queries"] == [ - { - "query": "SELECT authorized_user FROM manual_collection WHERE email = :email", - "parameters": {"email": ["customer-1@example.com"]}, - } - ] + assert data["stopped_collection_details"] == { + "step": "access", + "collection": "manual_dataset:manual_collection", + "action_needed": [ + { + "locators": {"email": ["customer-1@example.com"]}, + "get": ["authorized_user"], + "update": None, + } + ], + } assert data["resume_endpoint"] == "/privacy-request/{}/manual_input".format( privacy_request.id ) @@ -1057,15 +1056,15 @@ def test_get_paused_erasure_privacy_request_resume_info( privacy_request.save(db) paused_step = PausedStep.erasure paused_collection = CollectionAddress("manual_dataset", "another_collection") - privacy_request.cache_paused_step_and_collection(paused_step, paused_collection) - privacy_request.cache_queries( - paused_step, - paused_collection, - query=[ - { - "query": "UPDATE manual_collection SET authorized_user = :authorized_user WHERE id = :id", - "parameters": {"authorized_user": "abcde_masked_user", "id": 32424}, - } + privacy_request.cache_paused_collection_details( + step=paused_step, + collection=paused_collection, + action_needed=[ + ManualAction( + locators={"id": [32424]}, + get=None, + update={"authorized_user": "abcde_masked_user"}, + ) ], ) @@ -1075,14 +1074,17 @@ def test_get_paused_erasure_privacy_request_resume_info( data = response.json()["items"][0] assert data["status"] == "paused" - assert data["stopped_step"] == "erasure" - assert data["stopped_collection"] == "manual_dataset:another_collection" - assert data["manual_queries"] == [ - { - "query": "UPDATE manual_collection SET authorized_user = :authorized_user WHERE id = :id", - "parameters": {"authorized_user": "abcde_masked_user", "id": 32424}, - } - ] + assert data["stopped_collection_details"] == { + "step": "erasure", + "collection": "manual_dataset:another_collection", + "action_needed": [ + { + "locators": {"id": [32424]}, + "get": None, + "update": {"authorized_user": "abcde_masked_user"}, + } + ], + } assert data["resume_endpoint"] == "/privacy-request/{}/erasure_confirm".format( privacy_request.id ) @@ -1099,9 +1101,7 @@ def test_get_paused_webhook_resume_info( data = response.json()["items"][0] assert data["status"] == "paused" - assert data["stopped_step"] is None - assert data["stopped_collection"] is None - assert data["manual_queries"] == [] # No manual data needed + assert data["stopped_collection_details"] is None assert data["resume_endpoint"] == "/privacy-request/{}/resume".format( privacy_request.id ) @@ -1112,9 +1112,10 @@ def test_get_failed_request_resume_info( # Mock the privacy request being in an errored state waiting for retry privacy_request.status = PrivacyRequestStatus.error privacy_request.save(db) - paused_step = PausedStep.erasure - paused_collection = CollectionAddress("manual_dataset", "another_collection") - privacy_request.cache_failed_step_and_collection(paused_step, paused_collection) + privacy_request.cache_failed_collection_details( + step=PausedStep.erasure, + collection=CollectionAddress("manual_example", "another_collection"), + ) auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_READ]) response = api_client.get(url, headers=auth_header) @@ -1122,9 +1123,11 @@ def test_get_failed_request_resume_info( data = response.json()["items"][0] assert data["status"] == "error" - assert data["stopped_step"] == "erasure" - assert data["stopped_collection"] == "manual_dataset:another_collection" - assert data["manual_queries"] == [] # No manual data needed + assert data["stopped_collection_details"] == { + "step": "erasure", + "collection": "manual_example:another_collection", + "action_needed": None, + } assert data["resume_endpoint"] == "/privacy-request/{}/retry".format( privacy_request.id ) @@ -1769,9 +1772,7 @@ def test_resume_privacy_request( for rule in PolicyResponse.from_orm(privacy_request.policy).rules ], }, - "stopped_step": None, - "stopped_collection": None, - "manual_queries": [], + "stopped_collection_details": None, "resume_endpoint": None, } @@ -1815,7 +1816,7 @@ def test_manual_resume_privacy_request_no_paused_location( assert response.status_code == 400 assert ( response.json()["detail"] - == f"Cannot resume privacy request '{privacy_request.id}'; no paused collection or no paused step." + == f"Cannot resume privacy request '{privacy_request.id}'; no paused details." ) def test_resume_with_manual_input_collection_has_changed( @@ -1826,9 +1827,11 @@ def test_resume_with_manual_input_collection_has_changed( privacy_request.status = PrivacyRequestStatus.paused privacy_request.save(db) - privacy_request.cache_paused_step_and_collection( - PausedStep.access, CollectionAddress("manual_example", "filing_cabinet") + privacy_request.cache_paused_collection_details( + step=PausedStep.access, + collection=CollectionAddress("manual_example", "filing_cabinet"), ) + response = api_client.post(url, headers=auth_header, json=[{"mock": "row"}]) assert response.status_code == 422 assert ( @@ -1852,9 +1855,11 @@ def test_resume_with_manual_input_invalid_data( privacy_request.status = PrivacyRequestStatus.paused privacy_request.save(db) - privacy_request.cache_paused_step_and_collection( - PausedStep.access, CollectionAddress("manual_input", "filing_cabinet") + privacy_request.cache_paused_collection_details( + step=PausedStep.access, + collection=CollectionAddress("manual_input", "filing_cabinet"), ) + response = api_client.post(url, headers=auth_header, json=[{"mock": "row"}]) assert response.status_code == 422 assert ( @@ -1862,11 +1867,15 @@ def test_resume_with_manual_input_invalid_data( == "Cannot save manual rows. No 'mock' field defined on the 'manual_input:filing_cabinet' collection." ) + @mock.patch( + "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + ) @pytest.mark.usefixtures( "postgres_example_test_dataset_config", "manual_dataset_config" ) def test_resume_with_manual_input( self, + _, db, api_client, url, @@ -1877,9 +1886,11 @@ def test_resume_with_manual_input( privacy_request.status = PrivacyRequestStatus.paused privacy_request.save(db) - privacy_request.cache_paused_step_and_collection( - PausedStep.access, CollectionAddress("manual_input", "filing_cabinet") + privacy_request.cache_paused_collection_details( + step=PausedStep.access, + collection=CollectionAddress("manual_input", "filing_cabinet"), ) + response = api_client.post( url, headers=auth_header, @@ -1983,7 +1994,7 @@ def test_manual_resume_privacy_request_no_paused_location( assert response.status_code == 400 assert ( response.json()["detail"] - == f"Cannot resume privacy request '{privacy_request.id}'; no paused collection or no paused step." + == f"Cannot resume privacy request '{privacy_request.id}'; no paused details." ) def test_resume_with_manual_erasure_confirmation_collection_has_changed( @@ -1994,9 +2005,11 @@ def test_resume_with_manual_erasure_confirmation_collection_has_changed( privacy_request.status = PrivacyRequestStatus.paused privacy_request.save(db) - privacy_request.cache_paused_step_and_collection( - PausedStep.erasure, CollectionAddress("manual_example", "filing_cabinet") + privacy_request.cache_paused_collection_details( + step=PausedStep.erasure, + collection=CollectionAddress("manual_example", "filing_cabinet"), ) + response = api_client.post(url, headers=auth_header, json={"row_count": 0}) assert response.status_code == 422 assert ( @@ -2012,8 +2025,9 @@ def test_resume_still_paused_at_access_request( privacy_request.status = PrivacyRequestStatus.paused privacy_request.save(db) - privacy_request.cache_paused_step_and_collection( - PausedStep.access, CollectionAddress("manual_example", "filing_cabinet") + privacy_request.cache_paused_collection_details( + step=PausedStep.access, + collection=CollectionAddress("manual_example", "filing_cabinet"), ) response = api_client.post(url, headers=auth_header, json={"row_count": 0}) assert response.status_code == 400 @@ -2042,8 +2056,9 @@ def test_resume_with_manual_count( privacy_request.status = PrivacyRequestStatus.paused privacy_request.save(db) - privacy_request.cache_paused_step_and_collection( - PausedStep.erasure, CollectionAddress("manual_input", "filing_cabinet") + privacy_request.cache_paused_collection_details( + step=PausedStep.erasure, + collection=CollectionAddress("manual_input", "filing_cabinet"), ) response = api_client.post( url, @@ -2110,8 +2125,10 @@ def test_restart_from_failure( auth_header = generate_auth_header(scopes=[PRIVACY_REQUEST_CALLBACK_RESUME]) privacy_request.status = PrivacyRequestStatus.error privacy_request.save(db) - privacy_request.cache_failed_step_and_collection( - PausedStep.access, CollectionAddress("test_dataset", "test_collection") + + privacy_request.cache_failed_collection_details( + step=PausedStep.access, + collection=CollectionAddress("test_dataset", "test_collection"), ) response = api_client.post(url, headers=auth_header) @@ -2121,6 +2138,3 @@ def test_restart_from_failure( assert privacy_request.status == PrivacyRequestStatus.in_processing submit_mock.assert_called_with(from_step=PausedStep.access) - - -# TODO when I get back add a test that caches data on a privacy request and make sure it's returned properly diff --git a/tests/integration_tests/test_integration_restart.py b/tests/integration_tests/test_integration_restart.py index d792ec735..32d6837cf 100644 --- a/tests/integration_tests/test_integration_restart.py +++ b/tests/integration_tests/test_integration_restart.py @@ -7,7 +7,11 @@ from fidesops.graph.graph import DatasetGraph from fidesops.models.datasetconfig import convert_dataset_to_graph from fidesops.models.policy import PausedStep -from fidesops.models.privacy_request import ExecutionLog, PrivacyRequest +from fidesops.models.privacy_request import ( + ExecutionLog, + PrivacyRequest, + StoppedCollection, +) from fidesops.schemas.dataset import FidesopsDataset from fidesops.task import graph_task @@ -77,10 +81,9 @@ def test_restart_graph_from_failure( ("mongo_test:customer_details", "in_processing"), ("mongo_test:customer_details", "error"), ] - - assert privacy_request.get_failed_step_and_collection() == ( - PausedStep.access, - CollectionAddress("mongo_test", "customer_details"), + assert privacy_request.get_failed_collection_details() == StoppedCollection( + step=PausedStep.access, + collection=CollectionAddress("mongo_test", "customer_details"), ) # Reset secrets diff --git a/tests/integration_tests/test_manual_task.py b/tests/integration_tests/test_manual_task.py index 0d94876f6..5086993c6 100644 --- a/tests/integration_tests/test_manual_task.py +++ b/tests/integration_tests/test_manual_task.py @@ -2,7 +2,6 @@ import pytest from sqlalchemy import String, bindparam, text -from sqlalchemy.sql.elements import TextClause from fidesops.common_exceptions import PrivacyRequestPaused from fidesops.graph.config import CollectionAddress @@ -43,20 +42,19 @@ def test_postgres_with_manual_input_access_request_task( {"email": "customer-1@example.com"}, ) - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert paused_node.value == "manual_example:storage_unit" - assert request_type == PausedStep.access - - cached_queries = privacy_request.get_cached_queries( - PausedStep.access, CollectionAddress("manual_example", "storage_unit") + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.collection == CollectionAddress( + "manual_example", "storage_unit" ) - # Assert instructional query we'll surface to the user - assert cached_queries == [ - { - "query": "SELECT box_id,email FROM storage_unit WHERE email = :email", - "parameters": {"email": ("customer-1@example.com",)}, - } - ] + assert paused_details.step == PausedStep.access + assert len(paused_details.action_needed) == 1 + + assert paused_details.action_needed[0].locators == { + "email": ["customer-1@example.com"] + } + + assert paused_details.action_needed[0].get == ["box_id", "email"] + assert paused_details.action_needed[0].update is None # Mock user retrieving storage unit data by adding manual data to cache privacy_request.cache_manual_input( @@ -74,19 +72,21 @@ def test_postgres_with_manual_input_access_request_task( {"email": "customer-1@example.com"}, ) - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert paused_node.value == "manual_example:filing_cabinet" - assert request_type == PausedStep.access - - cached_queries = privacy_request.get_cached_queries( - PausedStep.access, CollectionAddress("manual_example", "filing_cabinet") + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.collection == CollectionAddress( + "manual_example", "filing_cabinet" ) - assert cached_queries == [ - { - "query": "SELECT id,authorized_user,customer_id,payment_card_id FROM filing_cabinet WHERE customer_id = :customer_id", - "parameters": {"customer_id": (1,)}, - } - ], "Assert instructional query that we will surface to the user" + assert paused_details.step == PausedStep.access + assert len(paused_details.action_needed) == 1 + assert paused_details.action_needed[0].locators == {"customer_id": [1]} + + assert paused_details.action_needed[0].get == [ + "id", + "authorized_user", + "customer_id", + "payment_card_id", + ] + assert paused_details.action_needed[0].update is None # Add manual filing cabinet data from the user privacy_request.cache_manual_input( @@ -142,10 +142,9 @@ def test_postgres_with_manual_input_access_request_task( keys=["city", "id", "state", "street", "zip"], ) - # Paused node removed from cache - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert request_type is None - assert paused_node is None + # Paused details removed from cache + paused_details = privacy_request.get_paused_collection_details() + assert paused_details is None execution_logs = db.query(ExecutionLog).filter_by( privacy_request_id=privacy_request.id @@ -237,9 +236,11 @@ def test_no_manual_input_found( {"email": "customer-1@example.com"}, ) - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert request_type == PausedStep.access - assert paused_node.value == "manual_example:storage_unit" + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.collection == CollectionAddress( + "manual_example", "storage_unit" + ) + assert paused_details.step == PausedStep.access # Mock user retrieving storage unit data by adding manual data to cache, # In this case, no data was found in the storage unit, so we pass in an empty list. @@ -258,6 +259,12 @@ def test_no_manual_input_found( {"email": "customer-1@example.com"}, ) + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.collection == CollectionAddress( + "manual_example", "filing_cabinet" + ) + assert paused_details.step == PausedStep.access + # No filing cabinet input found privacy_request.cache_manual_input( CollectionAddress.from_string("manual_example:filing_cabinet"), @@ -292,9 +299,8 @@ def test_no_manual_input_found( ) # Paused node removed from cache - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert request_type is None - assert paused_node is None + paused_details = privacy_request.get_paused_collection_details() + assert paused_details is None @pytest.mark.integration_postgres @@ -393,19 +399,17 @@ def test_collections_with_manual_erasure_confirmation( cached_data_for_erasures, ) - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert paused_node.value == "manual_example:filing_cabinet" - assert request_type == PausedStep.erasure - - cached_manual_queries = privacy_request.get_cached_queries( - PausedStep.erasure, CollectionAddress("manual_example", "filing_cabinet") + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.collection == CollectionAddress( + "manual_example", "filing_cabinet" ) - assert cached_manual_queries == [ - { - "query": "UPDATE filing_cabinet SET authorized_user = :authorized_user WHERE id = :id", - "parameters": {"authorized_user": None, "id": 1}, - } - ] + assert paused_details.step == PausedStep.erasure + assert len(paused_details.action_needed) == 1 + + assert paused_details.action_needed[0].locators == {"id": 1} + + assert paused_details.action_needed[0].get is None + assert paused_details.action_needed[0].update == {"authorized_user": None} # Mock confirming from user that there was no data in the filing cabinet privacy_request.cache_manual_erasure_count( @@ -424,19 +428,17 @@ def test_collections_with_manual_erasure_confirmation( cached_data_for_erasures, ) - request_type, paused_node = privacy_request.get_paused_step_and_collection() - assert paused_node.value == "manual_example:storage_unit" - assert request_type == PausedStep.erasure - - cached_manual_queries = privacy_request.get_cached_queries( - PausedStep.erasure, CollectionAddress("manual_example", "storage_unit") + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.collection == CollectionAddress( + "manual_example", "storage_unit" ) - assert cached_manual_queries == [ - { - "query": "UPDATE storage_unit SET email = :email WHERE box_id = :box_id", - "parameters": {"email": None, "box_id": 5}, - } - ] + assert paused_details.step == PausedStep.erasure + assert len(paused_details.action_needed) == 1 + + assert paused_details.action_needed[0].locators == {"box_id": 5} + + assert paused_details.action_needed[0].get is None + assert paused_details.action_needed[0].update == {"email": None} # Mock confirming from user that storage unit erasure is complete privacy_request.cache_manual_erasure_count( @@ -462,6 +464,8 @@ def test_collections_with_manual_erasure_confirmation( "manual_example:filing_cabinet": 0, } + assert privacy_request.get_paused_collection_details() is None + def test_format_cached_query(): assert format_cached_query(None) is None diff --git a/tests/models/test_privacy_request.py b/tests/models/test_privacy_request.py index 5412422c2..bbeaf827a 100644 --- a/tests/models/test_privacy_request.py +++ b/tests/models/test_privacy_request.py @@ -12,6 +12,7 @@ from fidesops.models.policy import PausedStep, Policy from fidesops.models.privacy_request import PrivacyRequest, PrivacyRequestStatus from fidesops.schemas.redis_cache import PrivacyRequestIdentity +from fidesops.service.connectors.manual_connector import ManualAction from fidesops.util.cache import FidesopsRedis, get_identity_cache_key paused_location = CollectionAddress("test_dataset", "test_collection") @@ -364,20 +365,36 @@ def test_two_way_validation_issues( class TestCachePausedLocation: def test_privacy_request_cache_paused_location(self, privacy_request): - assert privacy_request.get_paused_step_and_collection() == (None, None) + assert privacy_request.get_paused_collection_details() is None paused_step = PausedStep.erasure - privacy_request.cache_paused_step_and_collection(paused_step, paused_location) - - assert privacy_request.get_paused_step_and_collection() == ( - paused_step, - paused_location, + privacy_request.cache_paused_collection_details( + step=paused_step, + collection=paused_location, + action_needed=[ + ManualAction( + locators={"email": "customer-1@example.com"}, + get=["box_id", "associated_data"], + update=None, + ) + ], ) + paused_details = privacy_request.get_paused_collection_details() + assert paused_details.step == paused_step + assert paused_details.collection == paused_location + assert paused_details.action_needed == [ + ManualAction( + locators={"email": "customer-1@example.com"}, + get=["box_id", "associated_data"], + update=None, + ) + ] + def test_privacy_request_unpause(self, privacy_request): - privacy_request.cache_paused_step_and_collection() + privacy_request.cache_paused_collection_details() - assert privacy_request.get_paused_step_and_collection() == (None, None) + assert privacy_request.get_paused_collection_details() is None class TestCacheManualInput: @@ -431,47 +448,18 @@ def test_zero_cached(self, privacy_request): class TestPrivacyRequestCacheFailedStep: def test_cache_failed_step_and_collection(self, privacy_request): - privacy_request.cache_failed_step_and_collection( - PausedStep.erasure, paused_location - ) - - cached_data = privacy_request.get_failed_step_and_collection() - assert cached_data == (PausedStep.erasure, paused_location) - def test_cache_null_step_and_location(self, privacy_request): - privacy_request.cache_failed_step_and_collection() - - cached_data = privacy_request.get_failed_step_and_collection() - assert cached_data == (None, None) - - def test_replace_failed_step_and_location(self, privacy_request): - privacy_request.cache_failed_step_and_collection( - PausedStep.erasure, paused_location - ) - privacy_request.cache_failed_step_and_collection( - PausedStep.access, CollectionAddress("test", "test") - ) - - assert privacy_request.get_failed_step_and_collection() == ( - PausedStep.access, - CollectionAddress("test", "test"), + privacy_request.cache_failed_collection_details( + step=PausedStep.erasure, collection=paused_location ) + cached_data = privacy_request.get_failed_collection_details() + assert cached_data.step == PausedStep.erasure + assert cached_data.collection == paused_location + assert cached_data.action_needed is None -class TestPrivacyRequestCacheQueries: - def test_cache_queries(self, privacy_request): - queries = { - "query": "UPDATE manual_collection SET authorized_user = :authorized_user WHERE id = :id", - "parameters": {"authorized_user": "abcde_masked_user", "id": 32424}, - } - privacy_request.cache_queries(PausedStep.access, paused_location, [queries]) - - assert privacy_request.get_cached_queries( - PausedStep.access, paused_location - ) == [queries] + def test_cache_null_step_and_location(self, privacy_request): + privacy_request.cache_failed_collection_details() - def test_get_queries_none_cached(self, privacy_request): - assert ( - privacy_request.get_cached_queries(PausedStep.access, paused_location) - is None - ) + cached_data = privacy_request.get_failed_collection_details() + assert cached_data is None From 852cab263f2c31fbb76a591c0cfaaed4aec3e8a3 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Mon, 6 Jun 2022 11:33:56 -0500 Subject: [PATCH 04/10] Get rid of elements that cache a SQLQuery from an earlier draft. We're now caching more generic components. --- .../v1/endpoints/privacy_request_endpoints.py | 4 +-- src/fidesops/models/privacy_request.py | 2 +- .../service/connectors/manual_connector.py | 28 ------------------- tests/integration_tests/test_manual_task.py | 16 ----------- 4 files changed, 3 insertions(+), 47 deletions(-) diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index 6c61aaafb..ab908924d 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -809,8 +809,8 @@ def restart_privacy_request_from_failure( detail=f"Cannot restart privacy request from failure '{privacy_request.id}'; no failed step or collection.", ) - failed_step: Optional[PausedStep] = failed_details.step - failed_collection: Optional[CollectionAddress] = failed_details.collection + failed_step: PausedStep = failed_details.step + failed_collection: CollectionAddress = failed_details.collection logger.info( f"Restarting failed privacy request '{privacy_request_id}' from '{failed_step} step, 'collection '{failed_collection}'" diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index 9f4b17b25..355ddeafa 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -56,7 +56,7 @@ class ManualAction(BaseSchema): "locators" are similar to the SQL "WHERE" information. "get" contains a list of fields that should be retrieved from the source - "update" is a dictionary of fields and the value it should be replaced with. + "update" is a dictionary of fields and the value they should be replaced with. """ locators: Dict[str, Any] diff --git a/src/fidesops/service/connectors/manual_connector.py b/src/fidesops/service/connectors/manual_connector.py index 930df39e6..1bc4f51d1 100644 --- a/src/fidesops/service/connectors/manual_connector.py +++ b/src/fidesops/service/connectors/manual_connector.py @@ -1,8 +1,6 @@ import logging from typing import Any, Dict, List, Optional -from sqlalchemy.sql.elements import TextClause - from fidesops.common_exceptions import PrivacyRequestPaused from fidesops.graph.traversal import TraversalNode from fidesops.models.policy import PausedStep, Policy @@ -108,29 +106,3 @@ def mask_data( raise PrivacyRequestPaused( f"Collection '{node.address.value}' waiting on manual erasure confirmation for privacy request '{privacy_request.id}'" ) - - -def format_cached_query(stmt: Optional[TextClause]) -> Optional[Dict[str, Any]]: - """ - Format the SQLAlchemy TextClause for caching in Redis, for describing to the user the manual actions required on their - part. Store the query and the parameters separately. - - For example: - { - 'query': 'UPDATE filing_cabinet SET authorized_user = :authorized_user WHERE id = :id', - 'parameters': { - 'authorized_user': None, - 'id': 121 - } - } - """ - if stmt is None: - return None - - query_elems: Dict[str, Any] = {"query": str(stmt), "parameters": {}} - for ( - param_name, - bind_parameters, - ) in stmt._bindparams.items(): # pylint: disable=W0212 - query_elems["parameters"][param_name] = bind_parameters.value - return query_elems diff --git a/tests/integration_tests/test_manual_task.py b/tests/integration_tests/test_manual_task.py index 5086993c6..eb5f657df 100644 --- a/tests/integration_tests/test_manual_task.py +++ b/tests/integration_tests/test_manual_task.py @@ -465,19 +465,3 @@ def test_collections_with_manual_erasure_confirmation( } assert privacy_request.get_paused_collection_details() is None - - -def test_format_cached_query(): - assert format_cached_query(None) is None - - stmt = text( - "SELECT id,street,state,phone from customer where customer_id=:customer_id" - ) - stmt = stmt.bindparams( - bindparam("customer_id", type_=String, value="12345"), - ) - - assert format_cached_query(stmt) == { - "query": "SELECT id,street,state,phone from customer where customer_id=:customer_id", - "parameters": {"customer_id": "12345"}, - } From 96c11a5ff9e56b51882b0abfbd1b56c2b859bd91 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Mon, 6 Jun 2022 15:26:13 -0500 Subject: [PATCH 05/10] Remove import of element that no longer exists. --- tests/integration_tests/test_manual_task.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration_tests/test_manual_task.py b/tests/integration_tests/test_manual_task.py index eb5f657df..f61c25fb2 100644 --- a/tests/integration_tests/test_manual_task.py +++ b/tests/integration_tests/test_manual_task.py @@ -11,7 +11,6 @@ ExecutionLogStatus, PrivacyRequest, ) -from fidesops.service.connectors.manual_connector import format_cached_query from fidesops.task import graph_task from ..graph.graph_test_util import assert_rows_match From 8d039d58c88532b2887d5bd35bcddad251004acc Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Mon, 6 Jun 2022 16:14:07 -0500 Subject: [PATCH 06/10] Add new paused_at field for when a request is paused by a webhook or a manual collection. - Start setting finished_processing_at on errored privacy requests that fail due to a collection issue. --- src/fidesops/models/privacy_request.py | 11 ++++++++ src/fidesops/schemas/privacy_request.py | 1 + .../privacy_request/request_runner_service.py | 10 +++---- .../27fe9da9d0f9_privacy_request_stopped.py | 26 +++++++++++++++++++ .../test_privacy_request_endpoints.py | 4 +++ .../request_runner_service_test.py | 5 ++++ 6 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 src/migrations/versions/27fe9da9d0f9_privacy_request_stopped.py diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index 355ddeafa..14fb6c6fe 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -165,6 +165,7 @@ class PrivacyRequest(Base): # pylint: disable=R0904 reviewer = relationship( "FidesopsUser", backref=backref("privacy_request", passive_deletes=True) ) + paused_at = Column(DateTime(timezone=True), nullable=True) @classmethod def create(cls, db: Session, *, data: Dict[str, Any]) -> FidesopsBase: @@ -414,6 +415,16 @@ def start_processing(self, db: Session) -> None: self.status = PrivacyRequestStatus.in_processing self.save(db=db) + def pause_processing(self, db: Session) -> None: + """Mark privacy request as paused, and save paused_at""" + self.update( + db, + data={ + "status": PrivacyRequestStatus.paused, + "paused_at": datetime.utcnow(), + }, + ) + def error_processing(self, db: Session) -> None: """Mark privacy request as errored, and note time processing was finished""" self.update( diff --git a/src/fidesops/schemas/privacy_request.py b/src/fidesops/schemas/privacy_request.py index 30eec068c..093f62127 100644 --- a/src/fidesops/schemas/privacy_request.py +++ b/src/fidesops/schemas/privacy_request.py @@ -128,6 +128,7 @@ class PrivacyRequestResponse(BaseSchema): reviewed_by: Optional[str] reviewer: Optional[PrivacyRequestReviewer] finished_processing_at: Optional[datetime] + paused_at: Optional[datetime] status: PrivacyRequestStatus external_id: Optional[str] # This field intentionally doesn't use the PrivacyRequestIdentity schema diff --git a/src/fidesops/service/privacy_request/request_runner_service.py b/src/fidesops/service/privacy_request/request_runner_service.py index a9be34267..2f3537726 100644 --- a/src/fidesops/service/privacy_request/request_runner_service.py +++ b/src/fidesops/service/privacy_request/request_runner_service.py @@ -76,9 +76,7 @@ def run_webhooks_and_report_status( logging.info( f"Pausing execution of privacy request {privacy_request.id}. Halt instruction received from webhook {webhook.key}." ) - privacy_request.update( - db=db, data={"status": PrivacyRequestStatus.paused} - ) + privacy_request.pause_processing(db) initiate_paused_privacy_request_followup(privacy_request) return False except ClientUnsuccessfulException as exc: @@ -184,15 +182,13 @@ def run( ) except PrivacyRequestPaused as exc: - privacy_request.status = PrivacyRequestStatus.paused - privacy_request.save(db=session) + privacy_request.pause_processing(session) _log_warning(exc, config.dev_mode) session.close() return except BaseException as exc: # pylint: disable=broad-except - privacy_request.status = PrivacyRequestStatus.error - privacy_request.save(db=session) + privacy_request.error_processing(db=session) # If dev mode, log traceback _log_exception(exc, config.dev_mode) session.close() diff --git a/src/migrations/versions/27fe9da9d0f9_privacy_request_stopped.py b/src/migrations/versions/27fe9da9d0f9_privacy_request_stopped.py new file mode 100644 index 000000000..6b885484a --- /dev/null +++ b/src/migrations/versions/27fe9da9d0f9_privacy_request_stopped.py @@ -0,0 +1,26 @@ +"""privacy request stopped + +Revision ID: 27fe9da9d0f9 +Revises: 1ff88b7bd579 +Create Date: 2022-06-06 20:28:08.893135 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "27fe9da9d0f9" +down_revision = "1ff88b7bd579" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "privacyrequest", + sa.Column("paused_at", sa.DateTime(timezone=True), nullable=True), + ) + + +def downgrade(): + op.drop_column("privacyrequest", "paused_at") diff --git a/tests/api/v1/endpoints/test_privacy_request_endpoints.py b/tests/api/v1/endpoints/test_privacy_request_endpoints.py index 150d5aa5e..84bb5b855 100644 --- a/tests/api/v1/endpoints/test_privacy_request_endpoints.py +++ b/tests/api/v1/endpoints/test_privacy_request_endpoints.py @@ -486,6 +486,7 @@ def test_get_privacy_requests_by_id( "identity": None, "reviewed_at": None, "reviewed_by": None, + "paused_at": None, "reviewer": None, "policy": { "drp_action": None, @@ -539,6 +540,7 @@ def test_get_privacy_requests_by_partial_id( "identity": None, "reviewed_at": None, "reviewed_by": None, + "paused_at": None, "reviewer": None, "policy": { "drp_action": None, @@ -844,6 +846,7 @@ def test_verbose_privacy_requests( "identity": None, "reviewed_at": None, "reviewed_by": None, + "paused_at": None, "reviewer": None, "policy": { "drp_action": None, @@ -1763,6 +1766,7 @@ def test_resume_privacy_request( "reviewed_at": None, "reviewed_by": None, "reviewer": None, + "paused_at": None, "policy": { "drp_action": None, "key": privacy_request.policy.key, diff --git a/tests/service/privacy_request/request_runner_service_test.py b/tests/service/privacy_request/request_runner_service_test.py index 1a0491e55..1cb9ce44c 100644 --- a/tests/service/privacy_request/request_runner_service_test.py +++ b/tests/service/privacy_request/request_runner_service_test.py @@ -1203,6 +1203,7 @@ def test_run_webhooks_halt_received( assert not proceed assert privacy_request.finished_processing_at is None assert privacy_request.status == PrivacyRequestStatus.paused + assert privacy_request.paused_at is not None @mock.patch("fidesops.models.privacy_request.PrivacyRequest.trigger_policy_webhook") def test_run_webhooks_ap_scheduler_cleanup( @@ -1230,6 +1231,7 @@ def test_run_webhooks_ap_scheduler_cleanup( # Privacy request has been set to errored by ap scheduler, because it took too long for webhook to report back assert privacy_request.status == PrivacyRequestStatus.error assert privacy_request.finished_processing_at is not None + assert privacy_request.paused_at is not None @mock.patch("fidesops.models.privacy_request.PrivacyRequest.trigger_policy_webhook") def test_run_webhooks_client_error( @@ -1249,6 +1251,8 @@ def test_run_webhooks_client_error( ) assert not proceed assert privacy_request.status == PrivacyRequestStatus.error + assert privacy_request.finished_processing_at is not None + assert privacy_request.paused_at is None @mock.patch("fidesops.models.privacy_request.PrivacyRequest.trigger_policy_webhook") def test_run_webhooks_validation_error( @@ -1269,6 +1273,7 @@ def test_run_webhooks_validation_error( assert not proceed assert privacy_request.finished_processing_at is not None assert privacy_request.status == PrivacyRequestStatus.error + assert privacy_request.paused_at is None @mock.patch("fidesops.models.privacy_request.PrivacyRequest.trigger_policy_webhook") def test_run_webhooks( From c707cf2a34cfdc370eda59f89e7541e9f7aa369e Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Tue, 7 Jun 2022 11:33:23 -0500 Subject: [PATCH 07/10] Small docstring changes. --- src/fidesops/api/v1/endpoints/privacy_request_endpoints.py | 2 +- src/fidesops/models/privacy_request.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index ab908924d..d1e145f87 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -364,7 +364,7 @@ def _filter_privacy_request_queryset( def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: """ - Temporarily update a paused or errored privacy request object with instructions from Redis cache + Temporarily update a paused or errored privacy request object with instructions from the Redis cache about how to resume manually if applicable. """ resume_endpoint: Optional[str] = None diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index 14fb6c6fe..65e074848 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -52,7 +52,7 @@ class ManualAction(BaseSchema): - """Class to surface how to manually retrieve or mask data in a database-agnostic way + """Surface how to manually retrieve or mask data in a database-agnostic way "locators" are similar to the SQL "WHERE" information. "get" contains a list of fields that should be retrieved from the source @@ -288,7 +288,7 @@ def cache_failed_collection_details( collection: Optional[CollectionAddress] = None, ) -> None: """ - Cache details about the failed step and failed collection details. No specific input data is required to surface + Cache details about the failed step and failed collection details. No specific input data is required to resume a failed request, so action_needed is None. """ cache_restart_details( From ad5d797b912669ce836eb6d358627ef3ec129a44 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Tue, 7 Jun 2022 11:56:14 -0500 Subject: [PATCH 08/10] Add changelog and docs. --- CHANGELOG.md | 2 +- docs/fidesops/docs/guides/reporting.md | 130 +++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9cfc750b..f29753e1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ The types of changes are: ### Added * Subject Request Details page [#563](https://github.com/ethyca/fidesops/pull/563) * Restart Graph from Failure [#578](https://github.com/ethyca/fidesops/pull/578) - +* Cache and Surface Resume/Restart Instructions [#591](https://github.com/ethyca/fidesops/pull/591) ## [1.5.2](https://github.com/ethyca/fidesops/compare/1.5.1...1.5.2) diff --git a/docs/fidesops/docs/guides/reporting.md b/docs/fidesops/docs/guides/reporting.md index eb20c1f26..b08964ea3 100644 --- a/docs/fidesops/docs/guides/reporting.md +++ b/docs/fidesops/docs/guides/reporting.md @@ -5,6 +5,7 @@ In this section we'll cover: - How to check the high-level status of your privacy requests - How to get more detailed execution logs of collections and fields that were potentially affected as part of your privacy request. - How to download all privacy requests as a CSV +- How to view details on resuming/restarting a request Take me directly to [API docs](/fidesops/api#operations-Privacy_Requests-get_request_status_api_v1_privacy_request_get). @@ -183,4 +184,133 @@ To get all privacy requests in CSV format, use the `download_csv` query param: ```csv Time received,Subject identity,Policy key,Request status,Reviewer,Time approved/denied 2022-03-14 16:53:28.869258+00:00,{'email': 'customer-1@example.com'},my_primary_policy,complete,fid_16ffde2f-613b-4f79-bbae-41420b0f836b,2022-03-14 16:54:08.804283+00:00 +``` + + +## Details to resume a privacy request + +Privacy requests may need to be resumed in the event the request was paused, or restarted, in the event a privacy request failed. +Some collections may require manual input from the user to proceed privacy request execution. The information +needed to proceed can be accessed from the `GET api/v1/privacy-request?request_id=` endpoint. + +### Paused Access Request Example + +The request below is in a `paused` state. If we look at the `stopped_collection_details` key, we can see that the request +paused execution during the `access` step of the `manual_key:filing_cabinet` collection. Looking at `action_needed.locators` field, we can +see that the user should find the record in the filing cabinet with a customer_id of 72909, and get the +"authorized_user", "customer_id", "id", and "payment_card_id" fields from that record. These values should be manually +uploaded to the `resume_endpoint`. See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-access-privacy-request) +guides for more information on resuming a paused access request. + + +```json +{ + "items": [ + { + "id": "pri_ed4a6b7d-deab-489a-9a9f-9c2b19cd0713", + "created_at": "2022-06-06T20:12:28.809815+00:00", + "started_processing_at": "2022-06-06T20:12:28.986462+00:00", + ..., + "stopped_collection_details": { + "step": "access", + "collection": "manual_key:filing_cabinet", + "action_needed": [ + { + "locators": { + "customer_id": [ + 72909 + ] + }, + "get": [ + "authorized_user", + "customer_id", + "id", + "payment_card_id" + ], + "update": null + } + ] + }, + "resume_endpoint": "/privacy-request/pri_ed4a6b7d-deab-489a-9a9f-9c2b19cd0713/manual_input" + } + ], + "total": 1, + "page": 1, + "size": 50 +} +``` + +### Paused Erasure Request Example + +The request below is in a `paused` state. The `stopped_collection_details` shows us that the request +paused execution during the `erasure` step of the `manual_key:filing_cabinet` collection. Looking at `action_needed.locators` field, we can +see that the user should find the record in the filing cabinet with an `id` of 2, and replace its `authorized_user` with None. +A confirmation of the rows masked should be uploaded to the `resume_endpoint` See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-erasure-privacy-request) +guides for more information on resuming a paused erasure request. + +```json +{ + "items": [ + { + "id": "pri_59ea0129-fc6d-4a12-a5bd-2ee647bf5cec", + "created_at": "2022-06-06T20:22:05.436361+00:00", + "started_processing_at": "2022-06-06T20:22:05.473280+00:00", + "finished_processing_at": null, + "status": "paused", + ..., + "stopped_collection_details": { + "step": "erasure", + "collection": "manual_key:filing_cabinet", + "action_needed": [ + { + "locators": { + "id": 2 + }, + "get": null, + "update": { + "authorized_user": null + } + } + ] + }, + "resume_endpoint": "/privacy-request/pri_59ea0129-fc6d-4a12-a5bd-2ee647bf5cec/erasure_confirm" + } + ], + "total": 1, + "page": 1, + "size": 50 +} + + +``` + +### Failed Request Example + +The below request is an `error` state. Under `stopped_collection_details` we can see that it failed at the +erasure step of the `postgres_dataset:payment_card` collection. After troubleshooting the issues there, for example, +your postgres password could be incorrect, you would resume the request with a POST to the `resume_endpoint`. + +```json +{ + "items": [ + { + "id": "pri_59ea0129-fc6d-4a12-a5bd-2ee647bf5cec", + "created_at": "2022-06-06T20:22:05.436361+00:00", + "started_processing_at": "2022-06-06T20:22:05.473280+00:00", + "finished_processing_at": null, + "status": "error", + ..., + "stopped_collection_details": { + "step": "erasure", + "collection": "postgres_dataset:payment_card", + "action_needed": null + }, + "resume_endpoint": "/privacy-request/pri_59ea0129-fc6d-4a12-a5bd-2ee647bf5cec/retry" + } + ], + "total": 1, + "page": 1, + "size": 50 +} + ``` \ No newline at end of file From dd6ba1f27a2d030f76db63fed4741286da714e23 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Wed, 8 Jun 2022 19:45:33 -0500 Subject: [PATCH 09/10] Respond to CR - --- docs/fidesops/docs/guides/reporting.md | 3 +- .../v1/endpoints/privacy_request_endpoints.py | 8 ++- src/fidesops/models/privacy_request.py | 14 ++++ src/fidesops/schemas/dataset.py | 2 +- .../service/connectors/http_connector.py | 6 ++ .../service/connectors/query_config.py | 20 ++++-- .../test_privacy_request_endpoints.py | 26 ++++++- tests/integration_tests/test_manual_task.py | 1 - tests/service/connectors/test_queryconfig.py | 72 ++++++++----------- 9 files changed, 95 insertions(+), 57 deletions(-) diff --git a/docs/fidesops/docs/guides/reporting.md b/docs/fidesops/docs/guides/reporting.md index b08964ea3..0c6721cbf 100644 --- a/docs/fidesops/docs/guides/reporting.md +++ b/docs/fidesops/docs/guides/reporting.md @@ -186,7 +186,6 @@ Time received,Subject identity,Policy key,Request status,Reviewer,Time approved/ 2022-03-14 16:53:28.869258+00:00,{'email': 'customer-1@example.com'},my_primary_policy,complete,fid_16ffde2f-613b-4f79-bbae-41420b0f836b,2022-03-14 16:54:08.804283+00:00 ``` - ## Details to resume a privacy request Privacy requests may need to be resumed in the event the request was paused, or restarted, in the event a privacy request failed. @@ -244,7 +243,7 @@ guides for more information on resuming a paused access request. The request below is in a `paused` state. The `stopped_collection_details` shows us that the request paused execution during the `erasure` step of the `manual_key:filing_cabinet` collection. Looking at `action_needed.locators` field, we can -see that the user should find the record in the filing cabinet with an `id` of 2, and replace its `authorized_user` with None. +see that the user should find the record in the filing cabinet with an `id` of 2, and replace its `authorized_user` with `None`. A confirmation of the rows masked should be uploaded to the `resume_endpoint` See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-erasure-privacy-request) guides for more information on resuming a paused erasure request. diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index d1e145f87..b4943ed46 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -395,6 +395,7 @@ def attach_resume_instructions(privacy_request: PrivacyRequest) -> None: ) privacy_request.stopped_collection_details = stopped_collection_details + # replaces the placeholder in the url with the privacy request id privacy_request.resume_endpoint = ( resume_endpoint.format(privacy_request_id=privacy_request.id) if resume_endpoint @@ -474,9 +475,10 @@ def get_request_status( # it is explicitly requested for item in paginated.items: item.identity = item.get_cached_identity_data() - - for item in paginated.items: - attach_resume_instructions(item) + attach_resume_instructions(item) + else: + for item in paginated.items: + attach_resume_instructions(item) return paginated diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index 65e074848..9c664cabc 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -5,6 +5,7 @@ from enum import Enum as EnumType from typing import Any, Dict, List, Optional +from pydantic import root_validator from sqlalchemy import Column, DateTime from sqlalchemy import Enum as EnumColumn from sqlalchemy import ForeignKey, String @@ -63,6 +64,19 @@ class ManualAction(BaseSchema): get: Optional[List[str]] update: Optional[Dict[str, Any]] + @root_validator + @classmethod + def get_or_update_details( + cls: BaseSchema, values: Dict[str, Any] + ) -> Dict[str, Any]: + """Validate that 'get' or 'update' details are supplied.""" + if values and not values.get("get") and not values.get("update"): + raise ValueError( + "A ManualAction requires either 'get' or 'update' instructions." + ) + + return values + class StoppedCollection(BaseSchema): """Class that contains details about the collection that halted privacy request execution""" diff --git a/src/fidesops/schemas/dataset.py b/src/fidesops/schemas/dataset.py index 970b23806..8f2417b92 100644 --- a/src/fidesops/schemas/dataset.py +++ b/src/fidesops/schemas/dataset.py @@ -254,4 +254,4 @@ class DryRunDatasetResponse(BaseSchema): """ collectionAddress: CollectionAddressResponse - query: Optional[str] + query: Any diff --git a/src/fidesops/service/connectors/http_connector.py b/src/fidesops/service/connectors/http_connector.py index 157eb63b4..907fd1dd5 100644 --- a/src/fidesops/service/connectors/http_connector.py +++ b/src/fidesops/service/connectors/http_connector.py @@ -79,6 +79,9 @@ def retrieve_data( input_data: Dict[str, List[Any]], ) -> List[Row]: """Currently not supported as webhooks are not called at the collection level""" + raise NotImplementedError( + "Currently not supported as webhooks are not yet called at the collection level" + ) def mask_data( self, @@ -88,6 +91,9 @@ def mask_data( rows: List[Row], ) -> int: """Currently not supported as webhooks are not called at the collection level""" + raise NotImplementedError( + "Currently not supported as webhooks are not yet called at the collection level" + ) def create_client(self) -> None: """Not required for this type""" diff --git a/src/fidesops/service/connectors/query_config.py b/src/fidesops/service/connectors/query_config.py index bcbe933cb..bdd90c98c 100644 --- a/src/fidesops/service/connectors/query_config.py +++ b/src/fidesops/service/connectors/query_config.py @@ -240,7 +240,7 @@ def generate_query( returns None""" @abstractmethod - def query_to_str(self, t: T, input_data: Dict[str, List[Any]]) -> str: + def query_to_str(self, t: T, input_data: Dict[str, List[Any]]) -> Optional[str]: """Convert query to string""" @abstractmethod @@ -288,11 +288,21 @@ def generate_query( return ManualAction(locators=locators, get=get, update=None) return None - def query_to_str(self, t: T, input_data: Dict[str, List[Any]]) -> str: - """Convert query to string""" + def query_to_str(self, t: T, input_data: Dict[str, List[Any]]) -> None: + """Not used for ManualQueryConfig, we output the dry run query as a dictionary instead of a string""" - def dry_run_query(self) -> Optional[str]: - """dry run query for display""" + def dry_run_query(self) -> Optional[ManualAction]: + """Displays the ManualAction needed with question marks instead of action data for the locators + as a dry run query""" + fake_data: Dict[str, Any] = self.display_query_data() + manual_query: Optional[ManualAction] = self.generate_query(fake_data, None) + if not manual_query: + return None + + for where_params in manual_query.locators.values(): + for i, _ in enumerate(where_params): + where_params[i] = "?" + return manual_query def generate_update_stmt( self, row: Row, policy: Policy, request: PrivacyRequest diff --git a/tests/api/v1/endpoints/test_privacy_request_endpoints.py b/tests/api/v1/endpoints/test_privacy_request_endpoints.py index 84bb5b855..eabb342de 100644 --- a/tests/api/v1/endpoints/test_privacy_request_endpoints.py +++ b/tests/api/v1/endpoints/test_privacy_request_endpoints.py @@ -1131,9 +1131,7 @@ def test_get_failed_request_resume_info( "collection": "manual_example:another_collection", "action_needed": None, } - assert data["resume_endpoint"] == "/privacy-request/{}/retry".format( - privacy_request.id - ) + assert data["resume_endpoint"] == f"/privacy-request/{privacy_request.id}/retry" class TestGetExecutionLogs: @@ -1334,6 +1332,9 @@ def test_request_preview_incorrect_body( def test_request_preview_all( self, dataset_config_preview, + manual_dataset_config, + integration_manual_config, + postgres_example_test_dataset_config, api_client: TestClient, url, generate_auth_header, @@ -1342,6 +1343,7 @@ def test_request_preview_all( response = api_client.put(url, headers=auth_header) assert response.status_code == 200 response_body: List[DryRunDatasetResponse] = json.loads(response.text) + assert ( next( response["query"] @@ -1352,6 +1354,24 @@ def test_request_preview_all( == "SELECT email,id FROM subscriptions WHERE email = ?" ) + assert next( + response["query"] + for response in response_body + if response["collectionAddress"]["dataset"] == "manual_input" + if response["collectionAddress"]["collection"] == "filing_cabinet" + ) == { + "locators": {"customer_id": ["?", "?"]}, + "get": ["authorized_user", "customer_id", "id", "payment_card_id"], + "update": None, + } + + assert next( + response["query"] + for response in response_body + if response["collectionAddress"]["dataset"] == "manual_input" + if response["collectionAddress"]["collection"] == "storage_unit" + ) == {"locators": {"email": ["?"]}, "get": ["box_id", "email"], "update": None} + class TestApprovePrivacyRequest: @pytest.fixture(scope="function") diff --git a/tests/integration_tests/test_manual_task.py b/tests/integration_tests/test_manual_task.py index f61c25fb2..c49e3a888 100644 --- a/tests/integration_tests/test_manual_task.py +++ b/tests/integration_tests/test_manual_task.py @@ -1,7 +1,6 @@ import uuid import pytest -from sqlalchemy import String, bindparam, text from fidesops.common_exceptions import PrivacyRequestPaused from fidesops.graph.config import CollectionAddress diff --git a/tests/service/connectors/test_queryconfig.py b/tests/service/connectors/test_queryconfig.py index 5e21ae28e..d41d91278 100644 --- a/tests/service/connectors/test_queryconfig.py +++ b/tests/service/connectors/test_queryconfig.py @@ -70,29 +70,23 @@ def found_query_keys(node: TraversalNode, values: Dict[str, Any]) -> Set[str]: } # values exist for all query keys - assert ( - found_query_keys( - payment_card_node, - { - "id": ["A"], - "customer_id": ["V"], - "ignore_me": ["X"], - }, - ) - == {"id", "customer_id"} - ) + assert found_query_keys( + payment_card_node, + { + "id": ["A"], + "customer_id": ["V"], + "ignore_me": ["X"], + }, + ) == {"id", "customer_id"} # with no values OR an empty set, these are omitted - assert ( - found_query_keys( - payment_card_node, - { - "id": ["A"], - "customer_id": [], - "ignore_me": ["X"], - }, - ) - == {"id"} - ) + assert found_query_keys( + payment_card_node, + { + "id": ["A"], + "customer_id": [], + "ignore_me": ["X"], + }, + ) == {"id"} assert found_query_keys( payment_card_node, {"id": ["A"], "ignore_me": ["X"]} ) == {"id"} @@ -100,27 +94,21 @@ def found_query_keys(node: TraversalNode, values: Dict[str, Any]) -> Set[str]: assert found_query_keys(payment_card_node, {}) == set() def test_typed_filtered_values(self): - assert ( - payment_card_node.typed_filtered_values( - { - "id": ["A"], - "customer_id": ["V"], - "ignore_me": ["X"], - } - ) - == {"id": ["A"], "customer_id": ["V"]} - ) + assert payment_card_node.typed_filtered_values( + { + "id": ["A"], + "customer_id": ["V"], + "ignore_me": ["X"], + } + ) == {"id": ["A"], "customer_id": ["V"]} - assert ( - payment_card_node.typed_filtered_values( - { - "id": ["A"], - "customer_id": [], - "ignore_me": ["X"], - } - ) - == {"id": ["A"]} - ) + assert payment_card_node.typed_filtered_values( + { + "id": ["A"], + "customer_id": [], + "ignore_me": ["X"], + } + ) == {"id": ["A"]} assert payment_card_node.typed_filtered_values( {"id": ["A"], "ignore_me": ["X"]} From 2e66b870be7deef8ef9028f138353939fd3aebd3 Mon Sep 17 00:00:00 2001 From: Dawn Pattison Date: Thu, 9 Jun 2022 09:22:02 -0500 Subject: [PATCH 10/10] Fix wording in docs. --- docs/fidesops/docs/guides/reporting.md | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/docs/fidesops/docs/guides/reporting.md b/docs/fidesops/docs/guides/reporting.md index 0c6721cbf..fe7aae749 100644 --- a/docs/fidesops/docs/guides/reporting.md +++ b/docs/fidesops/docs/guides/reporting.md @@ -188,17 +188,15 @@ Time received,Subject identity,Policy key,Request status,Reviewer,Time approved/ ## Details to resume a privacy request -Privacy requests may need to be resumed in the event the request was paused, or restarted, in the event a privacy request failed. -Some collections may require manual input from the user to proceed privacy request execution. The information -needed to proceed can be accessed from the `GET api/v1/privacy-request?request_id=` endpoint. +A privacy request may pause when manual input is needed from the user, or it might fail for various reason on a specific collection. +Details to resume or retry that privacy request can be accessed via the `GET api/v1/privacy-request?request_id=` endpoint. ### Paused Access Request Example -The request below is in a `paused` state. If we look at the `stopped_collection_details` key, we can see that the request -paused execution during the `access` step of the `manual_key:filing_cabinet` collection. Looking at `action_needed.locators` field, we can -see that the user should find the record in the filing cabinet with a customer_id of 72909, and get the -"authorized_user", "customer_id", "id", and "payment_card_id" fields from that record. These values should be manually -uploaded to the `resume_endpoint`. See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-access-privacy-request) +The request below is in a `paused` state because we're waiting on manual input from the user to proceed. If we look at the `stopped_collection_details` key, we can see that the request +paused execution during the `access` step of the `manual_key:filing_cabinet` collection. The `action_needed.locators` field shows the user they should +fetch the record in the filing cabinet with a `customer_id` of `72909`, and pull the `authorized_user`, `customer_id`, `id`, and `payment_card_id` fields +from that record. These values should be manually uploaded to the `resume_endpoint`. See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-access-privacy-request) guides for more information on resuming a paused access request. @@ -241,10 +239,10 @@ guides for more information on resuming a paused access request. ### Paused Erasure Request Example -The request below is in a `paused` state. The `stopped_collection_details` shows us that the request +The request below is in a `paused` state because we're waiting on the user to confirm they've masked the appropriate data before proceeding. The `stopped_collection_details` shows us that the request paused execution during the `erasure` step of the `manual_key:filing_cabinet` collection. Looking at `action_needed.locators` field, we can see that the user should find the record in the filing cabinet with an `id` of 2, and replace its `authorized_user` with `None`. -A confirmation of the rows masked should be uploaded to the `resume_endpoint` See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-erasure-privacy-request) +A confirmation of the masked records count should be uploaded to the `resume_endpoint` See the [Manual Data](https://ethyca.github.io/fidesops/guides/manual_data/#resuming-a-paused-erasure-privacy-request) guides for more information on resuming a paused erasure request. ```json @@ -285,9 +283,8 @@ guides for more information on resuming a paused erasure request. ### Failed Request Example -The below request is an `error` state. Under `stopped_collection_details` we can see that it failed at the -erasure step of the `postgres_dataset:payment_card` collection. After troubleshooting the issues there, for example, -your postgres password could be incorrect, you would resume the request with a POST to the `resume_endpoint`. +The below request is an `error` state because something failed in the `erasure` step of the `postgres_dataset:payment_card` collection. +After troubleshooting the issues with your postgres connection, you would resume the request with a POST to the `resume_endpoint`. ```json {