Skip to content

Commit

Permalink
✨ adding background check for running services (⚠️ OPS) 🗃️ (#4925)
Browse files Browse the repository at this point in the history
On behalf of @matusdrobuliak66
  • Loading branch information
matusdrobuliak66 authored Nov 6, 2023
1 parent 1f92ca7 commit 73c5414
Show file tree
Hide file tree
Showing 20 changed files with 619 additions and 47 deletions.
4 changes: 4 additions & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ REGISTRY_SSL=True
REGISTRY_URL=registry.osparc-master.speag.com
REGISTRY_USER=admin

RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_CHECK_ENABLED=1
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC=300
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_COUNTER_FAIL=6

# NOTE: 172.17.0.1 is the docker0 interface, which redirect from inside a container onto the host network interface.
R_CLONE_PROVIDER=MINIO
R_CLONE_OPTION_TRANSFERS=5
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""resource_tracker_service_runs helpers for missing heartbeat
Revision ID: 22404057a50c
Revises: d0d544695487
Create Date: 2023-10-25 19:17:29.928871+00:00
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "22404057a50c"
down_revision = "d0d544695487"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"resource_tracker_service_runs",
sa.Column("service_run_status_msg", sa.String(), nullable=True),
)
op.add_column(
"resource_tracker_service_runs",
sa.Column(
"missed_heartbeat_counter",
sa.SmallInteger(),
server_default="0",
nullable=False,
),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("resource_tracker_service_runs", "missed_heartbeat_counter")
op.drop_column("resource_tracker_service_runs", "service_run_status_msg")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,17 @@ class ResourceTrackerServiceRunStatus(str, enum.Enum):
nullable=False,
doc="Timestamp when was the last heartbeat",
),
sa.Column(
"service_run_status_msg",
sa.String,
nullable=True,
doc="Custom message/comment, for example to help understand root cause of the error during investigation",
),
sa.Column(
"missed_heartbeat_counter",
sa.SmallInteger,
nullable=False,
default=0,
doc="How many heartbeat checks have been missed",
),
)
9 changes: 6 additions & 3 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,17 @@ services:
- PROMETHEUS_URL=${RESOURCE_USAGE_TRACKER_PROMETHEUS_URL}
- PROMETHEUS_USERNAME=${RESOURCE_USAGE_TRACKER_PROMETHEUS_USERNAME}
- PROMETHEUS_PASSWORD=${RESOURCE_USAGE_TRACKER_PROMETHEUS_PASSWORD}
- RESOURCE_USAGE_TRACKER_LOGLEVEL=${LOG_LEVEL:-INFO}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- RABBIT_HOST=${RABBIT_HOST}
- RABBIT_PASSWORD=${RABBIT_PASSWORD}
- RABBIT_PORT=${RABBIT_PORT}
- RABBIT_SECURE=${RABBIT_SECURE}
- RABBIT_USER=${RABBIT_USER}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- RESOURCE_USAGE_TRACKER_LOGLEVEL=${LOG_LEVEL:-INFO}
- RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_CHECK_ENABLED=${RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_CHECK_ENABLED}
- RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC=${RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC}
- RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_COUNTER_FAIL=${RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_COUNTER_FAIL}

dynamic-scheduler:
image: ${DOCKER_REGISTRY:-itisfoundation}/dynamic-scheduler:${DOCKER_IMAGE_TAG:-latest}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,12 @@ async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus):
"Containers killed to to OOMKiller: %s", container_states
)
else:
simcore_platform_status = SimcorePlatformStatus.BAD
# NOTE: MD/ANE discussed: Initial thought was to use SimcorePlatformStatus to
# inform RUT that there was some problem on Simcore side and therefore we will
# not bill the user for running the service. This needs to be discussed
# therefore we will always consider it as OK for now.
# NOTE: https://github.com/ITISFoundation/osparc-simcore/issues/4952
simcore_platform_status = SimcorePlatformStatus.OK

await send_service_stopped(app, simcore_platform_status)

Expand All @@ -293,7 +298,8 @@ async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus):
result = await docker_compose_rm(shared_store.compose_spec, settings)
_raise_for_errors(result, "rm")
except Exception:
await _send_resource_tracking_stop(SimcorePlatformStatus.BAD)
# NOTE: https://github.com/ITISFoundation/osparc-simcore/issues/4952
await _send_resource_tracking_stop(SimcorePlatformStatus.OK)
raise

await _send_resource_tracking_stop(SimcorePlatformStatus.OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ async def test_user_services_fail_to_stop_or_save_data(
assert isinstance(heart_beat_message, RabbitResourceTrackingHeartbeatMessage)
for stop_message in stop_messages:
assert isinstance(stop_message, RabbitResourceTrackingStoppedMessage)
assert stop_message.simcore_platform_status == SimcorePlatformStatus.BAD
assert stop_message.simcore_platform_status == SimcorePlatformStatus.OK


async def _simulate_container_crash(container_names: list[str]) -> None:
Expand Down Expand Up @@ -443,4 +443,4 @@ async def test_user_services_crash_when_running(

for stop_message in resource_tracking_messages:
assert isinstance(stop_message, RabbitResourceTrackingStoppedMessage)
assert stop_message.simcore_platform_status == expected_platform_state
assert stop_message.simcore_platform_status == SimcorePlatformStatus.OK
4 changes: 4 additions & 0 deletions services/resource-usage-tracker/.env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ POSTGRES_USER=test
POSTGRES_PASSWORD=test
POSTGRES_DB=test
POSTGRES_HOST=localhost

RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_CHECK_ENABLED=1
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC=300
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_COUNTER_FAIL=6
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ class ApplicationSettings(MinimalApplicationSettings):
These settings includes extra configuration for the http-API
"""

RESOURCE_USAGE_TRACKER_EVALUATION_INTERVAL_SEC: datetime.timedelta = Field(
default=datetime.timedelta(minutes=15),
description="Interval to evaluate the resource usage (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_CHECK_ENABLED: bool = Field(
default=True,
description="Possibility to disable RUT background task for checking heartbeats.",
)
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC: datetime.timedelta = Field(
default=datetime.timedelta(minutes=5),
description="Interval to check heartbeat of running services. (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
)
RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_COUNTER_FAIL: int = Field(
default=6,
description="Heartbeat couter limit when RUT considers service as unhealthy.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from models_library.services import ServiceKey, ServiceVersion
from models_library.users import UserID
from models_library.wallets import WalletID
from pydantic import BaseModel, PositiveInt
from pydantic import BaseModel, NonNegativeInt, PositiveInt


class ServiceRunCreate(BaseModel):
Expand Down Expand Up @@ -56,6 +56,7 @@ class ServiceRunStoppedAtUpdate(BaseModel):
service_run_id: ServiceRunId
stopped_at: datetime
service_run_status: ServiceRunStatus
service_run_status_msg: str | None


class ServiceRunDB(BaseModel):
Expand All @@ -82,6 +83,8 @@ class ServiceRunDB(BaseModel):
service_run_status: ServiceRunStatus
modified: datetime
last_heartbeat_at: datetime
service_run_status_msg: str | None
missed_heartbeat_counter: NonNegativeInt

class Config:
orm_mode = True
Expand All @@ -98,3 +101,13 @@ class Config:
class ServiceRunPage(NamedTuple):
items: list[ServiceRunGet]
total: PositiveInt


class ServiceRunForCheckDB(BaseModel):
service_run_id: ServiceRunId
last_heartbeat_at: datetime
missed_heartbeat_counter: NonNegativeInt
modified: datetime

class Config:
orm_mode = True
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime
from decimal import Decimal
from typing import cast

Expand Down Expand Up @@ -56,6 +57,7 @@
from ....models.resource_tracker_service_runs import (
ServiceRunCreate,
ServiceRunDB,
ServiceRunForCheckDB,
ServiceRunLastHeartbeatUpdate,
ServiceRunStoppedAtUpdate,
ServiceRunWithCreditsDB,
Expand Down Expand Up @@ -87,7 +89,7 @@ async def create_service_run(self, data: ServiceRunCreate) -> ServiceRunId:
user_id=data.user_id,
user_email=data.user_email,
project_id=f"{data.project_id}",
project_name=data.product_name,
project_name=data.project_name,
node_id=f"{data.node_id}",
node_name=data.node_name,
service_key=data.service_key,
Expand Down Expand Up @@ -118,7 +120,9 @@ async def update_service_run_last_heartbeat(
update_stmt = (
resource_tracker_service_runs.update()
.values(
modified=sa.func.now(), last_heartbeat_at=data.last_heartbeat_at
modified=sa.func.now(),
last_heartbeat_at=data.last_heartbeat_at,
missed_heartbeat_counter=0,
)
.where(
(
Expand Down Expand Up @@ -152,6 +156,7 @@ async def update_service_run_stopped_at(
modified=sa.func.now(),
stopped_at=data.stopped_at,
service_run_status=data.service_run_status,
service_run_status_msg=data.service_run_status_msg,
)
.where(
(
Expand Down Expand Up @@ -207,6 +212,8 @@ async def list_service_runs_by_product_and_user_and_wallet(
resource_tracker_service_runs.c.service_run_status,
resource_tracker_service_runs.c.modified,
resource_tracker_service_runs.c.last_heartbeat_at,
resource_tracker_service_runs.c.service_run_status_msg,
resource_tracker_service_runs.c.missed_heartbeat_counter,
resource_tracker_credit_transactions.c.osparc_credits,
resource_tracker_credit_transactions.c.transaction_status,
)
Expand Down Expand Up @@ -271,6 +278,83 @@ async def total_service_runs_by_product_and_user_and_wallet(
row = result.first()
return cast(PositiveInt, row[0]) if row else 0

### For Background check purpose:

async def list_service_runs_with_running_status_across_all_products(
self,
*,
offset: int,
limit: int,
) -> list[ServiceRunForCheckDB]:
async with self.db_engine.begin() as conn:
query = (
sa.select(
resource_tracker_service_runs.c.service_run_id,
resource_tracker_service_runs.c.last_heartbeat_at,
resource_tracker_service_runs.c.missed_heartbeat_counter,
resource_tracker_service_runs.c.modified,
)
.where(
resource_tracker_service_runs.c.service_run_status
== ServiceRunStatus.RUNNING
)
.order_by(resource_tracker_service_runs.c.started_at.desc()) # NOTE:
.offset(offset)
.limit(limit)
)
result = await conn.execute(query)

return [ServiceRunForCheckDB.from_orm(row) for row in result.fetchall()]

async def total_service_runs_with_running_status_across_all_products(
self,
) -> PositiveInt:
async with self.db_engine.begin() as conn:
query = (
sa.select(sa.func.count())
.select_from(resource_tracker_service_runs)
.where(
resource_tracker_service_runs.c.service_run_status
== ServiceRunStatus.RUNNING
)
)
result = await conn.execute(query)
row = result.first()
return cast(PositiveInt, row[0]) if row else 0

async def update_service_missed_heartbeat_counter(
self,
service_run_id: ServiceRunId,
last_heartbeat_at: datetime,
missed_heartbeat_counter: int,
) -> ServiceRunDB | None:
async with self.db_engine.begin() as conn:
update_stmt = (
resource_tracker_service_runs.update()
.values(
modified=sa.func.now(),
missed_heartbeat_counter=missed_heartbeat_counter,
)
.where(
(resource_tracker_service_runs.c.service_run_id == service_run_id)
& (
resource_tracker_service_runs.c.service_run_status
== ServiceRunStatus.RUNNING
)
& (
resource_tracker_service_runs.c.last_heartbeat_at
== last_heartbeat_at
)
)
.returning(sa.literal_column("*"))
)

result = await conn.execute(update_stmt)
row = result.first()
if row is None:
return None
return ServiceRunDB.from_orm(row)

#################################
# Credit transactions
#################################
Expand Down
Loading

0 comments on commit 73c5414

Please sign in to comment.