Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data copy from one server to the other: new data is not processed #845

Open
shankari opened this issue Jan 14, 2023 · 24 comments
Open

Data copy from one server to the other: new data is not processed #845

shankari opened this issue Jan 14, 2023 · 24 comments

Comments

@shankari
Copy link
Contributor

As we have separate enclaves for different projects, we need to have mechanisms for copying data from one enclave to another.

  • A concrete example might be a study/program that ends, but some users want to keep on collecting data using the open access study
  • A second example is copying over data from the CanBikeCO server while transitioning to NREL OpenPATH

The second example has actually occurred, and I copied over the data using a combination of bin/debug/extract_timeline_for_day_range_and_user.py and bin/debug/load_multi_timeline_for_range.py

The data was copied over correctly, and the user confirmed that it was displayed correctly.
However, data collected after the migration is not processed correctly and always remains in draft mode.

On investigating further, this is because the CLEAN_AND_RESAMPLE stage fails with the following error


2023-01-14T06:16:40.952Z    2023-01-14 06:16:40,952:DEBUG:140037033350976:get_entry_at_ts query = {'user_id': [redacted], 'metadata.key': 'background/filtered_location', 'data.ts': 1672435958.12}
    2023-01-14T06:16:41.521Z    2023-01-14 06:16:41,521:DEBUG:140037033350976:get_entry_at_ts result = None
    2023-01-14T06:16:41.522Z    2023-01-14 06:16:41,521:ERROR:140037033350976:Found error 'NoneType' object is not iterable while processing trip Entry({'_id': ObjectId('63b1db470f8d9992c7b4455f'), 'user_id': [redacted], 'metadata': {'key': 'segmentation/raw_trip', 'platform': 'server', 'write_ts': 1672600391.668694, 'time_zone': 'America/Los_Angeles', 'write_local_dt': {'year': 2023, 'month': 1, 'day': 1, 'hour': 11, 'minute': 13, 'second': 11, 'weekday': 6, 'timezone': 'America/Los_Angeles'}, 'write_fmt_time': '2023-01-01T11:13:11.668694-08:00'}, 'data': {'source': 'DwellSegmentationTimeFilter', 'start_ts': 1672586043.121, 'start_local_dt': {'year': 2023, 'month': 1, 'day': 1, 'hour': 8, 'minute': 14, 'second': 3, 'weekday': 6, 'timezone': 'America/Denver'}, 'start_fmt_time': '2023-01-01T08:14:03.121000-07:00', 'start_place': ObjectId('63af64b6aef87808843e480c'), 'start_loc': {'type': 'Point', '
coordinates': [-104.9209856, 39.9478752]}, 'end_ts': 1672588188.026, 'end_local_dt': {'year': 2023, 'month': 1, 'day': 1, 'hour': 8, 'minute': 49, 'second': 48, 'weekday': 6, 'timezone': 'America/Denver'}, 'end_fmt_time': '2023-01-01T08:49:48.026000-07:00', 'end_place': ObjectId('63b1db470f8d9992c7b44560'), 'end_loc': {'type': 'Point', 'coordinates': [-105.056373, 40.0321814]}, 'duration': 2144.9049999713898, 'distance': 14863.150328118767}})
    2023-01-14T06:16:41.522Z    Traceback (most recent call last):
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 121, in save_cleaned_segments_for_timeline
    2023-01-14T06:16:41.522Z    filtered_trip = get_filtered_trip(ts, trip)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 187, in get_filtered_trip
    2023-01-14T06:16:41.522Z    (filtered_section_entry, point_list) = get_filtered_section(filtered_trip_entry, section)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 292, in get_filtered_section
    2023-01-14T06:16:41.522Z    with_speeds_df = get_filtered_points(section, filtered_section_data)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 391,
 in get_filtered_points
    2023-01-14T06:16:41.522Z    filtered_loc_df = _add_start_point(filtered_loc_df, raw_start_place, ts, section.data.sensed_mode)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 562, in _add_start_point
    2023-01-14T06:16:41.522Z    raw_start_place_enter_loc_entry = _get_raw_place_enter_loc_entry(ts, raw_start_place)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 696, in _get_raw_place_enter_loc_entry
    2023-01-14T06:16:41.522Z    raw_place.data.enter_ts))
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/core/wrapper/wrapperbase.py", line 42, in __init__
    2023-01-14T06:16:41.522Z    super(WrapperBase, self).__init__(*args, **kwargs)
    2023-01-14T06:16:41.522Z    File "/root/miniconda-4.12.0/envs/emission/lib/python3.7/site-packages/attrdict/dictionary.py", line 17, in __init__
    2023-01-14T06:16:41.522Z    super(AttrDict, self).__init__(*args, **kwargs)
    2023-01-14T06:16:41.522Z    TypeError: 'NoneType' object is not iterable

We need to see why the start point is not available

@shankari
Copy link
Contributor Author

Related code is

def _get_raw_place_enter_loc_entry(ts, raw_place):
    if raw_place.data.enter_ts is not None:
        raw_start_place_enter_loc_entry = ecwe.Entry(
            ts.get_entry_at_ts("background/filtered_location", "data.ts",
                               raw_place.data.enter_ts))
    else:
        # These are not strictly accurate because the exit ts for the place
        # corresponds to the ts of the first point in the section. We are trying
        # to determine the correct exit_ts here. But its a reasonable estimate,
        # at least for the time zone, which is required when we extrapolate
        # note that this will fail for the specific case in which the first point outside
        # the geofence of the first place in a trip chain is in a different timezone
        # than the point itself. We can work around that by storing the enter_ts even
        # for the first place.
        dummy_section_start_loc_doc = {
            "loc": raw_place.data.location,
            "latitude": raw_place.data.location.coordinates[1],
            "longitude": raw_place.data.location.coordinates[0],
            "ts": raw_place.data.exit_ts,
            "fmt_time": raw_place.data.exit_fmt_time,
            "local_dt": raw_place.data.exit_local_dt
        }
        raw_start_place_enter_loc_entry = ecwe.Entry.create_entry(raw_place.user_id,
                                                                  "background/filtered_location",
                                                                  dummy_section_start_loc_doc)
    logging.debug("Raw place is %s and corresponding location is %s" %
                  (raw_place.get_id(), raw_start_place_enter_loc_entry.get_id()))
    return raw_start_place_enter_loc_entry

@shankari
Copy link
Contributor Author

So my current speculation is that the raw place for this trip was copied over from the old data and the link up with the new data is broken somehow. However, if we copied everything, we should have copied over all of the location timestamps as well. Why was this missing?

Let's first get the raw place (unnecessary fields redacted):

{'_id': ObjectId('63af64b6aef87808843e480c'), 'metadata': {'key': 'segmentation/raw_place', 'write_fmt_time': '2022-12-30T14:22:46.134360-08:00'}, 'data': {'source': 'DwellSegmentationTimeFilter', 'enter_ts': 1672435958.12, 'enter_fmt_time': '2022-12-30T14:32:38.120000-07:00', 'ending_trip': ObjectId('63af64b6aef87808843e480b'), 'exit_ts': 1672586043.121, 'exit_fmt_time': '2023-01-01T08:14:03.121000-07:00', 'starting_trip': ObjectId('63b1db470f8d9992c7b4455f'), 'duration': 150085.00100016594}}

As expected, the enter timestamp is from the 30th, so before the copy.
Let's confirm that there really isn't a location point with that timestamp. There isn't, even if we expand the range a bit.

>>> edb.get_timeseries_db().find_one({'user_id': UUID(...), 'metadata.key': 'background/filtered_location', 'data.ts': 1672435958.12})
>>> edb.get_timeseries_db().find_one({'user_id': UUID(...), 'metadata.key': 'background/filtered_location', 'data.ts': {"$gte": 1672435900, "$lte": 1672435800}})

@shankari
Copy link
Contributor Author

Did all the locations not get exported properly? I was worried about that, but I checked the number of entries and it seemed to match. Let's double-check the location profile. Now that we have loaded a bunch of other data on top, our queries have to be more complicated. #$#$# trying to do this at the last minute...

@shankari
Copy link
Contributor Author

Hm, searching for the last three entries before the start_ts, they are from Oct

>>> list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672435800}}).sort("data.ts", -1).limit(3))
               data.fmt_time
