Skip to content

Commit

Permalink
added timings to time and distance filter
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Nov 5, 2024
1 parent f4f6d19 commit 143f0d6
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import attrdict as ad
import numpy as np
import datetime as pydt
import time

# Our imports
import emission.analysis.point_features as pf
Expand All @@ -20,6 +21,9 @@

import emission.analysis.intake.segmentation.restart_checking as eaisr
import emission.analysis.intake.segmentation.trip_segmentation_methods.trip_end_detection_corner_cases as eaistc
import emission.storage.decorations.stats_queries as esds
import emission.core.wrapper.pipelinestate as ecwp
import emission.core.timer as ect

class DwellSegmentationDistFilter(eaist.TripSegmentationMethod):
def __init__(self, time_threshold, point_threshold, distance_threshold):
Expand All @@ -46,9 +50,23 @@ def segment_into_trips(self, timeseries, time_query):
data that they want from the sensor streams in order to determine the
segmentation points.
"""
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
self.filtered_points_df.loc[:,"valid"] = True
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
# Timer for fetching filtered location points
with ect.Timer() as t_get_filtered_points_df:
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
# Assuming all rows have the same user_id, extract from the first row
user_id = self.filtered_points_df.iloc[0]['user_id'] if not self.filtered_points_df.empty else None
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_filtered_location", time.time(), t_get_filtered_points_df.elapsed)

# Timer for setting 'valid' column
with ect.Timer() as t_set_valid_column:
self.filtered_points_df.loc[:, "valid"] = True
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_valid_column", time.time(), t_set_valid_column.elapsed)

# Timer for fetching transition data
with ect.Timer() as t_get_transition_df:
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df", time.time(), t_get_transition_df.elapsed)

if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
else:
Expand All @@ -62,51 +80,78 @@ def segment_into_trips(self, timeseries, time_query):
last_trip_end_point = None
curr_trip_start_point = None
just_ended = True
for idx, row in self.filtered_points_df.iterrows():
currPoint = ad.AttrDict(row)
currPoint.update({"idx": idx})
logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30)
if curr_trip_start_point is None:
logging.debug("Appending currPoint because the current start point is None")
# segmentation_points.append(currPoint)

# Timer for the entire loop over filtered points
with ect.Timer() as t_loop_over_points:
for idx, row in self.filtered_points_df.iterrows():
# Timer for processing each row
with ect.Timer() as t_process_row:
currPoint = ad.AttrDict(row)
currPoint.update({"idx": idx})
logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30)
if curr_trip_start_point is None:
logging.debug("Appending currPoint because the current start point is None")
# segmentation_points.append(currPoint)

if just_ended:
if self.continue_just_ended(idx, currPoint, self.filtered_points_df):
# We have "processed" the currPoint by deciding to glom it
self.last_ts_processed = currPoint.metadata_write_ts
continue
# else:
# Here's where we deal with the start trip. At this point, the
# distance is greater than the filter.
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False
else:
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
lastPoint = self.find_last_valid_point(idx)
if self.has_trip_ended(lastPoint, currPoint, timeseries):
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx-1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
just_ended = True
# Now, we have finished processing the previous point as a trip
# end or not. But we still need to process this point by seeing
# whether it should represent a new trip start, or a glom to the
# previous trip
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
if just_ended:
if self.continue_just_ended(idx, currPoint, self.filtered_points_df):
# We have "processed" the currPoint by deciding to glom it
self.last_ts_processed = currPoint.metadata_write_ts
continue
# else:
# Here's where we deal with the start trip. At this point, the
# distance is greater than the filter.
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False

else:
# Timer for selecting last 10 points
with ect.Timer() as t_select_last10Points:
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
# Using .iloc just ends up including points after this one.
# So we reset_index upstream and use it here.
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = self.filtered_points_df.iloc[max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1]
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/select_last10Points", time.time(), t_select_last10Points.elapsed)

# Timer for finding the last valid point
with ect.Timer() as t_find_last_valid_point:
lastPoint = self.find_last_valid_point(idx)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/find_last_valid_point", time.time(), t_find_last_valid_point.elapsed)

# Timer for checking if trip has ended
with ect.Timer() as t_has_trip_ended:
trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended", time.time(), t_has_trip_ended.elapsed)

if trip_ended:
# Timer for appending segmentation points and logging
with ect.Timer() as t_append_segmentation:
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx - 1))
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
# We have processed everything up to the trip end by marking it as a completed trip
self.last_ts_processed = currPoint.metadata_write_ts
just_ended = True

# Timer for checking if we should set a new trip start point
with ect.Timer() as t_continue_just_ended:
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
sel_point = currPoint
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
curr_trip_start_point = sel_point
just_ended = False
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start", time.time(), t_continue_just_ended.elapsed)
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/append_segmentation", time.time(), t_append_segmentation.elapsed)
# Store elapsed time for processing the row
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/process_row", time.time(), t_process_row.elapsed)
# Store elapsed time for looping over points
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop_over_points", time.time(), t_loop_over_points.elapsed)

# Since we only end a trip when we start a new trip, this means that
# the last trip that was pushed is ignored. Consider the example of
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
Expand All @@ -133,15 +178,20 @@ def segment_into_trips(self, timeseries, time_query):
# data for efficiency reasons? Therefore, we also check to see if there
# is a trip_end_detected in this timeframe after the last point. If so,
# then we end the trip at the last point that we have.

if not just_ended and len(self.transition_df) > 0:
stopped_moving_after_last = self.transition_df[(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
# Timer for handling the final trip end point
with ect.Timer() as t_handle_final_trip_end:
stopped_moving_after_last = self.transition_df[(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)]
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
if len(stopped_moving_after_last) > 0:
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
segmentation_points.append((curr_trip_start_point, currPoint))
self.last_ts_processed = currPoint.metadata_write_ts
else:
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_final_trip_end", time.time(), t_handle_final_trip_end.elapsed)

return segmentation_points

def has_trip_ended(self, lastPoint, currPoint, timeseries):
Expand Down
Loading

0 comments on commit 143f0d6

Please sign in to comment.