-
Notifications
You must be signed in to change notification settings - Fork 61
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add python side metric example * add doc * fix lint * move pid to init func * update metrics doc * Apply suggestions from code review Co-authored-by: zclzc <[email protected]> * update metrics doc, rm confusing comments Co-authored-by: zclzc <[email protected]>
- Loading branch information
Showing
3 changed files
with
121 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
This is an example demonstrating how to add your customized Python side Prometheus metrics. | ||
|
||
Mosec already has the Rust side metrics, including: | ||
|
||
* throughput for the inference endpoint | ||
* duration for each stage (including the IPC time) | ||
* batch size (only for the `max_batch_size > 1` workers) | ||
* number of remaining tasks to be processed | ||
|
||
If you need to monitor more details about the inference process, you can add some Python side metrics. E.g., the inference result distribution, the duration of some CPU-bound or GPU-bound processing, the IPC time (get from `rust_step_duration - python_step_duration`). | ||
|
||
This example has a simple WSGI app as the monitoring metrics service. In each worker process, the `Counter` will collect the inference results and export them to the metrics service. For the inference part, it parses the batch data and compares them with the average value. | ||
|
||
For more information about the multiprocess mode for the metrics, check the [Prometheus doc](https://github.com/prometheus/client_python#multiprocess-mode-eg-gunicorn). | ||
|
||
#### **`python_side_metrics.py`** | ||
|
||
```python | ||
--8<-- "examples/python_side_metrics.py" | ||
``` | ||
|
||
#### Start | ||
|
||
python python_side_metrics.py | ||
|
||
#### Test | ||
|
||
http POST :8000/inference num=1 | ||
|
||
#### Check the Python side metrics | ||
|
||
http :8080 | ||
|
||
#### Check the Rust side metrics | ||
|
||
http :8000/metrics |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import logging | ||
import os | ||
import pathlib | ||
import tempfile | ||
import threading | ||
from typing import List | ||
from wsgiref.simple_server import make_server | ||
|
||
from mosec import Server, Worker | ||
from mosec.errors import ValidationError | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.DEBUG) | ||
formatter = logging.Formatter( | ||
"%(asctime)s - %(process)d - %(levelname)s - %(filename)s:%(lineno)s - %(message)s" | ||
) | ||
sh = logging.StreamHandler() | ||
sh.setFormatter(formatter) | ||
logger.addHandler(sh) | ||
|
||
|
||
# check the PROMETHEUS_MULTIPROC_DIR environment variable before import Prometheus | ||
if not os.environ.get("PROMETHEUS_MULTIPROC_DIR"): | ||
metric_dir_path = os.path.join(tempfile.gettempdir(), "prometheus_multiproc_dir") | ||
pathlib.Path(metric_dir_path).mkdir(parents=True, exist_ok=True) | ||
os.environ["PROMETHEUS_MULTIPROC_DIR"] = metric_dir_path | ||
|
||
from prometheus_client import ( # type: ignore # noqa: E402 | ||
CONTENT_TYPE_LATEST, | ||
CollectorRegistry, | ||
Counter, | ||
generate_latest, | ||
multiprocess, | ||
) | ||
|
||
metric_registry = CollectorRegistry() | ||
multiprocess.MultiProcessCollector(metric_registry) | ||
counter = Counter("inference_result", "statistic of result", ("status", "pid")) | ||
|
||
|
||
def metric_app(environ, start_response): | ||
data = generate_latest(metric_registry) | ||
start_response( | ||
"200 OK", | ||
[("Content-Type", CONTENT_TYPE_LATEST), ("Content-Length", str(len(data)))], | ||
) | ||
return iter([data]) | ||
|
||
|
||
def metric_service(host="", port=8080): | ||
with make_server(host, port, metric_app) as httpd: | ||
httpd.serve_forever() | ||
|
||
|
||
class Inference(Worker): | ||
def __init__(self): | ||
super().__init__() | ||
self.pid = str(os.getpid()) | ||
|
||
def deserialize(self, data: bytes) -> int: | ||
json_data = super().deserialize(data) | ||
try: | ||
res = int(json_data.get("num")) | ||
except Exception as err: | ||
raise ValidationError(err) | ||
return res | ||
|
||
def forward(self, data: List[int]) -> List[bool]: | ||
avg = sum(data) / len(data) | ||
ans = [x >= avg for x in data] | ||
counter.labels(status="true", pid=self.pid).inc(sum(ans)) | ||
counter.labels(status="false", pid=self.pid).inc(len(ans) - sum(ans)) | ||
return ans | ||
|
||
|
||
if __name__ == "__main__": | ||
# Run the metrics server in another thread. | ||
metric_thread = threading.Thread(target=metric_service, daemon=True) | ||
metric_thread.start() | ||
|
||
# Run the inference server | ||
server = Server() | ||
server.append_worker(Inference, max_batch_size=8) | ||
server.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters