diff --git a/CHANGELOG.md b/CHANGELOG.md index 040b69f33..f366b7284 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,8 +49,9 @@ The types of changes are: * Bumped Python to version 3.9.13 in the `Dockerfile` [#630](https://github.com/ethyca/fidesops/pull/630) * Matched the path to the migrations in the mypy settings with the new location [#634](https://github.com/ethyca/fidesops/pull/634) * Sort ConnectionConfig by name ascending [#668](https://github.com/ethyca/fidesops/pull/672) -* Install MSSQL By Default [#664] (https://github.com/ethyca/fidesops/pull/664) +* Install MSSQL By Default [#664](https://github.com/ethyca/fidesops/pull/664) * [Admin UI] Change "Policy Name" to "Request Type" on SR list page.[#546](https://github.com/ethyca/fidesops/pull/696) +* Queue PrivacyRequests into a Celery queue for execution [#621](https://github.com/ethyca/fidesops/pull/621) ### Developer Experience @@ -70,6 +71,9 @@ The types of changes are: * The `[package]` config section no longer exists [#620](https://github.com/ethyca/fidesops/pull/620) +### Changed +* Process privacy requests as Celery tasks and not background processes [#621](https://github.com/ethyca/fidesops/pull/621) + ## [1.5.2](https://github.com/ethyca/fidesops/compare/1.5.1...1.5.2) ### Added diff --git a/data/config/fidesops.toml b/data/config/fidesops.toml index dad790080..551a2d0e1 100644 --- a/data/config/fidesops.toml +++ b/data/config/fidesops.toml @@ -26,8 +26,6 @@ DRP_JWT_SECRET = "testdrpsecret" LOG_LEVEL = "DEBUG" [execution] -CELERY_BROKER_URL = "redis://:testpassword@redis:6379" -CELERY_RESULT_BACKEND = "redis://:testpassword@redis:6379" TASK_RETRY_COUNT = 0 TASK_RETRY_DELAY = 1 TASK_RETRY_BACKOFF = 1 diff --git a/docker-compose.yml b/docker-compose.yml index 015b265e9..d6b6f27d4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: context: . dockerfile: Dockerfile depends_on: + - celery - db - redis expose: @@ -62,7 +63,7 @@ services: build: context: . dockerfile: Dockerfile - command: celery -A fidesops.tasks beat -l info --logfile=/var/log/celery.log + command: celery -A fidesops.tasks.celery_app worker --loglevel=info depends_on: redis: condition: service_started diff --git a/fidesops.toml b/fidesops.toml index b9bc95abd..d9c17c75c 100644 --- a/fidesops.toml +++ b/fidesops.toml @@ -29,8 +29,6 @@ DRP_JWT_SECRET = "secret" LOG_LEVEL = "INFO" [execution] -CELERY_BROKER_URL = "redis://:testpassword@redis:6379" -CELERY_RESULT_BACKEND = "redis://:testpassword@redis:6379" MASKING_STRICT = true REQUIRE_MANUAL_REQUEST_APPROVAL = false TASK_RETRY_COUNT = 0 diff --git a/src/fidesops/api/v1/endpoints/drp_endpoints.py b/src/fidesops/api/v1/endpoints/drp_endpoints.py index ac375e912..736c75017 100644 --- a/src/fidesops/api/v1/endpoints/drp_endpoints.py +++ b/src/fidesops/api/v1/endpoints/drp_endpoints.py @@ -28,7 +28,9 @@ from fidesops.schemas.privacy_request import PrivacyRequestDRPStatusResponse from fidesops.schemas.redis_cache import PrivacyRequestIdentity from fidesops.service.drp.drp_fidesops_mapper import DrpFidesopsMapper -from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner +from fidesops.service.privacy_request.request_runner_service import ( + queue_privacy_request, +) from fidesops.service.privacy_request.request_service import ( build_required_privacy_request_kwargs, cache_data, @@ -98,10 +100,7 @@ def create_drp_privacy_request( cache_data(privacy_request, policy, mapped_identity, None, data) - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit() + queue_privacy_request(privacy_request.id) return PrivacyRequestDRPStatusResponse( request_id=privacy_request.id, diff --git a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py index 3bb29ad93..303222d9a 100644 --- a/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py +++ b/src/fidesops/api/v1/endpoints/privacy_request_endpoints.py @@ -40,7 +40,11 @@ PRIVACY_REQUEST_RETRY, REQUEST_PREVIEW, ) -from fidesops.common_exceptions import TraversalError, ValidationError +from fidesops.common_exceptions import ( + FunctionalityNotConfigured, + TraversalError, + ValidationError, +) from fidesops.core.config import config from fidesops.graph.config import CollectionAddress from fidesops.graph.graph import DatasetGraph, Node @@ -69,7 +73,9 @@ RowCountRequest, StoppedCollection, ) -from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner +from fidesops.service.privacy_request.request_runner_service import ( + queue_privacy_request, +) from fidesops.service.privacy_request.request_service import ( build_required_privacy_request_kwargs, cache_data, @@ -109,7 +115,6 @@ def get_privacy_request_or_error( ) def create_privacy_request( *, - cache: FidesopsRedis = Depends(deps.get_cache), db: Session = Depends(deps.get_db), data: conlist(PrivacyRequestCreate, max_items=50) = Body(...), # type: ignore ) -> BulkPostPrivacyRequests: @@ -119,6 +124,11 @@ def create_privacy_request( You cannot update privacy requests after they've been created. """ + if not config.redis.ENABLED: + raise FunctionalityNotConfigured( + "Application redis cache required, but it is currently disabled! Please update your application configuration to enable integration with a redis cache." + ) + created = [] failed = [] # Optional fields to validate here are those that are both nullable in the DB, and exist @@ -177,10 +187,7 @@ def create_privacy_request( ) if not config.execution.REQUIRE_MANUAL_REQUEST_APPROVAL: - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit() + queue_privacy_request(privacy_request.id) except common_exceptions.RedisConnectionError as exc: logger.error("RedisConnectionError: %s", exc) @@ -615,11 +622,10 @@ def resume_privacy_request( privacy_request.status = PrivacyRequestStatus.in_processing privacy_request.save(db=db) - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit(from_webhook=webhook) - + queue_privacy_request( + privacy_request_id=privacy_request.id, + from_webhook_id=webhook.id, + ) return privacy_request @@ -713,10 +719,10 @@ def resume_privacy_request_with_manual_input( privacy_request.status = PrivacyRequestStatus.in_processing privacy_request.save(db=db) - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit(from_step=paused_step) + queue_privacy_request( + privacy_request_id=privacy_request.id, + from_step=paused_step.value, + ) return privacy_request @@ -820,11 +826,10 @@ def restart_privacy_request_from_failure( privacy_request.status = PrivacyRequestStatus.in_processing privacy_request.save(db=db) - - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit(from_step=failed_step) + queue_privacy_request( + privacy_request_id=privacy_request.id, + from_step=failed_step.value, + ) privacy_request.cache_failed_collection_details() # Reset failed step and collection to None @@ -832,7 +837,9 @@ def restart_privacy_request_from_failure( def review_privacy_request( - db: Session, cache: FidesopsRedis, request_ids: List[str], process_request: Callable + db: Session, + request_ids: List[str], + process_request_function: Callable, ) -> BulkReviewResponse: """Helper method shared between the approve and deny privacy request endpoints""" succeeded: List[PrivacyRequest] = [] @@ -859,7 +866,7 @@ def review_privacy_request( continue try: - process_request(privacy_request, cache) + process_request_function(privacy_request) except Exception: failure = { "message": "Privacy request could not be updated", @@ -883,7 +890,6 @@ def review_privacy_request( def approve_privacy_request( *, db: Session = Depends(deps.get_db), - cache: FidesopsRedis = Depends(deps.get_cache), client: ClientDetail = Security( verify_oauth_client, scopes=[PRIVACY_REQUEST_REVIEW], @@ -893,20 +899,19 @@ def approve_privacy_request( """Approve and dispatch a list of privacy requests and/or report failure""" user_id = client.user_id - def _process_request(privacy_request: PrivacyRequest, cache: FidesopsRedis) -> None: + def _approve_request(privacy_request: PrivacyRequest) -> None: """Method for how to process requests - approved""" privacy_request.status = PrivacyRequestStatus.approved privacy_request.reviewed_at = datetime.utcnow() privacy_request.reviewed_by = user_id privacy_request.save(db=db) - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit() + queue_privacy_request(privacy_request_id=privacy_request.id) return review_privacy_request( - db, cache, privacy_requests.request_ids, _process_request + db=db, + request_ids=privacy_requests.request_ids, + process_request_function=_approve_request, ) @@ -918,7 +923,6 @@ def _process_request(privacy_request: PrivacyRequest, cache: FidesopsRedis) -> N def deny_privacy_request( *, db: Session = Depends(deps.get_db), - cache: FidesopsRedis = Depends(deps.get_cache), client: ClientDetail = Security( verify_oauth_client, scopes=[PRIVACY_REQUEST_REVIEW], @@ -928,8 +932,8 @@ def deny_privacy_request( """Deny a list of privacy requests and/or report failure""" user_id = client.user_id - def _process_denial_request( - privacy_request: PrivacyRequest, _: FidesopsRedis + def _deny_request( + privacy_request: PrivacyRequest, ) -> None: """Method for how to process requests - denied""" @@ -948,5 +952,7 @@ def _process_denial_request( privacy_request.save(db=db) return review_privacy_request( - db, cache, privacy_requests.request_ids, _process_denial_request + db=db, + request_ids=privacy_requests.request_ids, + process_request_function=_deny_request, ) diff --git a/src/fidesops/core/config.py b/src/fidesops/core/config.py index 8006b2da4..7eecb1a57 100644 --- a/src/fidesops/core/config.py +++ b/src/fidesops/core/config.py @@ -39,8 +39,8 @@ class ExecutionSettings(FidesSettings): TASK_RETRY_BACKOFF: int REQUIRE_MANUAL_REQUEST_APPROVAL: bool = False MASKING_STRICT: bool = True - CELERY_BROKER_URL: str - CELERY_RESULT_BACKEND: str + CELERY_BROKER_URL: str = "redis://:testpassword@redis:6379/1" + CELERY_RESULT_BACKEND: str = "redis://:testpassword@redis:6379/1" class Config: env_prefix = "FIDESOPS__EXECUTION__" diff --git a/src/fidesops/models/privacy_request.py b/src/fidesops/models/privacy_request.py index b025c4158..2fbf6369b 100644 --- a/src/fidesops/models/privacy_request.py +++ b/src/fidesops/models/privacy_request.py @@ -5,6 +5,7 @@ from enum import Enum as EnumType from typing import Any, Dict, List, Optional +from celery.result import AsyncResult from pydantic import root_validator from sqlalchemy import Column, DateTime from sqlalchemy import Enum as EnumColumn @@ -40,6 +41,7 @@ from fidesops.util.cache import ( FidesopsRedis, get_all_cache_keys_for_privacy_request, + get_async_task_tracking_cache_key, get_cache, get_drp_request_body_cache_key, get_encryption_cache_key, @@ -213,6 +215,26 @@ def cache_identity(self, identity: PrivacyRequestIdentity) -> None: value, ) + def cache_task_id(self, task_id: str) -> None: + """Sets a task_id for this privacy request's asynchronous execution.""" + cache: FidesopsRedis = get_cache() + cache.set( + get_async_task_tracking_cache_key(self.id), + task_id, + ) + + def get_cached_task_id(self) -> Optional[str]: + """Gets the cached task ID for this privacy request.""" + cache: FidesopsRedis = get_cache() + task_id = cache.get(get_async_task_tracking_cache_key(self.id)) + return task_id + + def get_async_execution_task(self) -> Optional[AsyncResult]: + """Returns a task reflecting the state of this privacy request's asynchronous execution.""" + task_id = self.get_cached_task_id() + res: AsyncResult = AsyncResult(task_id) + return res + def cache_drp_request_body(self, drp_request_body: DrpPrivacyRequestCreate) -> None: """Sets the identity's values at their specific locations in the Fidesops app cache""" cache: FidesopsRedis = get_cache() diff --git a/src/fidesops/service/privacy_request/onetrust_service.py b/src/fidesops/service/privacy_request/onetrust_service.py index f17b95af2..df38d346c 100644 --- a/src/fidesops/service/privacy_request/onetrust_service.py +++ b/src/fidesops/service/privacy_request/onetrust_service.py @@ -31,8 +31,9 @@ ONETRUST_GET_SUBTASKS_BY_REF_ID, ONETRUST_PUT_SUBTASK_STATUS, ) -from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner -from fidesops.util.cache import get_cache +from fidesops.service.privacy_request.request_runner_service import ( + queue_privacy_request, +) from fidesops.util.storage_authenticator import get_onetrust_access_token logger = logging.getLogger(__name__) @@ -153,10 +154,7 @@ def _create_privacy_request( # pylint: disable=R0913 privacy_request: PrivacyRequest = PrivacyRequest.create(db=db, data=kwargs) privacy_request.cache_identity(identity) try: - PrivacyRequestRunner( - cache=get_cache(), - privacy_request=privacy_request, - ).submit() + queue_privacy_request(privacy_request_id=privacy_request.id) request_status = OneTrustSubtaskStatus.COMPLETED except BaseException: # pylint: disable=W0703 request_status = OneTrustSubtaskStatus.FAILED diff --git a/src/fidesops/service/privacy_request/request_runner_service.py b/src/fidesops/service/privacy_request/request_runner_service.py index 2f3537726..e373987a3 100644 --- a/src/fidesops/service/privacy_request/request_runner_service.py +++ b/src/fidesops/service/privacy_request/request_runner_service.py @@ -1,8 +1,10 @@ import logging from datetime import datetime, timedelta -from typing import Awaitable, Dict, List, Optional, Set +from typing import Dict, List, Optional, Set +from celery.utils.log import get_task_logger from pydantic import ValidationError +from redis.exceptions import DataError from sqlalchemy.orm import Session from fidesops import common_exceptions @@ -28,231 +30,239 @@ run_access_request, run_erasure, ) +from fidesops.tasks import celery_app from fidesops.tasks.scheduled.scheduler import scheduler -from fidesops.util.async_util import run_async -from fidesops.util.cache import FidesopsRedis +from fidesops.util.cache import ( + FidesopsRedis, + get_async_task_tracking_cache_key, + get_cache, +) from fidesops.util.collection_util import Row from fidesops.util.logger import _log_exception, _log_warning -logger = logging.getLogger(__name__) - - -class PrivacyRequestRunner: - """The class responsible for dispatching PrivacyRequests into the execution layer""" - - def __init__( - self, - cache: FidesopsRedis, - privacy_request: PrivacyRequest, - ): - self.cache = cache - self.privacy_request = privacy_request - - @staticmethod - def run_webhooks_and_report_status( - db: Session, - privacy_request: PrivacyRequest, - webhook_cls: WebhookTypes, - after_webhook_id: str = None, - ) -> bool: - """ - Runs a series of webhooks either pre- or post- privacy request execution, if any are configured. - Updates privacy request status if execution is paused/errored. - Returns True if execution should proceed. - """ - webhooks = db.query(webhook_cls).filter_by(policy_id=privacy_request.policy.id) - - if after_webhook_id: - # Only run webhooks configured to run after this Pre-Execution webhook - pre_webhook = PolicyPreWebhook.get(db=db, id=after_webhook_id) - webhooks = webhooks.filter( - webhook_cls.order > pre_webhook.order, +logger = get_task_logger(__name__) + + +def run_webhooks_and_report_status( + db: Session, + privacy_request: PrivacyRequest, + webhook_cls: WebhookTypes, + after_webhook_id: str = None, +) -> bool: + """ + Runs a series of webhooks either pre- or post- privacy request execution, if any are configured. + Updates privacy request status if execution is paused/errored. + Returns True if execution should proceed. + """ + webhooks = db.query(webhook_cls).filter_by(policy_id=privacy_request.policy.id) + + if after_webhook_id: + # Only run webhooks configured to run after this Pre-Execution webhook + pre_webhook = PolicyPreWebhook.get(db=db, id=after_webhook_id) + webhooks = webhooks.filter( + webhook_cls.order > pre_webhook.order, + ) + + for webhook in webhooks.order_by(webhook_cls.order): + try: + privacy_request.trigger_policy_webhook(webhook) + except PrivacyRequestPaused: + logging.info( + f"Pausing execution of privacy request {privacy_request.id}. Halt instruction received from webhook {webhook.key}." ) + privacy_request.pause_processing(db) + initiate_paused_privacy_request_followup(privacy_request) + return False + except ClientUnsuccessfulException as exc: + logging.error( + f"Privacy Request '{privacy_request.id}' exited after response from webhook '{webhook.key}': {exc.args[0]}." + ) + privacy_request.error_processing(db) + return False + except ValidationError: + logging.error( + f"Privacy Request '{privacy_request.id}' errored due to response validation error from webhook '{webhook.key}'." + ) + privacy_request.error_processing(db) + return False - for webhook in webhooks.order_by(webhook_cls.order): - try: - privacy_request.trigger_policy_webhook(webhook) - except PrivacyRequestPaused: - logging.info( - f"Pausing execution of privacy request {privacy_request.id}. Halt instruction received from webhook {webhook.key}." - ) - privacy_request.pause_processing(db) - initiate_paused_privacy_request_followup(privacy_request) - return False - except ClientUnsuccessfulException as exc: - logging.error( - f"Privacy Request '{privacy_request.id}' exited after response from webhook '{webhook.key}': {exc.args[0]}." - ) - privacy_request.error_processing(db) - return False - except ValidationError: - logging.error( - f"Privacy Request '{privacy_request.id}' errored due to response validation error from webhook '{webhook.key}'." - ) - privacy_request.error_processing(db) - return False - - return True - - def submit( - self, - from_webhook: Optional[PolicyPreWebhook] = None, - from_step: Optional[PausedStep] = None, - ) -> Awaitable[None]: - """Run this privacy request in a separate thread.""" - from_webhook_id = from_webhook.id if from_webhook else None - return run_async(self.run, self.privacy_request.id, from_webhook_id, from_step) - - def run( - self, - privacy_request_id: str, - from_webhook_id: Optional[str] = None, - from_step: Optional[PausedStep] = None, - ) -> None: - # pylint: disable=too-many-locals - """ - Dispatch a privacy_request into the execution layer by: - 1. Generate a graph from all the currently configured datasets - 2. Take the provided identity data - 3. Start the access request / erasure request execution - 4. When finished, upload the results to the configured storage destination if applicable - """ - SessionLocal = get_db_session() - with SessionLocal() as session: - - privacy_request = PrivacyRequest.get(db=session, id=privacy_request_id) - logging.info(f"Dispatching privacy request {privacy_request.id}") - privacy_request.start_processing(session) - - if not from_step: # Skip if we're resuming from the access or erasure step. - # Run pre-execution webhooks - proceed = self.run_webhooks_and_report_status( - session, - privacy_request=privacy_request, - webhook_cls=PolicyPreWebhook, - after_webhook_id=from_webhook_id, - ) - if not proceed: - session.close() - return - - policy = privacy_request.policy - try: - policy.rules[0] - except IndexError: - raise common_exceptions.MisconfiguredPolicyException( - f"Policy with key {policy.key} must contain at least one Rule." - ) + return True - try: - datasets = DatasetConfig.all(db=session) - dataset_graphs = [ - dataset_config.get_graph() for dataset_config in datasets - ] - dataset_graph = DatasetGraph(*dataset_graphs) - identity_data = privacy_request.get_cached_identity_data() - connection_configs = ConnectionConfig.all(db=session) - - if ( - not from_step == PausedStep.erasure - ): # Skip if we're resuming from erasure step - access_result: Dict[str, List[Row]] = run_access_request( - privacy_request=privacy_request, - policy=policy, - graph=dataset_graph, - connection_configs=connection_configs, - identity=identity_data, - ) - - self.upload_access_results( - session, policy, access_result, dataset_graph, privacy_request - ) - - if policy.get_rules_for_action(action_type=ActionType.erasure): - # We only need to run the erasure once until masking strategies are handled - run_erasure( - privacy_request=privacy_request, - policy=policy, - graph=dataset_graph, - connection_configs=connection_configs, - identity=identity_data, - access_request_data=get_cached_data_for_erasures( - privacy_request.id - ), - ) - - except PrivacyRequestPaused as exc: - privacy_request.pause_processing(session) - _log_warning(exc, config.dev_mode) - session.close() - return - except BaseException as exc: # pylint: disable=broad-except - privacy_request.error_processing(db=session) - # If dev mode, log traceback - _log_exception(exc, config.dev_mode) - session.close() - return +def upload_access_results( + session: Session, + policy: Policy, + access_result: Dict[str, List[Row]], + dataset_graph: DatasetGraph, + privacy_request: PrivacyRequest, +) -> None: + """Process the data uploads after the access portion of the privacy request has completed""" + if not access_result: + logging.info(f"No results returned for access request {privacy_request.id}") - # Run post-execution webhooks - proceed = self.run_webhooks_and_report_status( + for rule in policy.get_rules_for_action(action_type=ActionType.access): + if not rule.storage_destination: + raise common_exceptions.RuleValidationError( + f"No storage destination configured on rule {rule.key}" + ) + target_categories: Set[str] = {target.data_category for target in rule.targets} + filtered_results = filter_data_categories( + access_result, + target_categories, + dataset_graph.data_category_field_mapping, + ) + logging.info( + f"Starting access request upload for rule {rule.key} for privacy request {privacy_request.id}" + ) + try: + upload( db=session, + request_id=privacy_request.id, + data=filtered_results, + storage_key=rule.storage_destination.key, + ) + except common_exceptions.StorageUploadError as exc: + logging.error( + f"Error uploading subject access data for rule {rule.key} on policy {policy.key} and privacy request {privacy_request.id} : {exc}" + ) + privacy_request.status = PrivacyRequestStatus.error + + +def queue_privacy_request( + privacy_request_id: str, + from_webhook_id: Optional[str] = None, + from_step: Optional[str] = None, +) -> str: + cache: FidesopsRedis = get_cache() + task = run_privacy_request.delay( + privacy_request_id=privacy_request_id, + from_webhook_id=from_webhook_id, + from_step=from_step, + ) + try: + cache.set( + get_async_task_tracking_cache_key(privacy_request_id), + task.task_id, + ) + except DataError: + logger.debug(f"Error tracking task_id for request with id {privacy_request_id}") + + return task.task_id + + +@celery_app.task() +def run_privacy_request( + privacy_request_id: str, + from_webhook_id: Optional[str] = None, + from_step: Optional[str] = None, +) -> None: + # pylint: disable=too-many-locals + """ + Dispatch a privacy_request into the execution layer by: + 1. Generate a graph from all the currently configured datasets + 2. Take the provided identity data + 3. Start the access request / erasure request execution + 4. When finished, upload the results to the configured storage destination if applicable + """ + if from_step is not None: + # Re-cast `from_step` into an Enum to enforce the validation since unserializable objects + # can't be passed into and between tasks + from_step = PausedStep(from_step) + + SessionLocal = get_db_session() + with SessionLocal() as session: + + privacy_request = PrivacyRequest.get(db=session, id=privacy_request_id) + logging.info(f"Dispatching privacy request {privacy_request.id}") + privacy_request.start_processing(session) + + if not from_step: # Skip if we're resuming from the access or erasure step. + # Run pre-execution webhooks + proceed = run_webhooks_and_report_status( + session, privacy_request=privacy_request, - webhook_cls=PolicyPostWebhook, + webhook_cls=PolicyPreWebhook, + after_webhook_id=from_webhook_id, ) if not proceed: session.close() return - privacy_request.finished_processing_at = datetime.utcnow() - privacy_request.status = PrivacyRequestStatus.complete - privacy_request.save(db=session) - logging.info(f"Privacy request {privacy_request.id} run completed.") - session.close() + policy = privacy_request.policy + try: + policy.rules[0] + except IndexError: + raise common_exceptions.MisconfiguredPolicyException( + f"Policy with key {policy.key} must contain at least one Rule." + ) - @staticmethod - def upload_access_results( - session: Session, - policy: Policy, - access_result: Dict[str, List[Row]], - dataset_graph: DatasetGraph, - privacy_request: PrivacyRequest, - ) -> None: - """Process the data uploads after the access portion of the privacy request has completed""" - if not access_result: - logging.info(f"No results returned for access request {privacy_request.id}") - - for rule in policy.get_rules_for_action(action_type=ActionType.access): - if not rule.storage_destination: - raise common_exceptions.RuleValidationError( - f"No storage destination configured on rule {rule.key}" + try: + datasets = DatasetConfig.all(db=session) + dataset_graphs = [dataset_config.get_graph() for dataset_config in datasets] + dataset_graph = DatasetGraph(*dataset_graphs) + identity_data = privacy_request.get_cached_identity_data() + connection_configs = ConnectionConfig.all(db=session) + + if ( + not from_step == PausedStep.erasure + ): # Skip if we're resuming from erasure step + access_result: Dict[str, List[Row]] = run_access_request( + privacy_request=privacy_request, + policy=policy, + graph=dataset_graph, + connection_configs=connection_configs, + identity=identity_data, ) - target_categories: Set[str] = { - target.data_category for target in rule.targets - } - filtered_results = filter_data_categories( - access_result, - target_categories, - dataset_graph.data_category_field_mapping, - ) - logging.info( - f"Starting access request upload for rule {rule.key} for privacy request {privacy_request.id}" - ) - try: - upload( - db=session, - request_id=privacy_request.id, - data=filtered_results, - storage_key=rule.storage_destination.key, + + upload_access_results( + session, + policy, + access_result, + dataset_graph, + privacy_request, ) - except common_exceptions.StorageUploadError as exc: - logging.error( - f"Error uploading subject access data for rule {rule.key} on policy {policy.key} and privacy request {privacy_request.id} : {exc}" + + if policy.get_rules_for_action(action_type=ActionType.erasure): + # We only need to run the erasure once until masking strategies are handled + run_erasure( + privacy_request=privacy_request, + policy=policy, + graph=dataset_graph, + connection_configs=connection_configs, + identity=identity_data, + access_request_data=get_cached_data_for_erasures( + privacy_request.id + ), ) - privacy_request.status = PrivacyRequestStatus.error - def dry_run(self, privacy_request: PrivacyRequest) -> None: - """Pretend to dispatch privacy_request into the execution layer, return the query plan""" + except PrivacyRequestPaused as exc: + privacy_request.pause_processing(session) + _log_warning(exc, config.dev_mode) + session.close() + return + + except BaseException as exc: # pylint: disable=broad-except + privacy_request.error_processing(db=session) + # If dev mode, log traceback + _log_exception(exc, config.dev_mode) + session.close() + return + + # Run post-execution webhooks + proceed = run_webhooks_and_report_status( + db=session, + privacy_request=privacy_request, + webhook_cls=PolicyPostWebhook, + ) + if not proceed: + session.close() + return + + privacy_request.finished_processing_at = datetime.utcnow() + privacy_request.status = PrivacyRequestStatus.complete + privacy_request.save(db=session) + logging.info(f"Privacy request {privacy_request.id} run completed.") + session.close() def initiate_paused_privacy_request_followup(privacy_request: PrivacyRequest) -> None: diff --git a/src/fidesops/tasks/__init__.py b/src/fidesops/tasks/__init__.py index 08d60f0b9..3dc16da95 100644 --- a/src/fidesops/tasks/__init__.py +++ b/src/fidesops/tasks/__init__.py @@ -1,9 +1,33 @@ from celery import Celery +from celery.utils.log import get_task_logger -app = Celery("tasks") -app.config_from_object("fidesops.core.config", namespace="EXECUTION") -app.autodiscover_tasks(["fidesops.tasks", "fidesops.tasks.scheduled"]) +from fidesops.core.config import config + +logger = get_task_logger(__name__) + + +def _create_celery() -> Celery: + """ + Returns a configured version of the Celery application + """ + logger.info("Creating Celery app...") + app = Celery(__name__) + app.conf.update(broker_url=config.execution.CELERY_BROKER_URL) + app.conf.update(result_backend=config.execution.CELERY_RESULT_BACKEND) + logger.info("Autodiscovering tasks...") + app.autodiscover_tasks( + [ + "fidesops.tasks", + "fidesops.tasks.scheduled", + "fidesops.service.privacy_request", + ] + ) + return app + + +celery_app = _create_celery() if __name__ == "__main__": - app.worker_main() + logger.info("Running Celery worker...") + celery_app.worker_main() diff --git a/src/fidesops/util/async_util.py b/src/fidesops/util/async_util.py deleted file mode 100644 index 7d52793a6..000000000 --- a/src/fidesops/util/async_util.py +++ /dev/null @@ -1,30 +0,0 @@ -import asyncio -import logging -from asyncio import AbstractEventLoop -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Awaitable, Callable, Optional, TypeVar - -logger = logging.getLogger(__name__) -T = TypeVar("T") - - -executor = ThreadPoolExecutor(max_workers=2) - - -def _loop() -> AbstractEventLoop: - """Return the event loop""" - asyncio.set_event_loop(asyncio.SelectorEventLoop()) - return asyncio.get_event_loop() - - -def run_async(task: Callable[[Any], T], *args: Any) -> Awaitable[T]: - """Run a callable async""" - if not callable(task): - raise TypeError("Task must be a callable") - return _loop().run_in_executor(executor, task, *args) - - -def wait_for(t: Awaitable[T]) -> Optional[T]: - """Wait for the return of a callable. This is mostly intended - to be used for testing async tasks.""" - return asyncio.get_event_loop().run_until_complete(t) diff --git a/src/fidesops/util/cache.py b/src/fidesops/util/cache.py index 735214c6c..4cfc5fa91 100644 --- a/src/fidesops/util/cache.py +++ b/src/fidesops/util/cache.py @@ -147,3 +147,7 @@ def get_all_cache_keys_for_privacy_request(privacy_request_id: str) -> List[Any] return cache.keys(f"{privacy_request_id}-*") + cache.keys( f"id-{privacy_request_id}-*" ) + + +def get_async_task_tracking_cache_key(privacy_request_id: str) -> str: + return f"id-{privacy_request_id}-async-execution" diff --git a/tests/api/v1/endpoints/test_drp_endpoints.py b/tests/api/v1/endpoints/test_drp_endpoints.py index f339c3195..eb6aa648b 100644 --- a/tests/api/v1/endpoints/test_drp_endpoints.py +++ b/tests/api/v1/endpoints/test_drp_endpoints.py @@ -30,7 +30,7 @@ def url(self) -> str: return V1_URL_PREFIX + DRP_EXERCISE @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_drp_privacy_request( self, @@ -93,7 +93,7 @@ def test_create_drp_privacy_request( assert run_access_request_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_drp_privacy_request_unsupported_identity_props( self, diff --git a/tests/api/v1/endpoints/test_privacy_request_endpoints.py b/tests/api/v1/endpoints/test_privacy_request_endpoints.py index eabb342de..5128b9098 100644 --- a/tests/api/v1/endpoints/test_privacy_request_endpoints.py +++ b/tests/api/v1/endpoints/test_privacy_request_endpoints.py @@ -78,7 +78,7 @@ def url(self, oauth_client: ClientDetail, policy) -> str: return V1_URL_PREFIX + PRIVACY_REQUESTS @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request( self, @@ -104,7 +104,7 @@ def test_create_privacy_request( assert run_access_request_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_require_manual_approval( self, @@ -135,7 +135,7 @@ def test_create_privacy_request_require_manual_approval( config.execution.REQUIRE_MANUAL_REQUEST_APPROVAL = False @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_with_masking_configuration( self, @@ -190,11 +190,11 @@ def test_create_privacy_request_limit_exceeded( ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_starts_processing( self, - start_processing_mock, + run_privacy_request_mock, url, api_client: TestClient, db, @@ -208,15 +208,14 @@ def test_create_privacy_request_starts_processing( } ] resp = api_client.post(url, json=data) + assert run_privacy_request_mock.called assert resp.status_code == 200 - assert start_processing_mock.called - response_data = resp.json()["succeeded"] pr = PrivacyRequest.get(db=db, id=response_data[0]["id"]) pr.delete(db=db) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_with_external_id( self, @@ -235,7 +234,7 @@ def test_create_privacy_request_with_external_id( "identity": {"email": "test@example.com"}, } ] - resp = api_client.post(V1_URL_PREFIX + PRIVACY_REQUESTS, json=data) + resp = api_client.post(url, json=data) assert resp.status_code == 200 response_data = resp.json()["succeeded"] assert len(response_data) == 1 @@ -246,7 +245,7 @@ def test_create_privacy_request_with_external_id( assert run_access_request_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_caches_identity( self, @@ -279,7 +278,7 @@ def test_create_privacy_request_caches_identity( assert run_access_request_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_caches_masking_secrets( self, @@ -328,7 +327,7 @@ def test_create_privacy_request_invalid_encryption_values( assert resp.json()["detail"][0]["msg"] == "Encryption key must be 16 bytes long" @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_create_privacy_request_caches_encryption_keys( self, @@ -382,6 +381,29 @@ def test_create_privacy_request_no_identities( response_data = resp.json()["failed"] assert len(response_data) == 1 + def test_create_privacy_request_registers_async_task( + self, + db, + url, + api_client, + policy, + ): + data = [ + { + "requested_at": "2021-08-30T16:09:37.359Z", + "policy_key": policy.key, + "identity": {"email": "test@example.com"}, + } + ] + resp = api_client.post(url, json=data) + assert resp.status_code == 200 + response_data = resp.json()["succeeded"] + assert len(response_data) == 1 + pr = PrivacyRequest.get(db=db, id=response_data[0]["id"]) + assert pr.get_cached_task_id() is not None + assert pr.get_async_execution_task() is not None + pr.delete(db=db) + class TestGetPrivacyRequests: @pytest.fixture(scope="function") @@ -1390,7 +1412,7 @@ def test_approve_privacy_request_bad_scopes( assert response.status_code == 403 @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_approve_privacy_request_does_not_exist( self, submit_mock, db, url, api_client, generate_auth_header, privacy_request @@ -1411,7 +1433,7 @@ def test_approve_privacy_request_does_not_exist( assert not submit_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_approve_completed_privacy_request( self, submit_mock, db, url, api_client, generate_auth_header, privacy_request @@ -1432,7 +1454,7 @@ def test_approve_completed_privacy_request( assert not submit_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_approve_privacy_request_no_user_on_client( self, @@ -1463,7 +1485,7 @@ def test_approve_privacy_request_no_user_on_client( assert submit_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_approve_privacy_request( self, @@ -1517,7 +1539,7 @@ def test_deny_privacy_request_bad_scopes( assert response.status_code == 403 @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_deny_privacy_request_does_not_exist( self, submit_mock, db, url, api_client, generate_auth_header, privacy_request @@ -1538,7 +1560,7 @@ def test_deny_privacy_request_does_not_exist( assert not submit_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_deny_completed_privacy_request( self, submit_mock, db, url, api_client, generate_auth_header, privacy_request @@ -1559,7 +1581,7 @@ def test_deny_completed_privacy_request( assert not submit_mock.called @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_deny_privacy_request_without_denial_reason( self, @@ -1605,7 +1627,7 @@ def test_deny_privacy_request_without_denial_reason( assert not submit_mock.called # Shouldn't run! Privacy request was denied @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_deny_privacy_request_with_denial_reason( self, @@ -1750,7 +1772,7 @@ def test_resume_privacy_request_not_paused( assert response.status_code == 400 @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_resume_privacy_request( self, @@ -1892,7 +1914,7 @@ def test_resume_with_manual_input_invalid_data( ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) @pytest.mark.usefixtures( "postgres_example_test_dataset_config", "manual_dataset_config" @@ -2065,7 +2087,7 @@ def test_resume_still_paused_at_access_request( "postgres_example_test_dataset_config", "manual_dataset_config" ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_resume_with_manual_count( self, @@ -2141,7 +2163,7 @@ def test_restart_from_failure_no_stopped_collection( ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.submit" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_restart_from_failure( self, submit_mock, api_client, url, generate_auth_header, db, privacy_request @@ -2161,4 +2183,8 @@ def test_restart_from_failure( db.refresh(privacy_request) assert privacy_request.status == PrivacyRequestStatus.in_processing - submit_mock.assert_called_with(from_step=PausedStep.access) + submit_mock.assert_called_with( + privacy_request_id=privacy_request.id, + from_step=PausedStep.access.value, + from_webhook_id=None, + ) diff --git a/tests/conftest.py b/tests/conftest.py index d84440d02..c1f7589a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -165,6 +165,27 @@ def celery_enable_logging(): return True +@pytest.fixture(autouse=True, scope="session") +def celery_use_virtual_worker(celery_session_worker): + """ + This is a catch-all fixture that forces all of our + tests to use a virtual celery worker if a registered + task is executed within the scope of the test. + """ + yield celery_session_worker + + +@pytest.fixture(scope="session") +def run_privacy_request_task(celery_session_app): + """ + This fixture is the version of the run_privacy_request task that is + registered to the `celery_app` fixture which uses the virtualised `celery_worker` + """ + yield celery_session_app.tasks[ + "fidesops.service.privacy_request.request_runner_service.run_privacy_request" + ] + + @pytest.fixture(autouse=True, scope="session") def analytics_opt_out(): """Disable sending analytics when running tests.""" diff --git a/tests/fixtures/application_fixtures.py b/tests/fixtures/application_fixtures.py index b113d1ad2..db16e3f62 100644 --- a/tests/fixtures/application_fixtures.py +++ b/tests/fixtures/application_fixtures.py @@ -45,8 +45,6 @@ from fidesops.service.masking.strategy.masking_strategy_string_rewrite import ( STRING_REWRITE_STRATEGY_NAME, ) -from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner -from fidesops.util.cache import FidesopsRedis from fidesops.util.data_category import DataCategory logging.getLogger("faker").setLevel(logging.ERROR) @@ -1017,17 +1015,6 @@ def example_invalid_yaml_dataset() -> str: return load_dataset_as_string(example_filename) -@pytest.fixture -def privacy_request_runner( - cache: FidesopsRedis, - privacy_request: PrivacyRequest, -) -> Generator: - yield PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ) - - @pytest.fixture(scope="function") def sample_data(): return { diff --git a/tests/integration_tests/test_execution.py b/tests/integration_tests/test_execution.py index 103ab67c1..c9864ca7a 100644 --- a/tests/integration_tests/test_execution.py +++ b/tests/integration_tests/test_execution.py @@ -65,8 +65,8 @@ def test_delete_collection_before_new_request( self, db, policy, - cache, read_connection_config, + run_privacy_request_task, ) -> None: """Delete the connection config before execution starts which also deletes its dataset config. The graph is built with nothing in it, and no results are returned. @@ -79,11 +79,21 @@ def test_delete_collection_before_new_request( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) assert pr.get_results() != {} read_connection_config.delete(db) - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) assert pr.get_results() == {} @mock.patch("fidesops.task.graph_task.GraphTask.log_start") @@ -180,6 +190,7 @@ def test_collection_omitted_on_restart_from_failure( integration_mongodb_config, mongo_postgres_dataset_graph, example_datasets, + run_privacy_request_task, ) -> None: """Remove secrets to make privacy request fail, then delete the connection config. Build a graph that does not contain the deleted dataset config and re-run.""" @@ -272,8 +283,8 @@ def test_delete_connection_config_on_completed_request( self, db, policy, - cache, read_connection_config, + run_privacy_request_task, ) -> None: """Delete the connection config on a completed request leaves execution logs untouched""" customer_email = "customer-1@example.com" @@ -283,7 +294,12 @@ def test_delete_connection_config_on_completed_request( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) assert pr.get_results() != {} logs = get_sorted_execution_logs(db, pr) assert len(logs) == 22 @@ -530,8 +546,8 @@ def test_disable_connection_config_on_completed_request( self, db, policy, - cache, read_connection_config, + run_privacy_request_task, ) -> None: """Disabling the connection config on a completed request leaves execution logs untouched""" customer_email = "customer-1@example.com" @@ -541,7 +557,12 @@ def test_disable_connection_config_on_completed_request( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) assert pr.get_results() != {} logs = get_sorted_execution_logs(db, pr) assert len(logs) == 22 diff --git a/tests/service/privacy_request/onetrust_test.py b/tests/service/privacy_request/onetrust_test.py index fdcc4ceaa..fdc520fc6 100644 --- a/tests/service/privacy_request/onetrust_test.py +++ b/tests/service/privacy_request/onetrust_test.py @@ -40,7 +40,7 @@ "fidesops.service.privacy_request.onetrust_service.OneTrustService._get_all_subtasks" ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_intake_onetrust_requests_success( finish_processing_mock: Mock, @@ -133,7 +133,7 @@ def test_intake_onetrust_requests_success( "fidesops.service.privacy_request.onetrust_service.OneTrustService._get_all_subtasks" ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_intake_onetrust_requests_no_config( finish_processing_mock: Mock, @@ -190,7 +190,7 @@ def test_intake_onetrust_requests_no_config( "fidesops.service.privacy_request.onetrust_service.OneTrustService._get_all_subtasks" ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_intake_onetrust_requests_no_policy( finish_processing_mock: Mock, @@ -249,7 +249,7 @@ def test_intake_onetrust_requests_no_policy( "fidesops.service.privacy_request.onetrust_service.OneTrustService._get_all_subtasks" ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_intake_onetrust_requests_auth_fail( finish_processing_mock: Mock, @@ -310,7 +310,7 @@ def test_intake_onetrust_requests_auth_fail( "fidesops.service.privacy_request.onetrust_service.OneTrustService._get_all_subtasks" ) @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run" + "fidesops.service.privacy_request.request_runner_service.run_privacy_request.delay" ) def test_intake_onetrust_requests_no_fides_tasks( finish_processing_mock: Mock, diff --git a/tests/service/privacy_request/request_runner_service_test.py b/tests/service/privacy_request/request_runner_service_test.py index 1cb9ce44c..48beeecf7 100644 --- a/tests/service/privacy_request/request_runner_service_test.py +++ b/tests/service/privacy_request/request_runner_service_test.py @@ -38,49 +38,69 @@ MaskingStrategyFactory, ) from fidesops.service.masking.strategy.masking_strategy_hmac import HmacMaskingStrategy -from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner -from fidesops.util.async_util import wait_for +from fidesops.service.privacy_request.request_runner_service import ( + run_privacy_request, + run_webhooks_and_report_status, +) from fidesops.util.data_category import DataCategory +PRIVACY_REQUEST_TASK_TIMEOUT = 2 +# External services take much longer to return +PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL = 15 + @mock.patch("fidesops.service.privacy_request.request_runner_service.upload") def test_policy_upload_called( upload_mock: Mock, - db: Session, - privacy_request: PrivacyRequest, - privacy_request_runner: PrivacyRequestRunner, + privacy_request_status_pending: PrivacyRequest, + run_privacy_request_task, ) -> None: - wait_for(privacy_request_runner.submit()) + run_privacy_request_task.delay(privacy_request_status_pending.id).get( + timeout=PRIVACY_REQUEST_TASK_TIMEOUT + ) assert upload_mock.called def test_start_processing_sets_started_processing_at( db: Session, - privacy_request: PrivacyRequest, - privacy_request_runner: PrivacyRequestRunner, + privacy_request_status_pending: PrivacyRequest, + run_privacy_request_task, ) -> None: - privacy_request.started_processing_at = None - wait_for(privacy_request_runner.submit()) - + assert privacy_request_status_pending.started_processing_at is None + run_privacy_request_task.delay(privacy_request_status_pending.id).get( + timeout=PRIVACY_REQUEST_TASK_TIMEOUT + ) _sessionmaker = get_db_session() db = _sessionmaker() - privacy_request = PrivacyRequest.get(db=db, id=privacy_request.id) - assert privacy_request.started_processing_at is not None + assert ( + PrivacyRequest.get( + db=db, id=privacy_request_status_pending.id + ).started_processing_at + is not None + ) def test_start_processing_doesnt_overwrite_started_processing_at( db: Session, privacy_request: PrivacyRequest, - privacy_request_runner: PrivacyRequestRunner, + run_privacy_request_task, ) -> None: before = privacy_request.started_processing_at - wait_for(privacy_request_runner.submit()) + assert before is not None + + run_privacy_request_task.delay(privacy_request.id).get( + timeout=PRIVACY_REQUEST_TASK_TIMEOUT + ) + + _sessionmaker = get_db_session() + db = _sessionmaker() + privacy_request = PrivacyRequest.get(db=db, id=privacy_request.id) assert privacy_request.started_processing_at == before @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run_webhooks_and_report_status", + "fidesops.service.privacy_request.request_runner_service.run_webhooks_and_report_status", ) @mock.patch( "fidesops.service.privacy_request.request_runner_service.run_access_request" @@ -92,15 +112,17 @@ def test_from_graph_resume_does_not_run_pre_webhooks( run_webhooks, db: Session, privacy_request: PrivacyRequest, - privacy_request_runner: PrivacyRequestRunner, + run_privacy_request_task, erasure_policy, ) -> None: privacy_request.started_processing_at = None privacy_request.policy = erasure_policy privacy_request.save(db) - privacy_request.started_processing_at = None - wait_for(privacy_request_runner.submit(from_step=PausedStep.access)) + run_privacy_request_task.delay( + privacy_request_id=privacy_request.id, + from_step=PausedStep.access.value, + ).get(timeout=PRIVACY_REQUEST_TASK_TIMEOUT) _sessionmaker = get_db_session() db = _sessionmaker() @@ -117,7 +139,7 @@ def test_from_graph_resume_does_not_run_pre_webhooks( @mock.patch( - "fidesops.service.privacy_request.request_runner_service.PrivacyRequestRunner.run_webhooks_and_report_status", + "fidesops.service.privacy_request.request_runner_service.run_webhooks_and_report_status", ) @mock.patch( "fidesops.service.privacy_request.request_runner_service.run_access_request" @@ -129,14 +151,17 @@ def test_resume_privacy_request_from_erasure( run_webhooks, db: Session, privacy_request: PrivacyRequest, - privacy_request_runner: PrivacyRequestRunner, + run_privacy_request_task, erasure_policy, ) -> None: privacy_request.started_processing_at = None privacy_request.policy = erasure_policy privacy_request.save(db) - wait_for(privacy_request_runner.submit(from_step=PausedStep.erasure)) + run_privacy_request_task.delay( + privacy_request_id=privacy_request.id, + from_step=PausedStep.erasure.value, + ).get(timeout=PRIVACY_REQUEST_TASK_TIMEOUT) _sessionmaker = get_db_session() db = _sessionmaker() @@ -155,8 +180,9 @@ def test_resume_privacy_request_from_erasure( def get_privacy_request_results( db, policy, - cache, + run_privacy_request_task, privacy_request_data: Dict[str, Any], + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT, ) -> PrivacyRequest: """Utility method to run a privacy request and return results after waiting for the returned future.""" @@ -198,11 +224,8 @@ def get_privacy_request_results( for masking_secret in masking_secrets: privacy_request.cache_masking_secret(masking_secret) - wait_for( - PrivacyRequestRunner( - cache=cache, - privacy_request=privacy_request, - ).submit() + run_privacy_request_task.delay(privacy_request.id).get( + timeout=task_timeout, ) return PrivacyRequest.get(db=db, id=privacy_request.id) @@ -220,6 +243,7 @@ def test_create_and_process_access_request( policy, policy_pre_execution_webhooks, policy_post_execution_webhooks, + run_privacy_request_task, ): customer_email = "customer-1@example.com" data = { @@ -228,7 +252,12 @@ def test_create_and_process_access_request( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) results = pr.get_results() assert len(results.keys()) == 11 @@ -271,6 +300,7 @@ def test_create_and_process_access_request_mssql( policy, policy_pre_execution_webhooks, policy_post_execution_webhooks, + run_privacy_request_task, ): customer_email = "customer-1@example.com" @@ -280,7 +310,12 @@ def test_create_and_process_access_request_mssql( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) results = pr.get_results() assert len(results.keys()) == 11 @@ -311,6 +346,7 @@ def test_create_and_process_access_request_mysql( policy, policy_pre_execution_webhooks, policy_post_execution_webhooks, + run_privacy_request_task, ): customer_email = "customer-1@example.com" @@ -320,7 +356,12 @@ def test_create_and_process_access_request_mysql( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) results = pr.get_results() assert len(results.keys()) == 11 @@ -352,6 +393,7 @@ def test_create_and_process_access_request_mariadb( policy, policy_pre_execution_webhooks, policy_post_execution_webhooks, + run_privacy_request_task, ): customer_email = "customer-1@example.com" @@ -361,7 +403,12 @@ def test_create_and_process_access_request_mariadb( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) results = pr.get_results() assert len(results.keys()) == 11 @@ -394,6 +441,7 @@ def test_create_and_process_access_request_saas_mailchimp( policy_pre_execution_webhooks, policy_post_execution_webhooks, mailchimp_identity_email, + run_privacy_request_task, ): customer_email = mailchimp_identity_email data = { @@ -402,7 +450,13 @@ def test_create_and_process_access_request_saas_mailchimp( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) results = pr.get_results() assert len(results.keys()) == 3 @@ -433,6 +487,7 @@ def test_create_and_process_erasure_request_saas( generate_auth_header, mailchimp_identity_email, reset_mailchimp_data, + run_privacy_request_task, ): customer_email = mailchimp_identity_email data = { @@ -441,7 +496,13 @@ def test_create_and_process_erasure_request_saas( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy_hmac, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy_hmac, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) connector = SaaSConnector(mailchimp_connection_config) request: SaaSRequestParams = SaaSRequestParams( @@ -485,6 +546,7 @@ def test_create_and_process_access_request_saas_hubspot( policy_pre_execution_webhooks, policy_post_execution_webhooks, hubspot_identity_email, + run_privacy_request_task, ): customer_email = hubspot_identity_email data = { @@ -493,7 +555,13 @@ def test_create_and_process_access_request_saas_hubspot( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) results = pr.get_results() assert len(results.keys()) == 3 @@ -521,6 +589,7 @@ def test_create_and_process_erasure_request_specific_category_postgres( generate_auth_header, erasure_policy, read_connection_config, + run_privacy_request_task, ): customer_email = "customer-1@example.com" customer_id = 1 @@ -533,7 +602,12 @@ def test_create_and_process_erasure_request_specific_category_postgres( stmt = select("*").select_from(table("customer")) res = postgres_integration_db.execute(stmt).all() - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -560,6 +634,7 @@ def test_create_and_process_erasure_request_specific_category_mssql( db, generate_auth_header, erasure_policy, + run_privacy_request_task, ): customer_email = "customer-1@example.com" customer_id = 1 @@ -569,7 +644,12 @@ def test_create_and_process_erasure_request_specific_category_mssql( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -596,6 +676,7 @@ def test_create_and_process_erasure_request_specific_category_mysql( db, generate_auth_header, erasure_policy, + run_privacy_request_task, ): customer_email = "customer-1@example.com" customer_id = 1 @@ -605,7 +686,12 @@ def test_create_and_process_erasure_request_specific_category_mysql( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -632,6 +718,7 @@ def test_create_and_process_erasure_request_specific_category_mariadb( db, generate_auth_header, erasure_policy, + run_privacy_request_task, ): customer_email = "customer-1@example.com" customer_id = 1 @@ -641,7 +728,12 @@ def test_create_and_process_erasure_request_specific_category_mariadb( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -668,6 +760,7 @@ def test_create_and_process_erasure_request_generic_category( db, generate_auth_header, erasure_policy, + run_privacy_request_task, ): # It's safe to change this here since the `erasure_policy` fixture is scoped # at function level @@ -683,7 +776,12 @@ def test_create_and_process_erasure_request_generic_category( "identity": {"email": email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -717,6 +815,7 @@ def test_create_and_process_erasure_request_aes_generic_category( db, generate_auth_header, erasure_policy_aes, + run_privacy_request_task, ): # It's safe to change this here since the `erasure_policy` fixture is scoped # at function level @@ -732,7 +831,12 @@ def test_create_and_process_erasure_request_aes_generic_category( "identity": {"email": email}, } - pr = get_privacy_request_results(db, erasure_policy_aes, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy_aes, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -767,6 +871,7 @@ def test_create_and_process_erasure_request_with_table_joins( db, cache, erasure_policy, + run_privacy_request_task, ): # It's safe to change this here since the `erasure_policy` fixture is scoped # at function level @@ -782,7 +887,12 @@ def test_create_and_process_erasure_request_with_table_joins( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) pr.delete(db=db) stmt = select( @@ -813,6 +923,7 @@ def test_create_and_process_erasure_request_read_access( db, cache, erasure_policy, + run_privacy_request_task, ): customer_email = "customer-2@example.com" customer_id = 2 @@ -822,7 +933,12 @@ def test_create_and_process_erasure_request_read_access( "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + ) errored_execution_logs = pr.execution_logs.filter_by(status="error") assert errored_execution_logs.count() == 9 assert ( @@ -892,6 +1008,7 @@ def test_create_and_process_access_request_snowflake( db, cache, policy, + run_privacy_request_task, ): customer_email = snowflake_resources["email"] customer_name = snowflake_resources["name"] @@ -900,7 +1017,13 @@ def test_create_and_process_access_request_snowflake( "policy_key": policy.key, "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) results = pr.get_results() customer_table_key = ( f"EN_{pr.id}__access_request__snowflake_example_test_dataset:customer" @@ -921,6 +1044,7 @@ def test_create_and_process_erasure_request_snowflake( db, cache, erasure_policy, + run_privacy_request_task, ): customer_email = snowflake_resources["email"] snowflake_client = snowflake_resources["client"] @@ -930,7 +1054,13 @@ def test_create_and_process_erasure_request_snowflake( "policy_key": erasure_policy.key, "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) pr.delete(db=db) stmt = f'select "name", "variant_eg" from "customer" where "email" = {formatted_customer_email};' @@ -996,10 +1126,7 @@ def redshift_resources( @pytest.mark.integration_external @pytest.mark.integration_redshift def test_create_and_process_access_request_redshift( - redshift_resources, - db, - cache, - policy, + redshift_resources, db, cache, policy, run_privacy_request_task ): customer_email = redshift_resources["email"] customer_name = redshift_resources["name"] @@ -1008,7 +1135,13 @@ def test_create_and_process_access_request_redshift( "policy_key": policy.key, "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) results = pr.get_results() customer_table_key = ( f"EN_{pr.id}__access_request__redshift_example_test_dataset:customer" @@ -1039,6 +1172,7 @@ def test_create_and_process_erasure_request_redshift( db, cache, erasure_policy, + run_privacy_request_task, ): customer_email = redshift_resources["email"] data = { @@ -1048,7 +1182,13 @@ def test_create_and_process_erasure_request_redshift( } # Should erase customer name - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) pr.delete(db=db) connector = redshift_resources["connector"] @@ -1073,7 +1213,13 @@ def test_create_and_process_erasure_request_redshift( target.save(db=db) # Should erase state fields on address table - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) pr.delete(db=db) connector = redshift_resources["connector"] @@ -1097,6 +1243,7 @@ def test_create_and_process_access_request_bigquery( db, cache, policy, + run_privacy_request_task, ): customer_email = bigquery_resources["email"] customer_name = bigquery_resources["name"] @@ -1105,7 +1252,13 @@ def test_create_and_process_access_request_bigquery( "policy_key": policy.key, "identity": {"email": customer_email}, } - pr = get_privacy_request_results(db, policy, cache, data) + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) results = pr.get_results() customer_table_key = ( f"EN_{pr.id}__access_request__bigquery_example_test_dataset:customer" @@ -1136,6 +1289,7 @@ def test_create_and_process_erasure_request_bigquery( db, cache, erasure_policy, + run_privacy_request_task, ): customer_email = bigquery_resources["email"] data = { @@ -1145,7 +1299,13 @@ def test_create_and_process_erasure_request_bigquery( } # Should erase customer name - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) pr.delete(db=db) bigquery_client = bigquery_resources["client"] @@ -1168,7 +1328,13 @@ def test_create_and_process_erasure_request_bigquery( target.save(db=db) # Should erase state fields on address table - pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr = get_privacy_request_results( + db, + erasure_policy, + run_privacy_request_task, + data, + task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL, + ) pr.delete(db=db) bigquery_client = bigquery_resources["client"] @@ -1183,23 +1349,20 @@ def test_create_and_process_erasure_request_bigquery( assert row.state is None -class TestPrivacyRequestRunnerRunWebhooks: +class TestRunPrivacyRequestRunsWebhooks: @mock.patch("fidesops.models.privacy_request.PrivacyRequest.trigger_policy_webhook") def test_run_webhooks_halt_received( self, mock_trigger_policy_webhook, db, privacy_request, - privacy_request_runner, policy_pre_execution_webhooks, ): mock_trigger_policy_webhook.side_effect = PrivacyRequestPaused( "Request received to halt" ) - proceed = privacy_request_runner.run_webhooks_and_report_status( - db, privacy_request, PolicyPreWebhook - ) + proceed = run_webhooks_and_report_status(db, privacy_request, PolicyPreWebhook) assert not proceed assert privacy_request.finished_processing_at is None assert privacy_request.status == PrivacyRequestStatus.paused @@ -1211,7 +1374,6 @@ def test_run_webhooks_ap_scheduler_cleanup( mock_trigger_policy_webhook, db, privacy_request, - privacy_request_runner, policy_pre_execution_webhooks, ): config.redis.DEFAULT_TTL_SECONDS = ( @@ -1221,9 +1383,7 @@ def test_run_webhooks_ap_scheduler_cleanup( "Request received to halt" ) - proceed = privacy_request_runner.run_webhooks_and_report_status( - db, privacy_request, PolicyPreWebhook - ) + proceed = run_webhooks_and_report_status(db, privacy_request, PolicyPreWebhook) assert not proceed time.sleep(3) @@ -1239,16 +1399,13 @@ def test_run_webhooks_client_error( mock_trigger_policy_webhook, db, privacy_request, - privacy_request_runner, policy_pre_execution_webhooks, ): mock_trigger_policy_webhook.side_effect = ClientUnsuccessfulException( status_code=500 ) - proceed = privacy_request_runner.run_webhooks_and_report_status( - db, privacy_request, PolicyPreWebhook - ) + proceed = run_webhooks_and_report_status(db, privacy_request, PolicyPreWebhook) assert not proceed assert privacy_request.status == PrivacyRequestStatus.error assert privacy_request.finished_processing_at is not None @@ -1260,16 +1417,13 @@ def test_run_webhooks_validation_error( mock_trigger_policy_webhook, db, privacy_request, - privacy_request_runner, policy_pre_execution_webhooks, ): mock_trigger_policy_webhook.side_effect = ValidationError( errors={}, model=SecondPartyResponseFormat ) - proceed = privacy_request_runner.run_webhooks_and_report_status( - db, privacy_request, PolicyPreWebhook - ) + proceed = run_webhooks_and_report_status(db, privacy_request, PolicyPreWebhook) assert not proceed assert privacy_request.finished_processing_at is not None assert privacy_request.status == PrivacyRequestStatus.error @@ -1281,13 +1435,10 @@ def test_run_webhooks( mock_trigger_policy_webhook, db, privacy_request, - privacy_request_runner, policy_pre_execution_webhooks, ): - proceed = privacy_request_runner.run_webhooks_and_report_status( - db, privacy_request, PolicyPreWebhook - ) + proceed = run_webhooks_and_report_status(db, privacy_request, PolicyPreWebhook) assert proceed assert privacy_request.status == PrivacyRequestStatus.in_processing assert privacy_request.finished_processing_at is None @@ -1299,11 +1450,10 @@ def test_run_webhooks_after_webhook( mock_trigger_policy_webhook, db, privacy_request, - privacy_request_runner, policy_pre_execution_webhooks, ): """Test running webhooks after specific webhook - for when we're resuming privacy request execution""" - proceed = privacy_request_runner.run_webhooks_and_report_status( + proceed = run_webhooks_and_report_status( db, privacy_request, PolicyPreWebhook, policy_pre_execution_webhooks[0].id ) assert proceed diff --git a/tests/tasks/test_celery.py b/tests/tasks/test_celery.py index fe5d80286..6348e22e1 100644 --- a/tests/tasks/test_celery.py +++ b/tests/tasks/test_celery.py @@ -1,10 +1,10 @@ -def test_create_task(celery_app, celery_worker): - @celery_app.task +def test_create_task(celery_session_app, celery_session_worker): + @celery_session_app.task def multiply(x, y): return x * y # Force `celery_app` to register our new task # See: https://github.com/celery/celery/issues/3642#issuecomment-369057682 - celery_worker.reload() + celery_session_worker.reload() assert multiply.run(4, 4) == 16 assert multiply.delay(4, 4).get(timeout=10) == 16