0  2022-10-02T08:47:59-06:00
1  2022-10-02T08:47:56-06:00
2  2022-10-02T08:47:53-06:00

@shankari
Copy link
Contributor Author

let's look for everything a little after the exit_ts to see if there is a transition

>>> pd.json_normalize(list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2023-01-01T08:23:48.014000-07:00
1   2023-01-01T08:23:17.079000-07:00
2   2023-01-01T08:22:47.037000-07:00
3   2023-01-01T08:21:14.033000-07:00
4   2023-01-01T08:20:12.074000-07:00
5   2023-01-01T08:19:41.096000-07:00
6   2023-01-01T08:19:11.055000-07:00
7   2023-01-01T08:18:40.019000-07:00
8   2023-01-01T08:18:09.021000-07:00
9   2023-01-01T08:17:38.002000-07:00
10  2023-01-01T08:16:36.016000-07:00
11  2023-01-01T08:16:05.105000-07:00
12  2023-01-01T08:15:35.090000-07:00
13  2023-01-01T08:15:05.083000-07:00
14  2023-01-01T08:14:35.025000-07:00
15  2023-01-01T08:14:03.121000-07:00
16         2022-10-02T08:47:59-06:00
17         2022-10-02T08:47:56-06:00
18         2022-10-02T08:47:53-06:00
19         2022-10-02T08:47:50-06:00

@shankari
Copy link
Contributor Author

shankari commented Jan 15, 2023

Checked the mongodump and confirmed that there are location entries there.
So we need to change bin/debug/extract_timeline_for_day_range_and_user.py to retry until we get to the actual end of the list. Depending on how we do this, we may end up with a few duplicate entries at the borders. So we will need to change
bin/debug/load_multi_timeline_for_range.py to deal with duplicate entries.

I could also write a special script just for this that finds the missing entries from the timeseries and copies them over, but it seems like the more general fix addresses the underlying problem as well, and will fix this one too.

>>> pd.json_normalize(list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2022-12-30T14:35:41.025000-07:00
1   2022-12-30T14:35:10.056000-07:00
2   2022-12-30T14:34:40.047000-07:00
3   2022-12-30T14:34:09.100000-07:00
4   2022-12-30T14:33:38.201000-07:00
5   2022-12-30T14:33:08.162000-07:00
6   2022-12-30T14:32:38.120000-07:00
7   2022-12-30T14:32:08.073000-07:00
8   2022-12-30T14:31:37.184000-07:00
9   2022-12-30T14:31:07.156000-07:00
10  2022-12-30T14:30:37.096000-07:00
11  2022-12-30T14:30:07.053000-07:00
12  2022-12-30T14:29:36.122000-07:00
13  2022-12-30T14:29:06.083000-07:00
14  2022-12-30T14:28:36.065000-07:00
15  2022-12-30T14:28:01.315000-07:00
16  2022-12-30T14:27:31.296000-07:00
17  2022-12-30T14:27:01.227000-07:00
18  2022-12-30T14:26:31.197000-07:00
19  2022-12-30T14:26:01.190000-07:00

Checking to see whether this happens for the recreated locations as well. I'm guessing not, since the original copy seemed to work fine.

>>> pd.json_normalize(list(edb.get_analysis_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'analysis/recreated_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2022-12-30T14:32:38.120000-07:00
1   2022-12-30T14:32:16.888177-07:00
2   2022-12-30T14:31:46.888177-07:00
3   2022-12-30T14:31:16.888177-07:00
4   2022-12-30T14:30:46.888177-07:00
5   2022-12-30T14:30:16.888177-07:00
6   2022-12-30T14:29:46.888177-07:00
7   2022-12-30T14:29:16.888177-07:00
8   2022-12-30T14:28:46.888177-07:00
9   2022-12-30T14:28:16.888177-07:00
10  2022-12-30T14:27:46.888177-07:00
11  2022-12-30T14:27:16.888177-07:00
12  2022-12-30T14:26:46.888177-07:00
13  2022-12-30T14:26:16.888177-07:00
14  2022-12-30T14:25:46.888177-07:00
15  2022-12-30T14:25:16.888177-07:00
16  2022-12-30T14:24:46.888177-07:00
17  2022-12-30T14:24:16.888177-07:00
18  2022-12-30T14:23:46.888177-07:00
19  2022-12-30T14:23:16.888177-07:00

BINGO! As expected, the recreated locations have in fact been copied over correctly.

@shankari
Copy link
Contributor Author

Tried obvious solution to retry until we get to the end, similar to
https://github.com/MobilityNet/mobilitynet-analysis-scripts/blob/master/emeval/input/spec_details.py#L173

Code diff
diff --git a/bin/debug/extract_timeline_for_day_range_and_user.py b/bin/debug/extract_timeline_for_day_range_and_user.py
index 9fe0ee2a..e9ff091e 100644
--- a/bin/debug/extract_timeline_for_day_range_and_user.py
+++ b/bin/debug/extract_timeline_for_day_range_and_user.py
@@ -18,6 +18,7 @@ import json
 import bson.json_util as bju
 import arrow
 import argparse
+import copy

 import emission.core.wrapper.user as ecwu
 import emission.storage.timeseries.abstract_timeseries as esta
@@ -28,6 +29,32 @@ import emission.storage.timeseries.cache_series as estcs
 # https://github.com/e-mission/e-mission-docs/issues/356#issuecomment-520630934
 import emission.net.usercache.abstract_usercache as enua

+def check_done(curr_batch, list_so_far, query):
+    if len(curr_batch) == 0:
+        logging.debug(f"No more entries, finished reading for {query}")
+        return True
+    elif len(curr_batch) == 1 and curr_batch[0]["_id"] == list_so_far[-1]["_id"]:
+        logging.debug(f"Re-read the same single entry, finished reading for {query}")
+        return True
+    else:
+        logging.debug(f"curr_batch has length {len(curr_batch)}, not finished reading for {query}")
+        return False
+
+def get_with_retry(user_id, in_query):
+    # Let's clone the query since we are going to modify it
+    query = copy.copy(in_query)
+    # converts "data.ts" = ["data", "ts"]
+    timeTypeSplit = query.timeType.split(".")
+    list_so_far = []
+    done = False
+    while not done:
+        curr_batch = list(estcs.find_entries(user_id, key_list=None, time_query=query))
+        logging.debug(f"Retrieved {len(curr_batch)} entries for {query}")
+        done = check_done(curr_batch, list_so_far, query)
+        list_so_far.extend(curr_batch)
+        query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]]
+    return list_so_far
+

However, we get a concatentation of the ts and uc data, from escts
So if the timeseries has > limit entries, we will retrieve the first "limit" entries, let's say that the last entry is at t1
we then retrieve the usercache, the last entry of which may be t2 > t1. So we will again skip the entries between t2 and t1

In practice, while testing with the user above, it looks like t1 > t2, so we end up with an infinite loop

DEBUG:root:Retrieved 2787 entries for TimeQuery data.ts with range [1672195088.341, 1703980800)
DEBUG:root:curr_batch has length 2787, not finished reading for TimeQuery data.ts with range [1672195088.341, 1703980800)
DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1672195088.341}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 2491
DEBUG:root:finished querying values for None, count = 295
DEBUG:root:orig_ts_db_matches = 2491, analysis_ts_db_matches = 295
DEBUG:root:last timeseries entry is 1672435958.12
DEBUG:root:Found 1 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1672195088.341}}
DEBUG:root:last uc entry is 1672195088.341

