Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Survey Assist Using RF #938

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6471a96
add copy of code used in TRB paper
hlu109 Aug 12, 2022
9d6a1af
update user uuid lookup; add documentation note
hlu109 Dec 16, 2022
9bd9b18
Add additional logging to the calculation so that we can monitor the …
shankari Feb 15, 2023
1b9ece0
making `cluster_performance.ipynb`, `generate_figs_for_poster` and `…
humbleOldSage Aug 22, 2023
e7d2a14
Unified Interface for fit function
humbleOldSage Aug 26, 2023
59633e0
Fixing `models.py` to support `regenerate_classification_performance_…
humbleOldSage Aug 30, 2023
0adb5fe
[PARTIALLY TESTED] Single database read and Code Cleanuo
humbleOldSage Sep 14, 2023
e9abd51
[PARTIALLY TESTED] Survey Assist Using RF
humbleOldSage Oct 2, 2023
3820d87
[NOT TESTED]Predict implemented
humbleOldSage Oct 3, 2023
5b2572e
[NOT TESTED] Model storage and Model Testing included
humbleOldSage Oct 9, 2023
bf7f406
[TESTED]Forest Model Integration
humbleOldSage Nov 2, 2023
1d7be5a
Minor fixes
humbleOldSage Nov 2, 2023
b3d0db2
Delete Config file
humbleOldSage Nov 3, 2023
c514fe0
Merge remote-tracking branch 'e-mission-eval-private-data/move-models…
humbleOldSage Nov 3, 2023
3b038a9
removedfile
humbleOldSage Nov 3, 2023
94fc848
Update model.py
humbleOldSage Nov 3, 2023
87f109c
Merge branch 'master' into SurveyAssistUsingRandomForest
humbleOldSage Dec 7, 2023
33cdaab
[Tested, Will fail]Integrating RF model on server and more Unit test
humbleOldSage Dec 9, 2023
01fcb2a
minor fix
humbleOldSage Dec 9, 2023
f5fec64
Delete model.py
humbleOldSage Dec 12, 2023
585cc90
Update TestForestModel.py
humbleOldSage Dec 13, 2023
61bbe3f
Minor Fixes
humbleOldSage Dec 16, 2023
a32ce4f
[Tested]Adding Integration test
humbleOldSage Jan 2, 2024
052cb08
Improving test
humbleOldSage Jan 10, 2024
104dd9a
Integration Testing for forest model
humbleOldSage Feb 5, 2024
1b523ed
[Tested] Improvements for model integration
humbleOldSage Mar 15, 2024
35a1346
Forest Model related data additions
humbleOldSage Mar 21, 2024
19bb394
Update TestForestModelIntegration.py
humbleOldSage Mar 22, 2024
450094c
[TESTED] Updated ForestModelLoadAndSave.py
humbleOldSage Mar 22, 2024
ad968de
Fixing TestForestModelLoadandSave.py
humbleOldSage Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions emission/analysis/modelling/trip_model/forest_classifier.py
shankari marked this conversation as resolved.
Show resolved Hide resolved
shankari marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import emission.analysis.modelling.trip_model.config as eamtc
import emission.storage.timeseries.builtin_timeseries as estb
import emission.storage.decorations.trip_queries as esdtq
from emission.analysis.modelling.trip_model.models import ForestClassifierModel
from emission.analysis.modelling.trip_model.models import ForestClassifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see this form of import
Already flagged in #938 (comment)
and #938 (comment)
but not yet fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIxed.


EARTH_RADIUS = 6371000

class ForestClassifier(eamuu.TripModel):
class ForestClassifierModel(eamuu.TripModel):

def __init__(self,config=None):

