From dc7ea5f26fadd621bbdec980c81c5eeb48b56517 Mon Sep 17 00:00:00 2001 From: AlirezaRa94 Date: Tue, 18 Apr 2023 14:21:32 +1000 Subject: [PATCH 1/3] Added date filters to the time query of trips. --- utils/db_utils.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/utils/db_utils.py b/utils/db_utils.py index efb522e..408f1df 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -43,19 +43,12 @@ def query_uuids(start_date, end_date): return df def query_confirmed_trips(start_date, end_date): - query = { - '$and': [ - {'metadata.key': 'analysis/confirmed_trip'}, - {'data.start_ts': {'$exists': True}}, - ] - } + start_ts, end_ts = None, datetime.max.timestamp() if start_date is not None: - start_time = datetime.combine(start_date, datetime.min.time()) - query['$and'][1]['data.start_ts']['$gte'] = start_time.timestamp() + start_ts = datetime.combine(start_date, datetime.min.time()).timestamp() if end_date is not None: - end_time = datetime.combine(end_date, datetime.max.time()) - query['$and'][1]['data.start_ts']['$lt'] = end_time.timestamp() + end_ts = datetime.combine(end_date, datetime.max.time()).timestamp() projection = { '_id': 0, @@ -76,9 +69,10 @@ def query_confirmed_trips(start_date, end_date): ts = esta.TimeSeries.get_aggregate_time_series() # Note to self, allow end_ts to also be null in the timequery # we can then remove the start_time, end_time logic - # Alireza TODO: Replace with proper parsing for the start and end timestamp - entries = ts.find_entries(["analysis/confirmed_trip"], - time_query = estt.TimeQuery("data.start_ts", 0, sys.maxsize)) + entries = ts.find_entries( + key_list=["analysis/confirmed_trip"], + time_query=estt.TimeQuery("data.start_ts", start_ts, end_ts), + ) df = pd.json_normalize(list(entries)) # Alireza TODO: Make this be configurable, to support only the projection needed # logging.warn("Before filtering, df columns are %s" % df.columns) From 8fe0f5268b7cf359b1b9a4a69248c8d32822dcba Mon Sep 17 00:00:00 2001 From: AlirezaRa94 Date: Tue, 18 Apr 2023 16:40:02 +1000 Subject: [PATCH 2/3] Configured needed columns for trips dataframe. --- pages/data.py | 14 ++++++++------ utils/constants.py | 20 +++++++++++++++++--- utils/db_utils.py | 45 ++++++++++++++------------------------------ utils/permissions.py | 36 ++++++++++++++++++++++++++--------- 4 files changed, 66 insertions(+), 49 deletions(-) diff --git a/pages/data.py b/pages/data.py index e10394d..1fbfe3b 100644 --- a/pages/data.py +++ b/pages/data.py @@ -9,7 +9,7 @@ import pandas as pd from dash.exceptions import PreventUpdate -from utils.permissions import has_permission, get_uuids_columns, get_trips_columns, get_additional_trip_columns +from utils import permissions as perm_utils register_page(__name__, path="/data") @@ -38,13 +38,15 @@ def render_content(tab, store_uuids, store_trips): data, columns, has_perm = None, [], False if tab == 'tab-uuids-datatable': data = store_uuids["data"] - columns = get_uuids_columns() - has_perm = has_permission('data_uuids') + columns = perm_utils.get_uuids_columns() + has_perm = perm_utils.has_permission('data_uuids') elif tab == 'tab-trips-datatable': data = store_trips["data"] - columns = get_trips_columns() - columns.update(set(col['label'] for col in get_additional_trip_columns())) - has_perm = has_permission('data_trips') + columns = perm_utils.get_allowed_trip_columns() + columns.update( + col['label'] for col in perm_utils.get_allowed_named_trip_columns() + ) + has_perm = perm_utils.has_permission('data_trips') df = pd.DataFrame(data) if df.empty or not has_perm: return None diff --git a/utils/constants.py b/utils/constants.py index ecbc67d..0c79bdc 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -1,17 +1,25 @@ -valid_trip_columns = [ +REQUIRED_NAMED_COLS = [ + {'label': 'user_id', 'path': 'user_id'}, + {'label': 'trip_start_time_str', 'path': 'data.start_fmt_time'}, + {'label': 'trip_end_time_str', 'path': 'data.end_fmt_time'}, + {'label': 'start_coordinates', 'path': 'data.start_loc.coordinates'}, + {'label': 'end_coordinates', 'path': 'data.end_loc.coordinates'}, +] + +VALID_TRIP_COLS = [ "data.source", "data.start_ts", "data.start_local_dt", "data.start_fmt_time", "data.start_place", - "data.start_loc", "data.end_ts", "data.end_local_dt", "data.end_fmt_time", "data.end_place", - "data.end_loc", "data.duration", "data.distance", + "data.start_loc.coordinates", + "data.end_loc.coordinates", "metadata.key", "metadata.platform", "metadata.write_ts", @@ -21,6 +29,12 @@ "user_id" ] +BINARY_TRIP_COLS = [ + 'user_id', + 'data.start_place', + 'data.end_place', +] + valid_uuids_columns = [ 'user_token', 'user_id', diff --git a/utils/db_utils.py b/utils/db_utils.py index 408f1df..7aee1ea 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -1,14 +1,14 @@ +import logging from datetime import datetime, timezone -import sys import pandas as pd -import logging import emission.core.get_database as edb import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.timeseries.timequery as estt -from utils.permissions import get_trips_columns, get_additional_trip_columns +from utils import constants +from utils.permissions import get_all_trip_columns, get_all_named_trip_columns def query_uuids(start_date, end_date): @@ -50,22 +50,6 @@ def query_confirmed_trips(start_date, end_date): if end_date is not None: end_ts = datetime.combine(end_date, datetime.max.time()).timestamp() - projection = { - '_id': 0, - 'user_id': 1, - 'trip_start_time_str': '$data.start_fmt_time', - 'trip_end_time_str': '$data.end_fmt_time', - 'timezone': '$data.start_local_dt.timezone', - 'start_coordinates': '$data.start_loc.coordinates', - 'end_coordinates': '$data.end_loc.coordinates', - } - - for column in get_trips_columns(): - projection[column] = 1 - - for column in get_additional_trip_columns(): - projection[column['label']] = column['path'] - ts = esta.TimeSeries.get_aggregate_time_series() # Note to self, allow end_ts to also be null in the timequery # we can then remove the start_time, end_time logic @@ -74,18 +58,17 @@ def query_confirmed_trips(start_date, end_date): time_query=estt.TimeQuery("data.start_ts", start_ts, end_ts), ) df = pd.json_normalize(list(entries)) - # Alireza TODO: Make this be configurable, to support only the projection needed + # logging.warn("Before filtering, df columns are %s" % df.columns) - df = df[["user_id", "data.start_fmt_time", "data.end_fmt_time", "data.distance", "data.duration", "data.start_loc.coordinates", "data.end_loc.coordinates"]] - # logging.warn("After filtering, df columns are %s" % df.columns) if not df.empty: - df['user_id'] = df['user_id'].apply(str) - df['trip_start_time_str'] = df['data.start_fmt_time'] - df['trip_end_time_str'] = df['data.end_fmt_time'] - df['start_coordinates'] = df['data.start_loc.coordinates'] - df['end_coordinates'] = df['data.end_loc.coordinates'] - if 'data.start_place' in df.columns: - df['data.start_place'] = df['data.start_place'].apply(str) - if 'data.end_place' in df.columns: - df['data.end_place'] = df['data.end_place'].apply(str) + columns = [col for col in get_all_trip_columns() if col in df.columns] + df = df[columns] + for col in constants.BINARY_TRIP_COLS: + if col in df.columns: + df[col] = df[col].apply(str) + for named_col in get_all_named_trip_columns(): + if named_col['path'] in df.columns: + df[named_col['label']] = df[named_col['path']] + + # logging.warn("After filtering, df columns are %s" % df.columns) return df diff --git a/utils/permissions.py b/utils/permissions.py index ecdbcc2..2b6337c 100644 --- a/utils/permissions.py +++ b/utils/permissions.py @@ -3,8 +3,7 @@ import requests -from utils.constants import valid_trip_columns, valid_uuids_columns - +from utils import constants STUDY_NAME = os.getenv('STUDY_NAME') PATH = os.getenv('CONFIG_PATH') @@ -17,15 +16,38 @@ def has_permission(perm): return True if permissions.get(perm) is True else False -def get_trips_columns(): - columns = set(valid_trip_columns) +def get_allowed_named_trip_columns(): + return permissions.get('additional_trip_columns', []) + + +def get_all_named_trip_columns(): + named_columns = constants.REQUIRED_NAMED_COLS + named_columns.extend( + get_allowed_named_trip_columns() + ) + return named_columns + +def get_all_trip_columns(): + columns = set() + columns.update(get_allowed_trip_columns()) + columns.update( + col['path'] for col in get_allowed_named_trip_columns() + ) + columns.update( + col['path'] for col in constants.REQUIRED_NAMED_COLS + ) + return columns + + +def get_allowed_trip_columns(): + columns = set(constants.VALID_TRIP_COLS) for column in permissions.get("data_trips_columns_exclude", []): columns.discard(column) return columns def get_uuids_columns(): - columns = set(valid_uuids_columns) + columns = set(constants.valid_uuids_columns) for column in permissions.get("data_uuids_columns_exclude", []): columns.discard(column) return columns @@ -33,7 +55,3 @@ def get_uuids_columns(): def get_token_prefix(): return permissions['token_prefix'] + '_' if permissions.get('token_prefix') else '' - - -def get_additional_trip_columns(): - return permissions.get('additional_trip_columns', []) From d977851530c2b6e23ef1978f3643befb93bad26a Mon Sep 17 00:00:00 2001 From: AlirezaRa94 Date: Mon, 24 Apr 2023 14:15:44 +1000 Subject: [PATCH 3/3] Drop paths of named columns after labling. --- utils/constants.py | 1 - utils/db_utils.py | 7 ++++--- utils/permissions.py | 12 ++++++++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/utils/constants.py b/utils/constants.py index 0c79bdc..7863658 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -1,5 +1,4 @@ REQUIRED_NAMED_COLS = [ - {'label': 'user_id', 'path': 'user_id'}, {'label': 'trip_start_time_str', 'path': 'data.start_fmt_time'}, {'label': 'trip_end_time_str', 'path': 'data.end_fmt_time'}, {'label': 'start_coordinates', 'path': 'data.start_loc.coordinates'}, diff --git a/utils/db_utils.py b/utils/db_utils.py index 7aee1ea..8570f85 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -8,7 +8,7 @@ import emission.storage.timeseries.timequery as estt from utils import constants -from utils.permissions import get_all_trip_columns, get_all_named_trip_columns +from utils import permissions as perm_utils def query_uuids(start_date, end_date): @@ -61,14 +61,15 @@ def query_confirmed_trips(start_date, end_date): # logging.warn("Before filtering, df columns are %s" % df.columns) if not df.empty: - columns = [col for col in get_all_trip_columns() if col in df.columns] + columns = [col for col in perm_utils.get_all_trip_columns() if col in df.columns] df = df[columns] for col in constants.BINARY_TRIP_COLS: if col in df.columns: df[col] = df[col].apply(str) - for named_col in get_all_named_trip_columns(): + for named_col in perm_utils.get_all_named_trip_columns(): if named_col['path'] in df.columns: df[named_col['label']] = df[named_col['path']] + df = df.drop(columns=[named_col['path']]) # logging.warn("After filtering, df columns are %s" % df.columns) return df diff --git a/utils/permissions.py b/utils/permissions.py index 2b6337c..493f3f0 100644 --- a/utils/permissions.py +++ b/utils/permissions.py @@ -20,8 +20,14 @@ def get_allowed_named_trip_columns(): return permissions.get('additional_trip_columns', []) +def get_required_columns(): + required_cols = {'user_id'} + required_cols.update(col['path'] for col in constants.REQUIRED_NAMED_COLS) + return required_cols + + def get_all_named_trip_columns(): - named_columns = constants.REQUIRED_NAMED_COLS + named_columns = [item for item in constants.REQUIRED_NAMED_COLS] named_columns.extend( get_allowed_named_trip_columns() ) @@ -33,9 +39,7 @@ def get_all_trip_columns(): columns.update( col['path'] for col in get_allowed_named_trip_columns() ) - columns.update( - col['path'] for col in constants.REQUIRED_NAMED_COLS - ) + columns.update(get_required_columns()) return columns