Regardless, we need to check the timeseries and the usercache separately, which means that we need to change find_entries. Let's add a flag (retry) for backwards compatibility and set it to false. For the common case where we are requesting data for a week at a time, we will not hit the limit and will not need to take the performance hit of retrying

@shankari
Copy link
Contributor Author

shankari commented Jan 16, 2023

Added the retry flag and some more logging. And even without the retry, we do read all the points successfully while reading from a restored mongodump. Not quite sure why we didn't do this on the server - maybe we had dropped down the max limit of entries read? Alas, the server is now shut down, so I can't verify.

DEBUG:root:start_day_ts = 1577836800 (2020-01-01T00:00:00+00:00), end_day_ts = 1703980800 (2023-12-31T00:00:00+00:00)
WARNING:root:Called find_entries with retry=False but duration = 126144000 > 6 months, result is likely TRUNCATED
DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 601169
DEBUG:root:finished querying values for None, count = 103488
DEBUG:root:orig_ts_db_matches = 601169, analysis_ts_db_matches = 103488
DEBUG:root:after reading 353488 entries, last timeseries entry is 1672435958.12(2022-12-30T21:32:38.120000+00:00)
DEBUG:root:Found 1030 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}
DEBUG:root:after reading 1030 entries, last uc entry is 1672195088.341(2022-12-28T02:38:08.341000+00:00)

