Skip to content

Commit

Permalink
PIP Server Implementation (#61)
Browse files Browse the repository at this point in the history
* Add token validation

* Switch token to security and testing it

* Intro httpx for testing

* Support query and brief in server

* Fix testing

* Intro fast release for mutex

* Testing query
  • Loading branch information
Wh1isper authored Sep 15, 2023
1 parent 2b75221 commit 4cb165e
Show file tree
Hide file tree
Showing 26 changed files with 414 additions and 34 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
omit = duetector/cli/*
11 changes: 10 additions & 1 deletion dev-tools/entrypoint-server.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import os

import importlib_metadata

os.chdir(os.path.dirname(os.path.abspath(__file__)))
os.environ["DUETECTOR_LOG_LEVEL"] = "DEBUG"

import re
import sys
from pathlib import Path

from pkg_resources import load_entry_point

def load_entry_point(distribution, group, name):
dist_obj = importlib_metadata.distribution(distribution)
eps = [ep for ep in dist_obj.entry_points if ep.group == group and ep.name == name]
if not eps:
raise ImportError("Entry point %r not found" % ((group, name),))
return eps[0].load()


db_file = Path("./duetector-dbcollector.sqlite3")
config_file = Path("./config.toml")
Expand Down
11 changes: 10 additions & 1 deletion dev-tools/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import os

import importlib_metadata

os.chdir(os.path.dirname(os.path.abspath(__file__)))
os.environ["DUETECTOR_LOG_LEVEL"] = "DEBUG"

import re
import sys
from pathlib import Path

from pkg_resources import load_entry_point

def load_entry_point(distribution, group, name):
dist_obj = importlib_metadata.distribution(distribution)
eps = [ep for ep in dist_obj.entry_points if ep.group == group and ep.name == name]
if not eps:
raise ImportError("Entry point %r not found" % ((group, name),))
return eps[0].load()


db_file = Path("./duetector-dbcollector.sqlite3")
config_file = Path("./config.toml")
Expand Down
2 changes: 2 additions & 0 deletions duetector/analyzer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def brief(
end_datetime: Optional[datetime] = None,
with_details: bool = True,
distinct: bool = False,
inspect_type: bool = False,
) -> AnalyzerBrief:
"""
Get a brief of this analyzer.
Expand All @@ -97,6 +98,7 @@ def brief(
end_datetime (Optional[datetime], optional): End time. Defaults to None.
with_details (bool, optional): With details. Defaults to True.
distinct (bool, optional): Distinct. Defaults to False.
inspect_type (bool, optional): Weather fileds's value is type or type name. Defaults to False, type name.
Returns:
AnalyzerBrief: A brief of this analyzer.
Expand Down
26 changes: 21 additions & 5 deletions duetector/analyzer/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from duetector.analyzer.base import Analyzer
from duetector.analyzer.models import AnalyzerBrief, Brief, Tracking
from duetector.db import SessionManager
from duetector.log import logger


class DBAnalyzer(Analyzer):
Expand Down Expand Up @@ -140,6 +141,7 @@ def query(
if order_by_desc:
statm = statm.order_by(*[getattr(m, k).desc() for k in order_by_desc])

logger.debug(f"Querying {tracer}@{collector_id} with statm: {statm}")
with self.sm.begin() as session:
r.extend(
[
Expand Down Expand Up @@ -174,6 +176,7 @@ def _table_brief(
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
inspect: bool = True,
inspect_type: bool = False,
distinct: bool = False,
) -> Brief:
"""
Expand All @@ -191,7 +194,12 @@ def _table_brief(
m = self.sm.get_tracking_model(tracer, collector_id)

if not inspect:
return Brief(tracer=tracer, collector_id=collector_id, fields=m.inspect_fields())
logger.debug(f"Briefing {tracer}@{collector_id} without inspect")
return Brief(
tracer=tracer,
collector_id=collector_id,
fields=m.inspect_fields(value_as_type=inspect_type),
)
columns = m.inspect_fields().keys()
statm = select(*[getattr(m, k) for k in columns])
if distinct:
Expand All @@ -204,6 +212,7 @@ def _table_brief(
start_statm = statm.order_by(m.dt.asc())
end_statm = statm.order_by(m.dt.desc())
count_statm = select(func.count()).select_from(statm.subquery())
logger.debug(f"Briefing {tracer}@{collector_id} with statm: {start_statm}")
with self.sm.begin() as session:
start_tracking = self._convert_row_to_tracking(
columns, session.execute(start_statm).first(), tracer
Expand All @@ -218,7 +227,7 @@ def _table_brief(
start=start_tracking.dt,
end=end_tracking.dt,
count=session.execute(count_statm).scalar(),
fields=m.inspect_fields(),
fields=m.inspect_fields(value_as_type=inspect_type),
)

def _convert_row_to_tracking(self, columns: List[str], row: Any, tracer: str) -> Tracking:
Expand Down Expand Up @@ -246,6 +255,7 @@ def brief(
end_datetime: Optional[datetime] = None,
with_details: bool = True,
distinct: bool = False,
inspect_type: bool = False,
) -> AnalyzerBrief:
"""
Get a brief of this analyzer.
Expand All @@ -265,21 +275,27 @@ def brief(
Returns:
AnalyzerBrief: A brief of this analyzer.
"""

tables = self.sm.inspect_all_tables()
if tracers:
tables = [t for t in tables if self.sm.table_name_to_tracer(t) in tracers]
if collector_ids:
tables = [t for t in tables if self.sm.table_name_to_collector_id(t) in collector_ids]

briefs = [
briefs: List[Brief] = [
self._table_brief(
t, start_datetime, end_datetime, inspect=with_details, distinct=distinct
t,
start_datetime,
end_datetime,
inspect=with_details,
distinct=distinct,
inspect_type=inspect_type,
)
for t in tables
]

return AnalyzerBrief(
tracers=set([brief.tracer for brief in briefs]),
collector_ids=set([brief.collector_id for brief in briefs]),
briefs=briefs,
briefs={f"{brief.tracer}@{brief.collector_id}": brief for brief in briefs},
)
8 changes: 5 additions & 3 deletions duetector/analyzer/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Brief(pydantic.BaseModel):
fields: Dict[str, Any] = {}

def __repr__(self):
fields_repr = ", ".join([f"{k}: {v.__name__}" for k, v in self.fields.items()])
fields_repr = ", ".join([f"{k}: {v}" for k, v in self.fields.items()])

s = f"""
{self.tracer}@{self.collector_id} with {self.count} records
Expand Down Expand Up @@ -97,10 +97,12 @@ class AnalyzerBrief(pydantic.BaseModel):
Set of collector ids
"""

briefs: List[Brief]
briefs: Dict[str, Brief]

def __repr__(self):
briefs_repr = "\n".join([f"\n----------------{b}----------------" for b in self.briefs])
briefs_repr = "\n".join(
[f"\n----------------{b}----------------" for b in self.briefs.values()]
)
s = f"""
Available tracers: {self.tracers}
Available collector ids: {self.collector_ids}
Expand Down
2 changes: 1 addition & 1 deletion duetector/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _shutdown(sig=None, frame=None):
if brief:
try:
logger.info("Generating brief...")
logger.info(str(DBAnalyzer(c).brief()))
logger.info(str(DBAnalyzer(c).brief(inspect_type=False)))
except Exception as e:
logger.error("Exception when generating brief")
logger.exception(e)
Expand Down
19 changes: 13 additions & 6 deletions duetector/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ def to_analyzer_tracking(self) -> AT:
raise NotImplementedError

@classmethod
def inspect_fields(cls) -> Dict[str, Any]:
"""
Inspect fields of this model.
"""
def inspect_fields(
cls,
value_as_type: bool = False,
) -> Dict[str, Any]:
raise NotImplementedError


Expand Down Expand Up @@ -210,6 +210,8 @@ def get_tracking_model(
Returns:
type: a sqlalchemy model for tracking
"""
if tracer in self._tracking_models:
return self._tracking_models[tracer]

with self.mutex:
if tracer in self._tracking_models:
Expand Down Expand Up @@ -248,9 +250,14 @@ def to_analyzer_tracking(self) -> AT:
)

@classmethod
def inspect_fields(cls) -> Dict[str, Any]:
def inspect_fields(
cls,
value_as_type: bool = False,
) -> Dict[str, Any]:
return {
c.name: c.type.python_type for c in cls.__table__.columns if c.name != "id"
c.name: c.type.python_type if value_as_type else c.type.python_type.__name__
for c in cls.__table__.columns
if c.name != "id"
}

try:
Expand Down
23 changes: 22 additions & 1 deletion duetector/service/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
from fastapi import FastAPI
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import APIKeyQuery
from starlette.status import HTTP_403_FORBIDDEN

from duetector.__init__ import __version__
from duetector.service.config import Config, get_server_config
from duetector.service.control.routes import r as cr
from duetector.service.query.routes import r as qr


async def verify_token(
server_config: Config = Depends(get_server_config),
token: str = Depends(APIKeyQuery(name="token", auto_error=False)),
):
if server_config.token and token != server_config.token:
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Not authenticated")


app = FastAPI(
title="Duetector",
description="Data Usage Extensible Detector for data usage observability",
version=__version__,
dependencies=[Depends(verify_token)],
)
app.include_router(qr)
app.include_router(cr)


@app.get("/")
async def root():
"""
Just a simple health check, returns a message.
"""
return {"message": "Hello World"}
22 changes: 22 additions & 0 deletions duetector/service/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Any, Dict, Optional

from fastapi import Depends

from duetector.config import Configuable
from duetector.service.config import get_config


class Controller(Configuable):
config_scope = None

default_config = {}

def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):
super().__init__(config, *args, **kwargs)


def get_controller(controller_type: type):
def _(config: dict = Depends(get_config)) -> Controller:
return controller_type(config)

return _
15 changes: 14 additions & 1 deletion duetector/service/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
from typing import Any, Dict

from fastapi import Depends

try:
from functools import cache
except ImportError:
from functools import lru_cache as cache

from duetector.config import ConfigLoader
from duetector.config import Config, ConfigLoader, Configuable

CONFIG_PATH_ENV = "DUETECTOR_SERVER_CONFIG_PATH"

Expand All @@ -24,3 +26,14 @@ def get_config() -> Dict[str, Any]:
load_env=False,
dump_when_load=False,
).load_config()


class ServerConfig(Configuable):
config_scope = "server"
default_config = {"token": ""}


def get_server_config(
config: Dict[str, Any] = Depends(get_config),
) -> Config:
return ServerConfig(config).config
5 changes: 5 additions & 0 deletions duetector/service/control/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from duetector.service.base import Controller


class DaemonControler(Controller):
pass
Empty file.
3 changes: 2 additions & 1 deletion duetector/service/control/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

r = APIRouter(
prefix="/control",
tags=["control"],
)


@r.get("/")
async def root(config: dict = Depends(get_config)):
return config
pass
9 changes: 9 additions & 0 deletions duetector/service/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from fastapi import HTTPException
from starlette.status import HTTP_404_NOT_FOUND


class NotFoundError(HTTPException):
def __init__(self, what: str = ""):
if what:
what = f"{what} "
super().__init__(status_code=HTTP_404_NOT_FOUND, detail=f"{what}Not found")
Loading

0 comments on commit 4cb165e

Please sign in to comment.