Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement label inference pipeline #825

Merged
merged 8 commits into from
Jul 8, 2021
Empty file.
155 changes: 155 additions & 0 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Standard imports
import logging
import random
import copy

# Our imports
import emission.storage.pipeline_queries as epq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.core.wrapper.labelprediction as ecwl

# Does all the work necessary for a given user
def infer_labels(user_id):
time_query = epq.get_time_range_for_label_inference(user_id)
try:
lip = LabelInferencePipeline()
lip.user_id = user_id
lip.run_prediction_pipeline(user_id, time_query)
if lip.last_trip_done is None:
logging.debug("After run, last_trip_done == None, must be early return")
epq.mark_label_inference_done(user_id, lip.last_trip_done)
except:
logging.exception("Error while inferring labels, timestamp is unchanged")
epq.mark_label_inference_failed(user_id)

# A set of placeholder predictors to allow pipeline development without a real inference algorithm.
# For the moment, the system is configured to work with two labels, "mode_confirm" and
# "purpose_confirm", so I'll do that.

# The first placeholder scenario represents a case where it is hard to distinguish between
# biking and walking (e.g., because the user is a very slow biker) and hard to distinguish
# between work and shopping at the grocery store (e.g., because the user works at the
# grocery store), but whenever the user bikes to the location it is to work and whenever
# the user walks to the location it is to shop (e.g., because they don't have a basket on
# their bike), and the user bikes to the location four times more than they walk there.
# Obviously, it is a simplification.
def placeholder_predictor_0(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]


# The next placeholder scenario provides that same set of labels in 75% of cases and no
# labels in the rest.
def placeholder_predictor_1(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []


# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
trip_n = 0 # ugly use of globals for testing only
def placeholder_predictor_2(trip):
global trip_n # ugly use of globals for testing only
trip_n %= 6
trip_n += 1
return [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][6-trip_n]

# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which
# runs on the results of other algorithms), primary_algorithms specifies a corresponding
# function to run. This makes it easy to plug in additional algorithms later.
primary_algorithms = {
# This can be edited to select a different placeholder predictor
ecwl.AlgorithmTypes.PLACEHOLDER: placeholder_predictor_2
}

# Code structure based on emission.analysis.classification.inference.mode.pipeline
# and emission.analysis.classification.inference.mode.rule_engine
class LabelInferencePipeline:
def __init__(self):
self._last_trip_done = None

@property
def last_trip_done(self):
return self._last_trip_done

# For a given user and time range, runs all the primary algorithms and ensemble, saves results
# to the database, and records progress
def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CONFIRMED_TRIP_KEY, user_id, time_query=time_range)
for trip in self.toPredictTrips:
results = self.compute_and_save_algorithms(trip)
ensemble = self.compute_and_save_ensemble(trip, results)

# Add final prediction to the confirmed trip entry in the database
trip["data"]["inferred_labels"] = ensemble["prediction"]
self.ts.update(trip)
shankari marked this conversation as resolved.
Show resolved Hide resolved
if self._last_trip_done is None or self._last_trip_done.data.end_ts < trip.data.end_ts:
self._last_trip_done = trip

# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
predictions = []
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip.data.start_ts
lp.end_ts = trip.data.end_ts
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
# Since this is not a real ensemble yet, we will not mark it as such
# il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE
il.algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"])
il.start_ts = trip.data.start_ts
il.end_ts = trip.data.end_ts

il.prediction = copy.copy(predictions[0]["prediction"])

self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
1 change: 0 additions & 1 deletion emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,3 @@ def get_user_input_dict(ts, tct, input_key_list):
tct_userinput[ikey_name] = matched_userinput.data.label
logging.debug("for trip %s, returning user input dict %s" % (tct.get_id(), tct_userinput))
return tct_userinput

1 change: 1 addition & 0 deletions emission/core/wrapper/confirmedtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Confirmedtrip(ecwt.Trip):
# https://github.com/e-mission/e-mission-docs/issues/476#issuecomment-738120752
"primary_section": ecwb.WrapperBase.Access.WORM,
"inferred_primary_mode": ecwb.WrapperBase.Access.WORM,
"inferred_labels": ecwb.WrapperBase.Access.WORM,
# the user input will have all `manual/*` entries
# let's make that be somewhat flexible instead of hardcoding into the data model
"user_input": ecwb.WrapperBase.Access.WORM
Expand Down
8 changes: 6 additions & 2 deletions emission/core/wrapper/entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,17 @@ def _getData2Wrapper():
# the generated model for the random forest based mode inference
# saved so that it can be used for prediction without retraining
"mode_inference/model": "modeinfermodel",
# the predicted mode for a particular section
# the predicted mode for a particular section (one entry per algorithm)
"inference/prediction": "modeprediction",
# the predicted labels for a particular trip (one entry per algorithm)
"inference/labels": "labelprediction",
# equivalent of cleaned_section, but with the mode set to the
# inferred mode instead of just walk/bike/motorized
# used for consistency and to make the client work whether or not we were
# running the inference step
# the final inferred section mode (possibly an ensemble result)
"analysis/inferred_section": "inferredsection",
# the final inferred label data structure (possibly an ensemble result)
"analysis/inferred_labels": "labelprediction",
### ** END: prediction objects
### ** BEGIN: confirmed objects which combine inferred and user input values
"analysis/confirmed_trip": "confirmedtrip",
Expand Down
33 changes: 33 additions & 0 deletions emission/core/wrapper/labelprediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Based on modeprediction.py
import emission.core.wrapper.wrapperbase as ecwb
import enum