Or maybe there's just something very weird going on locally - we should be limiting to 250k, but the orig_ts_db_matches is 601k.

{
    "timeseries": {
        "url": "localhost",
        "result_limit": 250000
    }
}

@shankari
Copy link
Contributor Author

Actually, even retrying at the cache_series level is not enough because we call find_entries` in the timeseries to read the entries from the timeseries and even that is a concatenation of the timeseries and analysis timeseries results.
We have to read all of them separately to retry.

DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1672435958.12}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 363
DEBUG:root:finished querying values for None, count = 1
DEBUG:root:orig_ts_db_matches = 363, analysis_ts_db_matches = 1
DEBUG:root:Retrieved 364 entries for TimeQuery data.ts with range [1672435958.12, 1703980800)
DEBUG:root:curr_batch has length 364, not finished reading for TimeQuery data.ts with range [1672435958.12, 1703980800)
DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1672435958.12}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 363
DEBUG:root:finished querying values for None, count = 1
DEBUG:root:orig_ts_db_matches = 363, analysis_ts_db_matches = 1
DEBUG:root:Retrieved 364 entries for TimeQuery data.ts with range [1672435958.12, 1703980800)
DEBUG:root:curr_batch has length 364, not finished reading for TimeQuery data.ts with range [1672435958.12, 1703980800)
```

