Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Example: examples/worker_pool #242

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,36 @@ jobs:
timeout-minutes: 10
run: |
pytest -vv tests

worker_pool:
runs-on: ${{ matrix.os }}

strategy:
fail-fast: false
matrix:
python-version: ["3.12"]
os: ["ubuntu-latest"]

steps:
- name: Install apt packages
if: startsWith(matrix.os, 'ubuntu-')
run: |
sudo apt update
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
cache-dependency-path: '**/setup.py'
- name: Install dependencies
working-directory: examples/worker_pool
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Run tests
working-directory: examples/worker_pool
timeout-minutes: 10
run: |
pytest -vv tests
1 change: 1 addition & 0 deletions docs/userguide/examples/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Every example is an independent project and is tested via the
:maxdepth: 1

myworker
worker_pool
rabbitmq_management
range
myutils
Expand Down
94 changes: 94 additions & 0 deletions docs/userguide/examples/worker_pool.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
.. _examples_worker_pool:

=============
worker_pool
=============

:Release: |version|
:Date: |today|

.. contents::
:local:
:depth: 2

Description
===========

This example project demonstrates how to use a different `worker pool <https://docs.celeryq.dev/en/stable/reference/cli.html#cmdoption-celery-worker-P>`_.
The example uses two different methods to run the Celery worker with different pools.

The following guide will explain each method and how they are used.

.. tip::

See first the :ref:`examples_myworker` example before continuing with this one.

Breakdown
=========

File Structure
~~~~~~~~~~~~~~

The following diagram lists the relevant files in the project.

.. code-block:: text

rabbitmq_management/
├── tests/
│ ├── __init__.py
│ └── test_gevent_pool.py
│ └── test_solo_pool.py
└── Dockerfile
└── tasks.py
└── requirements.txt

Dockerfile
~~~~~~~~~~

To use the gevent pool, we create our own image using a similar Dockerfile to the one in the :ref:`examples_myworker` example.
The purpose of this worker is to ensure the gevent dependency is installed.

.. literalinclude:: ../../../examples/worker_pool/Dockerfile
:language: docker
:caption: examples.worker_pool.Dockerfile

.. literalinclude:: ../../../examples/worker_pool/requirements.txt
:language: docker
:caption: examples.worker_pool.requirements.txt

tasks.py
~~~~~~~~

Our tasks module is using the example task from the `Celery gevent example <https://github.com/celery/celery/blob/main/examples/gevent/README.rst>`_.

.. literalinclude:: ../../../examples/worker_pool/tasks.py
:language: python
:caption: examples.worker_pool.tasks.py

test_gevent_pool.py
~~~~~~~~~~~~~~~~~~~

To add a new gevent worker, we create a new :class:`CeleryWorkerContainer <pytest_celery.vendors.worker.container.CeleryWorkerContainer>` to
configure the worker with the gevent pool.

.. literalinclude:: ../../../examples/worker_pool/tests/test_gevent_pool.py
:language: python
:caption: examples.worker_pool.tests.test_gevent_pool.py
:end-before: # ----------------------------

And then we can just use it in our tests.

.. literalinclude:: ../../../examples/worker_pool/tests/test_gevent_pool.py
:language: python
:caption: examples.worker_pool.tests.test_gevent_pool.py
:start-after: # ----------------------------

test_solo_pool.py
~~~~~~~~~~~~~~~~~

The solo pool example on the other hand, reconfigures the default :ref:`built-in-worker`
as it does not require any additional dependencies.

.. literalinclude:: ../../../examples/worker_pool/tests/test_solo_pool.py
:language: python
:caption: examples.worker_pool.tests.test_solo_pool.py
29 changes: 29 additions & 0 deletions examples/worker_pool/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
FROM python:3.11-bookworm

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential git libevent-dev

# Set arguments
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=my_worker
ARG CELERY_WORKER_QUEUE=celery
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE

# Install packages
COPY --chown=test_user:test_user requirements.txt .
RUN pip install --no-cache-dir --upgrade pip
RUN pip install -r ./requirements.txt