Expand Down Expand Up @@ -54,7 +54,24 @@ def __init__(self,config=None):
if config.get(k) is None:
msg = f"cluster trip model config missing expected key {k}"
raise KeyError(msg)
self.model=ForestClassifierModel(config=config)
maxdepth =config['max_depth'] if config['max_depth']!='null' else None
self.model=ForestClassifier( loc_feature=config['loc_feature'],
radius= config['radius'],
size_thresh=config['radius'],
purity_thresh=config['purity_thresh'],
gamma=config['gamma'],
C=config['C'],
n_estimators=config['n_estimators'],
criterion=config['criterion'],
max_depth=maxdepth,
min_samples_split=config['min_samples_split'],
min_samples_leaf=config['min_samples_leaf'],
max_features=config['max_features'],
bootstrap=config['bootstrap'],
random_state=config['random_state'],
# drop_unclustered=False,
use_start_clusters=config['use_start_clusters'],
use_trip_clusters=config['use_trip_clusters'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just boilerplate code to copy parameters from one datastructure to another? Why not use something like kwargs instead?

Copy link
Contributor Author

@humbleOldSage humbleOldSage Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I have used kwargs now.



def fit(self,trips: List[ecwc.Confirmedtrip]):
Expand Down Expand Up @@ -89,7 +106,7 @@ def predict(self, trip: List[float]) -> Tuple[List[Dict], int]:
msg = f'model.predict cannot be called with an empty trips'
raise Exception(msg)
# CONVERT LIST OF TRIPS TO dataFrame
test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",[trip])
test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trip)
labeled_trip_df = esdtq.filter_labeled_trips(test_df)
expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df)
predcitions_df= self.model.predict(expanded_labeled_trip_df)
Expand Down Expand Up @@ -128,8 +145,14 @@ def to_dict(self):
## confirm this includes all the extra encoders/models
attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is trip_grouper and where did it come from? Why do we need to save it?
Why are we serializing attribute-by-attribute instead of saving a JSON representation of the model, which would be be obvious fit for our document-based storage system?

Copy link
Contributor Author

@humbleOldSage humbleOldSage Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for JSON serialization, we'll have to work attribute by attribute, since each instance here contain instances of other classes (ForestClassifierModel has ForestClassifier which has 3 RandomForestClassifiers and 2 OneHotWrappers ( this further has oneHotEncoders). We'll have to add for every class its own serialization method ( since JSON only serializes basic data structures by default) so that json.dumps (obj) can work. The current version was written with least disturbance to other parts of codebase in mind. However, prioritizing the way serialization fits our DB is an important thought. Do you want me to try the JSON way?

for attribute_name in attr:
if not hasattr(self.model,attribute_name):
raise ValueError(f"Attribute {attribute_name} not found in the model")

buffer=BytesIO()
joblib.dump(getattr(self.model,attribute_name),buffer)
try:
joblib.dump(getattr(self.model,attribute_name),buffer)
except Exception as e:
raise RuntimeError(f"Error serializing { attribute_name}: {str(e)}")
buffer.seek(0)
data[attribute_name]=buffer.getvalue()

Expand All @@ -144,14 +167,14 @@ def from_dict(self,model: Dict):
## TODO : confirm this includes all the extra encoders/models
attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper'])
for attribute_name in attr:
if attribute_name not in model:
raise ValueError(f"Attribute {attribute_name} missing in the model")
try:
if attribute_name in model:
buffer = BytesIO(model[attribute_name])
setattr(self.model,attribute_name, joblib.load(buffer))
buffer = BytesIO(model[attribute_name])
setattr(self.model,attribute_name, joblib.load(buffer))
except Exception as e:
print(f"Error loading {attribute_name}: {str(e)}")
# If we do not wish to raise the exception after logging the error, comment the line below
raise e
raise RuntimeError(f"Error deserializing { attribute_name}: {str(e)}")
# If we do not wish to raise the exception after logging the error, comment the line above