@shankari
Copy link
Contributor Author

wrt #845 (comment) this is the same reason. Note that there are 601,169 entries in the timeseries, and 103,488 entries in the analysis timeseries, but our final batch size is only 353,488 entries. The last entry is in december, but it is from the analysis timeseries (recreated_location) that was tagged on to the end.

DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 601169
DEBUG:root:finished querying values for None, count = 103488
DEBUG:root:orig_ts_db_matches = 601169, analysis_ts_db_matches = 103488
DEBUG:root:after reading 353488 entries, last timeseries entry is 1672435958.12(2022-12-30T21:32:38.120000+00:00)
DEBUG:root:Found 1030 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}
DEBUG:root:after reading 1030 entries, last uc entry is 1672195088.341(2022-12-28T02:38:08.341000+00:00)
DEBUG:root:Found 68 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.key': 'background/motion_activity'}], 'metadata.write_ts': {'$lte': 1703980800, '$gte': 1577836800}}

@shankari
Copy link
Contributor Author

Note also that the dump script has a validate_truncation function but it compares the list to the exact max limit (250k)
Since we combine the timeseries and analysis results, we actually end up with 353k > 250k, but the checks are for equality, so they don't work any more.

def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
    MAX_LIMIT = 25 * 10000
    if len(loc_entry_list) == MAX_LIMIT:
        logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list))
    if len(trip_entry_list) == MAX_LIMIT:
        logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list))
    if len(place_entry_list) == MAX_LIMIT:
        logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list))

The only principled solution is to actually read the three types of entries directly from the database and to retry each of them separately.

shankari added a commit to shankari/e-mission-server that referenced this issue Jan 17, 2023
We need to pass in the user_id as well

Testing done:
works without retry
e-mission/e-mission-docs#845 (comment)
@shankari
Copy link
Contributor Author

Porting the changes from master to the gis branch, that we actually use everywhere was a bit challenging since all the export code in the GIS branch is pulled out into a separate file (emission/export/export.py). I went through the changes carefully, and copied the new get_with_retry over exactly.

To test, I ran the export pipeline on both master and the gis branch.
The master export had one additional location entry.

