Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeseries query #34

Merged
merged 3 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pages/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pandas as pd
from dash.exceptions import PreventUpdate

from utils.permissions import has_permission, get_uuids_columns, get_trips_columns, get_additional_trip_columns
from utils import permissions as perm_utils

register_page(__name__, path="/data")

Expand Down Expand Up @@ -38,13 +38,15 @@ def render_content(tab, store_uuids, store_trips):
data, columns, has_perm = None, [], False
if tab == 'tab-uuids-datatable':
data = store_uuids["data"]
columns = get_uuids_columns()
has_perm = has_permission('data_uuids')
columns = perm_utils.get_uuids_columns()
has_perm = perm_utils.has_permission('data_uuids')
elif tab == 'tab-trips-datatable':
data = store_trips["data"]
columns = get_trips_columns()
columns.update(set(col['label'] for col in get_additional_trip_columns()))
has_perm = has_permission('data_trips')
columns = perm_utils.get_allowed_trip_columns()
columns.update(
col['label'] for col in perm_utils.get_allowed_named_trip_columns()
)
has_perm = perm_utils.has_permission('data_trips')
df = pd.DataFrame(data)
if df.empty or not has_perm:
return None
Expand Down
19 changes: 16 additions & 3 deletions utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
valid_trip_columns = [
REQUIRED_NAMED_COLS = [
{'label': 'trip_start_time_str', 'path': 'data.start_fmt_time'},
{'label': 'trip_end_time_str', 'path': 'data.end_fmt_time'},
{'label': 'start_coordinates', 'path': 'data.start_loc.coordinates'},
{'label': 'end_coordinates', 'path': 'data.end_loc.coordinates'},
]

VALID_TRIP_COLS = [
"data.source",
"data.start_ts",
"data.start_local_dt",
"data.start_fmt_time",
"data.start_place",
"data.start_loc",
"data.end_ts",
"data.end_local_dt",
"data.end_fmt_time",
"data.end_place",
"data.end_loc",
"data.duration",
"data.distance",
"data.start_loc.coordinates",
"data.end_loc.coordinates",
"metadata.key",
"metadata.platform",
"metadata.write_ts",
Expand All @@ -21,6 +28,12 @@
"user_id"
]

BINARY_TRIP_COLS = [
'user_id',
'data.start_place',
'data.end_place',
]

valid_uuids_columns = [
'user_token',
'user_id',
Expand Down
66 changes: 22 additions & 44 deletions utils/db_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from datetime import datetime, timezone
import sys

import pandas as pd
import logging

import emission.core.get_database as edb
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt

from utils.permissions import get_trips_columns, get_additional_trip_columns
from utils import constants
from utils import permissions as perm_utils


def query_uuids(start_date, end_date):
Expand Down Expand Up @@ -43,55 +43,33 @@ def query_uuids(start_date, end_date):
return df

def query_confirmed_trips(start_date, end_date):
query = {
'$and': [
{'metadata.key': 'analysis/confirmed_trip'},
{'data.start_ts': {'$exists': True}},
]
}
start_ts, end_ts = None, datetime.max.timestamp()
if start_date is not None:
start_time = datetime.combine(start_date, datetime.min.time())
query['$and'][1]['data.start_ts']['$gte'] = start_time.timestamp()
start_ts = datetime.combine(start_date, datetime.min.time()).timestamp()

if end_date is not None:
end_time = datetime.combine(end_date, datetime.max.time())
query['$and'][1]['data.start_ts']['$lt'] = end_time.timestamp()

projection = {
'_id': 0,
'user_id': 1,
'trip_start_time_str': '$data.start_fmt_time',
'trip_end_time_str': '$data.end_fmt_time',
'timezone': '$data.start_local_dt.timezone',
'start_coordinates': '$data.start_loc.coordinates',
'end_coordinates': '$data.end_loc.coordinates',
}

for column in get_trips_columns():
projection[column] = 1

for column in get_additional_trip_columns():
projection[column['label']] = column['path']
end_ts = datetime.combine(end_date, datetime.max.time()).timestamp()

ts = esta.TimeSeries.get_aggregate_time_series()
# Note to self, allow end_ts to also be null in the timequery
# we can then remove the start_time, end_time logic
# Alireza TODO: Replace with proper parsing for the start and end timestamp
entries = ts.find_entries(["analysis/confirmed_trip"],
time_query = estt.TimeQuery("data.start_ts", 0, sys.maxsize))
entries = ts.find_entries(
key_list=["analysis/confirmed_trip"],
time_query=estt.TimeQuery("data.start_ts", start_ts, end_ts),
)
df = pd.json_normalize(list(entries))
# Alireza TODO: Make this be configurable, to support only the projection needed

# logging.warn("Before filtering, df columns are %s" % df.columns)
df = df[["user_id", "data.start_fmt_time", "data.end_fmt_time", "data.distance", "data.duration", "data.start_loc.coordinates", "data.end_loc.coordinates"]]
# logging.warn("After filtering, df columns are %s" % df.columns)
if not df.empty:
df['user_id'] = df['user_id'].apply(str)
df['trip_start_time_str'] = df['data.start_fmt_time']
df['trip_end_time_str'] = df['data.end_fmt_time']
df['start_coordinates'] = df['data.start_loc.coordinates']
df['end_coordinates'] = df['data.end_loc.coordinates']
if 'data.start_place' in df.columns:
df['data.start_place'] = df['data.start_place'].apply(str)
if 'data.end_place' in df.columns:
df['data.end_place'] = df['data.end_place'].apply(str)
columns = [col for col in perm_utils.get_all_trip_columns() if col in df.columns]
df = df[columns]
for col in constants.BINARY_TRIP_COLS:
if col in df.columns:
df[col] = df[col].apply(str)
for named_col in perm_utils.get_all_named_trip_columns():
if named_col['path'] in df.columns:
df[named_col['label']] = df[named_col['path']]
shankari marked this conversation as resolved.
Show resolved Hide resolved
df = df.drop(columns=[named_col['path']])

# logging.warn("After filtering, df columns are %s" % df.columns)
return df
40 changes: 31 additions & 9 deletions utils/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import requests

from utils.constants import valid_trip_columns, valid_uuids_columns

from utils import constants

STUDY_NAME = os.getenv('STUDY_NAME')
PATH = os.getenv('CONFIG_PATH')
Expand All @@ -17,23 +16,46 @@ def has_permission(perm):
return True if permissions.get(perm) is True else False


def get_trips_columns():
columns = set(valid_trip_columns)
def get_allowed_named_trip_columns():
return permissions.get('additional_trip_columns', [])


def get_required_columns():
required_cols = {'user_id'}
required_cols.update(col['path'] for col in constants.REQUIRED_NAMED_COLS)
return required_cols


def get_all_named_trip_columns():
named_columns = [item for item in constants.REQUIRED_NAMED_COLS]
named_columns.extend(
get_allowed_named_trip_columns()
)
return named_columns

def get_all_trip_columns():
columns = set()
columns.update(get_allowed_trip_columns())
columns.update(
col['path'] for col in get_allowed_named_trip_columns()
)
columns.update(get_required_columns())
return columns


def get_allowed_trip_columns():
columns = set(constants.VALID_TRIP_COLS)
for column in permissions.get("data_trips_columns_exclude", []):
columns.discard(column)
return columns


def get_uuids_columns():
columns = set(valid_uuids_columns)
columns = set(constants.valid_uuids_columns)
for column in permissions.get("data_uuids_columns_exclude", []):
columns.discard(column)
return columns


def get_token_prefix():
return permissions['token_prefix'] + '_' if permissions.get('token_prefix') else ''


def get_additional_trip_columns():
return permissions.get('additional_trip_columns', [])