def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]:
"""
Expand Down
2 changes: 1 addition & 1 deletion emission/analysis/modelling/trip_model/model_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def build(self, config=None) -> eamuu.TripModel:
# Dict[ModelType, TripModel]
MODELS = {
ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning,
ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifier
ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifierModel
}
model = MODELS.get(self)
if model is None:
Expand Down
250 changes: 105 additions & 145 deletions emission/tests/modellingTests/TestForestModel.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,66 @@
import unittest
import logging
import numpy as np
import uuid
import json
import os

import emission.analysis.modelling.trip_model.model_storage as eamums
import emission.analysis.modelling.trip_model.model_type as eamumt
import emission.analysis.modelling.trip_model.run_model as eamur
import emission.analysis.modelling.trip_model.model_type as eamumt
import emission.analysis.modelling.trip_model.model_storage as eamums
import emission.storage.json_wrappers as esj
import emission.storage.timeseries.abstract_timeseries as esta
import emission.tests.modellingTests.modellingTestAssets as etmm
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.core.get_database as edb
import emission.storage.pipeline_queries as epq
import emission.core.wrapper.pipelinestate as ecwp
import numpy as np
import emission.core.wrapper.entry as ecwe
import emission.storage.decorations.analysis_timeseries_queries as esdatq

class TestRunForestModel(unittest.TestCase):
"""these tests were copied forward during a refactor of the tour model
[https://github.com/e-mission/e-mission-server/blob/10772f892385d44e11e51e796b0780d8f6609a2c/emission/analysis/modelling/tour_model_first_only/load_predict.py#L114]
class TestForestModel(unittest.TestCase):

it's uncertain what condition they are in besides having been refactored to
use the more recent tour modeling code.
"""

def setUp(self):
"""
sets up the end-to-end run model test with Confirmedtrip data
"""
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
level=logging.DEBUG)

# configuration for randomly-generated test data
self.user_id = user_id = 'TestRunForestModel-TestData'
self.origin = (-105.1705977, 39.7402654,)
self.destination = (-105.1755606, 39.7673075)
self.min_trips = 14
self.total_trips = 100
self.clustered_trips = 33 # must have at least self.min_trips similar trips by default
self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant
# $clustered_trips * $has_label_percent > self.min_trips
# must be correct or else this test could fail under some random test cases.

# for a negative test, below
self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl'

# test data can be saved between test invocations, check if data exists before generating
ts = esta.TimeSeries.get_time_series(user_id)
test_data = list(ts.find_entries(["analysis/confirmed_trip"]))
if len(test_data) == 0:
# generate test data for the database
logging.debug(f"inserting mock Confirmedtrips into database")

# generate labels with a known sample weight that we can rely on in the test
label_data = {
"mode_confirm": ['ebike', 'bike'],
"purpose_confirm": ['happy-hour', 'dog-park'],
"replaced_mode": ['walk'],
"mode_weights": [0.9, 0.1],
"purpose_weights": [0.1, 0.9]
}

train = etmm.generate_mock_trips(
user_id=user_id,
trips=self.total_trips,
origin=self.origin,
destination=self.destination,
trip_part='od',
label_data=label_data,
within_threshold=self.clustered_trips,
threshold=0.004, # ~400m
has_label_p=self.has_label_percent
)

ts.bulk_insert(train)

# confirm data write did not fail
test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None)
if len(test_data) != self.total_trips:
logging.debug(f'test invariant failed after generating test data')
self.fail()
else:
logging.debug(f'found {self.total_trips} trips in database')

self.user_id = uuid.UUID('aa9fdec9-2944-446c-8ee2-50d79b3044d3')
self.ts = esta.TimeSeries.get_time_series(self.user_id)
self.new_trips_per_invocation = 3
self.model_type = eamumt.ModelType.RANDOM_FOREST_CLASSIFIER
self.model_storage = eamums.ModelStorage.DOCUMENT_DATABASE
sim_threshold = 500 # meters
self.forest_model_config= {
"loc_feature" : "coordinates",
"radius": 500,
"size_thresh":1,
"purity_thresh":1.0,
"gamma":0.05,
"C":1,
"n_estimators":100,
"criterion":"gini",
"max_depth":'null',
"min_samples_split":2,
"min_samples_leaf":1,
"max_features":"sqrt",
"bootstrap":True,
"random_state":42,
"use_start_clusters":False,
"use_trip_clusters":True
}

existing_entries_for_user = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY]))
if len(existing_entries_for_user) != 0:
raise Exception(f"test invariant failed, there should be no entries for user {self.user_id}")

# load in trips from a test file source
input_file = 'emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for this file because it was used in one other tests for greedySimilarityBinning. However, for the user mentioned (above), there are just 6 trips. I wanted to confirm with you if I am free to use other files from this location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other files in this location are used in other tests so I don't see any reason why they cannot be used. As you point out, these are daily snapshots, so they are unlikely to give very high quality results, but there is no restriction on their use.

with open(input_file, 'r') as f:
trips_json = json.load(f, object_hook=esj.wrapped_object_hook)
self.trips = [ecwe.Entry(r) for r in trips_json]
logging.debug(f'loaded {len(self.trips)} trips from {input_file}')

def tearDown(self):
"""
clean up database
Expand All @@ -88,86 +70,64 @@ def tearDown(self):
edb.get_pipeline_state_db().delete_many({'user_id': self.user_id})


# def test_model_consistency(self):
# """
# Test to ensure that the model's predictions on the mock data remain consistent.
# """
# # Get the mock data from the parent class's setup
# mock_data = self.mock_data

# # Predict using the model
# current_predictions = eamur.predict_labels_with_n(
# trip=mock_data,
# model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER,
# model_storage=eamums.ModelStorage.DOCUMENT_DATABASE
# ) # assuming this is how you get predictions
# ## TODO :
# # Check if there are any previously stored predictions
# stored_predictions = list(self.collection.find({}))

# if len(stored_predictions) == 0:
# # If not, store the current predictions as the ground truth
# self.collection.insert_many([{"index": i, "prediction": p} for i, p in enumerate(current_predictions)])
# logging.debug("Stored current model predictions as ground truth.")
# else:
# # If there are stored predictions, compare them with the current predictions
# for stored_pred in stored_predictions:
# index, stored_value = stored_pred["index"], stored_pred["prediction"]
# current_value = current_predictions[index]

# self.assertEqual(stored_value, current_value, f"Prediction at index {index} has changed! Expected {stored_value}, but got {current_value}.")

# logging.debug("Model predictions are consistent with previously stored predictions.")

## TODO : Fix regression Tests

# def test_regression(self):
# """
# Regression test to ensure consistent model results.
# """
# # Load the previously stored predictions (if any)
# previous_predictions = self.load_previous_predictions()
def testRandomForestRegression(self):
"""
test to ensure consistent model results. Load data for a user from json, split
into train and test. After training, we generate predictions and match them with
predictions from last time. If the code is run for the first time, the current predicitons
will be stored as ground truth.
"""
file_path= 'emission/tests/modellingTests/data.json'
split=int(0.9*len(self.trips))
train_data= self.trips[:split]

self.ts.bulk_insert(train_data)

# confirm write to database succeeded
self.initial_data = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY]))
if len(self.initial_data) == 0:
logging.debug(f'Writing train data failed')
self.fail()

