Skip to content
This repository has been archived by the owner on Nov 30, 2022. It is now read-only.

Commit

Permalink
Execute Privacy Requests with Celery (#621)
Browse files Browse the repository at this point in the history
 Updates the way privacy requests are dispatched into processing from a background process into a Celery task
  • Loading branch information
Sean Preston authored Jun 22, 2022
1 parent 24d41e8 commit f548b25
Show file tree
Hide file tree
Showing 21 changed files with 664 additions and 425 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ The types of changes are:
* Bumped Python to version 3.9.13 in the `Dockerfile` [#630](https://github.com/ethyca/fidesops/pull/630)
* Matched the path to the migrations in the mypy settings with the new location [#634](https://github.com/ethyca/fidesops/pull/634)
* Sort ConnectionConfig by name ascending [#668](https://github.com/ethyca/fidesops/pull/672)
* Install MSSQL By Default [#664] (https://github.com/ethyca/fidesops/pull/664)
* Install MSSQL By Default [#664](https://github.com/ethyca/fidesops/pull/664)
* [Admin UI] Change "Policy Name" to "Request Type" on SR list page.[#546](https://github.com/ethyca/fidesops/pull/696)
* Queue PrivacyRequests into a Celery queue for execution [#621](https://github.com/ethyca/fidesops/pull/621)

### Developer Experience

Expand All @@ -70,6 +71,9 @@ The types of changes are:
* The `[package]` config section no longer exists [#620](https://github.com/ethyca/fidesops/pull/620)


### Changed
* Process privacy requests as Celery tasks and not background processes [#621](https://github.com/ethyca/fidesops/pull/621)

## [1.5.2](https://github.com/ethyca/fidesops/compare/1.5.1...1.5.2)

### Added
Expand Down
2 changes: 0 additions & 2 deletions data/config/fidesops.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ DRP_JWT_SECRET = "testdrpsecret"
LOG_LEVEL = "DEBUG"

[execution]
CELERY_BROKER_URL = "redis://:testpassword@redis:6379"
CELERY_RESULT_BACKEND = "redis://:testpassword@redis:6379"
TASK_RETRY_COUNT = 0
TASK_RETRY_DELAY = 1
TASK_RETRY_BACKOFF = 1
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
context: .
dockerfile: Dockerfile
depends_on:
- celery
- db
- redis
expose:
Expand Down Expand Up @@ -62,7 +63,7 @@ services:
build:
context: .
dockerfile: Dockerfile
command: celery -A fidesops.tasks beat -l info --logfile=/var/log/celery.log
command: celery -A fidesops.tasks.celery_app worker --loglevel=info
depends_on:
redis:
condition: service_started
Expand Down
2 changes: 0 additions & 2 deletions fidesops.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ DRP_JWT_SECRET = "secret"
LOG_LEVEL = "INFO"

[execution]
CELERY_BROKER_URL = "redis://:testpassword@redis:6379"
CELERY_RESULT_BACKEND = "redis://:testpassword@redis:6379"
MASKING_STRICT = true
REQUIRE_MANUAL_REQUEST_APPROVAL = false
TASK_RETRY_COUNT = 0
Expand Down
9 changes: 4 additions & 5 deletions src/fidesops/api/v1/endpoints/drp_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
from fidesops.schemas.privacy_request import PrivacyRequestDRPStatusResponse
from fidesops.schemas.redis_cache import PrivacyRequestIdentity
from fidesops.service.drp.drp_fidesops_mapper import DrpFidesopsMapper
from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner
from fidesops.service.privacy_request.request_runner_service import (
queue_privacy_request,
)
from fidesops.service.privacy_request.request_service import (
build_required_privacy_request_kwargs,
cache_data,
Expand Down Expand Up @@ -98,10 +100,7 @@ def create_drp_privacy_request(

cache_data(privacy_request, policy, mapped_identity, None, data)

PrivacyRequestRunner(
cache=cache,
privacy_request=privacy_request,
).submit()
queue_privacy_request(privacy_request.id)

return PrivacyRequestDRPStatusResponse(
request_id=privacy_request.id,
Expand Down
74 changes: 40 additions & 34 deletions src/fidesops/api/v1/endpoints/privacy_request_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
PRIVACY_REQUEST_RETRY,
REQUEST_PREVIEW,
)
from fidesops.common_exceptions import TraversalError, ValidationError
from fidesops.common_exceptions import (
FunctionalityNotConfigured,
TraversalError,
ValidationError,
)
from fidesops.core.config import config
from fidesops.graph.config import CollectionAddress
from fidesops.graph.graph import DatasetGraph, Node
Expand Down Expand Up @@ -69,7 +73,9 @@
RowCountRequest,
StoppedCollection,
)
from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner
from fidesops.service.privacy_request.request_runner_service import (
queue_privacy_request,
)
from fidesops.service.privacy_request.request_service import (
build_required_privacy_request_kwargs,
cache_data,
Expand Down Expand Up @@ -109,7 +115,6 @@ def get_privacy_request_or_error(
)
def create_privacy_request(
*,
cache: FidesopsRedis = Depends(deps.get_cache),
db: Session = Depends(deps.get_db),
data: conlist(PrivacyRequestCreate, max_items=50) = Body(...), # type: ignore
) -> BulkPostPrivacyRequests:
Expand All @@ -119,6 +124,11 @@ def create_privacy_request(
You cannot update privacy requests after they've been created.
"""
if not config.redis.ENABLED:
raise FunctionalityNotConfigured(
"Application redis cache required, but it is currently disabled! Please update your application configuration to enable integration with a redis cache."
)

created = []
failed = []
# Optional fields to validate here are those that are both nullable in the DB, and exist
Expand Down Expand Up @@ -177,10 +187,7 @@ def create_privacy_request(
)

if not config.execution.REQUIRE_MANUAL_REQUEST_APPROVAL:
PrivacyRequestRunner(
cache=cache,
privacy_request=privacy_request,
).submit()
queue_privacy_request(privacy_request.id)

except common_exceptions.RedisConnectionError as exc:
logger.error("RedisConnectionError: %s", exc)
Expand Down Expand Up @@ -615,11 +622,10 @@ def resume_privacy_request(
privacy_request.status = PrivacyRequestStatus.in_processing
privacy_request.save(db=db)

PrivacyRequestRunner(
cache=cache,
privacy_request=privacy_request,
).submit(from_webhook=webhook)

queue_privacy_request(
privacy_request_id=privacy_request.id,
from_webhook_id=webhook.id,
)
return privacy_request


Expand Down Expand Up @@ -713,10 +719,10 @@ def resume_privacy_request_with_manual_input(
privacy_request.status = PrivacyRequestStatus.in_processing
privacy_request.save(db=db)

PrivacyRequestRunner(
cache=cache,
privacy_request=privacy_request,
).submit(from_step=paused_step)
queue_privacy_request(
privacy_request_id=privacy_request.id,
from_step=paused_step.value,
)

return privacy_request

Expand Down Expand Up @@ -820,19 +826,20 @@ def restart_privacy_request_from_failure(

privacy_request.status = PrivacyRequestStatus.in_processing
privacy_request.save(db=db)

PrivacyRequestRunner(
cache=cache,
privacy_request=privacy_request,
).submit(from_step=failed_step)
queue_privacy_request(
privacy_request_id=privacy_request.id,
from_step=failed_step.value,
)

privacy_request.cache_failed_collection_details() # Reset failed step and collection to None

return privacy_request


def review_privacy_request(
db: Session, cache: FidesopsRedis, request_ids: List[str], process_request: Callable
db: Session,
request_ids: List[str],
process_request_function: Callable,
) -> BulkReviewResponse:
"""Helper method shared between the approve and deny privacy request endpoints"""
succeeded: List[PrivacyRequest] = []
Expand All @@ -859,7 +866,7 @@ def review_privacy_request(
continue

try:
process_request(privacy_request, cache)
process_request_function(privacy_request)
except Exception:
failure = {
"message": "Privacy request could not be updated",
Expand All @@ -883,7 +890,6 @@ def review_privacy_request(
def approve_privacy_request(
*,
db: Session = Depends(deps.get_db),
cache: FidesopsRedis = Depends(deps.get_cache),
client: ClientDetail = Security(
verify_oauth_client,
scopes=[PRIVACY_REQUEST_REVIEW],
Expand All @@ -893,20 +899,19 @@ def approve_privacy_request(
"""Approve and dispatch a list of privacy requests and/or report failure"""
user_id = client.user_id

def _process_request(privacy_request: PrivacyRequest, cache: FidesopsRedis) -> None:
def _approve_request(privacy_request: PrivacyRequest) -> None:
"""Method for how to process requests - approved"""
privacy_request.status = PrivacyRequestStatus.approved
privacy_request.reviewed_at = datetime.utcnow()
privacy_request.reviewed_by = user_id
privacy_request.save(db=db)

PrivacyRequestRunner(
cache=cache,
privacy_request=privacy_request,
).submit()
queue_privacy_request(privacy_request_id=privacy_request.id)

return review_privacy_request(
db, cache, privacy_requests.request_ids, _process_request
db=db,
request_ids=privacy_requests.request_ids,
process_request_function=_approve_request,
)


Expand All @@ -918,7 +923,6 @@ def _process_request(privacy_request: PrivacyRequest, cache: FidesopsRedis) -> N
def deny_privacy_request(
*,
db: Session = Depends(deps.get_db),
cache: FidesopsRedis = Depends(deps.get_cache),
client: ClientDetail = Security(
verify_oauth_client,
scopes=[PRIVACY_REQUEST_REVIEW],
Expand All @@ -928,8 +932,8 @@ def deny_privacy_request(
"""Deny a list of privacy requests and/or report failure"""
user_id = client.user_id

def _process_denial_request(
privacy_request: PrivacyRequest, _: FidesopsRedis
def _deny_request(
privacy_request: PrivacyRequest,
) -> None:
"""Method for how to process requests - denied"""

Expand All @@ -948,5 +952,7 @@ def _process_denial_request(
privacy_request.save(db=db)

return review_privacy_request(
db, cache, privacy_requests.request_ids, _process_denial_request
db=db,
request_ids=privacy_requests.request_ids,
process_request_function=_deny_request,
)
4 changes: 2 additions & 2 deletions src/fidesops/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class ExecutionSettings(FidesSettings):
TASK_RETRY_BACKOFF: int
REQUIRE_MANUAL_REQUEST_APPROVAL: bool = False
MASKING_STRICT: bool = True
CELERY_BROKER_URL: str
CELERY_RESULT_BACKEND: str
CELERY_BROKER_URL: str = "redis://:testpassword@redis:6379/1"
CELERY_RESULT_BACKEND: str = "redis://:testpassword@redis:6379/1"

class Config:
env_prefix = "FIDESOPS__EXECUTION__"
Expand Down
22 changes: 22 additions & 0 deletions src/fidesops/models/privacy_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from enum import Enum as EnumType
from typing import Any, Dict, List, Optional

from celery.result import AsyncResult
from pydantic import root_validator
from sqlalchemy import Column, DateTime
from sqlalchemy import Enum as EnumColumn
Expand Down Expand Up @@ -40,6 +41,7 @@
from fidesops.util.cache import (
FidesopsRedis,
get_all_cache_keys_for_privacy_request,
get_async_task_tracking_cache_key,
get_cache,
get_drp_request_body_cache_key,
get_encryption_cache_key,
Expand Down Expand Up @@ -213,6 +215,26 @@ def cache_identity(self, identity: PrivacyRequestIdentity) -> None:
value,
)

def cache_task_id(self, task_id: str) -> None:
"""Sets a task_id for this privacy request's asynchronous execution."""
cache: FidesopsRedis = get_cache()
cache.set(
get_async_task_tracking_cache_key(self.id),
task_id,
)

def get_cached_task_id(self) -> Optional[str]:
"""Gets the cached task ID for this privacy request."""
cache: FidesopsRedis = get_cache()
task_id = cache.get(get_async_task_tracking_cache_key(self.id))
return task_id

def get_async_execution_task(self) -> Optional[AsyncResult]:
"""Returns a task reflecting the state of this privacy request's asynchronous execution."""
task_id = self.get_cached_task_id()
res: AsyncResult = AsyncResult(task_id)
return res

def cache_drp_request_body(self, drp_request_body: DrpPrivacyRequestCreate) -> None:
"""Sets the identity's values at their specific locations in the Fidesops app cache"""
cache: FidesopsRedis = get_cache()
Expand Down
10 changes: 4 additions & 6 deletions src/fidesops/service/privacy_request/onetrust_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
ONETRUST_GET_SUBTASKS_BY_REF_ID,
ONETRUST_PUT_SUBTASK_STATUS,
)
from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner
from fidesops.util.cache import get_cache
from fidesops.service.privacy_request.request_runner_service import (
queue_privacy_request,
)
from fidesops.util.storage_authenticator import get_onetrust_access_token

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -153,10 +154,7 @@ def _create_privacy_request( # pylint: disable=R0913
privacy_request: PrivacyRequest = PrivacyRequest.create(db=db, data=kwargs)
privacy_request.cache_identity(identity)
try:
PrivacyRequestRunner(
cache=get_cache(),
privacy_request=privacy_request,
).submit()
queue_privacy_request(privacy_request_id=privacy_request.id)
request_status = OneTrustSubtaskStatus.COMPLETED
except BaseException: # pylint: disable=W0703
request_status = OneTrustSubtaskStatus.FAILED
Expand Down
Loading

0 comments on commit f548b25

Please sign in to comment.