Skip to content

Commit

Permalink
Add option to delete finished jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mxd4 committed Nov 27, 2020
1 parent f2395ca commit 7be0500
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 39 deletions.
20 changes: 12 additions & 8 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,20 @@ async def run_worker_async(self, **kwargs) -> None:
logs (defaults to ``None`` which will result in the worker named
``worker``).
timeout : ``float``
Indicates the maximum duration (in seconds) procrastinate
workers wait between each database job poll.
Raising this parameter can lower the rate of workers making queries to the
database for requesting jobs.
Indicates the maximum duration (in seconds) the worker waits between
each database job poll. Raising this parameter can lower the rate at which
the worker makes queries to the database for requesting jobs.
(defaults to 5.0)
listen_notify : ``bool``
If ``True``, worker will dedicate a connection from the pool to listening to
database events, notifying of newly available jobs. If ``False``, workers
will just poll the database periodically (see ``timeout``). (defaults to
True)
If ``True``, the worker will dedicate a connection from the pool to
listening to database events, notifying of newly available jobs.
If ``False``, the worker will just poll the database periodically
(see ``timeout``). (defaults to ``True``)
delete_jobs : ``str``
If ``always``, the worker will automatically delete all jobs on completion.
If ``successful`` the worker will only delete successful jobs.
If ``never``, the worker will keep the jobs in database.
(defaults to ``never``)
"""
self.perform_import_paths()
worker = self._worker(**kwargs)
Expand Down
6 changes: 6 additions & 0 deletions procrastinate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ def close_connection(procrastinate_app: procrastinate.App, *args, **kwargs):
default=True,
help="Whether to actively listen for new jobs or periodically poll",
)
@click.option(
"--delete-jobs",
type=click.Choice([v.value for v in worker.DeleteJobCondition]),
default=worker.DeleteJobCondition.NEVER.value,
help="Whether to delete jobs on completion",
)
@handle_errors()
def worker_(app: procrastinate.App, queues: str, **kwargs):
"""
Expand Down
2 changes: 2 additions & 0 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async def finish_job(
self,
job: jobs.Job,
status: jobs.Status,
delete_job: bool,
) -> None:
"""
Set a job to its final state (``succeeded`` or ``failed``).
Expand All @@ -207,6 +208,7 @@ async def finish_job(
query=sql.queries["finish_job"],
job_id=job.id,
status=status.value,
delete_job=delete_job,
)

async def retry_job(
Expand Down
2 changes: 1 addition & 1 deletion procrastinate/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def worker(self) -> None:

def register_task(self, task: tasks.Task, cron: str) -> PeriodicTask:
logger.info(
f"Registring task {task.name} to run periodically with cron {cron}",
f"Registering task {task.name} to run periodically with cron {cron}",
extra={
"action": "registering_periodic_task",
"task": task.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-- remove old procrastinate_finish_job function
-- remove old procrastinate_finish_job functions
-- https://github.com/peopledoc/procrastinate/pull/336
DROP FUNCTION IF EXISTS procrastinate_finish_job(integer, procrastinate_job_status, timestamp with time zone);
-- https://github.com/peopledoc/procrastinate/pull/354
DROP FUNCTION IF EXISTS procrastinate_finish_job(integer, procrastinate_job_status);
14 changes: 14 additions & 0 deletions procrastinate/sql/migrations/00.17.00_02_delete_finished_jobs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, delete_job boolean) RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
IF delete_job THEN
DELETE FROM procrastinate_jobs WHERE id = job_id;
ELSE
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1
WHERE id = job_id;
END IF;
END;
$$;
2 changes: 1 addition & 1 deletion procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ WHERE id IN (

-- finish_job --
-- Finish a job, changing it from "doing" to "succeeded" or "failed"
SELECT procrastinate_finish_job(%(job_id)s, %(status)s);
SELECT procrastinate_finish_job(%(job_id)s, %(status)s, %(delete_job)s);

-- retry_job --
-- Retry a job, changing it from "doing" to "todo"
Expand Down
41 changes: 29 additions & 12 deletions procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ CREATE FUNCTION procrastinate_defer_job(
scheduled_at timestamp with time zone
) RETURNS bigint
LANGUAGE plpgsql
AS $$
AS $$
DECLARE
job_id bigint;
BEGIN
Expand All @@ -94,7 +94,7 @@ CREATE FUNCTION procrastinate_defer_periodic_job(
_defer_timestamp bigint
) RETURNS bigint
LANGUAGE plpgsql
AS $$
AS $$
DECLARE
_job_id bigint;
_defer_id bigint;
Expand Down Expand Up @@ -141,7 +141,7 @@ $$;

CREATE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
Expand Down Expand Up @@ -176,7 +176,7 @@ $$;
-- to remove after 1.0.0 is released
CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone) RETURNS void
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
UPDATE procrastinate_jobs
SET status = end_status,
Expand All @@ -186,10 +186,11 @@ BEGIN
END;
$$;

-- procrastinate_finish_job
-- procrastinate_finish_job – old version
-- to remove after 1.0.0 is released
CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status) RETURNS void
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
UPDATE procrastinate_jobs
SET status = end_status,
Expand All @@ -198,9 +199,25 @@ BEGIN
END;
$$;

-- procrastinate_finish_job
CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, delete_job boolean) RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
IF delete_job THEN
DELETE FROM procrastinate_jobs WHERE id = job_id;
ELSE
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1
WHERE id = job_id;
END IF;
END;
$$;

CREATE FUNCTION procrastinate_retry_job(job_id integer, retry_at timestamp with time zone) RETURNS void
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
UPDATE procrastinate_jobs
SET status = 'todo',
Expand All @@ -212,7 +229,7 @@ $$;

CREATE FUNCTION procrastinate_notify_queue() RETURNS trigger
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
PERFORM pg_notify('procrastinate_queue#' || NEW.queue_name, NEW.task_name);
PERFORM pg_notify('procrastinate_any_queue', NEW.task_name);
Expand All @@ -222,7 +239,7 @@ $$;

CREATE FUNCTION procrastinate_trigger_status_events_procedure_insert() RETURNS trigger
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
INSERT INTO procrastinate_events(job_id, type)
VALUES (NEW.id, 'deferred'::procrastinate_job_event_type);
Expand All @@ -232,7 +249,7 @@ $$;

CREATE FUNCTION procrastinate_trigger_status_events_procedure_update() RETURNS trigger
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
WITH t AS (
SELECT CASE
Expand Down Expand Up @@ -267,7 +284,7 @@ $$;

CREATE FUNCTION procrastinate_trigger_scheduled_events_procedure() RETURNS trigger
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
INSERT INTO procrastinate_events(job_id, type, at)
VALUES (NEW.id, 'scheduled'::procrastinate_job_event_type, NEW.scheduled_at);
Expand All @@ -278,7 +295,7 @@ $$;

CREATE FUNCTION procrastinate_unlink_periodic_defers() RETURNS trigger
LANGUAGE plpgsql
AS $$
AS $$
BEGIN
UPDATE procrastinate_periodic_defers
SET job_id = NULL
Expand Down
6 changes: 5 additions & 1 deletion procrastinate/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ def fetch_job_one(self, queues: Optional[Iterable[str]]) -> Dict:

return {"id": None}

def finish_job_run(self, job_id: int, status: str) -> None:
def finish_job_run(self, job_id: int, status: str, delete_job: bool) -> None:
if delete_job:
self.jobs.pop(job_id)
return

job_row = self.jobs[job_id]
job_row["status"] = status
job_row["attempts"] += 1
Expand Down
26 changes: 24 additions & 2 deletions procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import logging
import time
from enum import Enum
from typing import Dict, Iterable, Optional, Set, Union

from procrastinate import app, exceptions, job_context, jobs, signals, tasks, utils
Expand All @@ -14,16 +15,27 @@
WORKER_CONCURRENCY = 1 # parallel task(s)


class DeleteJobCondition(Enum):
"""
An enumeration with all the possible conditions to delete a job
"""

NEVER = "never" #: Keep jobs in database after completion
SUCCESSFUL = "successful" #: Delete only successful jobs
ALWAYS = "always" #: Always delete jobs at completion


class Worker:
def __init__(
self,
app: app.App,
queues: Optional[Iterable[str]] = None,
name: Optional[str] = None,
concurrency: int = 1,
concurrency: int = WORKER_CONCURRENCY,
wait: bool = True,
timeout: float = WORKER_TIMEOUT,
listen_notify: bool = True,
delete_jobs: str = DeleteJobCondition.NEVER.value,
):
self.app = app
self.queues = queues
Expand All @@ -33,6 +45,7 @@ def __init__(
self.timeout = timeout
self.wait = wait
self.listen_notify = listen_notify
self.delete_jobs = DeleteJobCondition(delete_jobs)

# Handling the info about the currently running task.
self.known_missing_tasks: Set[str] = set()
Expand Down Expand Up @@ -171,7 +184,16 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None:
await self.job_manager.retry_job(job=job, retry_at=retry_at)
else:
assert status is not None
await self.job_manager.finish_job(job=job, status=status)

delete_job = {
DeleteJobCondition.ALWAYS: True,
DeleteJobCondition.NEVER: False,
DeleteJobCondition.SUCCESSFUL: status == jobs.Status.SUCCEEDED,
}[self.delete_jobs]

await self.job_manager.finish_job(
job=job, status=status, delete_job=delete_job
)

self.logger.debug(
f"Acknowledged job completion {job.call_string}",
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_worker(entrypoint, click_app, mocker):
click_app.run_worker = mocker.MagicMock()
result = entrypoint(
"--app yay worker --queues a,b --name=w1 --timeout=8.3 "
"--one-shot --concurrency=10 --no-listen-notify"
"--one-shot --concurrency=10 --no-listen-notify --delete-jobs=always"
)

assert result.output.strip() == "Launching a worker on a, b"
Expand All @@ -45,6 +45,7 @@ def test_worker(entrypoint, click_app, mocker):
timeout=8.3,
wait=False,
listen_notify=False,
delete_jobs="always",
)


Expand Down
24 changes: 16 additions & 8 deletions tests/integration/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ async def test_delete_old_jobs_multiple_jobs(
job_a = await pg_job_manager.fetch_job(queues=["queue_a"])
job_b = await pg_job_manager.fetch_job(queues=["queue_b"])
# We finish both jobs
await pg_job_manager.finish_job(job_a, status=jobs.Status.SUCCEEDED)
await pg_job_manager.finish_job(job_b, status=jobs.Status.SUCCEEDED)
await pg_job_manager.finish_job(
job_a, status=jobs.Status.SUCCEEDED, delete_job=False
)
await pg_job_manager.finish_job(
job_b, status=jobs.Status.SUCCEEDED, delete_job=False
)
# We back date the events for job_a
await aiopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=at - INTERVAL '2 hours'"
Expand All @@ -157,7 +161,7 @@ async def test_delete_old_job_filter_on_end_date(
# We start the job
job = await pg_job_manager.fetch_job(queues=["queue_a"])
# We finish the job
await pg_job_manager.finish_job(job, status=jobs.Status.SUCCEEDED)
await pg_job_manager.finish_job(job, status=jobs.Status.SUCCEEDED, delete_job=False)
# We back date only the start event
await aiopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=at - INTERVAL '2 hours'"
Expand Down Expand Up @@ -202,7 +206,7 @@ async def test_delete_old_jobs_parameters(
# We start a job
job = await pg_job_manager.fetch_job(queues=["queue_a"])
# We finish the job
await pg_job_manager.finish_job(job, status=status)
await pg_job_manager.finish_job(job, status=status, delete_job=False)
# We back date its events
await aiopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=at - INTERVAL '2 hours'"
Expand Down Expand Up @@ -231,15 +235,17 @@ async def test_finish_job(get_all, pg_job_manager, job_factory):
assert started_at.date() == datetime.datetime.utcnow().date()
assert await get_all("procrastinate_jobs", "attempts") == [{"attempts": 0}]

await pg_job_manager.finish_job(job=job, status=jobs.Status.SUCCEEDED)
await pg_job_manager.finish_job(
job=job, status=jobs.Status.SUCCEEDED, delete_job=False
)
expected = [{"status": "succeeded", "attempts": 1}]
assert await get_all("procrastinate_jobs", "status", "attempts") == expected


async def test_finish_job_retry(get_all, pg_job_manager, job_factory):
await pg_job_manager.defer_job_async(job_factory())
job1 = await pg_job_manager.fetch_job(queues=None)
await pg_job_manager.finish_job(job=job1, status=jobs.Status.TODO)
await pg_job_manager.finish_job(job=job1, status=jobs.Status.TODO, delete_job=False)

job2 = await pg_job_manager.fetch_job(queues=None)

Expand Down Expand Up @@ -346,7 +352,7 @@ async def fixture_jobs(pg_job_manager, job_factory):
task_kwargs={"key": "b"},
)
j2 = j2.evolve(id=await pg_job_manager.defer_job_async(job=j2))
await pg_job_manager.finish_job(job=j2, status=jobs.Status.FAILED)
await pg_job_manager.finish_job(job=j2, status=jobs.Status.FAILED, delete_job=False)

j3 = job_factory(
queue="q2",
Expand All @@ -356,7 +362,9 @@ async def fixture_jobs(pg_job_manager, job_factory):
task_kwargs={"key": "c"},
)
j3 = j3.evolve(id=await pg_job_manager.defer_job_async(job=j3))
await pg_job_manager.finish_job(job=j3, status=jobs.Status.SUCCEEDED)
await pg_job_manager.finish_job(
job=j3, status=jobs.Status.SUCCEEDED, delete_job=False
)

return [j1, j2, j3]

Expand Down
3 changes: 2 additions & 1 deletion tests/migration/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
@pytest.fixture
def run_migrations(db_execute):
def _(dbname):
migrations = sorted(pathlib.Path("procrastinate/sql/migrations").glob("*.sql"))
folder = pathlib.Path(__file__).parents[2] / "procrastinate/sql/migrations"
migrations = sorted(folder.glob("*.sql"))

for migration in migrations:
with db_execute(dbname) as execute:
Expand Down
Loading

0 comments on commit 7be0500

Please sign in to comment.