Skip to content

Commit

Permalink
Merge pull request #34 from AlirezaRa94/timeseries_query
Browse files Browse the repository at this point in the history
Timeseries query
  • Loading branch information
shankari authored May 1, 2023
2 parents a079493 + d977851 commit f58a052
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 62 deletions.
14 changes: 8 additions & 6 deletions pages/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pandas as pd
from dash.exceptions import PreventUpdate

from utils.permissions import has_permission, get_uuids_columns, get_trips_columns, get_additional_trip_columns
from utils import permissions as perm_utils

register_page(__name__, path="/data")

Expand Down Expand Up @@ -38,13 +38,15 @@ def render_content(tab, store_uuids, store_trips):
data, columns, has_perm = None, [], False
if tab == 'tab-uuids-datatable':
data = store_uuids["data"]
columns = get_uuids_columns()
has_perm = has_permission('data_uuids')
columns = perm_utils.get_uuids_columns()
has_perm = perm_utils.has_permission('data_uuids')
elif tab == 'tab-trips-datatable':
data = store_trips["data"]
columns = get_trips_columns()
columns.update(set(col['label'] for col in get_additional_trip_columns()))
has_perm = has_permission('data_trips')
columns = perm_utils.get_allowed_trip_columns()
columns.update(
col['label'] for col in perm_utils.get_allowed_named_trip_columns()
)
has_perm = perm_utils.has_permission('data_trips')
df = pd.DataFrame(data)
if df.empty or not has_perm:
return None
Expand Down
19 changes: 16 additions & 3 deletions utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
valid_trip_columns = [
REQUIRED_NAMED_COLS = [
{'label': 'trip_start_time_str', 'path': 'data.start_fmt_time'},
{'label': 'trip_end_time_str', 'path': 'data.end_fmt_time'},
{'label': 'start_coordinates', 'path': 'data.start_loc.coordinates'},
{'label': 'end_coordinates', 'path': 'data.end_loc.coordinates'},
]

VALID_TRIP_COLS = [
"data.source",
"data.start_ts",
"data.start_local_dt",
"data.start_fmt_time",
"data.start_place",
"data.start_loc",
"data.end_ts",
"data.end_local_dt",
"data.end_fmt_time",
"data.end_place",
"data.end_loc",
"data.duration",
"data.distance",
"data.start_loc.coordinates",
"data.end_loc.coordinates",
"metadata.key",
"metadata.platform",
"metadata.write_ts",
Expand All @@ -21,6 +28,12 @@
"user_id"
]

BINARY_TRIP_COLS = [
'user_id',
'data.start_place',
'data.end_place',
]

valid_uuids_columns = [
'user_token',
'user_id',
Expand Down
66 changes: 22 additions & 44 deletions utils/db_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from datetime import datetime, timezone
import sys

import pandas as pd
import logging

import emission.core.get_database as edb
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt

from utils.permissions import get_trips_columns, get_additional_trip_columns
from utils import constants
from utils import permissions as perm_utils


def query_uuids(start_date, end_date):
Expand Down Expand Up @@ -43,55 +43,33 @@ def query_uuids(start_date, end_date):
return df

def query_confirmed_trips(start_date, end_date):
query = {
'$and': [
{'metadata.key': 'analysis/confirmed_trip'},
{'data.start_ts': {'$exists': True}},
]
}
start_ts, end_ts = None, datetime.max.timestamp()
if start_date is not None:
start_time = datetime.combine(start_date, datetime.min.time())
query['$and'][1]['data.start_ts']['$gte'] = start_time.timestamp()
start_ts = datetime.combine(start_date, datetime.min.time()).timestamp()

if end_date is not None:
end_time = datetime.combine(end_date, datetime.max.time())
query['$and'][1]['data.start_ts']['$lt'] = end_time.timestamp()

projection = {
'_id': 0,
'user_id': 1,
'trip_start_time_str': '$data.start_fmt_time',
'trip_end_time_str': '$data.end_fmt_time',
'timezone': '$data.start_local_dt.timezone',
'start_coordinates': '$data.start_loc.coordinates',
'end_coordinates': '$data.end_loc.coordinates',
}

for column in get_trips_columns():
projection[column] = 1

for column in get_additional_trip_columns():
projection[column['label']] = column['path']
end_ts = datetime.combine(end_date, datetime.max.time()).timestamp()

ts = esta.TimeSeries.get_aggregate_time_series()
# Note to self, allow end_ts to also be null in the timequery
# we can then remove the start_time, end_time logic
# Alireza TODO: Replace with proper parsing for the start and end timestamp
entries = ts.find_entries(["analysis/confirmed_trip"],
time_query = estt.TimeQuery("data.start_ts", 0, sys.maxsize))
entries = ts.find_entries(
key_list=["analysis/confirmed_trip"],
time_query=estt.TimeQuery("data.start_ts", start_ts, end_ts),
)
df = pd.json_normalize(list(entries))
# Alireza TODO: Make this be configurable, to support only the projection needed

# logging.warn("Before filtering, df columns are %s" % df.columns)
df = df[["user_id", "data.start_fmt_time", "data.end_fmt_time", "data.distance", "data.duration", "data.start_loc.coordinates", "data.end_loc.coordinates"]]
# logging.warn("After filtering, df columns are %s" % df.columns)
if not df.empty:
df['user_id'] = df['user_id'].apply(str)
df['trip_start_time_str'] = df['data.start_fmt_time']
df['trip_end_time_str'] = df['data.end_fmt_time']
df['start_coordinates'] = df['data.start_loc.coordinates']
df['end_coordinates'] = df['data.end_loc.coordinates']
if 'data.start_place' in df.columns:
df['data.start_place'] = df['data.start_place'].apply(str)
if 'data.end_place' in df.columns:
df['data.end_place'] = df['data.end_place'].apply(str)
columns = [col for col in perm_utils.get_all_trip_columns() if col in df.columns]
df = df[columns]
for col in constants.BINARY_TRIP_COLS:
if col in df.columns:
df[col] = df[col].apply(str)
for named_col in perm_utils.get_all_named_trip_columns():
if named_col['path'] in df.columns:
df[named_col['label']] = df[named_col['path']]
df = df.drop(columns=[named_col['path']])

# logging.warn("After filtering, df columns are %s" % df.columns)
return df
40 changes: 31 additions & 9 deletions utils/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import requests

from utils.constants import valid_trip_columns, valid_uuids_columns

from utils import constants

STUDY_NAME = os.getenv('STUDY_NAME')
PATH = os.getenv('CONFIG_PATH')
Expand All @@ -17,23 +16,46 @@ def has_permission(perm):
return True if permissions.get(perm) is True else False


def get_trips_columns():
columns = set(valid_trip_columns)
def get_allowed_named_trip_columns():
return permissions.get('additional_trip_columns', [])


def get_required_columns():
required_cols = {'user_id'}
required_cols.update(col['path'] for col in constants.REQUIRED_NAMED_COLS)
return required_cols


def get_all_named_trip_columns():
named_columns = [item for item in constants.REQUIRED_NAMED_COLS]
named_columns.extend(
get_allowed_named_trip_columns()
)
return named_columns

def get_all_trip_columns():
columns = set()
columns.update(get_allowed_trip_columns())
columns.update(
col['path'] for col in get_allowed_named_trip_columns()
)
columns.update(get_required_columns())
return columns


def get_allowed_trip_columns():
columns = set(constants.VALID_TRIP_COLS)
for column in permissions.get("data_trips_columns_exclude", []):
columns.discard(column)
return columns


def get_uuids_columns():
columns = set(valid_uuids_columns)
columns = set(constants.valid_uuids_columns)
for column in permissions.get("data_uuids_columns_exclude", []):
columns.discard(column)
return columns


def get_token_prefix():
return permissions['token_prefix'] + '_' if permissions.get('token_prefix') else ''


def get_additional_trip_columns():
return permissions.get('additional_trip_columns', [])

0 comments on commit f58a052

Please sign in to comment.