From 14d7cb705d0f5c39ad148e7603b74f9b8b559a95 Mon Sep 17 00:00:00 2001 From: wunder957 Date: Fri, 10 Nov 2023 17:39:23 +0800 Subject: [PATCH] Demo for jaeger client --- duetector/analyzer/jaeger/analyzer.py | 15 +++++++++++++++ duetector/service/query/routes.py | 8 +++----- duetector/service/utils.py | 11 +++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) create mode 100644 duetector/service/utils.py diff --git a/duetector/analyzer/jaeger/analyzer.py b/duetector/analyzer/jaeger/analyzer.py index 421d065..5d9c9c8 100644 --- a/duetector/analyzer/jaeger/analyzer.py +++ b/duetector/analyzer/jaeger/analyzer.py @@ -1,2 +1,17 @@ +import asyncio + +import grpc + from duetector.analyzer.jaeger.proto.query_pb2 import * from duetector.analyzer.jaeger.proto.query_pb2_grpc import * + + +async def run() -> None: + async with grpc.aio.insecure_channel("localhost:16685") as channel: + stub = QueryServiceStub(channel) + response = await stub.GetServices(GetServicesRequest()) + print(response) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/duetector/service/query/routes.py b/duetector/service/query/routes.py index f25b889..2d241b2 100644 --- a/duetector/service/query/routes.py +++ b/duetector/service/query/routes.py @@ -1,7 +1,4 @@ -from asyncio import sleep - from fastapi import APIRouter, Body, Depends -from fastapi.concurrency import run_in_threadpool from duetector.service.base import get_controller from duetector.service.query.controller import AnalyzerController @@ -11,6 +8,7 @@ QueryBody, QueryResult, ) +from duetector.service.utils import ensure_async r = APIRouter( prefix="/query", @@ -38,7 +36,7 @@ async def query( Query data from analyzer """ analyzer = controller.get_analyzer(analyzer_name) - trackings = await run_in_threadpool(analyzer.query, **query_param.model_dump()) + trackings = await ensure_async(analyzer.query, **query_param.model_dump()) return QueryResult( trackings=trackings, @@ -53,7 +51,7 @@ async def query_brief( ): # type is not serializable, so we need to get analyzer without inspect type analyzer = controller.get_analyzer(analyzer_name) - brief = await run_in_threadpool(analyzer.brief, inspect_type=False) + brief = await ensure_async(analyzer.brief, inspect_type=False) return BriefResult( brief=brief, analyzer_name=analyzer_name, diff --git a/duetector/service/utils.py b/duetector/service/utils.py new file mode 100644 index 0000000..2da94ee --- /dev/null +++ b/duetector/service/utils.py @@ -0,0 +1,11 @@ +import asyncio + +from fastapi.concurrency import run_in_threadpool + + +async def ensure_async(f: callable, *args, **kwargs): + # await async function, run sync function in thread pool + if asyncio.iscoroutinefunction(f): + return await f(*args, **kwargs) + + return await run_in_threadpool(f, *args, **kwargs)