Skip to content

Commit

Permalink
Merge branch 'master' into is3569/flaky-api-server
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov authored May 11, 2023
2 parents a51ef00 + 5346f61 commit ac70927
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class ThinDV2LocalhostClient(BaseThinClient):
BASE_ADDRESS: str = "http://localhost:8000" # NOSONAR
BASE_ADDRESS: str = "http://localhost:8000" # NOSONAR

def __init__(self):
self.client = AsyncClient(timeout=Timeout(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ class DynamicSidecarEgressSettings(BaseCustomSettings):
description="envoy image to use",
)
DYNAMIC_SIDECAR_ENVOY_LOG_LEVEL: EnvoyLogLevel = Field(
default=EnvoyLogLevel.ERROR, description="log level for envoy proxy service"
default=EnvoyLogLevel.ERROR, # type: ignore
description="log level for envoy proxy service",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class Worker(BaseModel):
metrics: WorkerMetrics


class WorkersDict(DictModel[AnyUrl, Worker]):
...
WorkersDict: TypeAlias = dict[AnyUrl, Worker]


class Scheduler(BaseModel):
Expand All @@ -74,7 +73,7 @@ class Scheduler(BaseModel):

@validator("workers", pre=True, always=True)
@classmethod
def ensure_workers_is_empty_dict(cls, v) -> WorkersDict:
def ensure_workers_is_empty_dict(cls, v):
if v is None:
return {}
return v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ServiceRemovalState(BaseModel):
description="when True, marks the service as ready to be removed",
)
can_save: bool = Field(
...,
False,
description="when True, saves the internal state and upload outputs of the service",
)
was_removed: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class RunningDynamicServiceDetails(ServiceDetails):

@cached_property
def legacy_service_url(self) -> str:
return f"http://{self.host}:{self.internal_port}{self.basepath}" # NOSONAR
return f"http://{self.host}:{self.internal_port}{self.basepath}" # NOSONAR

@classmethod
def from_scheduler_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def add_service(
request_dns: str,
request_scheme: str,
request_simcore_user_agent: str,
can_save: str,
can_save: bool,
) -> None:
"""
Adds a new service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
"""

import asyncio
import contextlib
import functools
import logging
from asyncio import sleep
from dataclasses import dataclass
from asyncio import Lock, Queue, Task, sleep
from dataclasses import dataclass, field

from fastapi import FastAPI
from models_library.basic_types import PortInt
from models_library.projects import ProjectID
from models_library.projects_networks import DockerNetworkAlias
Expand All @@ -39,24 +41,13 @@
RetrieveDataOutEnveloped,
)
from .....models.schemas.dynamic_services import (
DynamicSidecarStatus,
RunningDynamicServiceDetails,
SchedulerData,
ServiceState,
ServiceName,
)
from ...api_client import DynamicSidecarClient, get_dynamic_sidecar_client
from ...docker_api import (
get_dynamic_sidecar_state,
get_dynamic_sidecars_to_observe,
remove_pending_volume_removal_services,
update_scheduler_data_label,
)
from ...docker_states import extract_containers_minimum_statuses
from ...errors import (
DockerServiceNotFoundError,
DynamicSidecarError,
DynamicSidecarNotFoundError,
)
from ...docker_api import update_scheduler_data_label
from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError
from .._abc import SchedulerPublicInterface
from . import _scheduler_utils
from ._events_utils import (
Expand All @@ -65,7 +56,6 @@
service_save_state,
)
from ._observer import observing_single_service
from ._scheduler_mixin import SchedulerInternalsMixin

logger = logging.getLogger(__name__)

Expand All @@ -74,7 +64,92 @@


@dataclass
class Scheduler(SchedulerInternalsMixin, SchedulerPublicInterface):
class Scheduler( # pylint: disable=too-many-instance-attributes
SchedulerPublicInterface
):
app: FastAPI

_lock: Lock = field(default_factory=Lock)
_to_observe: dict[ServiceName, SchedulerData] = field(default_factory=dict)
_service_observation_task: dict[ServiceName, asyncio.Task | object | None] = field(
default_factory=dict
)
_keep_running: bool = False
_inverse_search_mapping: dict[NodeID, ServiceName] = field(default_factory=dict)
_scheduler_task: Task | None = None
_cleanup_volume_removal_services_task: Task | None = None
_trigger_observation_queue_task: Task | None = None
_trigger_observation_queue: Queue = field(default_factory=Queue)
_observation_counter: int = 0

async def start(self) -> None:
# run as a background task
logger.info("Starting dynamic-sidecar scheduler")
self._keep_running = True
self._scheduler_task = asyncio.create_task(
self._run_scheduler_task(), name="dynamic-scheduler"
)
self._trigger_observation_queue_task = asyncio.create_task(
self._run_trigger_observation_queue_task(),
name="dynamic-scheduler-trigger-obs-queue",
)

self._cleanup_volume_removal_services_task = asyncio.create_task(
_scheduler_utils.cleanup_volume_removal_services(self.app),
name="dynamic-scheduler-cleanup-volume-removal-services",
)
await _scheduler_utils.discover_running_services(self)

async def shutdown(self) -> None:
logger.info("Shutting down dynamic-sidecar scheduler")
self._keep_running = False
self._inverse_search_mapping = {}
self._to_observe = {}

if self._cleanup_volume_removal_services_task is not None:
self._cleanup_volume_removal_services_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cleanup_volume_removal_services_task
self._cleanup_volume_removal_services_task = None