<             "$oid": "6377b6074202ae5631d96451"
<         "metadata": {
<             "key": "background/filtered_location",
<         "data": {
<             "fmt_time": "2022-11-18T09:09:11.620000-07:00",

Re-generating to see how the logs differ

@shankari
Copy link
Contributor Author

On master:

DEBUG:root:finished querying values for None, count = 79243
DEBUG:root:Retrieved batch of size 79243, cumulative 79243 entries of total 79243 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:finished querying values for None, count = 12768
DEBUG:root:Retrieved batch of size 12768, cumulative 12768 entries of total 12768 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:For 802667b6-371f-45b2-9c7a-bb051244836a, found 664 messages in usercache
DEBUG:root:Found 664 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}}
DEBUG:root:Retrieved batch of size 664, cumulative 664 entries of total 664 documents for TimeQuery data.ts with range [1664582400, 1703980800)

On GIS

DEBUG:root:finished querying values for None, count = 79242
DEBUG:root:Retrieved batch of size 79242, cumulative 79242 entries of total 79242 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:finished querying values for None, count = 12768
DEBUG:root:Retrieved batch of size 12768, cumulative 12768 entries of total 12768 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:For 802667b6-371f-45b2-9c7a-bb051244836a, found 664 messages in usercache
DEBUG:root:Found 664 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}}
DEBUG:root:Retrieved batch of size 664, cumulative 664 entries of total 664 documents for TimeQuery data.ts with range [1664582400, 1703980800)

So get_entries_for_timeseries actually returns different results on master and gis? This seems whack

@shankari
Copy link
Contributor Author

Running the query directly against the database while on the GIS branch does give the right number

>>> edb.get_timeseries_db().count_documents({'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'),'data.ts': {'$lte': 1703980800, '$gte': 1664582400}})
79243

@shankari
Copy link
Contributor Author

ok, so the difference is due to the invalid check