# The "prediction" data structure is a list of label possibilities, each one consisting of a set of labels and a probability:
# [
# {"labels": {"labeltype1": "labelvalue1", "labeltype2": "labelvalue2"}, "p": 0.61},
# {"labels": {"labeltype1": "labelvalue3", "labeltype2": "labelvalue4"}, "p": 0.27},
# ...
# ]


class AlgorithmTypes(enum.Enum):
ENSEMBLE = 0
PLACEHOLDER = 1


class Labelprediction(ecwb.WrapperBase):
props = {"trip_id": ecwb.WrapperBase.Access.WORM, # the trip that this is part of
"algorithm_id": ecwb.WrapperBase.Access.WORM, # the algorithm that made this prediction
"prediction": ecwb.WrapperBase.Access.WORM, # What we predict -- see above
"start_ts": ecwb.WrapperBase.Access.WORM, # start time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline
"end_ts": ecwb.WrapperBase.Access.WORM, # end time for the prediction, so that it can be captured in time-based queries, e.g. to reset the pipeline
}

enums = {
"algorithm_id": AlgorithmTypes
}
geojson = {}
local_dates = {}

def _populateDependencies(self):
pass
1 change: 1 addition & 0 deletions emission/core/wrapper/pipelinestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PipelineStages(enum.Enum):
CLEAN_RESAMPLING = 11
MODE_INFERENCE = 4
CREATE_CONFIRMED_OBJECTS = 13
LABEL_INFERENCE = 14
TOUR_MODEL = 5
ALTERNATIVES = 10
USER_MODEL = 7
Expand Down
9 changes: 9 additions & 0 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import emission.analysis.intake.cleaning.location_smoothing as eaicl
import emission.analysis.intake.cleaning.clean_and_resample as eaicr
import emission.analysis.classification.inference.mode.pipeline as eacimp
import emission.analysis.classification.inference.labels.pipeline as eacilp
import emission.net.ext_service.habitica.executor as autocheck

import emission.storage.decorations.stats_queries as esds
Expand Down Expand Up @@ -166,6 +167,14 @@ def run_intake_pipeline_for_user(uuid):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS.name,
time.time(), crt.elapsed)

with ect.Timer() as crt:
logging.info("*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: inferring labels" % uuid + "*" * 10)
eacilp.infer_labels(uuid)

esds.store_pipeline_time(uuid, ecwp.PipelineStages.LABEL_INFERENCE.name,
time.time(), crt.elapsed)

with ect.Timer() as ogt:
logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
METRICS_DAILY_MEAN_DURATION = "metrics/daily_mean_duration"
METRICS_DAILY_USER_MEDIAN_SPEED = "metrics/daily_user_median_speed"
METRICS_DAILY_MEAN_MEDIAN_SPEED = "metrics/daily_mean_median_speed"
INFERRED_LABELS_KEY = "inference/labels"

# General methods

Expand Down
16 changes: 16 additions & 0 deletions emission/storage/pipeline_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ def mark_mode_inference_done(user_id, last_section_done):
def mark_mode_inference_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.MODE_INFERENCE)

def get_time_range_for_label_inference(user_id):
tq = get_time_range_for_stage(user_id, ps.PipelineStages.LABEL_INFERENCE)
tq.timeType = "data.end_ts"
return tq

# This stage operates on trips, not sections
def mark_label_inference_done(user_id, last_trip_done):
if last_trip_done is None:
mark_stage_done(user_id, ps.PipelineStages.LABEL_INFERENCE, None)
else:
mark_stage_done(user_id, ps.PipelineStages.LABEL_INFERENCE,
last_trip_done.data.end_ts + END_FUZZ_AVOID_LTE)

def mark_label_inference_failed(user_id):
mark_stage_failed(user_id, ps.PipelineStages.LABEL_INFERENCE)

def get_time_range_for_output_gen(user_id):
return get_time_range_for_stage(user_id, ps.PipelineStages.OUTPUT_GEN)

Expand Down
2 changes: 2 additions & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def __init__(self, user_id):
"metrics/daily_user_median_speed": self.analysis_timeseries_db,
"metrics/daily_mean_median_speed": self.analysis_timeseries_db,
"inference/prediction": self.analysis_timeseries_db,
"inference/labels": self.analysis_timeseries_db,
"analysis/inferred_section": self.analysis_timeseries_db,
"analysis/inferred_labels": self.analysis_timeseries_db,
"analysis/confirmed_trip": self.analysis_timeseries_db,
"analysis/confirmed_section": self.analysis_timeseries_db
}
Expand Down