if self._scheduler_task is not None:
self._scheduler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._scheduler_task
self._scheduler_task = None

if self._trigger_observation_queue_task is not None:
await self._trigger_observation_queue.put(None)

self._trigger_observation_queue_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._trigger_observation_queue_task
self._trigger_observation_queue_task = None
self._trigger_observation_queue = Queue()

# let's properly cleanup remaining observation tasks
running_tasks = [
x for x in self._service_observation_task.values() if isinstance(x, Task)
]
for task in running_tasks:
task.cancel()
try:
MAX_WAIT_TIME_SECONDS = 5
results = await asyncio.wait_for(
asyncio.gather(*running_tasks, return_exceptions=True),
timeout=MAX_WAIT_TIME_SECONDS,
)
if bad_results := list(filter(lambda r: isinstance(r, Exception), results)):
logger.error(
"Following observation tasks completed with an unexpected error:%s",
f"{bad_results}",
)
except asyncio.TimeoutError:
logger.error(
"Timed-out waiting for %s to complete. Action: Check why this is blocking",
f"{running_tasks=}",
)

def toggle_observation(self, node_uuid: NodeID, disable: bool) -> bool:
"""
returns True if it managed to enable/disable observation of the service
Expand Down Expand Up @@ -320,7 +395,6 @@ async def remove_service_from_observation(self, node_uuid: NodeID) -> None:
logger.debug("Removed service '%s' from scheduler", service_name)

async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDetails:
# pylint: disable=too-many-return-statements
"""
raises DynamicSidecarNotFoundError
Expand All @@ -330,73 +404,8 @@ async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDeta
service_name = self._inverse_search_mapping[node_uuid]

scheduler_data: SchedulerData = self._to_observe[service_name]

# check if there was an error picked up by the scheduler
# and marked this service as failed
if scheduler_data.dynamic_sidecar.status.current != DynamicSidecarStatus.OK:
return RunningDynamicServiceDetails.from_scheduler_data(
node_uuid=node_uuid,
scheduler_data=scheduler_data,
service_state=ServiceState.FAILED,
service_message=scheduler_data.dynamic_sidecar.status.info,
)

# is the service stopping?
if scheduler_data.dynamic_sidecar.service_removal_state.can_remove:
return RunningDynamicServiceDetails.from_scheduler_data(
node_uuid=node_uuid,
scheduler_data=scheduler_data,
service_state=ServiceState.STOPPING,
service_message=scheduler_data.dynamic_sidecar.status.info,
)

# the service should be either running or starting
try:
sidecar_state, sidecar_message = await get_dynamic_sidecar_state(
# the service_name is unique and will not collide with other names
# it can be used in place of the service_id here, as the docker API accepts both
service_id=scheduler_data.service_name
)
except DockerServiceNotFoundError:
# in this case, the service is starting, so state is pending
return RunningDynamicServiceDetails.from_scheduler_data(
node_uuid=node_uuid,
scheduler_data=scheduler_data,
service_state=ServiceState.PENDING,
service_message=scheduler_data.dynamic_sidecar.status.info,
)

# while the dynamic-sidecar state is not RUNNING report it's state
if sidecar_state != ServiceState.RUNNING:
return RunningDynamicServiceDetails.from_scheduler_data(
node_uuid=node_uuid,
scheduler_data=scheduler_data,
service_state=sidecar_state,
service_message=sidecar_message,
)

# NOTE: This will be repeatedly called until the
# user services are effectively started

# wait for containers to start
if len(scheduler_data.dynamic_sidecar.containers_inspect) == 0:
# marks status as waiting for containers
return RunningDynamicServiceDetails.from_scheduler_data(
node_uuid=node_uuid,
scheduler_data=scheduler_data,
service_state=ServiceState.STARTING,
service_message="",
)

# compute composed containers states
container_state, container_message = extract_containers_minimum_statuses(
scheduler_data.dynamic_sidecar.containers_inspect
)
return RunningDynamicServiceDetails.from_scheduler_data(
node_uuid=node_uuid,
scheduler_data=scheduler_data,
service_state=container_state,
service_message=container_message,
return await _scheduler_utils.get_stack_status_from_scheduler_data(
scheduler_data
)

async def retrieve_service_inputs(
Expand Down Expand Up @@ -562,47 +571,3 @@ async def _run_scheduler_task(self) -> None:

await sleep(settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS)
self._observation_counter += 1

async def _discover_running_services(self) -> None:
"""discover all services which were started before and add them to the scheduler"""
dynamic_sidecar_settings: DynamicSidecarSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
)
services_to_observe: list[
SchedulerData
] = await get_dynamic_sidecars_to_observe(dynamic_sidecar_settings)

logger.info(
"The following services need to be observed: %s", services_to_observe
)

for scheduler_data in services_to_observe:
await self._add_service(scheduler_data)

async def _cleanup_volume_removal_services(self) -> None:
settings: DynamicServicesSchedulerSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
)
dynamic_sidecar_settings: DynamicSidecarSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
)

logger.debug(
"dynamic-sidecars cleanup pending volume removal services every %s seconds",
settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S,
)
while await asyncio.sleep(
settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S,
True,
):
logger.debug("Removing pending volume removal services...")

try:
await remove_pending_volume_removal_services(dynamic_sidecar_settings)
except asyncio.CancelledError:
logger.info("Stopped pending volume removal services task")
raise
except Exception: # pylint: disable=broad-except
logger.exception(
"Unexpected error while cleaning up pending volume removal services"
)
Loading

0 comments on commit ac70927

Please sign in to comment.