>>> edb.get_timeseries_db().count_documents({'invalid': {'$exists': False}, 'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}})
79242
>>> edb.get_timeseries_db().count_documents({'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}})
79243

Double-checking, the new entry is in fact invalid

>>> edb.get_timeseries_db().find_one({"_id": boi.ObjectId("6377b6074202ae5631d96451")})
{'_id': ObjectId('6377b6074202ae5631d96451'), 'user_id': UUID('...'), 'metadata': {'key': 'background/filtered_location', 'write_fmt_time': '2022-11-18T09:09:15.153000-07:00'}, 'data': {'fmt_time': '2022-11-18T09:09:11.620000-07:00'}, 'invalid': True}

Also true in the diff

<             "$oid": "6377b6074202ae5631d96451"
<         },
...
<         },
<         "invalid": true
<     },

And the GIS branch does in fact include the invalid check in the query

    def _get_query(self, key_list = None, time_query = None, geo_query = None,
                   extra_query_list = []):
...
        ret_query = {"invalid": {"$exists": False}}
        ret_query.update(self.user_query)

while the master branch does not

    def _get_query(self, key_list = None, time_query = None, geo_query = None,
                   extra_query_list = []):
...
        ret_query = {}

Phew! I thought I was going crazy for a minute there

@shankari
Copy link
Contributor Author

As a side note, it looks like we don't have the invalid key as an index in the database. Since we include it in every single search, we should probably add it, it will improve the performance.

@shankari
Copy link
Contributor Author

shankari commented Jan 17, 2023

restored for the test user

INFO:root:Loading file or prefix /tmp/test_smaller_retrieve_gis_2
INFO:root:Found 2 matching files for prefix /tmp/test_smaller_retrieve_gis_2
INFO:root:files are ['/tmp/test_smaller_retrieve_gis_2_....gz', '/tmp/test_smaller_retrieve_gis_2_pipelinestate_....gz'] ... ['/tmp/test_smaller_retrieve_gis_2_....gz']
INFO:root:==================================================
INFO:root:Loading data from file /tmp/test_smaller_retrieve_gis_2_....gz
INFO:root:Analyzing timeline...
INFO:root:timeline has 97206 entries
INFO:root:timeline has data from 1 users
INFO:root:timeline has the following unique keys {'statemachine/transition', 'analysis/inferred_section', 'stats/client_time', 'background/location', 'analysis/inferred_trip', 'segmentation/raw_untracked', 'inference/prediction', 'background/filtered_location', 'stats/pipeline_error', 'manual/mode_confirm', 'segmentation/raw_section', 'analysis/cleaned_stop', 'analysis/cleaned_untracked', 'stats/client_nav_event', 'analysis/inferred_labels', 'background/motion_activity', 'background/battery', 'analysis/cleaned_section', 'stats/client_error', 'manual/replaced_mode', 'analysis/expected_trip', 'segmentation/raw_place', 'analysis/cleaned_place', 'manual/purpose_confirm', 'stats/pipeline_time', 'analysis/confirmed_trip', 'analysis/cleaned_trip', 'segmentation/raw_trip', 'segmentation/raw_stop', 'inference/labels', 'analysis/recreated_location', 'stats/server_api_time'}
INFO:root:timeline for user ... contains analysis results
For uuid ..., finished loading 7 entries into the usercache and 0 entries into the timeseries
Loading pipeline state for ... from /tmp/test_smaller_retrieve_gis_2_pipelinestate_....gz
INFO:root:ignoring duplicate key error while restoring pipeline state
INFO:root:Creating user entries for 1 users
INFO:root:pattern = user-%01d
INFO:root:For 1 users, loaded 345442 raw entries, 66001 processed entries and 13 pipeline states

we now have those location entries

pd.json_normalize(list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2023-01-01T08:23:48.014000-07:00
1   2023-01-01T08:23:17.079000-07:00
2   2023-01-01T08:22:47.037000-07:00
3   2023-01-01T08:21:14.033000-07:00
4   2023-01-01T08:20:12.074000-07:00
5   2023-01-01T08:19:41.096000-07:00
6   2023-01-01T08:19:11.055000-07:00
7   2023-01-01T08:18:40.019000-07:00
8   2023-01-01T08:18:09.021000-07:00
9   2023-01-01T08:17:38.002000-07:00
10  2023-01-01T08:16:36.016000-07:00
11  2023-01-01T08:16:05.105000-07:00
12  2023-01-01T08:15:35.090000-07:00
13  2023-01-01T08:15:05.083000-07:00
14  2023-01-01T08:14:35.025000-07:00
15  2023-01-01T08:14:03.121000-07:00
16  2022-12-30T14:35:41.025000-07:00
17  2022-12-30T14:35:10.056000-07:00
18  2022-12-30T14:34:40.047000-07:00
19  2022-12-30T14:34:09.100000-07:00

@shankari
Copy link
Contributor Author

And the pipeline has since run successfully

For ...: on samsung, trip count = 984, location count = 64374, first trip = 2022-01-02T08:57:02.028000-07:00, last trip = 2023-01-17T13:38:56.016000-07:00, confirmed_pct  = approximately 0.98, last install: 2023-01-17T05:25:49.791000-07:00

@shankari
Copy link
Contributor Author

Will close this once we have copied over data for all the other participants as well

@shankari
Copy link
Contributor Author

shankari commented Jan 19, 2023

Looking at the other participants

id last copied over location entry last trip last location entry Issue OS
12 2022-12-31T22:50:21.373000-07:00 2023-01-02T11:36:53.495000-07:00 2023-01-02T11:39:11.811000-07:00 No more location points; app uninstall? TCL
13 2022-12-05T23:25:32.409000-07:00 2022-12-30T13:30:14.693000-07:00 2023-01-14T18:54:40.730000-07:00 Copy over missing data motorola
14 2022-12-04T10:43:39.999537-07:00 2022-10-27T23:41:00.000556+10:00 2023-01-18T15:44:07.005393-07:00 Copy over missing data Apple
15 2022-12-31T03:03:39.999470-07:00 2023-01-04T14:30:00.000063-07:00 2023-01-04T14:30:00.000063-07:00 No more location points, app uninstall? Apple
16 2022-12-15T14:38:05.979000-07:00 2022-12-30T06:16:34.483000-07:00 2023-01-17T14:46:00.200000-07:00 Copy over missing data Samsung
17 2022-09-29T19:22:23.999000-06:00 2023-01-06T16:42:31.998000-07:00 2023-01-06T19:07:04.114000-07:00 No more location points, app uninstall? TCL

[1] but we have data until 12-31, missing point is from 12-30? data from 12-31 is from new app, previous data copy is only until 12-05
[2] we have data from Dec, but the missing location point is from 2022-10-27 and the location entries before that are from 2022-02-04?! Through searching, it looks like after Feb, we only have entries from 2022-11-05. So we will likely need to copy in multiple batches.

@shankari
Copy link
Contributor Author

shankari commented Jan 19, 2023

Copied over:

  • 13 (2022-12-05 2023-01-02)
  • 16 (2022-12-05 2023-01-02)
  • 14 (in chunks of one month each, starting from 2022-02-01 2022-03-02)

@shankari
Copy link
Contributor Author

So 16 is weird. The missing item is not the location entry, but the place for the trip

2023-01-18 13:59:13,231:ERROR:140693321615168:Sectioning failed for user ...
2023-01-18 13:59:13,226:INFO:140693321615168:++++++++++++++++++++Processing trip 61d1ddc9200e4476992912a8 for user ....++++++++++++++++++++
Traceback (most recent call last):
File "/usr/src/app/emission/analysis/intake/segmentation/section_segmentation.py", line 52, in segment_current_sections
segment_trip_into_sections(user_id, trip_entry, trip_entry.data.source)
File "/usr/src/app/emission/analysis/intake/segmentation/section_segmentation.py", line 66, in segment_trip_into_sections
distance_from_place = _get_distance_from_start_place_to_end(trip_entry)
File "/usr/src/app/emission/analysis/intake/segmentation/section_segmentation.py", line 200, in _get_distance_from_start_place_to_end
start_place = esda.get_object(esda.RAW_PLACE_KEY, start_place_id)
File "/usr/src/app/emission/storage/decorations/analysis_timeseries_queries.py", line 45, in get_object
return get_entry(key, object_id).data
AttributeError: 'NoneType' object has no attribute 'data'

The missing place is in from 2021-12-31, the missing trip is from 2022-01-02
And the reason we were processing these old trips was because the section segmentation was reset completely

2023-01-18 13:59:13,019:INFO:140693321615168:For stage PipelineStages.SECTION_SEGMENTATION, start_ts is None

Although it was not reset in the original mongodump.

>>> list(edb.get_pipeline_state_db().find({"user_id": UUID("..."), "pipeline_stage": 2}))
[{'_id': ObjectId('60d532c55d607068e6913d4c'), 'user_id': UUID('...'), 'pipeline_stage': 2, 'curr_run_ts': None, 'last_processed_ts': 1672406199.483, 'last_ts_run': 1672416825.4551017}]

>>> list(edb.get_pipeline_state_db().find({"user_id": UUID(".."), "pipeline_stage": 2}))
[{'_id': ObjectId('63b19ee41ed87bba98d73510'), 'user_id': UUID('...'), 'pipeline_stage': 2, 'curr_run_ts': None, 'last_processed_ts': None, 'last_ts_run': None}]

And we are missing analysis data for the user before Jan 2022

{'user_id': UUID('...'), 'metadata': {'key': 'analysis/confirmed_trip', 'end_fmt_time': '2021-06-25T06:42:50-06:00', 'start_fmt_time': '2021-06-25T06:36:50-06:00'}

{'user_id': UUID('fbff5e08-b7f2-4a94-ab4b-2d7371999ef7'), 'metadata': {'key': 'analysis/confirmed_trip', 'end_fmt_time': '2022-01-02T09:31:06-07:00', 'start_fmt_time': '2022-01-02T07:55:07.322381-07:00'}

Ok so it looks like I copied over only data from 2022-01-01 onwards
I then reset the pipeline (at least for the section stage)
This meant that the start place of the first trip was not present, causing us to fail

(1) One fix is to reset the pipeline completely
(2) A second fix is to get data from 2021 and copy it over

Let's go with (2) so that we retain the object ids in case we want to handle any more fixes.

@shankari
Copy link
Contributor Author

shankari commented Jan 19, 2023

Found another gap for user 16 from 2022-08-09 to 2022-09-12

0          2022-09-12
1   2022-09-12
2          2022-09-12
3   2022-09-12
4   2022-08-09
5          2022-08-09
6   2022-08-09
7          2022-08-09

@shankari
Copy link
Contributor Author

After loading that gap, am currently at 2022-06-11T16:14:11-06:00

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant