Skip to content

Commit

Permalink
Adds celery.toml for loading custom Celery config [#821] (#865)
Browse files Browse the repository at this point in the history
* adds option to configure EVENT_QUEUE_PREFIX for celery

* provide the option to specify a default queue name too

* update celery config to load in from its own config toml file

* updates changelog

* update value for event_queue_prefix

* test celery config overrides

* include config_path arg

* add type def

* add config path to execution settings

* correct values
  • Loading branch information
Sean Preston authored Jul 15, 2022
1 parent 842cd96 commit 8e106c2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ The types of changes are:

### Developer Experience
* Replace user authentication routes with fideslib routes [#811](https://github.com/ethyca/fidesops/pull/811)
* Reduce docker image size [846](https://github.com/ethyca/fidesops/pull/846)
* Reduce docker image size [#846](https://github.com/ethyca/fidesops/pull/846)
* Load Celery config overrides from a dedicated `celery.toml` [#821](https://github.com/ethyca/fidesops/pull/821)

### Docs
* Backend UI deployment [#827](https://github.com/ethyca/fidesops/pull/827)
Expand Down
2 changes: 2 additions & 0 deletions celery.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
event_queue_prefix = "fidesops_worker"
default_queue_name = "fidesops"
2 changes: 2 additions & 0 deletions data/config/celery.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
event_queue_prefix = "overridden_fidesops_worker"
default_queue_name = "overridden_fidesops"
3 changes: 1 addition & 2 deletions src/fidesops/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class ExecutionSettings(FidesSettings):
TASK_RETRY_BACKOFF: int
REQUIRE_MANUAL_REQUEST_APPROVAL: bool = False
MASKING_STRICT: bool = True
CELERY_BROKER_URL: Optional[str] = None
CELERY_RESULT_BACKEND: Optional[str] = None
WORKER_ENABLED: bool = True
CELERY_CONFIG_PATH: Optional[str] = "celery.toml"

class Config:
env_prefix = "FIDESOPS__EXECUTION__"
Expand Down
26 changes: 19 additions & 7 deletions src/fidesops/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
from typing import Any, Dict, MutableMapping

from celery import Celery
from celery.utils.log import get_task_logger
from fideslib.core.config import load_toml

from fidesops.core.config import config
from fidesops.util.logger import NotPii

logger = get_task_logger(__name__)


def _create_celery() -> Celery:
def _create_celery(config_path: str = config.execution.CELERY_CONFIG_PATH) -> Celery:
"""
Returns a configured version of the Celery application
"""
logger.info("Creating Celery app...")
app = Celery(__name__)

broker_url = config.execution.CELERY_BROKER_URL or config.redis.CONNECTION_URL
app.conf.update(broker_url=broker_url)
celery_config: Dict[str, Any] = {
# Defaults for the celery config
"broker_url": config.redis.CONNECTION_URL,
"result_backend": config.redis.CONNECTION_URL,
}

try:
celery_config_overrides: MutableMapping[str, Any] = load_toml([config_path])
except FileNotFoundError as e:
logger.warning("celery.toml could not be loaded: %s", NotPii(e))
else:
celery_config.update(celery_config_overrides)

app.conf.update(celery_config)

result_backend = (
config.execution.CELERY_RESULT_BACKEND or config.redis.CONNECTION_URL
)
app.conf.update(result_backend=result_backend)
logger.info("Autodiscovering tasks...")
app.autodiscover_tasks(
[
Expand Down
18 changes: 18 additions & 0 deletions tests/tasks/test_celery.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from fidesops.core.config import config
from fidesops.tasks import _create_celery


def test_create_task(celery_session_app, celery_session_worker):
@celery_session_app.task
def multiply(x, y):
Expand All @@ -8,3 +12,17 @@ def multiply(x, y):
celery_session_worker.reload()
assert multiply.run(4, 4) == 16
assert multiply.delay(4, 4).get(timeout=10) == 16


def test_celery_default_config() -> None:
celery_app = _create_celery()
assert celery_app.conf["broker_url"] == config.redis.CONNECTION_URL
assert celery_app.conf["result_backend"] == config.redis.CONNECTION_URL
assert celery_app.conf["event_queue_prefix"] == "fidesops_worker"
assert celery_app.conf["default_queue_name"] == "fidesops"


def test_celery_config_override() -> None:
celery_app = _create_celery(config_path="data/config/celery.toml")
assert celery_app.conf["event_queue_prefix"] == "overridden_fidesops_worker"
assert celery_app.conf["default_queue_name"] == "overridden_fidesops"

0 comments on commit 8e106c2

Please sign in to comment.