Skip to content

Commit

Permalink
Check if worker is initialized
Browse files Browse the repository at this point in the history
Fixes #605.
  • Loading branch information
nkaretnikov committed Dec 13, 2023
1 parent 31550ee commit ea1416e
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""add worker
Revision ID: 0f7e23ff24ee
Revises: 771180018e1b
Create Date: 2023-12-13 21:01:45.546591
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "0f7e23ff24ee"
down_revision = "771180018e1b"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"worker",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("initialized", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("initialized", name="_uc"),
)


def downgrade():
op.drop_table("worker")
17 changes: 17 additions & 0 deletions conda-store-server/conda_store_server/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand Down Expand Up @@ -41,6 +42,22 @@
ARN_ALLOWED_REGEX = re.compile(schema.ARN_ALLOWED)


class Worker(Base):
"""Used to communicate with the worker process"""

__tablename__ = "worker"

id = Column(Integer, primary_key=True)

# Used to check whether the worker is initialized
initialized = Column(Boolean, default=False)

__table_args__ = (
# Ensures no duplicates can be added with this combination of fields.
UniqueConstraint("initialized", name="_uc"),
)


class Namespace(Base):
"""Namespace for resources"""

Expand Down
47 changes: 47 additions & 0 deletions conda-store-server/conda_store_server/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
)
from traitlets.config import Application, catch_config_error

logger = logging.getLogger("app")
logger.setLevel(logging.INFO)


class CondaStoreServer(Application):
aliases = {
Expand Down Expand Up @@ -200,6 +203,12 @@ def initialize(self, *args, **kwargs):
# ensure checks on redis_url
self.conda_store.redis_url

# ensure the worker was started and celery is available
# self.conda_store.celery_app
# import time
# time.sleep(5)
# assert self.conda_store.celery_app.control.inspect().ping() is not None

def init_fastapi_app(self):
def trim_slash(url):
return url[:-1] if url.endswith("/") else url
Expand Down Expand Up @@ -340,16 +349,53 @@ async def favicon():
name="static-storage",
)

@app.on_event("startup")
async def startup_event():
import signal
import time

from conda_store_server import orm

# Makes the loop below exit on Ctrl-C
signal.signal(signal.SIGINT, sys.exit)

# Colors
green = "\x1b[32m"
red = "\x1b[31m"
reset = "\x1b[0m"

# Waits in a loop for worker to become initialized
while True:
with self.conda_store.session_factory() as db:
q = db.query(orm.Worker).first()
if q is not None and q.initialized:
logger.info(f"{green}" "Worker initialized" f"{reset}")
break

time.sleep(5)
logger.critical(
f"{red}"
"Waiting for worker... "
"Use --standalone if running outside of docker"
f"{reset}"
)

return app

def start(self):
fastapi_app = self.init_fastapi_app()

from conda_store_server import orm

with self.conda_store.session_factory() as db:
self.conda_store.ensure_settings(db)
self.conda_store.ensure_namespace(db)
self.conda_store.ensure_conda_channels(db)

# Needs to delete this entry to check if the worker is available
db.query(orm.Worker).delete()
db.commit()

# start worker if in standalone mode
if self.standalone:
import multiprocessing
Expand All @@ -376,6 +422,7 @@ def start(self):
else []
),
)

finally:
if self.standalone:
process.join()
12 changes: 12 additions & 0 deletions conda-store-server/conda_store_server/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection():
sender.app.send_task("task_initialize_worker")
sender.app.send_task("task_update_conda_channels")
sender.app.send_task("task_watch_paths")
sender.app.send_task("task_cleanup_builds")
Expand All @@ -46,6 +47,17 @@ def worker(self):
return self._worker


@shared_task(base=WorkerTask, name="task_initialize_worker", bind=True)
def task_initialize_worker(self):
from conda_store_server import orm

conda_store = self.worker.conda_store

with conda_store.session_factory() as db:
db.add(orm.Worker(initialized=True))
db.commit()


@shared_task(base=WorkerTask, name="task_watch_paths", bind=True)
def task_watch_paths(self):
conda_store = self.worker.conda_store
Expand Down

0 comments on commit ea1416e

Please sign in to comment.