# The workdir must be /app
WORKDIR /app

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE
5 changes: 5 additions & 0 deletions examples/worker_pool/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[pytest]
log_cli = true
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)
log_cli_date_format = %Y-%m-%d %H:%M:%S
5 changes: 5 additions & 0 deletions examples/worker_pool/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pytest>=7.4.4
pytest-xdist>=3.5.0
pytest-subtests>=0.11.0
celery[gevent]
pytest-celery[all]@git+https://github.com/celery/pytest-celery.git
16 changes: 16 additions & 0 deletions examples/worker_pool/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Based on https://github.com/celery/celery/blob/main/examples/gevent/tasks.py

import requests
from celery import shared_task


@shared_task(ignore_result=True)
def urlopen(url):
print(f"Opening: {url}")
try:
requests.get(url)
except requests.exceptions.RequestException as exc:
print(f"Exception for {url}: {exc!r}")
return url, 0
print(f"Done with: {url}")
return url, 1
Empty file.
87 changes: 87 additions & 0 deletions examples/worker_pool/tests/test_gevent_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import pytest
import tasks
from celery import Celery
from celery.canvas import Signature
from celery.canvas import group
from celery.result import AsyncResult
from pytest_docker_tools import build
from pytest_docker_tools import container
from pytest_docker_tools import fxtr

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerCluster
from pytest_celery import CeleryWorkerContainer
from pytest_celery import defaults
from pytest_celery import ping


class GeventWorkerContainer(CeleryWorkerContainer):
@classmethod
def command(cls, *args: str) -> list[str]:
return super().command("-P", "gevent", "-c", "1000")


gevent_worker_image = build(
path=".",
dockerfile="Dockerfile",
tag="pytest-celery/examples/worker_pool:gevent",
buildargs=GeventWorkerContainer.buildargs(),
)


gevent_worker_container = container(
image="{gevent_worker_image.id}",
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},
wrapper_class=GeventWorkerContainer,
timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=GeventWorkerContainer.command(),
)


@pytest.fixture
def gevent_worker(gevent_worker_container: GeventWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker:
worker = CeleryTestWorker(gevent_worker_container, app=celery_setup_app)
yield worker
worker.teardown()


@pytest.fixture
def celery_worker_cluster(gevent_worker: CeleryTestWorker) -> CeleryWorkerCluster:
cluster = CeleryWorkerCluster(gevent_worker)
yield cluster
cluster.teardown()


@pytest.fixture
def default_worker_tasks(default_worker_tasks: set) -> set:
default_worker_tasks.add(tasks)
return default_worker_tasks


# ----------------------------


class TestGeventPool:
def test_celery_banner(self, gevent_worker: CeleryTestWorker):
gevent_worker.assert_log_exists("concurrency: 1000 (gevent)")

def test_ping(self, celery_setup: CeleryTestSetup):
sig: Signature = ping.s()
res: AsyncResult = sig.apply_async()
assert res.get(timeout=RESULT_TIMEOUT) == "pong"

def test_celery_gevent_example(self, celery_setup: CeleryTestSetup):
"""Based on https://github.com/celery/celery/tree/main/examples/gevent"""
LIST_OF_URLS = [
"https://github.com/celery",
"https://github.com/celery/celery",
"https://github.com/celery/pytest-celery",
]
group(tasks.urlopen.s(url) for url in LIST_OF_URLS).apply_async()
celery_setup.worker.assert_log_does_not_exist("Exception for")
37 changes: 37 additions & 0 deletions examples/worker_pool/tests/test_solo_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

import pytest
from celery.canvas import Signature
from celery.result import AsyncResult

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerContainer
from pytest_celery import ping


class SoloPoolWorker(CeleryWorkerContainer):
@classmethod
def command(cls, *args: str) -> list[str]:
return super().command("-P", "solo")


@pytest.fixture
def default_worker_container_cls() -> type[CeleryWorkerContainer]:
return SoloPoolWorker


@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
return SoloPoolWorker


class TestSoloPool:
def test_celery_banner(self, celery_worker: CeleryTestWorker):
celery_worker.assert_log_exists("solo")

def test_ping(self, celery_setup: CeleryTestSetup):
sig: Signature = ping.s()
res: AsyncResult = sig.apply_async()
assert res.get(timeout=RESULT_TIMEOUT) == "pong"