diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..a12e979 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[run] +omit = duetector/cli/* diff --git a/dev-tools/entrypoint-server.py b/dev-tools/entrypoint-server.py index 01d09ed..2d03e4d 100644 --- a/dev-tools/entrypoint-server.py +++ b/dev-tools/entrypoint-server.py @@ -1,5 +1,7 @@ import os +import importlib_metadata + os.chdir(os.path.dirname(os.path.abspath(__file__))) os.environ["DUETECTOR_LOG_LEVEL"] = "DEBUG" @@ -7,7 +9,14 @@ import sys from pathlib import Path -from pkg_resources import load_entry_point + +def load_entry_point(distribution, group, name): + dist_obj = importlib_metadata.distribution(distribution) + eps = [ep for ep in dist_obj.entry_points if ep.group == group and ep.name == name] + if not eps: + raise ImportError("Entry point %r not found" % ((group, name),)) + return eps[0].load() + db_file = Path("./duetector-dbcollector.sqlite3") config_file = Path("./config.toml") diff --git a/dev-tools/entrypoint.py b/dev-tools/entrypoint.py index e638dff..7d4a4d8 100644 --- a/dev-tools/entrypoint.py +++ b/dev-tools/entrypoint.py @@ -1,5 +1,7 @@ import os +import importlib_metadata + os.chdir(os.path.dirname(os.path.abspath(__file__))) os.environ["DUETECTOR_LOG_LEVEL"] = "DEBUG" @@ -7,7 +9,14 @@ import sys from pathlib import Path -from pkg_resources import load_entry_point + +def load_entry_point(distribution, group, name): + dist_obj = importlib_metadata.distribution(distribution) + eps = [ep for ep in dist_obj.entry_points if ep.group == group and ep.name == name] + if not eps: + raise ImportError("Entry point %r not found" % ((group, name),)) + return eps[0].load() + db_file = Path("./duetector-dbcollector.sqlite3") config_file = Path("./config.toml") diff --git a/duetector/analyzer/base.py b/duetector/analyzer/base.py index 52600d5..8c5c8b9 100644 --- a/duetector/analyzer/base.py +++ b/duetector/analyzer/base.py @@ -82,6 +82,7 @@ def brief( end_datetime: Optional[datetime] = None, with_details: bool = True, distinct: bool = False, + inspect_type: bool = False, ) -> AnalyzerBrief: """ Get a brief of this analyzer. @@ -97,6 +98,7 @@ def brief( end_datetime (Optional[datetime], optional): End time. Defaults to None. with_details (bool, optional): With details. Defaults to True. distinct (bool, optional): Distinct. Defaults to False. + inspect_type (bool, optional): Weather fileds's value is type or type name. Defaults to False, type name. Returns: AnalyzerBrief: A brief of this analyzer. diff --git a/duetector/analyzer/db.py b/duetector/analyzer/db.py index 5e8a3b2..df9fdee 100644 --- a/duetector/analyzer/db.py +++ b/duetector/analyzer/db.py @@ -6,6 +6,7 @@ from duetector.analyzer.base import Analyzer from duetector.analyzer.models import AnalyzerBrief, Brief, Tracking from duetector.db import SessionManager +from duetector.log import logger class DBAnalyzer(Analyzer): @@ -140,6 +141,7 @@ def query( if order_by_desc: statm = statm.order_by(*[getattr(m, k).desc() for k in order_by_desc]) + logger.debug(f"Querying {tracer}@{collector_id} with statm: {statm}") with self.sm.begin() as session: r.extend( [ @@ -174,6 +176,7 @@ def _table_brief( start_datetime: Optional[datetime] = None, end_datetime: Optional[datetime] = None, inspect: bool = True, + inspect_type: bool = False, distinct: bool = False, ) -> Brief: """ @@ -191,7 +194,12 @@ def _table_brief( m = self.sm.get_tracking_model(tracer, collector_id) if not inspect: - return Brief(tracer=tracer, collector_id=collector_id, fields=m.inspect_fields()) + logger.debug(f"Briefing {tracer}@{collector_id} without inspect") + return Brief( + tracer=tracer, + collector_id=collector_id, + fields=m.inspect_fields(value_as_type=inspect_type), + ) columns = m.inspect_fields().keys() statm = select(*[getattr(m, k) for k in columns]) if distinct: @@ -204,6 +212,7 @@ def _table_brief( start_statm = statm.order_by(m.dt.asc()) end_statm = statm.order_by(m.dt.desc()) count_statm = select(func.count()).select_from(statm.subquery()) + logger.debug(f"Briefing {tracer}@{collector_id} with statm: {start_statm}") with self.sm.begin() as session: start_tracking = self._convert_row_to_tracking( columns, session.execute(start_statm).first(), tracer @@ -218,7 +227,7 @@ def _table_brief( start=start_tracking.dt, end=end_tracking.dt, count=session.execute(count_statm).scalar(), - fields=m.inspect_fields(), + fields=m.inspect_fields(value_as_type=inspect_type), ) def _convert_row_to_tracking(self, columns: List[str], row: Any, tracer: str) -> Tracking: @@ -246,6 +255,7 @@ def brief( end_datetime: Optional[datetime] = None, with_details: bool = True, distinct: bool = False, + inspect_type: bool = False, ) -> AnalyzerBrief: """ Get a brief of this analyzer. @@ -265,15 +275,21 @@ def brief( Returns: AnalyzerBrief: A brief of this analyzer. """ + tables = self.sm.inspect_all_tables() if tracers: tables = [t for t in tables if self.sm.table_name_to_tracer(t) in tracers] if collector_ids: tables = [t for t in tables if self.sm.table_name_to_collector_id(t) in collector_ids] - briefs = [ + briefs: List[Brief] = [ self._table_brief( - t, start_datetime, end_datetime, inspect=with_details, distinct=distinct + t, + start_datetime, + end_datetime, + inspect=with_details, + distinct=distinct, + inspect_type=inspect_type, ) for t in tables ] @@ -281,5 +297,5 @@ def brief( return AnalyzerBrief( tracers=set([brief.tracer for brief in briefs]), collector_ids=set([brief.collector_id for brief in briefs]), - briefs=briefs, + briefs={f"{brief.tracer}@{brief.collector_id}": brief for brief in briefs}, ) diff --git a/duetector/analyzer/models.py b/duetector/analyzer/models.py index da50e1d..1093278 100644 --- a/duetector/analyzer/models.py +++ b/duetector/analyzer/models.py @@ -68,7 +68,7 @@ class Brief(pydantic.BaseModel): fields: Dict[str, Any] = {} def __repr__(self): - fields_repr = ", ".join([f"{k}: {v.__name__}" for k, v in self.fields.items()]) + fields_repr = ", ".join([f"{k}: {v}" for k, v in self.fields.items()]) s = f""" {self.tracer}@{self.collector_id} with {self.count} records @@ -97,10 +97,12 @@ class AnalyzerBrief(pydantic.BaseModel): Set of collector ids """ - briefs: List[Brief] + briefs: Dict[str, Brief] def __repr__(self): - briefs_repr = "\n".join([f"\n----------------{b}----------------" for b in self.briefs]) + briefs_repr = "\n".join( + [f"\n----------------{b}----------------" for b in self.briefs.values()] + ) s = f""" Available tracers: {self.tracers} Available collector ids: {self.collector_ids} diff --git a/duetector/cli/main.py b/duetector/cli/main.py index 75d8830..2654f6f 100644 --- a/duetector/cli/main.py +++ b/duetector/cli/main.py @@ -170,7 +170,7 @@ def _shutdown(sig=None, frame=None): if brief: try: logger.info("Generating brief...") - logger.info(str(DBAnalyzer(c).brief())) + logger.info(str(DBAnalyzer(c).brief(inspect_type=False))) except Exception as e: logger.error("Exception when generating brief") logger.exception(e) diff --git a/duetector/db.py b/duetector/db.py index ebcf18e..5cd4bd9 100644 --- a/duetector/db.py +++ b/duetector/db.py @@ -61,10 +61,10 @@ def to_analyzer_tracking(self) -> AT: raise NotImplementedError @classmethod - def inspect_fields(cls) -> Dict[str, Any]: - """ - Inspect fields of this model. - """ + def inspect_fields( + cls, + value_as_type: bool = False, + ) -> Dict[str, Any]: raise NotImplementedError @@ -210,6 +210,8 @@ def get_tracking_model( Returns: type: a sqlalchemy model for tracking """ + if tracer in self._tracking_models: + return self._tracking_models[tracer] with self.mutex: if tracer in self._tracking_models: @@ -248,9 +250,14 @@ def to_analyzer_tracking(self) -> AT: ) @classmethod - def inspect_fields(cls) -> Dict[str, Any]: + def inspect_fields( + cls, + value_as_type: bool = False, + ) -> Dict[str, Any]: return { - c.name: c.type.python_type for c in cls.__table__.columns if c.name != "id" + c.name: c.type.python_type if value_as_type else c.type.python_type.__name__ + for c in cls.__table__.columns + if c.name != "id" } try: diff --git a/duetector/service/app.py b/duetector/service/app.py index 0a9aeb7..75f8c39 100644 --- a/duetector/service/app.py +++ b/duetector/service/app.py @@ -1,13 +1,34 @@ -from fastapi import FastAPI +from fastapi import Depends, FastAPI, HTTPException +from fastapi.security import APIKeyQuery +from starlette.status import HTTP_403_FORBIDDEN from duetector.__init__ import __version__ +from duetector.service.config import Config, get_server_config from duetector.service.control.routes import r as cr from duetector.service.query.routes import r as qr + +async def verify_token( + server_config: Config = Depends(get_server_config), + token: str = Depends(APIKeyQuery(name="token", auto_error=False)), +): + if server_config.token and token != server_config.token: + raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Not authenticated") + + app = FastAPI( title="Duetector", description="Data Usage Extensible Detector for data usage observability", version=__version__, + dependencies=[Depends(verify_token)], ) app.include_router(qr) app.include_router(cr) + + +@app.get("/") +async def root(): + """ + Just a simple health check, returns a message. + """ + return {"message": "Hello World"} diff --git a/duetector/service/base.py b/duetector/service/base.py new file mode 100644 index 0000000..cd14fcb --- /dev/null +++ b/duetector/service/base.py @@ -0,0 +1,22 @@ +from typing import Any, Dict, Optional + +from fastapi import Depends + +from duetector.config import Configuable +from duetector.service.config import get_config + + +class Controller(Configuable): + config_scope = None + + default_config = {} + + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): + super().__init__(config, *args, **kwargs) + + +def get_controller(controller_type: type): + def _(config: dict = Depends(get_config)) -> Controller: + return controller_type(config) + + return _ diff --git a/duetector/service/config.py b/duetector/service/config.py index c352135..570c208 100644 --- a/duetector/service/config.py +++ b/duetector/service/config.py @@ -1,12 +1,14 @@ import os from typing import Any, Dict +from fastapi import Depends + try: from functools import cache except ImportError: from functools import lru_cache as cache -from duetector.config import ConfigLoader +from duetector.config import Config, ConfigLoader, Configuable CONFIG_PATH_ENV = "DUETECTOR_SERVER_CONFIG_PATH" @@ -24,3 +26,14 @@ def get_config() -> Dict[str, Any]: load_env=False, dump_when_load=False, ).load_config() + + +class ServerConfig(Configuable): + config_scope = "server" + default_config = {"token": ""} + + +def get_server_config( + config: Dict[str, Any] = Depends(get_config), +) -> Config: + return ServerConfig(config).config diff --git a/duetector/service/control/controller.py b/duetector/service/control/controller.py new file mode 100644 index 0000000..4c55957 --- /dev/null +++ b/duetector/service/control/controller.py @@ -0,0 +1,5 @@ +from duetector.service.base import Controller + + +class DaemonControler(Controller): + pass diff --git a/duetector/service/control/models.py b/duetector/service/control/models.py new file mode 100644 index 0000000..e69de29 diff --git a/duetector/service/control/routes.py b/duetector/service/control/routes.py index 9cac799..ca5042d 100644 --- a/duetector/service/control/routes.py +++ b/duetector/service/control/routes.py @@ -4,9 +4,10 @@ r = APIRouter( prefix="/control", + tags=["control"], ) @r.get("/") async def root(config: dict = Depends(get_config)): - return config + pass diff --git a/duetector/service/exceptions.py b/duetector/service/exceptions.py new file mode 100644 index 0000000..85b264f --- /dev/null +++ b/duetector/service/exceptions.py @@ -0,0 +1,9 @@ +from fastapi import HTTPException +from starlette.status import HTTP_404_NOT_FOUND + + +class NotFoundError(HTTPException): + def __init__(self, what: str = ""): + if what: + what = f"{what} " + super().__init__(status_code=HTTP_404_NOT_FOUND, detail=f"{what}Not found") diff --git a/duetector/service/query/controller.py b/duetector/service/query/controller.py new file mode 100644 index 0000000..fd5a29a --- /dev/null +++ b/duetector/service/query/controller.py @@ -0,0 +1,52 @@ +from typing import Any, Dict, List, Optional + +from duetector.analyzer.base import Analyzer +from duetector.analyzer.db import DBAnalyzer +from duetector.service.base import Controller +from duetector.service.exceptions import NotFoundError + + +class AnalyzerController(Controller): + def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs): + super().__init__(config, *args, **kwargs) + + # TODO: Make this configurable, may intro a manager for analyzer + self._avaliable_analyzers = [DBAnalyzer] + + self._analyzers: Dict[str, Analyzer] = { + analyzer.config_scope: self._init_analyzer(analyzer) + for analyzer in self._avaliable_analyzers + } + + def _init_analyzer(self, analyzer: type): + analyzer_config = getattr(self.config, analyzer.config_scope)._config_dict + return analyzer(analyzer_config) + + @property + def avaliable_analyzer_names(self) -> List[str]: + return [a.config_scope for a in self._avaliable_analyzers] + + def get_analyzer(self, analyzer_name: str) -> Analyzer: + """ + Get analyzer by name + + Args: + analyzer_name (str): analyzer name, should be one of avaliable_analyzer_names + + Raises: + NotFoundError: if analyzer not found + + Returns: + Analyzer: analyzer instance + + Allow use "-" or "_" in analyzer name, for example, both "db-analyzer" and "db_analyzer" are ok. + + Examples: + >>> controller.get_analyzer("db-analyzer") + """ + a = self._analyzers.get(analyzer_name) or self._analyzers.get( + analyzer_name.replace("-", "_") + ) + if not a: + raise NotFoundError(analyzer_name) + return a diff --git a/duetector/service/query/models.py b/duetector/service/query/models.py new file mode 100644 index 0000000..c74df63 --- /dev/null +++ b/duetector/service/query/models.py @@ -0,0 +1,54 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + +from duetector.analyzer.models import AnalyzerBrief, Tracking + + +class AvaliableAnalyzers(BaseModel): + analyzers: List[str] + + +class QueryBody(BaseModel): + tracers: Optional[List[str]] = None + collector_ids: Optional[List[str]] = None + start_datetime: Optional[datetime] = None + end_datetime: Optional[datetime] = None + start: int = 0 + limit: int = 0 + columns: Optional[List[str]] = None + where: Optional[Dict[str, Any]] = None + distinct: bool = False + order_by_asc: Optional[List[str]] = None + order_by_desc: Optional[List[str]] = None + + model_config = { + "json_schema_extra": { + "examples": [ + { + "tracers": [], + "collector_ids": [], + "start_datetime": datetime.fromtimestamp(0), + "end_datetime": datetime.now(), + "start": 0, + "limit": 0, + "columns": [], + "where": {}, + "distinct": False, + "order_by_asc": [], + "order_by_desc": [], + } + ] + } + } + + +class QueryResult(BaseModel): + trackings: List[Tracking] + count: int + + +class BriefResult(BaseModel): + brief: AnalyzerBrief + analyzer_name: str diff --git a/duetector/service/query/routes.py b/duetector/service/query/routes.py index e8671e7..7700760 100644 --- a/duetector/service/query/routes.py +++ b/duetector/service/query/routes.py @@ -1,12 +1,57 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Body, Depends -from duetector.service.config import get_config +from duetector.service.base import get_controller +from duetector.service.query.controller import AnalyzerController +from duetector.service.query.models import ( + AvaliableAnalyzers, + BriefResult, + QueryBody, + QueryResult, +) r = APIRouter( prefix="/query", + tags=["query"], ) -@r.get("/") -async def root(config: dict = Depends(get_config)): - return config +@r.get("/", response_model=AvaliableAnalyzers) +async def root( + controller: AnalyzerController = Depends(get_controller(AnalyzerController)), +): + """ + Response available analyzer + """ + return AvaliableAnalyzers(analyzers=controller.avaliable_analyzer_names) + + +@r.post("/{analyzer_name}", response_model=QueryResult) +async def query( + analyzer_name: str, + query_param: QueryBody = Body(default=QueryBody()), + controller: AnalyzerController = Depends(get_controller(AnalyzerController)), +): + """ + Query data from analyzer + """ + analyzer = controller.get_analyzer(analyzer_name) + trackings = analyzer.query(**query_param.model_dump()) + + return QueryResult( + trackings=trackings, + count=len(trackings), + ) + + +@r.get("/{analyzer_name}/brief", response_model=BriefResult) +async def query_brief( + analyzer_name: str, + controller: AnalyzerController = Depends(get_controller(AnalyzerController)), +): + # type is not serializable, so we need to get analyzer without inspect type + analyzer = controller.get_analyzer(analyzer_name) + + return BriefResult( + brief=analyzer.brief(inspect_type=False), + analyzer_name=analyzer_name, + ) diff --git a/duetector/static/config.toml b/duetector/static/config.toml index c5ea18d..6f5d8d1 100644 --- a/duetector/static/config.toml +++ b/duetector/static/config.toml @@ -104,3 +104,6 @@ table_prefix = "duetector_tracking" [db_analyzer.db.engine] url = "sqlite:///duetector-dbcollector.sqlite3" + +[server] +token = "" diff --git a/duetector/tools/config_generator.py b/duetector/tools/config_generator.py index 7b6703d..05962e1 100644 --- a/duetector/tools/config_generator.py +++ b/duetector/tools/config_generator.py @@ -9,6 +9,7 @@ from duetector.log import logger from duetector.managers import CollectorManager, FilterManager, TracerManager from duetector.monitors import BccMonitor, ShMonitor +from duetector.service.config import ServerConfig def _recursive_load(config_scope: str, config_dict: dict, default_config: dict): @@ -62,6 +63,8 @@ class ConfigGenerator: All analyzers to inspect. """ + others = [ServerConfig] + def __init__( self, load: bool = True, @@ -87,6 +90,9 @@ def __init__( for a in self.analyzer: _recursive_load(a.config_scope, self.dynamic_config, a.default_config) + + for o in self.others: + _recursive_load(o.config_scope, self.dynamic_config, o.default_config) # This will generate default config file if not exists if load: self.loaded_config = ConfigLoader(path, load_env, dump_when_load=False).load_config() diff --git a/pyproject.toml b/pyproject.toml index 88fdf08..51283ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ 'Programming Language :: Python :: 3.11', ] [project.optional-dependencies] -test = ["pytest", "pytest-cov", "pytype"] +test = ["pytest", "pytest-cov", "pytype", "httpx"] docs = ["Sphinx<=7.2.4", "sphinx-rtd-theme", "sphinx-click", "autodoc_pydantic"] [project.scripts] diff --git a/tests/config.toml b/tests/config.toml index 7b3dc63..72129ac 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -7,7 +7,7 @@ [filter] disabled = false - +include_extension = true [filter.patternfilter] disabled = false @@ -31,6 +31,16 @@ re_exclude_gcustom = ["ignore_custom*"] [tracer] disabled = false +include_extension = true + +[tracer.clonetracer] +disabled = false +attach_event = "__x64_sys_clone" +poll_timeout = 10 + +[tracer.tcpconnecttracer] +disabled = false +poll_timeout = 10 [tracer.unametracer] disabled = false @@ -38,14 +48,20 @@ enable_cache = true [tracer.opentracer] disabled = false +attach_event = "do_sys_openat2" +poll_timeout = 10 [collector] disabled = false +include_extension = true [collector.dbcollector] disabled = false id = "unittest" +[collector.dbcollector.backend_args] +max_workers = 10 + [collector.dbcollector.db] table_prefix = "duetector_tracking" @@ -57,17 +73,36 @@ disabled = true id = "unittest" maxlen = 1024 +[collector.dequecollector.backend_args] +max_workers = 10 + [monitor.bcc] disabled = false auto_init = true +continue_on_exception = true + +[monitor.bcc.backend_args] +max_workers = 10 + +[monitor.bcc.poller] +interval_ms = 500 [monitor.sh] disabled = false auto_init = true timeout = 5 +[monitor.sh.backend_args] +max_workers = 10 + +[monitor.sh.poller] +interval_ms = 500 + [db_analyzer.db] table_prefix = "duetector_tracking" [db_analyzer.db.engine] url = "sqlite:///:memory:" + +[server] +token = "" diff --git a/tests/service/test_query.py b/tests/service/test_query.py new file mode 100644 index 0000000..48378a0 --- /dev/null +++ b/tests/service/test_query.py @@ -0,0 +1,40 @@ +import pytest +from fastapi.testclient import TestClient + +from duetector.analyzer.db import DBAnalyzer +from duetector.service.app import app +from duetector.service.config import get_config + + +@pytest.fixture +def configed_app(full_config): + app.dependency_overrides = {get_config: lambda: full_config} + return app + + +@pytest.fixture +def client(configed_app): + with TestClient(configed_app) as client: + yield client + + +def test_query(client: TestClient): + response = client.get(f"/query/") + assert response.status_code == 200 + assert response.json() == {"analyzers": [DBAnalyzer.config_scope]} + + +def test_query_brief(client: TestClient): + response = client.get(f"/query/{DBAnalyzer.config_scope}/brief") + assert response.status_code == 200 + assert response.json() + + +def test_query_analyzer(client: TestClient): + response = client.post(f"/query/{DBAnalyzer.config_scope}") + assert response.status_code == 200 + assert response.json() + + +if __name__ == "__main__": + pytest.main(["-vv", "-s", __file__]) diff --git a/tests/service/test_token.py b/tests/service/test_token.py new file mode 100644 index 0000000..bb43ec2 --- /dev/null +++ b/tests/service/test_token.py @@ -0,0 +1,28 @@ +import pytest +from fastapi.testclient import TestClient + +from duetector.service.app import app +from duetector.service.config import get_config + +app.dependency_overrides = { + get_config: lambda: { + "server": { + "token": "test_token", + } + } +} + + +@pytest.fixture +def client(): + with TestClient(app) as client: + yield client + + +def test_root(client: TestClient): + response = client.get("/", params={"token": "test_token"}) + assert response.status_code == 200 + + +if __name__ == "__main__": + pytest.main(["-vv", "-s", __file__]) diff --git a/tests/test_db_analyzer.py b/tests/test_db_analyzer.py index 1595687..ba4a05f 100644 --- a/tests/test_db_analyzer.py +++ b/tests/test_db_analyzer.py @@ -101,8 +101,12 @@ def test_brief(db_analyzer: DBAnalyzer, a_tracking, collector_id): assert not db_analyzer.brief(tracers=["not-exist"]).tracers assert not db_analyzer.brief(collector_ids=["not-exist"]).collector_ids - assert not db_analyzer.brief(start_datetime=now + timedelta(days=1)).briefs[0].count - assert not db_analyzer.brief(end_datetime=now - timedelta(days=1)).briefs[0].count + assert not list(db_analyzer.brief(start_datetime=now + timedelta(days=1)).briefs.values())[ + 0 + ].count + assert not list(db_analyzer.brief(end_datetime=now - timedelta(days=1)).briefs.values())[ + 0 + ].count if __name__ == "__main__": diff --git a/tests/test_service.py b/tests/test_service.py deleted file mode 100644 index 15c51b9..0000000 --- a/tests/test_service.py +++ /dev/null @@ -1,5 +0,0 @@ -# TODO - - -def test_service(): - pass