Skip to content

Commit

Permalink
Support tracers, collector_ids and distinct
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Sep 12, 2023
1 parent 0e726ac commit 1badf4e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 31 deletions.
98 changes: 73 additions & 25 deletions duetector/analyzer/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def __init__(self, config: Optional[Dict[str, Any]] = None, *args, **kwargs):

def query(
self,
tracer: Optional[str] = None,
collector_id: Optional[str] = None,
tracers: Optional[List[str]] = None,
collector_ids: Optional[List[str]] = None,
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
start: int = 0,
Expand All @@ -93,8 +93,8 @@ def query(
Query all tracking records from database.
Args:
tracer (Optional[str], optional): Tracer's name. Defaults to None, all tracers will be queried.
collector_id (Optional[str], optional): Collector id. Defaults to None, all collector id will be queried.
tracers (Optional[List[str]], optional): Tracer's name. Defaults to None, all tracers will be queried.
collector_ids (Optional[List[str]], optional): Collector id. Defaults to None, all collector id will be queried.
start_datetime (Optional[datetime], optional): Start time. Defaults to None.
end_datetime (Optional[datetime], optional): End time. Defaults to None.
start (int, optional): Start index. Defaults to 0.
Expand All @@ -110,10 +110,10 @@ def query(
"""

tables = self.sm.inspect_all_tables()
if tracer:
tables = [t for t in tables if self.sm.table_name_to_tracer(t) == tracer]
if collector_id:
tables = [t for t in tables if self.sm.table_name_to_collector_id(t) == collector_id]
if tracers:
tables = [t for t in tables if self.sm.table_name_to_tracer(t) in tracers]
if collector_ids:
tables = [t for t in tables if self.sm.table_name_to_collector_id(t) in collector_ids]

r = []
for t in tables:
Expand Down Expand Up @@ -141,7 +141,7 @@ def query(
with self.sm.begin() as session:
r.extend(
[
Tracking(tracer=tracer, **{k: v for k, v in zip(columns, r)})
self._convert_row_to_tracking(columns, r, tracer)
for r in session.execute(statm).fetchall()
]
)
Expand Down Expand Up @@ -172,6 +172,7 @@ def _table_brief(
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
inspect: bool = True,
distinct: bool = False,
) -> Brief:
"""
Get a brief of a table.
Expand All @@ -184,48 +185,95 @@ def _table_brief(
"""
tracer = self.sm.table_name_to_tracer(table_name)
collector_id = self.sm.table_name_to_collector_id(table_name)
if not inspect:
return Brief(tracer=tracer, collector_id=collector_id)

m = self.sm.get_tracking_model(tracer, collector_id)

start_statm = select(m).order_by(m.dt.asc())
end_statm = select(m).order_by(m.dt.desc())
count_statm = select(func.count()).select_from(m)
if not inspect:
return Brief(tracer=tracer, collector_id=collector_id, fields=m.inspect_fields())
columns = m.inspect_fields().keys()
statm = select(*[getattr(m, k) for k in columns])
if distinct:
statm = statm.distinct()
if start_datetime:
start_statm = start_statm.where(m.dt >= start_datetime)
count_statm = count_statm.where(m.dt >= start_datetime)
statm = statm.where(m.dt >= start_datetime)
if end_datetime:
end_statm = end_statm.where(m.dt <= end_datetime)
count_statm = count_statm.where(m.dt <= end_datetime)
statm = statm.where(m.dt <= end_datetime)

start_statm = statm.order_by(m.dt.asc())
end_statm = statm.order_by(m.dt.desc())
count_statm = select(func.count()).select_from(statm.subquery())
with self.sm.begin() as session:
start_tracking = self._convert_row_to_tracking(
columns, session.execute(start_statm).first(), tracer
)
end_tracking = self._convert_row_to_tracking(
columns, session.execute(end_statm).first(), tracer
)

return Brief(
tracer=tracer,
collector_id=collector_id,
start=session.execute(start_statm).first()[0].dt,
end=session.execute(end_statm).first()[0].dt,
start=start_tracking.dt,
end=end_tracking.dt,
count=session.execute(count_statm).scalar(),
fields=m.inspect_fields(),
)

def _convert_row_to_tracking(self, columns: List[str], row: Any, tracer: str) -> Tracking:
"""
Convert a row to a tracking record.
Args:
columns (List[str]): Columns.
row (Any): Row.
tracer (str): Tracer's name.
Returns:
duetector.analyzer.models.Tracking: A tracking record.
"""
if not row:
return Tracking(tracer=tracer)

return Tracking(tracer=tracer, **{k: v for k, v in zip(columns, row)})

def brief(
self,
tracers: Optional[List[str]] = None,
collector_ids: Optional[List[str]] = None,
start_datetime: Optional[datetime] = None,
end_datetime: Optional[datetime] = None,
with_details: bool = True,
distinct: bool = False,
) -> AnalyzerBrief:
"""
Get a brief of this analyzer.
Args:
tracers (Optional[List[str]], optional):
Tracers. Defaults to None, all tracers will be queried.
If a specific tracer is not found, it will be ignored.
collector_ids (Optional[List[str]], optional):
Collector ids. Defaults to None, all collector ids will be queried.
If a specific collector id is not found, it will be ignored.
start_datetime (Optional[datetime], optional): Start time. Defaults to None.
end_datetime (Optional[datetime], optional): End time. Defaults to None.
with_details (bool, optional): With details. Defaults to True.
distinct (bool, optional): Distinct. Defaults to False.
Returns:
AnalyzerBrief: A brief of this analyzer.
TODO:
Support specify tracers/collector ids/(distinct)fields
"""
tables = self.sm.inspect_all_tables()
if tracers:
tables = [t for t in tables if self.sm.table_name_to_tracer(t) in tracers]
if collector_ids:
tables = [t for t in tables if self.sm.table_name_to_collector_id(t) in collector_ids]

briefs = [
self._table_brief(t, start_datetime, end_datetime, inspect=with_details)
for t in self.sm.inspect_all_tables()
self._table_brief(
t, start_datetime, end_datetime, inspect=with_details, distinct=distinct
)
for t in tables
]

return AnalyzerBrief(
Expand Down
24 changes: 18 additions & 6 deletions tests/test_db_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ def db_analyzer(full_config, c_tracking, collector_id):

def test_query(db_analyzer: DBAnalyzer, a_tracking, collector_id):
assert a_tracking in db_analyzer.query()
assert a_tracking in db_analyzer.query(tracer=a_tracking.tracer)
assert a_tracking in db_analyzer.query(collector_id=collector_id)
assert a_tracking in db_analyzer.query(tracer=a_tracking.tracer, collector_id=collector_id)
assert a_tracking in db_analyzer.query(tracers=[a_tracking.tracer])
assert a_tracking in db_analyzer.query(collector_ids=[collector_id])
assert a_tracking in db_analyzer.query(
tracers=[a_tracking.tracer], collector_ids=[collector_id]
)
assert a_tracking in db_analyzer.query(start_datetime=now - timedelta(days=1))
assert a_tracking in db_analyzer.query(end_datetime=now + timedelta(days=1))
assert a_tracking in db_analyzer.query(order_by_asc=["pid"])
Expand All @@ -79,8 +81,8 @@ def test_query(db_analyzer: DBAnalyzer, a_tracking, collector_id):
fname=a_tracking.fname,
) in db_analyzer.query(columns=["pid", "fname"])

assert not db_analyzer.query(tracer="not-exist")
assert not db_analyzer.query(collector_id="not-exist")
assert not db_analyzer.query(tracers=["not-exist"])
assert not db_analyzer.query(collector_ids=["not-exist"])
assert not db_analyzer.query(start_datetime=now + timedelta(days=1))
assert not db_analyzer.query(end_datetime=now - timedelta(days=1))
assert not db_analyzer.query(start=100)
Expand All @@ -89,8 +91,18 @@ def test_query(db_analyzer: DBAnalyzer, a_tracking, collector_id):

def test_brief(db_analyzer: DBAnalyzer, a_tracking, collector_id):
assert db_analyzer.brief()
assert db_analyzer.brief(tracers=[a_tracking.tracer])
assert db_analyzer.brief(collector_ids=[collector_id])
assert db_analyzer.brief(tracers=[a_tracking.tracer], collector_ids=[collector_id])
assert db_analyzer.brief(start_datetime=now - timedelta(days=1))
assert db_analyzer.brief(end_datetime=now + timedelta(days=1))
assert db_analyzer.brief(with_details=False)
# print(db_analyzer.brief())
assert db_analyzer.brief(distinct=True)

assert not db_analyzer.brief(tracers=["not-exist"]).tracers
assert not db_analyzer.brief(collector_ids=["not-exist"]).collector_ids
assert not db_analyzer.brief(start_datetime=now + timedelta(days=1)).briefs[0].count
assert not db_analyzer.brief(end_datetime=now - timedelta(days=1)).briefs[0].count


if __name__ == "__main__":
Expand Down

0 comments on commit 1badf4e

Please sign in to comment.