From a878f58ac320f7fb8cbdca8adda2093e2e976435 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Wed, 13 Dec 2023 20:55:48 +0100 Subject: [PATCH 01/11] Check if worker is initialized Fixes #605. --- .../versions/0f7e23ff24ee_add_worker.py | 29 +++++++++++ conda-store-server/conda_store_server/orm.py | 17 +++++++ .../conda_store_server/server/app.py | 51 +++++++++++++++++++ .../conda_store_server/worker/tasks.py | 14 +++++ 4 files changed, 111 insertions(+) create mode 100644 conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py diff --git a/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py b/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py new file mode 100644 index 000000000..5987190e8 --- /dev/null +++ b/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py @@ -0,0 +1,29 @@ +"""add worker + +Revision ID: 0f7e23ff24ee +Revises: 771180018e1b +Create Date: 2023-12-13 21:01:45.546591 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0f7e23ff24ee" +down_revision = "771180018e1b" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "worker", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("initialized", sa.Boolean(), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("initialized", name="_uc"), + ) + + +def downgrade(): + op.drop_table("worker") diff --git a/conda-store-server/conda_store_server/orm.py b/conda-store-server/conda_store_server/orm.py index 0275f21f5..e415a5b48 100644 --- a/conda-store-server/conda_store_server/orm.py +++ b/conda-store-server/conda_store_server/orm.py @@ -11,6 +11,7 @@ from sqlalchemy import ( JSON, BigInteger, + Boolean, Column, DateTime, Enum, @@ -41,6 +42,22 @@ ARN_ALLOWED_REGEX = re.compile(schema.ARN_ALLOWED) +class Worker(Base): + """Used to communicate with the worker process""" + + __tablename__ = "worker" + + id = Column(Integer, primary_key=True) + + # Used to check whether the worker is initialized + initialized = Column(Boolean, default=False) + + __table_args__ = ( + # Ensures no duplicates can be added with this combination of fields. + UniqueConstraint("initialized", name="_uc"), + ) + + class Namespace(Base): """Namespace for resources""" diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index e851a444a..e6bc4caec 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -28,6 +28,9 @@ ) from traitlets.config import Application, catch_config_error +logger = logging.getLogger("app") +logger.setLevel(logging.INFO) + class CondaStoreServer(Application): aliases = { @@ -340,16 +343,64 @@ async def favicon(): name="static-storage", ) + @app.on_event("startup") + async def startup_event(): + import signal + import time + + from conda_store_server import orm + + # This adds a signal handler because uvicorn ignores all signals in + # the startup event. You wouldn't be able to terminate this loop via + # Ctrl-C otherwise + signal.signal(signal.SIGINT, sys.exit) + + # Colors to make the output more visible + green = "\x1b[32m" + red = "\x1b[31m" + reset = "\x1b[0m" + + # Waits in a loop for the worker to become ready, which is + # communicated via task_initialize_worker + while True: + with self.conda_store.session_factory() as db: + q = db.query(orm.Worker).first() + if q is not None and q.initialized: + logger.info(f"{green}" "Worker initialized" f"{reset}") + break + + time.sleep(5) + logger.critical( + f"{red}" + "Waiting for worker... " + "Use --standalone if running outside of docker" + f"{reset}" + ) + return app def start(self): fastapi_app = self.init_fastapi_app() + from conda_store_server import orm + with self.conda_store.session_factory() as db: self.conda_store.ensure_settings(db) self.conda_store.ensure_namespace(db) self.conda_store.ensure_conda_channels(db) + # We need to ensure the database has no entries in the Worker table + # when the server is started. This can happen when the server was + # terminated while the Worker table was populated in the previous + # run. The check in startup_event expects the worker to populate the + # said table (via task_initialize_worker) only if things are working + # as expected. The deletion code needs to be run on the server side + # (here) and before the worker is started to avoid a race condition. + # We want to make sure that the worker is able to create a new entry + # in the database if the worker is actually running. + db.query(orm.Worker).delete() + db.commit() + # start worker if in standalone mode if self.standalone: import multiprocessing diff --git a/conda-store-server/conda_store_server/worker/tasks.py b/conda-store-server/conda_store_server/worker/tasks.py index 6441921d9..894afcfdd 100644 --- a/conda-store-server/conda_store_server/worker/tasks.py +++ b/conda-store-server/conda_store_server/worker/tasks.py @@ -25,6 +25,7 @@ @worker_ready.connect def at_start(sender, **k): with sender.app.connection(): + sender.app.send_task("task_initialize_worker") sender.app.send_task("task_update_conda_channels") sender.app.send_task("task_watch_paths") sender.app.send_task("task_cleanup_builds") @@ -59,6 +60,19 @@ def _shutdown(*args, **kwargs): return self._worker +# Signals to the server that the worker is running, see startup_event in +# CondaStoreServer.init_fastapi_app +@shared_task(base=WorkerTask, name="task_initialize_worker", bind=True) +def task_initialize_worker(self): + from conda_store_server import orm + + conda_store = self.worker.conda_store + + with conda_store.session_factory() as db: + db.add(orm.Worker(initialized=True)) + db.commit() + + @shared_task(base=WorkerTask, name="task_watch_paths", bind=True) def task_watch_paths(self): conda_store = self.worker.conda_store From fda15d887dd407df9cfd09c71b34c7436b21af89 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Fri, 15 Dec 2023 21:34:26 +0100 Subject: [PATCH 02/11] Raname constraint since name is already taken --- .../alembic/versions/0f7e23ff24ee_add_worker.py | 2 +- conda-store-server/conda_store_server/orm.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py b/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py index 5987190e8..fb71c5dff 100644 --- a/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py +++ b/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py @@ -21,7 +21,7 @@ def upgrade(): sa.Column("id", sa.Integer(), nullable=False), sa.Column("initialized", sa.Boolean(), nullable=True), sa.PrimaryKeyConstraint("id"), - sa.UniqueConstraint("initialized", name="_uc"), + sa.UniqueConstraint("initialized", name="_uc_worker"), ) diff --git a/conda-store-server/conda_store_server/orm.py b/conda-store-server/conda_store_server/orm.py index e415a5b48..282bf610f 100644 --- a/conda-store-server/conda_store_server/orm.py +++ b/conda-store-server/conda_store_server/orm.py @@ -54,7 +54,7 @@ class Worker(Base): __table_args__ = ( # Ensures no duplicates can be added with this combination of fields. - UniqueConstraint("initialized", name="_uc"), + UniqueConstraint("initialized", name="_uc_worker"), ) From 6b56fa91408e7f433ee3eb873cdf8996988becc9 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Fri, 15 Dec 2023 21:53:40 +0100 Subject: [PATCH 03/11] Do not block when checking for worker --- .../conda_store_server/server/app.py | 83 +++++++++++-------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index e6bc4caec..4dd912790 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -343,27 +343,55 @@ async def favicon(): name="static-storage", ) - @app.on_event("startup") - async def startup_event(): - import signal - import time + return app + + def start(self): + import time + from threading import Thread - from conda_store_server import orm + from conda_store_server import orm + from sqlalchemy.pool import QueuePool - # This adds a signal handler because uvicorn ignores all signals in - # the startup event. You wouldn't be able to terminate this loop via - # Ctrl-C otherwise - signal.signal(signal.SIGINT, sys.exit) + fastapi_app = self.init_fastapi_app() - # Colors to make the output more visible + with self.conda_store.session_factory() as db: + self.conda_store.ensure_settings(db) + self.conda_store.ensure_namespace(db) + self.conda_store.ensure_conda_channels(db) + + # This ensures the database has no Worker table entires when the + # server starts, which is necessary for the worker to signal that + # it's ready via task_initialize_worker. Old Worker entries could + # still be in the database on startup after they were added on the + # previous run and the server was terminated. + # + # Note that this cleanup is deliberately done on startup because the + # server could be terminated due to a power failure, which would + # leave no chance for cleanup actions to run on shutdown. + # + # The database is used for worker-server communication because it + # will work regardless of celery_broker_url used, which can be Redis + # or just point to a database connection. + db.query(orm.Worker).delete() + db.commit() + + def check_worker(): + # Uses colors to make the output more visible green = "\x1b[32m" red = "\x1b[31m" reset = "\x1b[0m" + # Creates a new DB connection since this will be run in a separate + # thread and connections cannot be shared between threads + session_factory = orm.new_session_factory( + url=self.conda_store.database_url, + poolclass=QueuePool, + ) + # Waits in a loop for the worker to become ready, which is - # communicated via task_initialize_worker + # communicated by the worker via task_initialize_worker while True: - with self.conda_store.session_factory() as db: + with session_factory() as db: q = db.query(orm.Worker).first() if q is not None and q.initialized: logger.info(f"{green}" "Worker initialized" f"{reset}") @@ -377,29 +405,13 @@ async def startup_event(): f"{reset}" ) - return app - - def start(self): - fastapi_app = self.init_fastapi_app() - - from conda_store_server import orm - - with self.conda_store.session_factory() as db: - self.conda_store.ensure_settings(db) - self.conda_store.ensure_namespace(db) - self.conda_store.ensure_conda_channels(db) - - # We need to ensure the database has no entries in the Worker table - # when the server is started. This can happen when the server was - # terminated while the Worker table was populated in the previous - # run. The check in startup_event expects the worker to populate the - # said table (via task_initialize_worker) only if things are working - # as expected. The deletion code needs to be run on the server side - # (here) and before the worker is started to avoid a race condition. - # We want to make sure that the worker is able to create a new entry - # in the database if the worker is actually running. - db.query(orm.Worker).delete() - db.commit() + # We cannot check whether the worker is ready right away and block. When + # running via docker, the worker is started *after* the server is + # running because it relies on config files created by the server. + # So we just keep checking in a separate thread until the worker is + # ready. + worker_checker = Thread(target=check_worker) + worker_checker.start() # start worker if in standalone mode if self.standalone: @@ -442,3 +454,4 @@ def start(self): finally: if self.standalone: process.join() + worker_checker.join() From a8ebf2f450b8b99e18ee71e039738d23984e0624 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Mon, 18 Dec 2023 22:57:55 +0100 Subject: [PATCH 04/11] Update the comment --- conda-store-server/conda_store_server/worker/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conda-store-server/conda_store_server/worker/tasks.py b/conda-store-server/conda_store_server/worker/tasks.py index 894afcfdd..f978b328a 100644 --- a/conda-store-server/conda_store_server/worker/tasks.py +++ b/conda-store-server/conda_store_server/worker/tasks.py @@ -60,8 +60,8 @@ def _shutdown(*args, **kwargs): return self._worker -# Signals to the server that the worker is running, see startup_event in -# CondaStoreServer.init_fastapi_app +# Signals to the server that the worker is running, see check_worker in +# CondaStoreServer.start @shared_task(base=WorkerTask, name="task_initialize_worker", bind=True) def task_initialize_worker(self): from conda_store_server import orm From 9484b16d9f1e12a1ccf8cc1c77ce8bb133e2385d Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Mon, 18 Dec 2023 23:00:46 +0100 Subject: [PATCH 05/11] Use `self.log`, change critical to warning --- conda-store-server/conda_store_server/server/app.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index 4dd912790..83d82133b 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -28,9 +28,6 @@ ) from traitlets.config import Application, catch_config_error -logger = logging.getLogger("app") -logger.setLevel(logging.INFO) - class CondaStoreServer(Application): aliases = { @@ -394,11 +391,11 @@ def check_worker(): with session_factory() as db: q = db.query(orm.Worker).first() if q is not None and q.initialized: - logger.info(f"{green}" "Worker initialized" f"{reset}") + self.log.info(f"{green}" "Worker initialized" f"{reset}") break time.sleep(5) - logger.critical( + self.log.warning( f"{red}" "Waiting for worker... " "Use --standalone if running outside of docker" From 7df438611baad3f3fc8f1b7fa17c6be73b547309 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Mon, 18 Dec 2023 23:07:00 +0100 Subject: [PATCH 06/11] Improve wording in comments --- conda-store-server/conda_store_server/orm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conda-store-server/conda_store_server/orm.py b/conda-store-server/conda_store_server/orm.py index 282bf610f..e1776e4f5 100644 --- a/conda-store-server/conda_store_server/orm.py +++ b/conda-store-server/conda_store_server/orm.py @@ -43,13 +43,13 @@ class Worker(Base): - """Used to communicate with the worker process""" + """For communicating with the worker process""" __tablename__ = "worker" id = Column(Integer, primary_key=True) - # Used to check whether the worker is initialized + # For checking whether the worker is initialized initialized = Column(Boolean, default=False) __table_args__ = ( From ddb5a5a4741bcf8880fb96ea8cb49eeb272dfb8a Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Sat, 20 Jan 2024 11:38:22 +0100 Subject: [PATCH 07/11] Move imports out of function scope --- conda-store-server/conda_store_server/server/app.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index 83d82133b..e493ab1aa 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -2,11 +2,13 @@ import os import posixpath import sys +import time +from threading import Thread import conda_store_server import conda_store_server.dbutil as dbutil import uvicorn -from conda_store_server import __version__, storage +from conda_store_server import __version__, orm, storage from conda_store_server.app import CondaStore from conda_store_server.server import auth, views from fastapi import FastAPI, HTTPException, Request @@ -14,6 +16,7 @@ from fastapi.responses import FileResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates +from sqlalchemy.pool import QueuePool from starlette.middleware.sessions import SessionMiddleware from traitlets import ( Bool, @@ -343,12 +346,6 @@ async def favicon(): return app def start(self): - import time - from threading import Thread - - from conda_store_server import orm - from sqlalchemy.pool import QueuePool - fastapi_app = self.init_fastapi_app() with self.conda_store.session_factory() as db: From cebc24bb03e96a40950dc8ecb516e788129aea20 Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Sat, 20 Jan 2024 11:47:33 +0100 Subject: [PATCH 08/11] Make `check_worker` a private class method --- .../conda_store_server/server/app.py | 62 +++++++++---------- .../conda_store_server/worker/tasks.py | 4 +- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index e493ab1aa..ca86fed80 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -345,6 +345,36 @@ async def favicon(): return app + def _check_worker(self, delay=5): + # Uses colors to make the output more visible + green = "\x1b[32m" + red = "\x1b[31m" + reset = "\x1b[0m" + + # Creates a new DB connection since this will be run in a separate + # thread and connections cannot be shared between threads + session_factory = orm.new_session_factory( + url=self.conda_store.database_url, + poolclass=QueuePool, + ) + + # Waits in a loop for the worker to become ready, which is + # communicated by the worker via task_initialize_worker + while True: + with session_factory() as db: + q = db.query(orm.Worker).first() + if q is not None and q.initialized: + self.log.info(f"{green}" "Worker initialized" f"{reset}") + break + + time.sleep(delay) + self.log.warning( + f"{red}" + "Waiting for worker... " + "Use --standalone if running outside of docker" + f"{reset}" + ) + def start(self): fastapi_app = self.init_fastapi_app() @@ -369,42 +399,12 @@ def start(self): db.query(orm.Worker).delete() db.commit() - def check_worker(): - # Uses colors to make the output more visible - green = "\x1b[32m" - red = "\x1b[31m" - reset = "\x1b[0m" - - # Creates a new DB connection since this will be run in a separate - # thread and connections cannot be shared between threads - session_factory = orm.new_session_factory( - url=self.conda_store.database_url, - poolclass=QueuePool, - ) - - # Waits in a loop for the worker to become ready, which is - # communicated by the worker via task_initialize_worker - while True: - with session_factory() as db: - q = db.query(orm.Worker).first() - if q is not None and q.initialized: - self.log.info(f"{green}" "Worker initialized" f"{reset}") - break - - time.sleep(5) - self.log.warning( - f"{red}" - "Waiting for worker... " - "Use --standalone if running outside of docker" - f"{reset}" - ) - # We cannot check whether the worker is ready right away and block. When # running via docker, the worker is started *after* the server is # running because it relies on config files created by the server. # So we just keep checking in a separate thread until the worker is # ready. - worker_checker = Thread(target=check_worker) + worker_checker = Thread(target=self._check_worker) worker_checker.start() # start worker if in standalone mode diff --git a/conda-store-server/conda_store_server/worker/tasks.py b/conda-store-server/conda_store_server/worker/tasks.py index f978b328a..fa7dd5cee 100644 --- a/conda-store-server/conda_store_server/worker/tasks.py +++ b/conda-store-server/conda_store_server/worker/tasks.py @@ -60,8 +60,8 @@ def _shutdown(*args, **kwargs): return self._worker -# Signals to the server that the worker is running, see check_worker in -# CondaStoreServer.start +# Signals to the server that the worker is running, see _check_worker in +# CondaStoreServer @shared_task(base=WorkerTask, name="task_initialize_worker", bind=True) def task_initialize_worker(self): from conda_store_server import orm From c141c963c22258280eade55328a2bec54e0bb85a Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Sat, 20 Jan 2024 11:59:25 +0100 Subject: [PATCH 09/11] Move colors into a global private enum --- .../conda_store_server/server/app.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index ca86fed80..034584bb3 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -3,6 +3,7 @@ import posixpath import sys import time +from enum import Enum from threading import Thread import conda_store_server @@ -32,6 +33,12 @@ from traitlets.config import Application, catch_config_error +class _Color(str, Enum): + GREEN = "\x1b[32m" + RED = "\x1b[31m" + RESET = "\x1b[0m" + + class CondaStoreServer(Application): aliases = { "config": "CondaStoreServer.config_file", @@ -346,11 +353,6 @@ async def favicon(): return app def _check_worker(self, delay=5): - # Uses colors to make the output more visible - green = "\x1b[32m" - red = "\x1b[31m" - reset = "\x1b[0m" - # Creates a new DB connection since this will be run in a separate # thread and connections cannot be shared between threads session_factory = orm.new_session_factory( @@ -364,15 +366,17 @@ def _check_worker(self, delay=5): with session_factory() as db: q = db.query(orm.Worker).first() if q is not None and q.initialized: - self.log.info(f"{green}" "Worker initialized" f"{reset}") + self.log.info( + f"{_Color.GREEN}" "Worker initialized" f"{_Color.RESET}" + ) break time.sleep(delay) self.log.warning( - f"{red}" + f"{_Color.RED}" "Waiting for worker... " "Use --standalone if running outside of docker" - f"{reset}" + f"{_Color.RESET}" ) def start(self): From ed595af76a08dfdc47aaedcc8573b70fb1f0c5ab Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Sat, 20 Jan 2024 12:05:03 +0100 Subject: [PATCH 10/11] Remove `UniqueConstraint` from the `Worker` table --- .../alembic/versions/0f7e23ff24ee_add_worker.py | 1 - conda-store-server/conda_store_server/orm.py | 5 ----- 2 files changed, 6 deletions(-) diff --git a/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py b/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py index fb71c5dff..ca086ebd0 100644 --- a/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py +++ b/conda-store-server/conda_store_server/alembic/versions/0f7e23ff24ee_add_worker.py @@ -21,7 +21,6 @@ def upgrade(): sa.Column("id", sa.Integer(), nullable=False), sa.Column("initialized", sa.Boolean(), nullable=True), sa.PrimaryKeyConstraint("id"), - sa.UniqueConstraint("initialized", name="_uc_worker"), ) diff --git a/conda-store-server/conda_store_server/orm.py b/conda-store-server/conda_store_server/orm.py index e1776e4f5..ff3fea384 100644 --- a/conda-store-server/conda_store_server/orm.py +++ b/conda-store-server/conda_store_server/orm.py @@ -52,11 +52,6 @@ class Worker(Base): # For checking whether the worker is initialized initialized = Column(Boolean, default=False) - __table_args__ = ( - # Ensures no duplicates can be added with this combination of fields. - UniqueConstraint("initialized", name="_uc_worker"), - ) - class Namespace(Base): """Namespace for resources""" From 1863ec04ab7aef201be7de245d170eb12d09efcb Mon Sep 17 00:00:00 2001 From: Nikita Karetnikov Date: Sat, 20 Jan 2024 17:10:51 +0100 Subject: [PATCH 11/11] Fix a typo --- conda-store-server/conda_store_server/server/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conda-store-server/conda_store_server/server/app.py b/conda-store-server/conda_store_server/server/app.py index 034584bb3..a23ef150f 100644 --- a/conda-store-server/conda_store_server/server/app.py +++ b/conda-store-server/conda_store_server/server/app.py @@ -387,7 +387,7 @@ def start(self): self.conda_store.ensure_namespace(db) self.conda_store.ensure_conda_channels(db) - # This ensures the database has no Worker table entires when the + # This ensures the database has no Worker table entries when the # server starts, which is necessary for the worker to signal that # it's ready via task_initialize_worker. Old Worker entries could # still be in the database on startup after they were added on the