Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EP Merge: Fix health check / app startup based on health of rabbitmq and postgres connections #2505

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions domain-ee/ee-ep-merge-app/end_to_end/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import pytest_asyncio
from src.python_src.api import on_shut_down, on_start_up
from src.python_src.api import on_shut_down, start_hoppy


@pytest_asyncio.fixture(autouse=True, scope="session")
async def endpoint_lifecycle():
await on_start_up()
await start_hoppy()
yield
await on_shut_down()
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from httpx import AsyncClient
from src.python_src.api import app
from src.python_src.schema.merge_job import JobState
from src.python_src.service.job_store import job_store
from src.python_src.service.job_store import JOB_STORE

ACCEPTABLE_JOB_PROCESSING_DURATION = 0.2

Expand Down Expand Up @@ -54,7 +54,7 @@ def assert_job(job_id,
expected_state: JobState,
expected_error_state: JobState | None = None,
expected_num_errors: int = 0):
job = job_store.get_merge_job(job_id)
job = JOB_STORE.get_merge_job(job_id)
assert job is not None
assert job.pending_claim_id == pending_claim_id
assert job.ep400_claim_id == ep400_claim_id
Expand Down
14 changes: 7 additions & 7 deletions domain-ee/ee-ep-merge-app/end_to_end/test_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from uuid import uuid4

import pytest
from src.python_src.api import resume_job_state_machine
from src.python_src.api import JOB_RUNNER
from src.python_src.schema.merge_job import JobState, MergeJob
from src.python_src.service.job_store import job_store
from src.python_src.service.job_store import JOB_STORE

PENDING_CLAIM_ID = 10000
EP400_WITH_DUPLICATE = 10001
Expand All @@ -30,7 +30,7 @@ def assert_job(job_id,
expected_state: JobState,
expected_error_state: JobState | None = None,
expected_num_errors: int = 0):
job = job_store.get_merge_job(job_id)
job = JOB_STORE.get_merge_job(job_id)
assert job is not None
assert job.pending_claim_id == pending_claim_id
assert job.ep400_claim_id == ep400_claim_id
Expand Down Expand Up @@ -63,9 +63,9 @@ async def test(self, starting_state, pending_claim_id, ep400_claim_id):
state=starting_state,
created_at=NOW,
updated_at=NOW)
job_store.submit_merge_job(job)
JOB_STORE.submit_merge_job(job)

await asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
await asyncio.get_event_loop().run_in_executor(None, JOB_RUNNER.resume_job, job)
assert_job(job_id, pending_claim_id, ep400_claim_id, JobState.COMPLETED_SUCCESS)

class TestError:
Expand Down Expand Up @@ -120,7 +120,7 @@ async def test(self, starting_state, pending_claim_id, ep400_claim_id, expected_
state=starting_state,
created_at=NOW,
updated_at=NOW)
job_store.submit_merge_job(job)
JOB_STORE.submit_merge_job(job)

await asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
await asyncio.get_event_loop().run_in_executor(None, JOB_RUNNER.resume_job, job)
assert_job(job_id, pending_claim_id, ep400_claim_id, JobState.COMPLETED_ERROR, expected_error_state, expected_num_errors)
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from uuid import uuid4

import pytest
from src.python_src.api import resume_job_state_machine
from src.python_src.api import JOB_RUNNER
from src.python_src.schema.merge_job import JobState, MergeJob
from src.python_src.service.job_store import job_store
from src.python_src.service.job_store import JOB_STORE

PENDING_CLAIM_ID = 10000
EP400_WITH_DUPLICATE = 10001
Expand All @@ -26,7 +26,7 @@ def assert_job(job_id,
expected_state: JobState,
expected_error_state: JobState | None = None,
expected_num_errors: int = 0):
job = job_store.get_merge_job(job_id)
job = JOB_STORE.get_merge_job(job_id)
assert job is not None
assert job.pending_claim_id == pending_claim_id
assert job.ep400_claim_id == ep400_claim_id
Expand Down Expand Up @@ -57,9 +57,9 @@ async def test(self, pending_claim_id, ep400_claim_id):
state=JobState.RUNNING_ADD_CLAIM_NOTE_TO_EP400,
created_at=NOW,
updated_at=NOW)
job_store.submit_merge_job(job)
JOB_STORE.submit_merge_job(job)

await asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
await asyncio.get_event_loop().run_in_executor(None, JOB_RUNNER.resume_job, job)
assert_job(job_id, pending_claim_id, ep400_claim_id, JobState.COMPLETED_SUCCESS)


Expand All @@ -85,7 +85,7 @@ async def test(self, pending_claim_id, ep400_claim_id, expected_error_state, exp
state=JobState.RUNNING_ADD_CLAIM_NOTE_TO_EP400,
created_at=NOW,
updated_at=NOW)
job_store.submit_merge_job(job)
JOB_STORE.submit_merge_job(job)

await asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
await asyncio.get_event_loop().run_in_executor(None, JOB_RUNNER.resume_job, job)
assert_job(job_id, pending_claim_id, ep400_claim_id, JobState.COMPLETED_ERROR, expected_error_state, expected_num_errors)
14 changes: 7 additions & 7 deletions domain-ee/ee-ep-merge-app/end_to_end/test_resume_at_cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from uuid import uuid4

import pytest
from src.python_src.api import resume_job_state_machine
from src.python_src.schema.merge_job import JobState, MergeJob
from src.python_src.service.job_store import job_store
from src.python_src.service.job_runner import JOB_RUNNER
from src.python_src.service.job_store import JOB_STORE

PENDING_CLAIM_ID = 10000
EP400_WITH_DUPLICATE = 10001
Expand All @@ -27,7 +27,7 @@ def assert_job(job_id,
expected_state: JobState,
expected_error_state: JobState | None = None,
expected_num_errors: int = 0):
job = job_store.get_merge_job(job_id)
job = JOB_STORE.get_merge_job(job_id)
assert job is not None
assert job.pending_claim_id == pending_claim_id
assert job.ep400_claim_id == ep400_claim_id
Expand Down Expand Up @@ -58,9 +58,9 @@ async def test(self, pending_claim_id, ep400_claim_id):
state=JobState.RUNNING_CANCEL_EP400_CLAIM,
created_at=NOW,
updated_at=NOW)
job_store.submit_merge_job(job)
JOB_STORE.submit_merge_job(job)

await asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
await asyncio.get_event_loop().run_in_executor(None, JOB_RUNNER.resume_job, job)
assert_job(job_id, pending_claim_id, ep400_claim_id, JobState.COMPLETED_SUCCESS)


Expand Down Expand Up @@ -91,7 +91,7 @@ async def test(self, pending_claim_id, ep400_claim_id, expected_error_state, exp
state=JobState.RUNNING_CANCEL_EP400_CLAIM,
created_at=NOW,
updated_at=NOW)
job_store.submit_merge_job(job)
JOB_STORE.submit_merge_job(job)

await asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
await asyncio.get_event_loop().run_in_executor(None, JOB_RUNNER.resume_job, job)
assert_job(job_id, pending_claim_id, ep400_claim_id, JobState.COMPLETED_ERROR, expected_error_state, expected_num_errors)
5 changes: 3 additions & 2 deletions domain-ee/ee-ep-merge-app/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
import pytest_asyncio
from integration.mq_endpoint import MqEndpoint
from src.python_src.api import on_shut_down, on_start_up
from src.python_src.api import on_shut_down, start_hoppy
from src.python_src.config import EXCHANGES, QUEUES, REPLY_QUEUES, ClientName


