Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement label inference pipeline #825

Merged
merged 8 commits into from
Jul 8, 2021
Empty file.
121 changes: 121 additions & 0 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Standard imports
import logging

# Our imports
import emission.storage.pipeline_queries as epq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.core.wrapper.labelprediction as ecwl

def infer_labels(user_id):
time_query = epq.get_time_range_for_label_inference(user_id)
try:
lip = LabelInferencePipeline()
lip.user_id = user_id
lip.run_prediction_pipeline(user_id, time_query)
if lip.last_trip_done is None:
logging.debug("After run, last_trip_done == None, must be early return")
epq.mark_label_inference_done(user_id, lip.last_trip_done)
except:
logging.exception("Error while inferring labels, timestamp is unchanged")
epq.mark_label_inference_failed(user_id)

# Code structure based on emission.analysis.classification.inference.mode.pipeline
# and emission.analysis.classification.inference.mode.rule_engine
class LabelInferencePipeline:
def __init__(self):
self._last_trip_done = None

@property
def last_trip_done(self):
return self._last_trip_done

def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
shankari marked this conversation as resolved.
Show resolved Hide resolved
for trip in self.toPredictTrips:
prediction = predict_trip(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.prediction = prediction
lp.start_ts = trip.data.start_ts
lp.end_ts = trip.data.end_ts
# Insert the Labelprediction into the database as its own independent document
self.ts.insert_data(self.user_id, esda.INFERRED_LABELS_KEY, lp)
GabrielKS marked this conversation as resolved.
Show resolved Hide resolved
if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts:
self._last_trip_done = trip

# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def predict_trip(trip):
return placeholder_prediction(trip)

# For testing only!
trip_n = 0
import random

# A placeholder predictor to allow pipeline development without a real inference algorithm
def placeholder_prediction(trip, scenario=2):
# For the moment, the system is configured to work with two labels, "mode_confirm" and
# "purpose_confirm", so I'll do that.

# For testing only!
global trip_n
trip_n %= 6
trip_n += 1

return [
# The first placeholder scenario represents a case where it is hard to distinguish between
# biking and walking (e.g., because the user is a very slow biker) and hard to distinguish
# between work and shopping at the grocery store (e.g., because the user works at the
# grocery store), but whenever the user bikes to the location it is to work and whenever
# the user walks to the location it is to shop (e.g., because they don't have a basket on
# their bike), and the user bikes to the location four times more than they walk there.
# Obviously, it is a simplification.
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],

# The next placeholder scenario provides that same set of labels in 75% of cases and no
# labels in the rest.
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
if random.random() > 0.25 else [],

# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
[
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][6-trip_n]
][scenario]

9 changes: 9 additions & 0 deletions emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def create_confirmed_trips(user_id, timerange):
confirmed_trip_dict["data"]["cleaned_trip"] = tct.get_id()
confirmed_trip_dict["data"]["user_input"] = \
get_user_input_dict(ts, tct, input_key_list)
confirmed_trip_dict["data"]["inferred_labels"] = get_inferred_labels_list(tct)
confirmed_trip_entry = ecwe.Entry(confirmed_trip_dict)
# save the entry
ts.insert(confirmed_trip_entry)
Expand All @@ -97,3 +98,11 @@ def get_user_input_dict(ts, tct, input_key_list):
logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput))
return tct_userinput

# For a given trip, find the corresponding list of label inferences if it has been generated
def get_inferred_labels_list(trip):
candidates = esdt.get_sections_for_trip(esda.INFERRED_LABELS_KEY, trip.user_id, trip.get_id())
if len(candidates) == 0: return {} # Perhaps we have not run the inference step for this trip
assert len(candidates) == 1, \
"Multiple label inference list objects for trip "+str(trip.get_id())
return candidates[0]["data"]["prediction"]
GabrielKS marked this conversation as resolved.
Show resolved Hide resolved

