-
Notifications
You must be signed in to change notification settings - Fork 6
/
classes.py
2252 lines (1987 loc) · 100 KB
/
classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import math
import types
from datetime import datetime, timedelta
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Type, Union
from packaging import version
if TYPE_CHECKING:
import altair as alt
from sktime.forecasting.base import BaseForecaster
import numpy as np
import pandas as pd
import pytz
from pandas.core.groupby import DataFrameGroupBy
from pandas.util._decorators import cache_readonly
from sqlalchemy import (
Column,
DateTime,
Float,
ForeignKey,
Integer,
Interval,
and_,
func,
select,
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
from sqlalchemy.orm import Session, backref, declarative_mixin, relationship
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.schema import Index
from sqlalchemy.sql.elements import BinaryExpression
from sqlalchemy.sql.expression import Selectable
import timely_beliefs.utils as tb_utils
from timely_beliefs.beliefs import probabilistic_utils
from timely_beliefs.beliefs import utils as belief_utils
from timely_beliefs.beliefs.utils import is_pandas_structure, is_tb_structure, meta_repr
from timely_beliefs.db_base import Base
from timely_beliefs.sensors import utils as sensor_utils
from timely_beliefs.sensors.classes import DBSensor, Sensor, SensorDBMixin
from timely_beliefs.sensors.func_store.knowledge_horizons import ex_ante, ex_post
from timely_beliefs.sources import utils as source_utils
from timely_beliefs.sources.classes import BeliefSource, DBBeliefSource
METADATA = ["sensor", "event_resolution"]
DatetimeLike = Union[datetime, str, pd.Timestamp]
TimedeltaLike = Union[timedelta, str, pd.Timedelta]
JoinTarget = Union[
Selectable,
type,
AliasedClass,
types.FunctionType,
]
class TimedBelief(object):
"""
The basic description of a data point as a belief, which includes the following:
- a sensor (what the belief is about)
- an event (an instant or period of time that the belief is about)
- a horizon (indicating when the belief was formed with respect to the event)
- a source (who or what formed the belief)
- a value (what was believed)
- a cumulative probability (the likelihood of the value being equal or lower than stated)*
* The default assumption is that the mean value is given (cp=0.5), but if no beliefs about possible other outcomes
are given, then this will be treated as a deterministic belief (cp=1). As an alternative to specifying a cumulative
probability explicitly, you can specify an integer number of standard deviations which is translated
into a cumulative probability assuming a normal distribution (e.g. sigma=-1 becomes cp=0.1587).
"""
event_start: datetime
belief_horizon: timedelta
event_value: float # todo: allow string to represent beliefs about labels? But what would nominal data mean for the interpretation of cp?
sensor: Sensor
source: BeliefSource
cumulative_probability: float
def __init__(
self,
sensor: Sensor,
source: BeliefSource | str | int,
event_value: float | None = None,
cumulative_probability: float | None = None,
cp: float | None = None,
sigma: float | None = None,
event_start: DatetimeLike | None = None,
event_time: DatetimeLike | None = None,
belief_horizon: TimedeltaLike | None = None,
belief_time: DatetimeLike | None = None,
):
self.sensor = sensor
self.source = source_utils.ensure_source_exists(source)
self.event_value = event_value
if [cumulative_probability, cp, sigma].count(None) not in (2, 3):
raise ValueError(
"Must specify either cumulative_probability, cp, sigma or none of them (0.5 is the default value)."
)
if cumulative_probability is not None:
self.cumulative_probability = cumulative_probability
elif cp is not None:
self.cumulative_probability = cp
elif sigma is not None:
self.cumulative_probability = 1 / 2 + (math.erf(sigma / 2**0.5)) / 2
else:
self.cumulative_probability = 0.5
if [event_start, event_time].count(None) != 1:
raise ValueError("Must specify either an event_start or an event_time.")
elif event_start is not None:
self.event_start = tb_utils.parse_datetime_like(event_start, "event_start")
elif event_time is not None:
if self.sensor.event_resolution != timedelta():
raise KeyError(
"Sensor has a non-zero resolution, so it doesn't measure instantaneous events. "
"Use event_start instead of event_time."
)
self.event_start = tb_utils.parse_datetime_like(event_time, "event_time")
if [belief_horizon, belief_time].count(None) != 1:
raise ValueError("Must specify either a belief_horizon or a belief_time.")
elif belief_horizon is not None:
self.belief_horizon = tb_utils.parse_timedelta_like(belief_horizon)
elif belief_time is not None:
belief_time = tb_utils.parse_datetime_like(belief_time, "belief_time")
self.belief_horizon = (
self.sensor.knowledge_time(self.event_start, self.event_resolution)
- belief_time
)
def __repr__(self):
return (
"<TimedBelief: at %s, the value of %s is %.2f (by %s with horizon %s)>"
% (
self.event_start,
self.sensor,
self.event_value,
self.source,
self.belief_horizon,
)
)
@hybrid_property
def event_end(self) -> datetime:
return self.event_start + self.sensor.event_resolution
@hybrid_property
def knowledge_time(self) -> datetime:
return self.sensor.knowledge_time(self.event_start, self.event_resolution)
@hybrid_property
def knowledge_horizon(self) -> timedelta:
return self.sensor.knowledge_horizon(self.event_start, self.event_resolution)
@hybrid_property
def event_resolution(self) -> timedelta:
return self.sensor.event_resolution
@hybrid_property
def belief_time(self) -> datetime:
return self.knowledge_time - self.belief_horizon
@property
def source_id(self):
"""Convenience method so these and DBTimedBelief can be treated equally"""
if self.source is not None:
return self.source.name
return None
@declarative_mixin
class TimedBeliefDBMixin(TimedBelief):
"""
Mixin class for a table with beliefs.
The fields source and sensor do not point to another table - overwrite them to make that happen.
"""
@declared_attr
def __table_args__(cls):
return (
Index(
f"{cls.__tablename__}_search_session_idx",
"event_start",
"sensor_id",
"source_id",
postgresql_include=[
"belief_horizon", # we use min() on this (most_recent_beliefs_only)
],
),
Index(
f"{cls.__tablename__}_search_session_singleevent_idx",
"event_start",
"sensor_id",
),
)
event_start = Column(DateTime(timezone=True), primary_key=True, index=True)
belief_horizon = Column(Interval(), nullable=False, primary_key=True)
cumulative_probability = Column(
Float, nullable=False, primary_key=True, default=0.5
)
event_value = Column(Float, nullable=False)
@declared_attr
def sensor_id(cls):
return Column(
Integer(),
ForeignKey("sensor.id", ondelete="CASCADE"),
primary_key=True,
index=True,
)
@declared_attr
def source_id(cls):
return Column(Integer, ForeignKey("belief_source.id"), primary_key=True)
def __init__(
self,
sensor: DBSensor,
source: DBBeliefSource,
event_value: float | None = None,
cumulative_probability: float | None = None,
cp: float | None = None,
sigma: float | None = None,
event_start: DatetimeLike | None = None,
event_time: DatetimeLike | None = None,
belief_horizon: TimedeltaLike | None = None,
belief_time: DatetimeLike | None = None,
):
self.sensor_id = sensor.id
self.source_id = source.id
TimedBelief.__init__(
self,
sensor=sensor,
source=source,
event_value=event_value,
cumulative_probability=cumulative_probability,
cp=cp,
sigma=sigma,
event_start=event_start,
event_time=event_time,
belief_horizon=belief_horizon,
belief_time=belief_time,
)
@classmethod
def add_to_session(
cls,
session: Session,
beliefs_data_frame: "BeliefsDataFrame",
expunge_session: bool = False,
allow_overwrite: bool = False,
bulk_save_objects: bool = True,
commit_transaction: bool = False,
):
"""Add a BeliefsDataFrame as timed beliefs to a database session.
If you are adding lots of beliefs, it's most efficient to use expunge_session=True and allow_overwrite=False
:param session: the database session to use
:param beliefs_data_frame: the BeliefsDataFrame to be persisted
:param expunge_session: if True, all non-flushed instances are removed from the session before adding beliefs.
Expunging can resolve problems you might encounter with states of objects in your session.
When using this option, you might want to flush newly-created objects which are not beliefs
(e.g. a sensor or data source object).
:param allow_overwrite: if True, new objects are merged
if False, objects are added to the session or bulk saved
:param bulk_save_objects: if True, objects are bulk saved with session.bulk_save_objects(),
which is quite fast but has several caveats, see:
https://docs.sqlalchemy.org/orm/persistence_techniques.html#bulk-operations-caveats
if False, objects are added to the session with session.add_all()
:param commit_transaction: if True, the session is committed
if False, you can still add other data to the session
and commit it all within an atomic transaction
"""
if beliefs_data_frame.empty:
return
# Belief timing is stored as the belief horizon rather than as the belief time
beliefs_data_frame = (
beliefs_data_frame.convert_index_from_belief_time_to_horizon().reset_index()
)
beliefs = [
cls(sensor=beliefs_data_frame.sensor, **d)
for d in beliefs_data_frame.to_dict("records")
]
if expunge_session:
session.expunge_all()
if bulk_save_objects:
# serialize sources and sensor, while adding new sources
# serialize sources
beliefs_data_frame["source_id"] = beliefs_data_frame["source"].apply(
lambda x: x.id
)
# Add new sources
newbies = pd.isnull(beliefs_data_frame["source_id"])
if any(newbies):
session.add_all(beliefs_data_frame.loc[newbies, "source"])
session.flush() # assign IDs
beliefs_data_frame.loc[newbies, "source_id"] = beliefs_data_frame.loc[
newbies, "source"
].apply(lambda x: x.id)
# serialize sensor
beliefs_data_frame["sensor_id"] = beliefs_data_frame.sensor.id
beliefs_data_frame = beliefs_data_frame.drop(columns=["source"])
smt = insert(cls).values(beliefs_data_frame.to_dict("records"))
if allow_overwrite:
smt = smt.on_conflict_do_update(
index_elements=[
"event_start",
"belief_horizon",
"source_id",
"sensor_id",
"cumulative_probability",
],
set_=dict(event_value=smt.excluded.event_value),
)
session.execute(smt)
else:
if allow_overwrite:
for belief in beliefs:
session.merge(belief)
else:
session.add_all(beliefs)
if commit_transaction:
session.commit()
@classmethod
def search_session( # noqa: C901
cls,
session: Session,
sensor: SensorDBMixin | int,
sensor_class: Type[SensorDBMixin] | None = DBSensor,
event_starts_after: datetime | None = None,
event_ends_after: datetime | None = None,
event_starts_before: datetime | None = None,
event_ends_before: datetime | None = None,
beliefs_after: datetime | None = None,
beliefs_before: datetime | None = None,
horizons_at_least: timedelta | None = None,
horizons_at_most: timedelta | None = None,
source: BeliefSource | list[BeliefSource] | None = None,
most_recent_beliefs_only: bool = False,
most_recent_events_only: bool = False,
most_recent_only: bool = False,
place_beliefs_in_sensor_timezone: bool = True,
place_events_in_sensor_timezone: bool = True,
custom_filter_criteria: list[BinaryExpression] | None = None,
custom_join_targets: list[JoinTarget] | None = None,
) -> "BeliefsDataFrame":
"""Search a database session for beliefs about sensor events.
The optional arguments represent optional filters, with two exceptions:
- sensor_class makes it possible to create a query on sensor subclasses
- custom_join_targets makes it possible to add custom filters using other (incl. subclassed) targets
A word about query speed:
As data sets can become quite large (and get wrapped into a BeliefsDataFrame), we recommend to filter by sensor and source, as well as a time range, in order to limit processing time.
You should also consider selecting only the most recent beliefs (for each event, there might have been more than one).
Also, look into selecting only the most recent events (per source).
These two latter tips decrease the dataset and thus post-processing time. They use sub-queries, though, so be sure to use the main filtering, as well, like time range.
Finally, if you only need one most recent value (one result row), there is a fast-track (most_recent_only).
:param session: the database session to use
:param sensor: sensor to which the beliefs pertain, or its unique sensor id
:param sensor_class: optionally pass the sensor (sub)class explicitly (only needed if you pass a sensor id instead of a sensor, and your sensor class is not DBSensor); the class should be mapped to a database table
:param event_starts_after: only return beliefs about events that start after this datetime (inclusive)
:param event_ends_after: only return beliefs about events that end after this datetime (exclusive for non-instantaneous events, inclusive for instantaneous events)
note that the first event may transpire partially before this datetime
:param event_starts_before: only return beliefs about events that start before this datetime (exclusive for non-instantaneous events, inclusive for instantaneous events)
note that the last event may transpire partially after this datetime
:param event_ends_before: only return beliefs about events that end before this datetime (inclusive)
:param beliefs_after: only return beliefs formed after this datetime (inclusive)
:param beliefs_before: only return beliefs formed before this datetime (inclusive)
:param horizons_at_least: only return beliefs with a belief horizon equal or greater than this timedelta (for example, use timedelta(0) to get ante knowledge time beliefs)
:param horizons_at_most: only return beliefs with a belief horizon equal or less than this timedelta (for example, use timedelta(0) to get post knowledge time beliefs)
:param source: only return beliefs formed by the given source or list of sources. This speeds up your query, so if you know the source, let the query know.
:param most_recent_beliefs_only: only return the most recent beliefs for each event from each source (minimum belief horizon)
:param most_recent_events_only: only return (post knowledge time) beliefs for the most recent event (maximum event start) for each source
:param most_recent_only: only looks up one most recent event and then only returns the most recent belief about that event that it finds. This is a considerable fast-track if only one most recent belief is all you need.
:param place_beliefs_in_sensor_timezone: if True (the default), belief times are converted to the timezone of the sensor
:param place_events_in_sensor_timezone: if True (the default), event starts are converted to the timezone of the sensor
:param custom_filter_criteria: additional filters, such as ones that rely on subclasses
:param custom_join_targets: additional join targets, to accommodate filters that rely on other targets (e.g. subclasses)
:returns: a multi-index DataFrame with all relevant beliefs
"""
source_class = cls.source.property.mapper.class_
# Check for timezone-aware datetime input
if not pd.isnull(event_starts_after):
event_starts_after = tb_utils.parse_datetime_like(
event_starts_after, "event_starts_after"
)
if not pd.isnull(event_ends_after):
event_ends_after = tb_utils.parse_datetime_like(
event_ends_after, "event_ends_after"
)
if not pd.isnull(event_starts_before):
event_starts_before = tb_utils.parse_datetime_like(
event_starts_before, "event_starts_before"
)
if not pd.isnull(event_ends_before):
event_ends_before = tb_utils.parse_datetime_like(
event_ends_before, "event_ends_before"
)
if not pd.isnull(beliefs_after):
beliefs_after = tb_utils.parse_datetime_like(
beliefs_after, "belief_not_before"
)
if not pd.isnull(beliefs_before):
beliefs_before = tb_utils.parse_datetime_like(
beliefs_before, "belief_before"
)
# Query sensor, required for its timing properties
if isinstance(sensor, int):
# Check for proper sensor class
if not issubclass(sensor_class, SensorDBMixin):
raise ValueError(
f"sensor {sensor} is a {type(sensor)}, which is not a subclass of {SensorDBMixin}"
)
sensor = session.execute(
select(sensor_class).filter(sensor_class.id == sensor)
).scalar_one_or_none()
if sensor is None:
raise ValueError("No such sensor")
# Parse source parameter
sources: list = []
if source is not None:
sources = [source] if not isinstance(source, list) else source
# Fast-track empty list of sources
if sources == []:
return BeliefsDataFrame(sensor=sensor, beliefs=[])
# Get bounds on the knowledge horizon (so we can already roughly filter by belief time)
(
knowledge_horizon_min,
knowledge_horizon_max,
) = sensor_utils.eval_verified_knowledge_horizon_fnc(
sensor.knowledge_horizon_fnc,
sensor.knowledge_horizon_par,
event_resolution=sensor.event_resolution,
get_bounds=True,
)
def apply_event_timing_filters(q):
"""Apply filters that concern the event time.
This includes any custom filters
"""
if not pd.isnull(event_starts_after):
q = q.filter(cls.event_start >= event_starts_after)
if not pd.isnull(event_ends_after):
if sensor.event_resolution == timedelta(0):
# inclusive
q = q.filter(cls.event_start >= event_ends_after)
else:
# exclusive
q = q.filter(
cls.event_start > event_ends_after - sensor.event_resolution
)
if not pd.isnull(event_starts_before):
if sensor.event_resolution == timedelta(0):
# inclusive
q = q.filter(cls.event_start <= event_starts_before)
else:
# exclusive
q = q.filter(cls.event_start < event_starts_before)
if not pd.isnull(event_ends_before):
q = q.filter(
cls.event_start <= event_ends_before - sensor.event_resolution
)
return q
def apply_belief_timing_filters(q):
"""Apply filters that concern the belief timing.
This includes any custom filters
"""
# Apply rough belief time filter
if not pd.isnull(
beliefs_after
) and belief_utils.extreme_timedeltas_not_equal(
knowledge_horizon_min, timedelta.min
):
q = q.filter(
cls.event_start - cls.belief_horizon
>= beliefs_after + knowledge_horizon_min
)
if not pd.isnull(
beliefs_before
) and belief_utils.extreme_timedeltas_not_equal(
knowledge_horizon_max, timedelta.max
):
q = q.filter(
cls.event_start - cls.belief_horizon
<= beliefs_before + knowledge_horizon_max
)
# Apply belief horizon filter
if not pd.isnull(horizons_at_least):
q = q.filter(cls.belief_horizon >= horizons_at_least)
if not pd.isnull(horizons_at_most):
q = q.filter(cls.belief_horizon <= horizons_at_most)
# Apply custom filter criteria and join targets
if custom_filter_criteria is not None:
q = q.filter(*custom_filter_criteria)
if custom_join_targets is not None:
for target in custom_join_targets:
q = q.join(target)
return q
# Main query
q = select(
cls.event_start,
cls.belief_horizon,
cls.source_id,
cls.cumulative_probability,
cls.event_value,
).filter(cls.sensor_id == sensor.id)
q = apply_event_timing_filters(q)
q = apply_belief_timing_filters(q)
# Apply source filter
if len(sources) > 0:
q = q.join(source_class).filter(cls.source_id.in_([s.id for s in sources]))
# Switch to fast-track if user wants both most recent events & beliefs and one source is requested.
# In this case, we know only one row will be returned. (If only one source exists in the to-be-returned dataset,
# we'd get one row without filtering for source, but we cannot know if that happens before we query.)
if (most_recent_beliefs_only and most_recent_events_only) and len(sources) == 1:
most_recent_only = True
most_recent_events_only = False
most_recent_beliefs_only = False
# Otherwise, we should not allow to mix the two most-recent-X approaches, for clarity
elif (most_recent_beliefs_only or most_recent_events_only) and (
most_recent_only
):
raise ValueError(
"most_recent_events|beliefs_only can not be used with most_recent_only."
)
# Apply most recent beliefs filter as subquery
most_recent_beliefs_only_incompatible_criteria = (
beliefs_before is not None or beliefs_after is not None
) and sensor.knowledge_horizon_fnc not in (ex_ante.__name__, ex_post.__name__)
if (
most_recent_beliefs_only
and not most_recent_beliefs_only_incompatible_criteria
):
subq = select(
cls.event_start,
cls.source_id,
func.min(cls.belief_horizon).label("most_recent_belief_horizon"),
)
# Apply event and belief timing filters to the subquery, too,
# before taking the minimum horizon (the former is crucial for speed)
subq = apply_event_timing_filters(subq)
subq = apply_belief_timing_filters(subq)
subq = (
subq.filter(cls.sensor_id == sensor.id)
.group_by(cls.event_start, cls.source_id)
.subquery()
)
q = q.join(
subq,
and_(
cls.event_start == subq.c.event_start,
cls.source_id == subq.c.source_id,
cls.belief_horizon == subq.c.most_recent_belief_horizon,
),
)
# Apply most recent events filter as subquery
if most_recent_events_only:
subq_most_recent_events = select(
cls.source_id,
func.max(cls.event_start).label("most_recent_event_start"),
)
subq_most_recent_events = apply_event_timing_filters(
subq_most_recent_events
)
subq_most_recent_events = apply_belief_timing_filters(
subq_most_recent_events
)
subq_most_recent_events = (
subq_most_recent_events.filter(cls.sensor_id == sensor.id)
.group_by(cls.source_id)
.subquery()
)
q = q.join(
subq_most_recent_events,
and_(
cls.source_id == subq_most_recent_events.c.source_id,
cls.event_start
== subq_most_recent_events.c.most_recent_event_start,
),
)
# Apply fast-track most-recent-only approach
# Note that currently, this only works for a deterministic belief. A probabilistic belief would have multiple rows
# (sharing the same event start and belief horizon).
if most_recent_only:
q = q.order_by(cls.event_start.desc(), cls.belief_horizon.asc()).limit(1)
# Useful debugging code, let's keep it here
# from sqlalchemy.dialects import postgresql
# print(q.compile(dialect=postgresql.dialect()))
# Build our DataFrame of beliefs
df = pd.DataFrame(session.execute(q))
if df.empty:
return BeliefsDataFrame(sensor=sensor)
df.columns = [
"event_start",
"belief_horizon",
"source_id",
"cumulative_probability",
"event_value",
]
# Fill in sources
if source is None:
source_ids = df["source_id"].unique().tolist()
sources = session.scalars(
select(source_class).filter(source_class.id.in_(source_ids))
).all()
source_map = {source.id: source for source in sources}
df["source_id"] = df["source_id"].map(source_map)
df = df.rename(columns={"source_id": "source"})
# Build our BeliefsDataFrame
df = BeliefsDataFrame(df, sensor=sensor)
df = df.convert_index_from_belief_horizon_to_time()
# Actually filter by belief time
if beliefs_after is not None:
df = df[df.index.get_level_values("belief_time") >= beliefs_after]
if beliefs_before is not None:
df = df[df.index.get_level_values("belief_time") <= beliefs_before]
# Select most recent beliefs using postprocessing in case of incompatible search criteria
if most_recent_beliefs_only and most_recent_beliefs_only_incompatible_criteria:
df = belief_utils.select_most_recent_belief(df)
# Convert timezone of beliefs and events to sensor timezone
if place_beliefs_in_sensor_timezone:
df = df.convert_timezone_of_belief_timing_index(sensor.timezone)
if place_events_in_sensor_timezone:
df = df.convert_timezone_of_event_timing_index(sensor.timezone)
return df
class DBTimedBelief(Base, TimedBeliefDBMixin):
"""Database representation of TimedBelief.
We get fields from the Mixin and configure sensor and source relationships.
We are not sure why the relationships cannot live in the Mixin as declared attributes,
but they have to be here (thus other custom implementations need to include them, as well).
"""
__tablename__ = "timed_beliefs"
sensor = relationship(
"DBSensor",
backref=backref(
"beliefs", lazy=True, cascade="all, delete-orphan", passive_deletes=True
),
)
source = relationship(
"DBBeliefSource",
backref=backref(
"beliefs", lazy=True, cascade="all, delete-orphan", passive_deletes=True
),
)
def __init__(
self,
sensor: DBSensor,
source: DBBeliefSource,
event_value: float | None = None,
cumulative_probability: float | None = None,
cp: float | None = None,
sigma: float | None = None,
event_start: DatetimeLike | None = None,
event_time: DatetimeLike | None = None,
belief_horizon: TimedeltaLike | None = None,
belief_time: DatetimeLike | None = None,
):
TimedBeliefDBMixin.__init__(
self,
sensor=sensor,
source=source,
event_value=event_value,
cumulative_probability=cumulative_probability,
cp=cp,
sigma=sigma,
event_start=event_start,
event_time=event_time,
belief_horizon=belief_horizon,
belief_time=belief_time,
)
Base.__init__(self)
class BeliefsSeries(pd.Series):
"""Just for slicing, to keep around the metadata."""
_metadata = METADATA
# Pre-Pandas 2.0, call __finalize__() after construction to inherit metadata.
if version.parse(pd.__version__) < version.parse("2.0.0"):
@property
def _constructor(self):
def f(*args, **kwargs):
return BeliefsSeries(*args, **kwargs).__finalize__(
self, method="inherit"
)
return f
else:
@property
def _constructor(self):
return partial(BeliefsSeries)
@property
def _constructor_expanddim(self):
def f(*args, **kwargs):
"""Call __finalize__() after construction to inherit metadata."""
# adapted from https://github.com/pandas-dev/pandas/issues/19850#issuecomment-367934440
return BeliefsDataFrame(*args, **kwargs).__finalize__(
self, method="inherit"
)
# workaround from https://github.com/pandas-dev/pandas/issues/32860#issuecomment-697993089
f._get_axis_number = super(BeliefsSeries, self)._get_axis_number
return f
def __finalize__(self, other, method=None, **kwargs):
"""Propagate metadata from other to self."""
for name in self._metadata:
object.__setattr__(self, name, getattr(other, name, None))
if hasattr(other, "name"):
object.__setattr__(self, "name", getattr(other, "name"))
return self
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
return
def __repr__(self):
"""Add the sensor and event resolution to the string representation of the BeliefsSeries."""
return super().__repr__() + "\n" + meta_repr(self)
@property
def event_frequency(self) -> timedelta | None:
"""Duration between observations of events.
:returns: a timedelta for regularly spaced observations
None for irregularly spaced observations
"""
return pd.Timedelta(pd.infer_freq(self.index.unique("event_start")))
class BeliefsDataFrame(pd.DataFrame):
"""Beliefs about a sensor.
A BeliefsDataFrame object is a pandas.DataFrame with the following specific data columns and MultiIndex levels:
columns: ["event_value"]
index levels: ["event_start", "belief_time", "source", "cumulative_probability"]
To initialize, pass sensor=Sensor("sensor_name"), together with data through one of these methods:
Method 1: pass a list of TimedBelief objects.
Method 2: pass a pandas DataFrame with columns ["event_start", "belief_time", "source", "event_value"]
- Optional column: "cumulative_probability" (the default is 0.5)
- Alternatively, use keyword arguments to replace columns containing unique values for each belief
Method 3: pass a pandas Series with DatetimeIndex and keyword arguments for "belief_time" or "belief_horizon", and "source"
- Alternatively, use the "event_start" keyword argument to ignore the index
In addition to the standard DataFrame constructor arguments,
BeliefsDataFrame also accepts the following keyword arguments:
:param beliefs: a list of TimedBelief objects used to initialize the BeliefsDataFrame
:param sensor: the Sensor object that each belief pertains to
:param source: the source of each belief in the input DataFrame (a BeliefSource, str or int)
:param event_start: the start of the event that each belief pertains to (a datetime)
:param belief_time: the time at which each belief was formed (a datetime) - use this as alternative to belief_horizon
:param belief_horizon: how long before (the event could be known) each belief was formed (a timedelta) - use this as alternative to belief_time
:param cumulative_probability: a float in the range [0, 1] describing the cumulative probability of each belief - use this e.g. to initialize a BeliefsDataFrame containing only the values at 95% cumulative probability
"""
_metadata = METADATA
@property
def _constructor(self):
def f(*args, **kwargs):
"""Call __finalize__() after construction to inherit metadata."""
return BeliefsDataFrame(*args, **kwargs).__finalize__(
self, method="inherit"
)
return f
@property
def _constructor_sliced(self):
def f(*args, **kwargs):
"""Call __finalize__() after construction to inherit metadata."""
# adapted from https://github.com/pandas-dev/pandas/issues/19850#issuecomment-367934440
return BeliefsSeries(*args, **kwargs).__finalize__(self, method="inherit")
return f
def __finalize__(self, other, method=None, **kwargs):
"""Propagate metadata from other to self."""
# merge operation: using metadata of the left object
# Check if sources have unique names
if hasattr(other, "objs"):
sources = []
for df in other.objs:
if "source" in df.index:
sources.extend(
df.index.get_level_values(level="source")
.unique()
.to_numpy(dtype="object")
)
sources = set(sources)
source_names = set(source.name for source in sources)
if len(source_names) != len(sources):
raise ValueError(
"Source names must be unique. Cannot initialise BeliefsDataFrame given the following unique sources:\n%s"
% sources
)
if method == "merge":
for name in self._metadata:
object.__setattr__(self, name, getattr(other.left, name, None))
# concat operation: using metadata of the first object
elif method == "concat":
for name in self._metadata:
object.__setattr__(self, name, getattr(other.objs[0], name, None))
else:
for name in self._metadata:
object.__setattr__(self, name, getattr(other, name, None))
return self
def __init__( # noqa: C901 todo: refactor, e.g. by detecting initialization method
self, *args, **kwargs
):
"""Initialise a multi-index DataFrame with beliefs about a unique sensor."""
# Initialized with a BeliefsSeries or BeliefsDataFrame
if len(args) > 0 and isinstance(args[0], (BeliefsSeries, BeliefsDataFrame)):
super().__init__(*args, **kwargs)
assign_sensor_and_event_resolution(
self, args[0].sensor, args[0].event_resolution
)
return
# Obtain parameters that are specific to our DataFrame subclass
sensor: Sensor = kwargs.pop("sensor", None)
event_resolution: TimedeltaLike = kwargs.pop("event_resolution", None)
source: BeliefSource | str | int = kwargs.pop("source", None)
source: BeliefSource = source_utils.ensure_source_exists(
source, allow_none=True
)
event_start: DatetimeLike = kwargs.pop("event_start", None)
belief_time: DatetimeLike = kwargs.pop("belief_time", None)
belief_horizon: datetime = kwargs.pop("belief_horizon", None)
cumulative_probability: float = kwargs.pop("cumulative_probability", None)
beliefs: list[TimedBelief] = kwargs.pop("beliefs", None)
if beliefs is None: # check if args contains a list of beliefs
for i, arg in enumerate(args):
if isinstance(arg, list):
if all(isinstance(b, TimedBelief) for b in arg):
args = list(args)
beliefs = args.pop(
i
) # arg contains beliefs, and we simultaneously remove it from args
args = tuple(args)
break
# Define our columns and indices
columns = ["event_value"]
indices = ["event_start", "belief_time", "source", "cumulative_probability"]
default_types = {
"event_value": float,
"event_start": datetime,
"event_end": datetime,
"belief_time": datetime,
"belief_horizon": timedelta,
"source": BeliefSource,
"cumulative_probability": float,
}
# Pick an initialization method
if beliefs:
# Method 1
# Call the pandas DataFrame constructor with the right input
kwargs["columns"] = columns
# Check for different sensors
unique_sensors = set(belief.sensor for belief in beliefs)
if len(unique_sensors) != 1:
raise ValueError("BeliefsDataFrame cannot describe multiple sensors.")
sensor = list(unique_sensors)[0]
# Check for different sources with the same name
unique_sources = set(str(belief.source) for belief in beliefs)
unique_source_string_representations = set(
str(source) for source in unique_sources
)
if len(unique_source_string_representations) != len(unique_sources):
raise ValueError(
"String representations of sources must be unique. Cannot initialise BeliefsDataFrame given the following unique sources:\n%s"
% unique_sources
)
# Construct data and index from beliefs before calling super class
beliefs = sorted(
set(beliefs),
key=lambda b: (
b.event_start,
b.belief_time,
b.source,
b.cumulative_probability,
),
)
kwargs["data"] = [[getattr(i, j) for j in columns] for i in beliefs]
kwargs["index"] = pd.MultiIndex.from_tuples(
[[getattr(i, j) for j in indices] for i in beliefs], names=indices
)
super().__init__(*args, **kwargs)
else:
# Method 2 and 3
# Interpret initialisation with a pandas Series (preprocessing step of method 3)
if len(args) > 0 and isinstance(args[0], pd.Series):
args = list(args)
args[0] = args[0].copy() # avoid inplace operations
args[0] = args[0].to_frame(
name="event_value" if not args[0].name else args[0].name
)
if isinstance(args[0].index, pd.DatetimeIndex) and event_start is None:
args[0].index.name = (
"event_start" if not args[0].index.name else args[0].index.name
)
args[0].reset_index(inplace=True)
args = tuple(args)
elif len(args) > 0 and isinstance(args[0], pd.DataFrame):
# Avoid inplace operations on the input DataFrame
args = list(args)
args[0] = args[0].copy() # avoid inplace operations
args = tuple(args)
super().__init__(*args, **kwargs)
if len(args) == 0 or (self.empty and is_pandas_structure(args[0])):
set_columns_and_indices_for_empty_frame(
self, columns, indices, default_types
)
elif is_pandas_structure(args[0]) and not is_tb_structure(args[0]):
# Set (possibly overwrite) each index level to a unique value if set explicitly
if source is not None:
self["source"] = source_utils.ensure_source_exists(source)
elif "source" not in self:
raise KeyError("DataFrame should contain column named 'source'.")
elif not isinstance(self["source"].dtype, BeliefSource):
self["source"] = self["source"].apply(
source_utils.ensure_source_exists
)