Expand Down Expand Up @@ -62,7 +62,8 @@ async def endpoint_lifecycle(get_claim_endpoint: MqEndpoint,
await update_claim_contentions_endpoint.start(event_loop)
await cancel_claim_endpoint.start(event_loop)
await add_claim_note_endpoint.start(event_loop)
await on_start_up()
await start_hoppy()

yield

get_claim_endpoint.stop()
Expand Down
114 changes: 83 additions & 31 deletions domain-ee/ee-ep-merge-app/src/python_src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,47 @@
from uuid import UUID, uuid4

import uvicorn
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, status
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic_models import (
HealthResponse,
MergeEndProductsErrorResponse,
MergeEndProductsRequest,
MergeJobResponse,
MergeJobsResponse,
)
from schema.merge_job import JobState, MergeJob
from service.ep_merge_machine import EpMergeMachine, Workflow
from service.ep_merge_machine import EpMergeMachine
from service.hoppy_service import HOPPY
from service.job_store import job_store
from service.job_runner import JOB_RUNNER
from service.job_store import JOB_STORE
from sqlalchemy.exc import SQLAlchemyError
from starlette.responses import JSONResponse
from util.sanitizer import sanitize

CONNECT_TO_DATABASE_FAILURE = "Cannot connect to database."
CONNECT_TO_RABBIT_MQ_FAILURE = "Cannot connect to RabbitMQ."


@asynccontextmanager
async def lifespan(api: FastAPI):
await on_start_up()
on_start_up()
yield
await on_shut_down()


async def on_start_up():
def on_start_up():
loop = asyncio.get_event_loop()
await HOPPY.start_hoppy_clients(loop)
jobs_to_restart = job_store.get_all_incomplete_jobs()
for job in jobs_to_restart:
asyncio.get_event_loop().run_in_executor(None, resume_job_state_machine, job)
loop.create_task(start_job_runner())
loop.create_task(start_hoppy())


async def start_job_runner():
await JOB_RUNNER.start()


async def start_hoppy():
await HOPPY.start_hoppy_clients()


async def on_shut_down():
Expand Down Expand Up @@ -67,30 +79,65 @@ async def on_shut_down():
)


@app.get("/health")
@app.get("/health",
response_model=HealthResponse,
response_model_exclude_none=True)
def get_health_status():
return {"status": "ok"}
if is_healthy():
return {"status": "healthy"}
else:
errors = []
if not HOPPY.is_ready():
errors.append("Cannot connect to RabbitMQ.")
if not JOB_STORE.is_ready():
errors.append("Cannot connect to database.")
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"status": "unhealthy",
"errors": errors
})


def is_healthy():
return HOPPY.is_ready() and JOB_STORE.is_ready()
dfitchett marked this conversation as resolved.
Show resolved Hide resolved


@app.post("/merge",
status_code=status.HTTP_202_ACCEPTED,
response_model=MergeJobResponse,
response_model_exclude_none=True)
async def merge_claims(merge_request: MergeEndProductsRequest, background_tasks: BackgroundTasks):
async def merge_claims(request: Request, merge_request: MergeEndProductsRequest, background_tasks: BackgroundTasks):
validate_merge_request(merge_request)

if not is_healthy():
errors = []
if not HOPPY.is_ready():
errors.append(CONNECT_TO_RABBIT_MQ_FAILURE)
if not JOB_STORE.is_ready():
errors.append(CONNECT_TO_DATABASE_FAILURE)
logging.error(f"event=mergeJobRejected errors={errors}")
return JSONResponse(
status_code=500,
content=jsonable_encoder({
"method": "POST",
"url": str(request.url),
"errors": errors
})
)

job_id = uuid4()
logging.info(f"event=mergeJobSubmitted "
f"job_id={job_id} "
f"pending_claim_id={sanitize(merge_request.pending_claim_id)} "
f"ep400_claim_id={sanitize(merge_request.ep400_claim_id)}")

merge_job = MergeJob(job_id=job_id,
pending_claim_id=merge_request.pending_claim_id,
ep400_claim_id=merge_request.ep400_claim_id)
job_store.submit_merge_job(merge_job)
JOB_STORE.submit_merge_job(merge_job)

logging.info(f"event=mergeJobSubmitted "
f"job_id={job_id} "
f"pending_claim_id={sanitize(merge_request.pending_claim_id)} "
f"ep400_claim_id={sanitize(merge_request.ep400_claim_id)}")

background_tasks.add_task(start_job_state_machine, merge_job)
background_tasks.add_task(JOB_RUNNER.start_job, merge_job)

return {"job": merge_job}

Expand All @@ -104,15 +151,6 @@ def start_job_state_machine(merge_job):
EpMergeMachine(merge_job).start()


def resume_job_state_machine(in_progress_job):
if in_progress_job.state == JobState.RUNNING_CANCEL_EP400_CLAIM:
EpMergeMachine(in_progress_job, Workflow.RESUME_CANCEL_EP400).start()
elif in_progress_job.state == JobState.RUNNING_ADD_CLAIM_NOTE_TO_EP400:
EpMergeMachine(in_progress_job, Workflow.RESUME_ADD_NOTE).start()
else:
EpMergeMachine(in_progress_job, Workflow.RESTART).start()


@app.get("/merge/{job_id}",
response_model=MergeJobResponse,
responses={
Expand All @@ -122,7 +160,7 @@ def resume_job_state_machine(in_progress_job):
},
response_model_exclude_none=True)
async def get_merge_request_by_job_id(job_id: UUID):
job = job_store.get_merge_job(job_id)
job = JOB_STORE.get_merge_job(job_id)
if job:
logging.info(f"event=getMergeJobByJobId job={jsonable_encoder(job)}")
return {"job": job}
Expand All @@ -142,7 +180,7 @@ async def get_merge_request_by_job_id(job_id: UUID):
async def get_merge_jobs(state: Annotated[list[JobState], Query()] = JobState.incomplete_states(),
page: int = 1,
size: int = 10):
jobs, total = job_store.query(states=state, offset=page, limit=size)
jobs, total = JOB_STORE.query(states=state, offset=page, limit=size)
logging.info(f"event=getMergeJobs "
f"total={total} "
f"page={sanitize(page)} "
Expand All @@ -152,5 +190,19 @@ async def get_merge_jobs(state: Annotated[list[JobState], Query()] = JobState.in
return MergeJobsResponse(states=state, total=total, page=page, size=size, jobs=jobs)


@app.exception_handler(SQLAlchemyError)
async def sqlalchemy_exception_handler(request: Request, err: SQLAlchemyError):
msg = str(err).replace('\n', ' ')
logging.error(f"event=requestFailed method={request.method} url={request.url} resource={'Database'} error={msg}")
return JSONResponse(
status_code=500,
content=jsonable_encoder({
"method": request.method,
"url": str(request.url),
"errors": [CONNECT_TO_DATABASE_FAILURE]
})
)


if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8140)
uvicorn.run(app, host="localhost", port=8140, loop='asyncio')
dfitchett marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 9 additions & 0 deletions domain-ee/ee-ep-merge-app/src/python_src/db/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from config import SQLALCHEMY_DATABASE_URI
from fastapi.encoders import jsonable_encoder
from sqlalchemy import create_engine, desc, inspect
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker

engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
Expand Down Expand Up @@ -47,5 +48,13 @@ def update(self, obj, db):
db.query(obj.Meta.orm_model).filter(primary_key == getattr(obj, primary_key.name)).update(as_json)
db.commit()

@with_connection
def is_ready(self, obj, db):
try:
db.query(obj.Meta.orm_model).count()
return True
except SQLAlchemyError:
return False


database = Database()
5 changes: 5 additions & 0 deletions domain-ee/ee-ep-merge-app/src/python_src/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
from typing_extensions import Annotated


class HealthResponse(BaseModel):
status: str
errors: list[str] | None = None


class MergeEndProductsRequest(BaseModel):
pending_claim_id: Annotated[int, Field(strict=True)]
ep400_claim_id: Annotated[int, Field(strict=True)]
Expand Down
Loading