Skip to content

Commit

Permalink
support client event to server/cloud instance
Browse files Browse the repository at this point in the history
  • Loading branch information
tianweidut committed Dec 21, 2023
1 parent 4949eea commit f418c08
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 1 deletion.
3 changes: 3 additions & 0 deletions client/starwhale/api/_impl/evaluation/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from starwhale.base.type import RunSubDirType, PredictLogMode
from starwhale.api.service import Service
from starwhale.utils.error import ParameterError, FieldTypeOrValueError
from starwhale.utils.event import event
from starwhale.base.context import Context
from starwhale.core.job.store import JobStorage
from starwhale.api._impl.dataset import Dataset
Expand Down Expand Up @@ -186,6 +187,7 @@ def _wrapper(*args: t.Any, **kwargs: t.Any) -> None:
return _wrapper

@_record_status # type: ignore
@event("run Starwhale Model evaluation")
def _starwhale_internal_run_evaluate(self) -> None:
now = now_str()
try:
Expand Down Expand Up @@ -279,6 +281,7 @@ def _do_evaluate(self) -> t.Any:
func()

@_record_status # type: ignore
@event("run Starwhale Model prediction")
def _starwhale_internal_run_predict(self) -> None:
if not self.dataset_uris:
raise FieldTypeOrValueError("context.dataset_uris is empty")
Expand Down
2 changes: 2 additions & 0 deletions client/starwhale/api/_impl/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from starwhale.utils import console
from starwhale.consts import SHORT_VERSION_CNT, DecoratorInjectAttr
from starwhale.utils.event import add_event
from starwhale.base.context import Context
from starwhale.api._impl.model import build as build_starwhale_model
from starwhale.api._impl.dataset import Dataset
Expand Down Expand Up @@ -95,6 +96,7 @@ def _run_wrapper(*args: t.Any, **kw: t.Any) -> t.Any:
)

# TODO: support arguments from command line
add_event(f"start to finetune model by {func.__qualname__} function")
ret = func(*inject_args)

if auto_build_model:
Expand Down
2 changes: 2 additions & 0 deletions client/starwhale/api/_impl/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from starwhale.utils import console, disable_progress_bar
from starwhale.utils.fs import blake2b_content
from starwhale.consts.env import SWEnv
from starwhale.utils.event import event

from .job import Handler
from ...base.uri.project import Project
Expand All @@ -18,6 +19,7 @@
_called_build_lock = threading.Lock()


