Skip to content

Commit

Permalink
Fix Migration (#116) (#118)
Browse files Browse the repository at this point in the history
Co-authored-by: Peter Kraft <[email protected]>
  • Loading branch information
qianl15 and kraftp authored Sep 25, 2024
1 parent dee7bb4 commit 857890c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
34 changes: 34 additions & 0 deletions dbos/migrations/versions/50f3227f0b4b_fix_job_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""fix_job_queue
Revision ID: 50f3227f0b4b
Revises: eab0cc1d9a14
Create Date: 2024-09-25 14:03:53.308068
"""

from typing import Sequence, Union

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "50f3227f0b4b"
down_revision: Union[str, None] = "eab0cc1d9a14"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.drop_constraint("job_queue_pkey", "job_queue", schema="dbos", type_="primary")

op.create_primary_key(
"job_queue_pkey", "job_queue", ["workflow_uuid"], schema="dbos"
)


def downgrade() -> None:
# Reverting the changes
op.drop_constraint("job_queue_pkey", "job_queue", schema="dbos", type_="primary")

op.create_primary_key(
"job_queue_pkey", "job_queue", ["created_at_epoch_ms"], schema="dbos"
)
15 changes: 11 additions & 4 deletions dbos/queue.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import threading
import time
import traceback
from typing import TYPE_CHECKING, Optional

from dbos.core import P, R, _execute_workflow_id, _start_workflow
from dbos.error import DBOSInitializationError

if TYPE_CHECKING:
from dbos.dbos import DBOS, Workflow, WorkflowHandle
Expand Down Expand Up @@ -31,6 +31,13 @@ def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
while not stop_event.is_set():
time.sleep(1)
for queue_name, queue in dbos._registry.queue_info_map.items():
wf_ids = dbos._sys_db.start_queued_workflows(queue_name, queue.concurrency)
for id in wf_ids:
_execute_workflow_id(dbos, id)
try:
wf_ids = dbos._sys_db.start_queued_workflows(
queue_name, queue.concurrency
)
for id in wf_ids:
_execute_workflow_id(dbos, id)
except Exception:
dbos.logger.warning(
f"Exception encountered in queue thread: {traceback.format_exc()}"
)

0 comments on commit 857890c

Please sign in to comment.