test_data=self.trips[split:]
logging.debug(f'LENDATA{len(train_data),len(test_data)}')
eamur.update_trip_model(
user_id=self.user_id,
model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER,
model_storage=eamums.ModelStorage.DOCUMENT_DATABASE,
min_trips=4,
model_config=self.forest_model_config
)
model = eamur._load_stored_trip_model(
user_id=self.user_id,
model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER,
model_storage=eamums.ModelStorage.DOCUMENT_DATABASE,
model_config=self.forest_model_config
)

# # Run the current model to get predictions
# current_predictions = self.run_current_model()

# # If there are no previous predictions, store the current predictions
# if previous_predictions is None:
# self.store_predictions(current_predictions)
# else:
# # Compare the current predictions with the previous predictions
# self.assertPredictionsMatch(previous_predictions, current_predictions)

# def load_previous_predictions(self):
# # Retrieve stored predictions from the database
# # Using get_analysis_timeseries_db as an example, replace with the correct method if needed
# db = edb.get_analysis_timeseries_db()
# predictions = db.find_one({"user_id": self.user_id, "metadata.key": "predictions"})
# return predictions

# def run_current_model(self):
# # Placeholder: Run the current model and get predictions
# # Replace this with the actual model running code
# predictions = None
# return predictions

# def store_predictions(self, predictions):
# # Store the predictions in the database
# # Using get_analysis_timeseries_db as an example, replace with the correct method if needed
# db = edb.get_analysis_timeseries_db()
# entry = {
# "user_id": self.user_id,
# "metadata": {
# "key": "predictions",
# "write_ts": pd.Timestamp.now().timestamp() # Using pandas timestamp as an example
# },
# "data": predictions
# }
# db.insert_one(entry)

# def assertPredictionsMatch(self, prev, curr):
# # Placeholder: Check if the predictions match
# # This will depend on the format and type of your predictions
# # For example, if predictions are lists or arrays, you can use numpy
# if not np.array_equal(prev, curr):
# self.fail("Current model predictions do not match previously stored predictions!")
curr_predictions_list = eamur.predict_labels_with_n(
trip_list = [test_data],
model=model
)


## predictions take the form like :
#
#{'labels': {'mode_confirm': 'ebike', 'replaced_mode': 'walk', 'purpose_confirm': 'dog-park'}, 'p': 1.0}
# we can store these predictions in a json and then for every run other than the first we
# can load the predictions and compare

try:
if os.path.exists(file_path) and os.path.getsize(file_path)>0:
with open(file_path, 'r') as f:
prev_predictions_list = json.load(f)
logging.debug()
self.assertEqual(prev_predictions_list,curr_predictions_list," previous predictions should match current predictions")
else:
with open(file_path,'w') as file:
json.dump(curr_predictions_list,file,indent=4)
logging.debug("Previous predicitons stored for future matching" )
except json.JSONDecodeError:
logging.debug("jsonDecodeErrorError")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is questionable. I wanted to know if dumping ( writing predictions of first run ever to be used as ground truth) in a JSON here is a good practice or not?

Other option is to store the past predictions in db. But I think the tear-down will clean it up while exiting. So I opted for JSON. Is there a way to protect the writing. Let me know if I am missing something here.

return " decoding JSON."
Loading
Loading