@event(msg="build Starwhale model")
def build(
modules: t.Optional[t.List[t.Any]] = None,
workdir: t.Optional[_path_T] = None,
Expand Down
27 changes: 27 additions & 0 deletions client/starwhale/base/client/api/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from starwhale.base.uri.instance import Instance
from starwhale.base.client.client import Client, TypeWrapper
from starwhale.base.client.models.base import ResponseCode
from starwhale.base.client.models.models import EventRequest, ResponseMessageListEventVo


class EventApi(Client):
def __init__(self, instance: Instance) -> None:
super().__init__(instance.url, instance.token)

def list(
self, project: str, job: str, task: str = "", run: str = ""
) -> TypeWrapper[ResponseMessageListEventVo]:
uri = f"/api/v1/project/{project}/job/{job}"
return TypeWrapper(
ResponseMessageListEventVo,
self.http_get(uri, params={"taskId": task, "runId": run}),
)

def add(
self, project: str, job: str, event: EventRequest
) -> TypeWrapper[ResponseCode]:
uri = f"/api/v1/project/{project}/job/{job}"
data = self.http_post(uri, event)
return TypeWrapper(ResponseCode, data)
5 changes: 4 additions & 1 deletion client/starwhale/base/uri/instance.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Any, Dict, Tuple, Optional
from urllib.parse import urlparse

from starwhale.utils import config
from starwhale.base.uri.exceptions import NoMatchException


def _get_instances() -> Dict[str, Dict]:
from starwhale.utils import config

return config.load_swcli_config().get("instances", {}) # type: ignore[no-any-return]


def _get_default_instance_alias() -> str:
from starwhale.utils import config

return config.load_swcli_config().get("current_instance", "") # type: ignore[no-any-return]


Expand Down
3 changes: 3 additions & 0 deletions client/starwhale/consts/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ class SWEnv:
instance_uri = "SW_INSTANCE_URI"
project = "SW_PROJECT"
task_id = "SW_TASK_ID"
run_id = "SW_RUN_ID"
job_id = "SW_JOB_ID"
project_uri = "SW_PROJECT_URI"
job_version = "SW_JOB_VERSION"
model_version = "SW_MODEL_VERSION"
runtime_version = "SW_RUNTIME_VERSION"
Expand Down
3 changes: 3 additions & 0 deletions client/starwhale/core/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from starwhale.api.service import Service
from starwhale.base.bundle import BaseBundle, LocalStorageBundleMixin
from starwhale.utils.error import NoSupportError
from starwhale.utils.event import event, add_event
from starwhale.base.context import Context
from starwhale.api._impl.job import Handler, generate_jobs_yaml
from starwhale.base.scheduler import Step, Scheduler
Expand Down Expand Up @@ -197,6 +198,7 @@ def _get_cls(cls, uri: Instance) -> t.Union[t.Type[StandaloneModel], t.Type[Clou
raise NoSupportError(f"model uri:{uri}")

@classmethod
@event(msg="copy Starwhale Model")
def copy(
cls,
src_uri: Resource,
Expand Down Expand Up @@ -883,6 +885,7 @@ def serve(
port: int,
) -> None:
svc = cls._get_service(model_config.run.modules, model_src_dir)
add_event("serving Starwhale Model")
svc.serve(host, port, model_config.name)

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions client/starwhale/core/runtime/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
ExclusiveArgsError,
UnExpectedConfigFieldError,
)
from starwhale.utils.event import event
from starwhale.utils.process import check_call
from starwhale.utils.progress import run_with_progress_bar
from starwhale.base.bundle_copy import BundleCopy
Expand Down Expand Up @@ -765,6 +766,7 @@ def __str__(self) -> str:
return f"Starwhale Runtime: {self.uri}"

@classmethod
@event(msg="copy Starwhale Runtime")
def copy(
cls,
src_uri: Resource,
Expand Down
2 changes: 2 additions & 0 deletions client/starwhale/core/runtime/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
check_valid_conda_prefix,
)
from starwhale.utils.error import NoSupportError, FieldTypeOrValueError
from starwhale.utils.event import event
from starwhale.utils.process import check_call
from starwhale.core.model.model import StandaloneModel
from starwhale.base.uri.resource import Resource, ResourceType
Expand Down Expand Up @@ -120,6 +121,7 @@ def run_arbitrary_cmd(
capture_stdout=not live_stream,
)

@event(msg="restore Starwhale Runtime")
def _restore_runtime(
self,
force_restore: bool = False,
Expand Down
185 changes: 185 additions & 0 deletions client/starwhale/utils/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from __future__ import annotations

import os
import json
import queue
import atexit
import typing as t
import threading
from types import TracebackType
from functools import wraps

from starwhale.consts.env import SWEnv
from starwhale.utils.debug import console
from starwhale.base.uri.project import Project
from starwhale.base.client.api.event import EventApi
from starwhale.base.client.models.models import (
Source,
EventType,
EventRequest,
RelatedResource,
EventResourceType,
)


def event(*deco_args: t.Any, **deco_kw: t.Any) -> t.Any:
"""Event decorator for recording event to server or standalone instance
Arguments:
msg: [str, optional] The event message. Default is the function name.
external: [Dict, optional] The external data for the event. Default is None.
The external data should be json serializable.
event_type: [str|EventType, optional] The event type. Default is EventType.info.
str accepts: INFO, WARNING, ERROR.
ignore_end: [bool, optional] Whether to ignore the event when the function is finished. Default is False.
"""
if len(deco_args) == 1 and len(deco_kw) == 0 and callable(deco_args[0]):
return event()(deco_args[0])
else:

def _decorator(func: t.Callable) -> t.Any:
@wraps(func)
def _wrapper(*f_args: t.Any, **f_kw: t.Any) -> t.Any:
msg = deco_kw.get("msg", f"func-{func.__qualname__}")
external = deco_kw.get("external")
event_type = deco_kw.get("event_type", EventType.info)
ignore_end = deco_kw.get("ignore_end", False)

add_event(msg, external, event_type)
try:
_rt = func(*f_args, **f_kw)
except Exception as e:
if not ignore_end:
add_event(f"{msg}, run failed:{e}", external, EventType.error)
raise
else:
if not ignore_end:
add_event(f"{msg}, run success", external, event_type)

return _rt

return _wrapper

return _decorator


def add_event(
msg: str, external: t.Any = None, event_type: str | EventType = EventType.info
) -> None:
with Event._lock:
if Event._instance is None:
Event._instance = Event()
event = Event._instance

event.add(msg, external, event_type)


class Event(threading.Thread):
_instance: Event | None = None
_lock = threading.Lock()

def __init__(self, maxsize: int = 1000) -> None:
super().__init__(name="EventThread")
self.queue: queue.Queue[t.Tuple | None] = queue.Queue(maxsize)

self.daemon = True
self.start()
atexit.register(self.close)

def add(
self,
msg: str,
external: t.Any = None,
event_type: str | EventType = EventType.info,
) -> None:
msg = msg.strip()
if not msg:
return

if external:
external = json.dumps(external)

if isinstance(event_type, str):
event_type = EventType(event_type)

self.queue.put((msg, external, event_type))

def __enter__(self) -> Event:
return self

def __exit__(
self,
type: t.Optional[t.Type[BaseException]],
value: t.Optional[BaseException],
trace: TracebackType,
) -> None:
if value: # pragma: no cover
console.warning(f"type:{type}, exception:{value}, traceback:{trace}")

self.close()

def close(self) -> None:
atexit.unregister(self.close)
self.queue.put(None)

self.flush()
self.join()

def flush(self) -> None:
self.queue.join()

def _dispatch_to_server(
self,
project: Project,
msg: str,
external: t.Any,
event_type: EventType = EventType.info,
) -> None:
job_id = os.environ.get("SW_JOB_ID")
run_id = os.environ.get("SW_RUN_ID")
if not job_id or not run_id:
console.warning(
f"failed to dispatch event({msg}) to server, "
f"project={project} job_id={job_id} run_id={run_id}"
)
return

EventApi(project.instance).add(
project=project.id,
job=job_id,
event=EventRequest(
event_type=event_type,
source=Source.client,
related_resource=RelatedResource(
event_resource_type=EventResourceType.run,
id=int(run_id),
),
message=msg,
data=external,
),
).raise_on_error()

def run(self) -> None:
while True:
item = self.queue.get()
if item is None:
self.queue.task_done()
break

try:
msg, external, typ = item
console.info(f"[EVENT][{typ.value}] msg={msg} external={external}")
project_uri = os.environ.get(SWEnv.project_uri)
if project_uri:
project = Project(project_uri)
else:
project = None

if project and project.instance.is_cloud:
self._dispatch_to_server(project, msg, external, event_type=typ)
# TODO: support event dispatch to standalone
except Exception as e:
# Event can ignore dispatch error, event is not critical
console.exception(f"failed to dispatch event({item}): {e}")
finally:
self.queue.task_done()
Loading

0 comments on commit f418c08

Please sign in to comment.