Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft of relaxed time periods #59

Merged
merged 18 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Scripts/assignment/assignment_period.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def assign(self, modes: Iterable[str]
mtxs = self._get_impedances(modes)
for ass_cl in param.car_classes:
mtxs["cost"][ass_cl] += self._dist_unit_cost[ass_cl] * mtxs["dist"][ass_cl]
for ass_cl in param.car_classes + param.transit_classes:
if ass_cl in mtxs["dist"]:
del mtxs["dist"][ass_cl]
return mtxs

def end_assign(self) -> Dict[str, Dict[str, numpy.ndarray]]:
Expand Down
2 changes: 1 addition & 1 deletion Scripts/assignment/departure_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def calc_gaps(self) -> Dict[str,float]:
max_gap : float
Maximum gap for OD pair in car work demand matrix
"""
car_demand = self.demand[self.time_periods[0]]["car_work"]
car_demand = self.demand[next(iter(self.time_periods))]["car_work"]
max_gap = numpy.abs(car_demand - self.old_car_demand).max()
try:
old_sum = self.old_car_demand.sum()
Expand Down
21 changes: 15 additions & 6 deletions Scripts/assignment/emme_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import parameters.assignment as param
from assignment.abstract_assignment import AssignmentModel
from assignment.assignment_period import AssignmentPeriod
import assignment.off_peak_period as periods
from assignment.freight_assignment import FreightAssignmentPeriod
if TYPE_CHECKING:
from assignment.emme_bindings.emme_project import EmmeProject
Expand Down Expand Up @@ -45,8 +46,11 @@ class EmmeAssignmentModel(AssignmentModel):
delete_extra_matrices : bool (optional)
If True, only matrices needed for demand calculation will be
returned from end assignment.
time_periods : list of str (optional)
time_periods : dict (optional)
key : str
Time period names, default is aht, pt, iht
value : str
Name of `AssignmentPeriod` sub-class
first_matrix_id : int (optional)
Where to save matrices (if saved),
300 matrix ids will be reserved, starting from first_matrix_id.
Expand All @@ -60,7 +64,7 @@ def __init__(self,
use_free_flow_speeds: bool = False,
use_stored_speeds: bool = False,
delete_extra_matrices: bool = False,
time_periods: List[str] = param.time_periods,
time_periods: dict[str, str] = param.time_periods,
first_matrix_id: int = 100):
self.separate_emme_scenarios = separate_emme_scenarios
self.save_matrices = save_matrices
Expand Down Expand Up @@ -112,7 +116,7 @@ def prepare_network(self, car_dist_unit_cost: Dict[str, float]):
scen_id = self.mod_scenario.number
emme_matrices = self._create_matrices(
tp, i*hundred + self.first_matrix_id, id_ten)
self.assignment_periods.append(AssignmentPeriod(
self.assignment_periods.append(vars(periods)[self.time_periods[tp]](
tp, scen_id, self.emme_project, emme_matrices,
separate_emme_scenarios=self.separate_emme_scenarios,
use_free_flow_speeds=self.use_free_flow_speeds,
Expand Down Expand Up @@ -494,13 +498,18 @@ def _create_matrices(self, time_period, id_hundred, id_ten):
value : str
EMME matrix id
"""
tag = time_period if self.save_matrices else ""
emme_matrices = {}
for i, ass_class in enumerate(param.emme_matrices, start=1):
is_off_peak = (ass_class in param.transit_classes
and param.time_periods[time_period] in (
"OffPeakPeriod", "TransitAssignmentPeriod"))
matrix_ids = {}
for mtx_type in param.emme_matrices[ass_class]:
_id_hundred = (id_hundred
if self.save_matrices or mtx_type == "demand" else 0)
save_matrices = (self.save_matrices
or mtx_type == "demand"
or is_off_peak)
_id_hundred = id_hundred if save_matrices else 0
tag = time_period if save_matrices else ""
matrix_ids[mtx_type] = "mf{}".format(
_id_hundred + id_ten[mtx_type] + i)
description = f"{mtx_type}_{ass_class}_{tag}"
Expand Down
78 changes: 70 additions & 8 deletions Scripts/assignment/mock_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class MockAssignmentModel(AssignmentModel):
def __init__(self, matrices: MatrixData,
use_free_flow_speeds: bool = False,
time_periods: List[str]=param.time_periods,
time_periods: Dict[str, str]=param.time_periods,
delete_extra_matrices: bool = False):
self.matrices = matrices
log.info("Reading matrices from " + str(self.matrices.path))
Expand All @@ -29,22 +29,24 @@ def __init__(self, matrices: MatrixData,
else:
end_assignment_classes -= set(
param.long_distance_transit_classes)
self.time_periods = time_periods
self.assignment_periods = [MockPeriod(
self.time_periods = {}
cls = globals()
for tp, class_name in time_periods.items():
self.time_periods[tp] = (cls[class_name] if class_name in cls
else MockPeriod)
self.assignment_periods = [self.time_periods[tp](
tp, matrices, end_assignment_classes)
for tp in time_periods]

@property
def zone_numbers(self) -> numpy.array:
"""Numpy array of all zone numbers."""
with self.matrices.open("time", self.time_periods[0]) as mtx:
zone_numbers = mtx.zone_numbers
return zone_numbers
return next(iter(self.assignment_periods)).zone_numbers

@property
def mapping(self):
"""dict: Dictionary of zone numbers and corresponding indices."""
with self.matrices.open("time", self.time_periods[0]) as mtx:
with self.matrices.open("time", next(iter(self.assignment_periods))) as mtx:
mapping = mtx.mapping
return mapping

Expand Down Expand Up @@ -91,12 +93,15 @@ def zone_numbers(self):
zone_numbers = mtx.zone_numbers
return zone_numbers

def init_assign(self):
pass

def assign_trucks_init(self):
pass

def assign(self, modes: Iterable[str]
) -> Dict[str, Dict[str, numpy.ndarray]]:
""" Get travel impedance matrices for one time period from files.
"""Get travel impedance matrices for one time period from files.

Parameters
----------
Expand All @@ -113,6 +118,9 @@ def assign(self, modes: Iterable[str]
for ass_cl in param.car_classes:
mtxs["cost"][ass_cl] += (self.dist_unit_cost[ass_cl]
* mtxs["dist"][ass_cl])
for ass_cl in param.car_classes + param.transit_classes:
if ass_cl in mtxs["dist"]:
del mtxs["dist"][ass_cl]
return mtxs

def end_assign(self) -> Dict[str, Dict[str, numpy.ndarray]]:
Expand Down Expand Up @@ -179,3 +187,57 @@ def set_matrix(self,
matrix: numpy.ndarray):
with self.matrices.open("demand", self.name, self.zone_numbers, m='a') as mtx:
mtx[ass_class] = matrix


class OffPeakPeriod(MockPeriod):
def assign(self, *args) -> Dict[str, Dict[str, numpy.ndarray]]:
"""Get travel impedance matrices for one time period from files.

Returns
-------
dict
Type (time/cost/dist) : dict
Assignment class (car_work/transit/...) : numpy 2-d matrix
"""
mtxs = self._get_impedances(
param.car_classes + param.local_transit_classes)
for ass_cl in param.car_classes:
mtxs["cost"][ass_cl] += (self.dist_unit_cost[ass_cl]
* mtxs["dist"][ass_cl])
del mtxs["dist"]
return mtxs


class TransitAssignmentPeriod(MockPeriod):
def assign(self, *args) -> Dict[str, Dict[str, numpy.ndarray]]:
"""Get local transit impedance matrices for one time period.

Returns
-------
dict
Type (time/cost/dist) : dict
Assignment class (transit_work/transit_leisure) : numpy 2-d matrix
"""
mtxs = self._get_impedances(param.local_transit_classes)
del mtxs["dist"]
return mtxs

def end_assign(self) -> Dict[str, Dict[str, numpy.ndarray]]:
"""Get transit impedance matrices for one time period.

Long-distance mode impedances are included if assignment period
was created with delete_extra_matrices option disabled.

Returns
-------
dict
Type (time/cost/dist) : dict
Assignment class (transit_work/...) : numpy 2-d matrix
"""
self._end_assignment_classes -= set(
param.private_classes + param.freight_classes)
return self._get_impedances(self._end_assignment_classes)

class EndAssignmentOnlyPeriod(MockPeriod):
def assign(self, *args) -> None:
return None
94 changes: 94 additions & 0 deletions Scripts/assignment/off_peak_period.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from typing import Dict, Iterable
from numpy.core import ndarray
import copy

from assignment.assignment_period import AssignmentPeriod
import parameters.assignment as param


class OffPeakPeriod(AssignmentPeriod):
"""Off-peak assignment period.

The major difference compared to a regular assignment period is that
bus speeds are taken from free-flow assignment in demand-calculation loop
and transit assignment is hence not iterated.

Car assignment is performed as usual.
"""

def init_assign(self):
"""Assign transit for one time period with free-flow bus speed."""
self._set_car_vdfs(use_free_flow_speeds=True)
stopping_criteria = copy.copy(
param.stopping_criteria["coarse"])
stopping_criteria["max_iterations"] = 0
self._assign_cars(stopping_criteria)
self._assign_transit(param.transit_classes)

def assign(self, *args) -> Dict[str, Dict[str, ndarray]]:
"""Assign cars for one time period.

Get travel impedance matrices for one time period from assignment.
Transit impedance is fetched from free-flow init assignment.

Returns
-------
dict
Type (time/cost/dist) : dict
Assignment class (car_work/transit/...) : numpy 2-d matrix
"""
if not self._separate_emme_scenarios:
self._calc_background_traffic(include_trucks=True)
self._assign_cars(self.stopping_criteria["coarse"])
mtxs = self._get_impedances(
param.car_classes + param.local_transit_classes)
for ass_cl in param.car_classes:
mtxs["cost"][ass_cl] += self._dist_unit_cost[ass_cl] * mtxs["dist"][ass_cl]
del mtxs["dist"]
return mtxs


class TransitAssignmentPeriod(OffPeakPeriod):
"""Transit-only assignment period.

The major difference compared to a regular assignment period is that
bus speeds are taken from free-flow assignment and transit assignment
is hence not iterated.

Car assignment is not performed at all.
"""

def assign(self, *args) -> Dict[str, Dict[str, ndarray]]:
"""Get local transit impedance matrices for one time period.

Returns
-------
dict
Type (time/cost/dist) : dict
Assignment class (transit_work/transit_leisure) : numpy 2-d matrix
"""
mtxs = self._get_impedances(param.local_transit_classes)
del mtxs["dist"]
return mtxs

def end_assign(self) -> Dict[str, Dict[str, ndarray]]:
"""Get transit impedance matrices for one time period.

Long-distance mode impedances are included if assignment period
was created with delete_extra_matrices option disabled.

Returns
-------
dict
Type (time/cost/dist) : dict
Assignment class (transit_work/...) : numpy 2-d matrix
"""
self._calc_transit_network_results()
self._end_assignment_classes -= set(
param.private_classes + param.freight_classes)
return self._get_impedances(self._end_assignment_classes)


class EndAssignmentOnlyPeriod(AssignmentPeriod):
def assign(self, *args) -> None:
return None
6 changes: 5 additions & 1 deletion Scripts/datatypes/purpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,19 @@ def transform_impedance(self, impedance):
cols = self.dest_interval
day_imp = {}
for mode in self.impedance_share:
share_sum = 0
day_imp[mode] = defaultdict(float)
ass_class = mode.replace("pax", assignment_classes[self.name])
for time_period in self.impedance_share[mode]:
for mtx_type in impedance[time_period]:
if ass_class in impedance[time_period][mtx_type]:
share = self.impedance_share[mode][time_period]
imp = impedance[time_period][mtx_type][ass_class]
share = self.impedance_share[mode][time_period]
share_sum += sum(share)
day_imp[mode][mtx_type] += share[0] * imp[rows, cols]
day_imp[mode][mtx_type] += share[1] * imp[cols, rows].T
if abs(share_sum/len(day_imp[mode]) - 2) > 0.001:
raise ValueError(f"False impedance shares: {self.name} : {mode}")
# Apply cost change to validate model elasticities
if self.mtx_adjustment is not None:
for t in self.mtx_adjustment:
Expand Down
3 changes: 2 additions & 1 deletion Scripts/lem.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import utils.log as log
from assignment.emme_assignment import EmmeAssignmentModel
from assignment.mock_assignment import MockAssignmentModel
from assignment.assignment_period import AssignmentPeriod
from modelsystem import ModelSystem, AgentModelSystem
from datahandling.matrixdata import MatrixData

Expand Down Expand Up @@ -68,7 +69,7 @@ def main(args):
"delete_extra_matrices": args.delete_extra_matrices,
}
if calculate_long_dist_demand:
kwargs["time_periods"] = ["vrk"]
kwargs["time_periods"] = {"vrk": "AssignmentPeriod"}
if args.do_not_use_emme:
log.info("Initializing MockAssignmentModel...")
mock_result_path = results_path / "Matrices" / args.submodel
Expand Down
8 changes: 5 additions & 3 deletions Scripts/modelsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ def __init__(self,
purpose = new_tour_purpose(
json.loads(file.read_text("utf-8")), self.zdata_forecast,
self.resultdata, cost_data["cost_changes"])
if (sorted(next(iter(purpose.impedance_share.values())))
== sorted(assignment_model.time_periods)):
required_time_periods = sorted(
{tp for m in purpose.impedance_share.values() for tp in m})
if required_time_periods == sorted(assignment_model.time_periods):
if isinstance(purpose, SecDestPurpose):
sec_dest_purposes.append(purpose)
elif purpose.orig == "home":
Expand Down Expand Up @@ -247,7 +248,6 @@ def assign_base_demand(self,
self.dtm = dt.DirectDepartureTimeModel(self.ass_model)

if not self.ass_model.use_free_flow_speeds:
self.ass_model.init_assign()
log.info("Get long-distance trip matrices")
self._add_external_demand(self.long_dist_matrices)
log.info("Get freight matrices")
Expand Down Expand Up @@ -278,6 +278,8 @@ def assign_base_demand(self,
transport_classes=transport_classes) as mtx:
for ass_class in transport_classes:
self.dtm.demand[tp][ass_class] = mtx[ass_class]
if not self.ass_model.use_free_flow_speeds:
ap.init_assign()
ap.assign_trucks_init()
impedance[tp] = (ap.end_assign() if is_end_assignment
else ap.assign(self.travel_modes))
Expand Down
9 changes: 8 additions & 1 deletion Scripts/parameters/assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
"aht": 6,
"pt": 6,
"iht": 6,
"it": 6,
"vrk": 6,
},
}
Expand Down Expand Up @@ -405,6 +406,7 @@
volume_factors["aux_transit"] = volume_factors["transit"]
for mode in volume_factors:
volume_factors[mode]["vrk"] = 1
volume_factors[mode]["it"] = 0
# Factor for converting weekday traffic into yearly day average
years_average_day_factor = 0.85
# Factor for converting day traffic into 7:00-22:00 traffic
Expand All @@ -430,7 +432,12 @@
}

### ASSIGNMENT REFERENCES ###
time_periods: List[str] = ["aht", "pt", "iht"]
time_periods = {
"aht": "AssignmentPeriod",
"pt": "OffPeakPeriod",
"iht": "AssignmentPeriod",
"it": "TransitAssignmentPeriod",
}
car_classes = (
"car_work",
"car_leisure",
Expand Down
Loading