1 change: 1 addition & 0 deletions emission/core/wrapper/confirmedtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Confirmedtrip(ecwt.Trip):
# https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-738120752
"primary_section": ecwb.WrapperBase.Access.WORM,
"inferred_primary_mode": ecwb.WrapperBase.Access.WORM,
"inferred_labels": ecwb.WrapperBase.Access.WORM,
# the user input will have all `manual/*` entries
# let's make that be somewhat flexible instead of hardcoding into the data model
"user_input": ecwb.WrapperBase.Access.WORM
Expand Down
2 changes: 2 additions & 0 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def _getData2Wrapper():
"mode_inference/model": "modeinfermodel",
# the predicted mode for a particular section
"inference/prediction": "modeprediction",
# the predicted labels for a particular trip
"inference/labels": "labelprediction",
# equivalent of cleaned_section, but with the mode set to the
# inferred mode instead of just walk/bike/motorized
# used for consistency and to make the client work whether or not we were
Expand Down
16 changes: 16 additions & 0 deletions emission/core/wrapper/labelprediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Based on modeprediction.py
import emission.core.wrapper.wrapperbase as ecwb

class Labelprediction(ecwb.WrapperBase):
props = {"trip_id": ecwb.WrapperBase.Access.WORM, # the trip that this is part of
"prediction": ecwb.WrapperBase.Access.WORM, # What we predict
shankari marked this conversation as resolved.
Show resolved Hide resolved
"start_ts": ecwb.WrapperBase.Access.WORM, # start time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline
"end_ts": ecwb.WrapperBase.Access.WORM, # end time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline
}

enums = {}
geojson = {}
local_dates = {}

def _populateDependencies(self):
pass
1 change: 1 addition & 0 deletions emission/core/wrapper/pipelinestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class PipelineStages(enum.Enum):
JUMP_SMOOTHING = 3
CLEAN_RESAMPLING = 11
MODE_INFERENCE = 4
LABEL_INFERENCE = 14
CREATE_CONFIRMED_OBJECTS = 13
TOUR_MODEL = 5
ALTERNATIVES = 10
Expand Down
9 changes: 9 additions & 0 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import emission.analysis.intake.cleaning.location_smoothing as eaicl
import emission.analysis.intake.cleaning.clean_and_resample as eaicr
import emission.analysis.classification.inference.mode.pipeline as eacimp
import emission.analysis.classification.inference.labels.pipeline as eacilp
import emission.net.ext_service.habitica.executor as autocheck

import emission.storage.decorations.stats_queries as esds
Expand Down Expand Up @@ -158,6 +159,14 @@ def run_intake_pipeline_for_user(uuid):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.MODE_INFERENCE.name,
time.time(), crt.elapsed)

with ect.Timer() as crt:
logging.info("*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10)
eacilp.infer_labels(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.LABEL_INFERENCE.name,
time.time(), crt.elapsed)

with ect.Timer() as crt:
logging.info("*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: creating confirmed objects " % uuid + "*" * 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
METRICS_DAILY_MEAN_DURATION = "metrics/daily_mean_duration"
METRICS_DAILY_USER_MEDIAN_SPEED = "metrics/daily_user_median_speed"
METRICS_DAILY_MEAN_MEDIAN_SPEED = "metrics/daily_mean_median_speed"
INFERRED_LABELS_KEY = "inference/labels"

# General methods

Expand Down
16 changes: 16 additions & 0 deletions emission/storage/pipeline_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ def mark_mode_inference_done(user_id, last_section_done):
def mark_mode_inference_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.MODE_INFERENCE)

def get_time_range_for_label_inference(user_id):
tq = get_time_range_for_stage(user_id, ps.PipelineStages.LABEL_INFERENCE)
tq.timeType = "data.end_ts"
return tq

# This stage operates on trips, not sections
def mark_label_inference_done(user_id, last_trip_done):
if last_trip_done is None:
mark_stage_done(user_id, ps.PipelineStages.LABEL_INFERENCE, None)
else:
mark_stage_done(user_id, ps.PipelineStages.LABEL_INFERENCE,
last_trip_done.data.end_ts + END_FUZZ_AVOID_LTE)

def mark_label_inference_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.LABEL_INFERENCE)

def get_time_range_for_output_gen(user_id):
return get_time_range_for_stage(user_id, ps.PipelineStages.OUTPUT_GEN)

Expand Down
1 change: 1 addition & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self, user_id):
"metrics/daily_user_median_speed": self.analysis_timeseries_db,
"metrics/daily_mean_median_speed": self.analysis_timeseries_db,
"inference/prediction": self.analysis_timeseries_db,
"inference/labels": self.analysis_timeseries_db,
"analysis/inferred_section": self.analysis_timeseries_db,
"analysis/confirmed_trip": self.analysis_timeseries_db,
"analysis/confirmed_section": self.analysis_timeseries_db
Expand Down