diff --git a/CHANGELOG.md b/CHANGELOG.md index 1068fa79d2..470857d837 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,8 @@ The types of changes are: ### Developer Experience * Replace user authentication routes with fideslib routes [#811](https://github.com/ethyca/fidesops/pull/811) -* Reduce docker image size [846](https://github.com/ethyca/fidesops/pull/846) +* Reduce docker image size [#846](https://github.com/ethyca/fidesops/pull/846) +* Load Celery config overrides from a dedicated `celery.toml` [#821](https://github.com/ethyca/fidesops/pull/821) ### Docs * Backend UI deployment [#827](https://github.com/ethyca/fidesops/pull/827) diff --git a/celery.toml b/celery.toml new file mode 100644 index 0000000000..3893545e18 --- /dev/null +++ b/celery.toml @@ -0,0 +1,2 @@ +event_queue_prefix = "fidesops_worker" +default_queue_name = "fidesops" \ No newline at end of file diff --git a/data/config/celery.toml b/data/config/celery.toml new file mode 100644 index 0000000000..05f4f95b82 --- /dev/null +++ b/data/config/celery.toml @@ -0,0 +1,2 @@ +event_queue_prefix = "overridden_fidesops_worker" +default_queue_name = "overridden_fidesops" \ No newline at end of file diff --git a/src/fidesops/core/config.py b/src/fidesops/core/config.py index 1dea1023f4..eeb748b43d 100644 --- a/src/fidesops/core/config.py +++ b/src/fidesops/core/config.py @@ -39,9 +39,8 @@ class ExecutionSettings(FidesSettings): TASK_RETRY_BACKOFF: int REQUIRE_MANUAL_REQUEST_APPROVAL: bool = False MASKING_STRICT: bool = True - CELERY_BROKER_URL: Optional[str] = None - CELERY_RESULT_BACKEND: Optional[str] = None WORKER_ENABLED: bool = True + CELERY_CONFIG_PATH: Optional[str] = "celery.toml" class Config: env_prefix = "FIDESOPS__EXECUTION__" diff --git a/src/fidesops/tasks/__init__.py b/src/fidesops/tasks/__init__.py index 41a834d2ec..d3b64c3061 100644 --- a/src/fidesops/tasks/__init__.py +++ b/src/fidesops/tasks/__init__.py @@ -1,25 +1,37 @@ +from typing import Any, Dict, MutableMapping + from celery import Celery from celery.utils.log import get_task_logger +from fideslib.core.config import load_toml from fidesops.core.config import config +from fidesops.util.logger import NotPii logger = get_task_logger(__name__) -def _create_celery() -> Celery: +def _create_celery(config_path: str = config.execution.CELERY_CONFIG_PATH) -> Celery: """ Returns a configured version of the Celery application """ logger.info("Creating Celery app...") app = Celery(__name__) - broker_url = config.execution.CELERY_BROKER_URL or config.redis.CONNECTION_URL - app.conf.update(broker_url=broker_url) + celery_config: Dict[str, Any] = { + # Defaults for the celery config + "broker_url": config.redis.CONNECTION_URL, + "result_backend": config.redis.CONNECTION_URL, + } + + try: + celery_config_overrides: MutableMapping[str, Any] = load_toml([config_path]) + except FileNotFoundError as e: + logger.warning("celery.toml could not be loaded: %s", NotPii(e)) + else: + celery_config.update(celery_config_overrides) + + app.conf.update(celery_config) - result_backend = ( - config.execution.CELERY_RESULT_BACKEND or config.redis.CONNECTION_URL - ) - app.conf.update(result_backend=result_backend) logger.info("Autodiscovering tasks...") app.autodiscover_tasks( [ diff --git a/tests/tasks/test_celery.py b/tests/tasks/test_celery.py index 6348e22e1b..35a54a723f 100644 --- a/tests/tasks/test_celery.py +++ b/tests/tasks/test_celery.py @@ -1,3 +1,7 @@ +from fidesops.core.config import config +from fidesops.tasks import _create_celery + + def test_create_task(celery_session_app, celery_session_worker): @celery_session_app.task def multiply(x, y): @@ -8,3 +12,17 @@ def multiply(x, y): celery_session_worker.reload() assert multiply.run(4, 4) == 16 assert multiply.delay(4, 4).get(timeout=10) == 16 + + +def test_celery_default_config() -> None: + celery_app = _create_celery() + assert celery_app.conf["broker_url"] == config.redis.CONNECTION_URL + assert celery_app.conf["result_backend"] == config.redis.CONNECTION_URL + assert celery_app.conf["event_queue_prefix"] == "fidesops_worker" + assert celery_app.conf["default_queue_name"] == "fidesops" + + +def test_celery_config_override() -> None: + celery_app = _create_celery(config_path="data/config/celery.toml") + assert celery_app.conf["event_queue_prefix"] == "overridden_fidesops_worker" + assert celery_app.conf["default_queue_name"] == "overridden_fidesops"