Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added counts.csv file to record the number of people in each age group and cell #265

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pyEpiabm/pyEpiabm/sweep/initial_demographics_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ def __init__(self, dem_file_params: typing.Dict):
self.titles.append("kw_or_chr")
self.writer = _CsvDictWriter(
folder, file_name, self.titles)
num_age_groups = (len(Parameters.instance().age_proportions)
if Parameters.instance().age_proportions.size
and self.age_output
else 1)
self.count_titles = (["Cell"] +
[f"Age group {i}" for i in range(num_age_groups)])
self.counts_writer = _CsvDictWriter(
folder, "counts.csv", self.count_titles)

def __call__(self, *args):
"""During the initial sweeps, this will be called, and will loop
Expand All @@ -70,12 +78,18 @@ def __call__(self, *args):
age_group (int, optional), location_x (float, optional), location_y
(float, optional), kw_or_chr (str).

Furthermore, we write up the counts of how many people there are per
age group within each cell in a contingency table .csv file titled
"counts.csv".

Note that kw_or_chr stands for 'key worker or care home resident'. For
kw_or_chr, 'W' refers to a key worker, 'C' refers to a care home
resident and 'X' refers to a person who is neither.

"""
for cell in self._population.cells:
cell_count_dict = {title: 0 for title in self.count_titles}
cell_count_dict["Cell"] = cell.id
for person in cell.persons:
data = {"id": person.id,
"age": person.age
Expand All @@ -88,3 +102,10 @@ def __call__(self, *args):
("C" if person.care_home_resident else "X")}
data = {k: data[k] for k in data if data[k] is not None}
self.writer.write(data)
# Update cell_count_dict
age_str = (f"Age group {person.age_group}"
if self.age_output else "Age group 0")
# The below line will increment the age group count in the
# dictionary
cell_count_dict.update({age_str: cell_count_dict[age_str] + 1})
self.counts_writer.write(cell_count_dict)
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ def setUp(self) -> None:
microcell_0_0 = self.test_population.cells[0].microcells[0]
microcell_0_0.add_household([microcell_0_0.persons[0]])
person_0_0_0_0 = microcell_0_0.households[0].persons[0]
person_0_0_0_0.age_group = 5
person_0_0_0_0.age = 25
person_0_0_0_0.key_worker = True
microcell_1_0 = self.test_population.cells[1].microcells[0]
microcell_1_0.add_household([microcell_1_0.persons[0],
microcell_1_0.persons[1]])
person_1_0_0_0 = microcell_1_0.households[0].persons[0]
person_1_0_0_0.age_group = 13
person_1_0_0_0.age = 65
person_1_0_0_0.care_home_resident = True
person_1_0_0_1 = microcell_1_0.households[0].persons[1]
person_1_0_0_1.age_group = 8
person_1_0_0_1.age = 40
self.dem_file_params = {"output_dir":
"pyEpiabm/pyEpiabm/tests/test_output/mock"}
self.count_titles = ["Cell"] + [f"Age group {i}" for i in range(17)]

@patch('os.makedirs')
def test_construct_invalid_input(self, mock_mkdir):
Expand Down Expand Up @@ -65,20 +69,37 @@ def test_construct_invalid_input(self, mock_mkdir):

@patch('os.makedirs')
def test_construct(self, mock_mkdir):
file_name = os.path.join(os.getcwd(),
self.dem_file_params["output_dir"],
"demographics.csv")
counts_file_name = os.path.join(os.getcwd(),
self.dem_file_params["output_dir"],
"counts.csv")
mo = mock_open()
with patch('pyEpiabm.output._csv_dict_writer.open', mo):
# Now test init using valid input
dem_sweep = pe.sweep.InitialDemographicsSweep(self.dem_file_params)
dem_sweep.bind_population(self.test_population)
self.assertEqual(dem_sweep.spatial_output, False)
self.assertEqual(dem_sweep.age_output, False)
file_name = os.path.join(os.getcwd(),
self.dem_file_params["output_dir"],
"demographics.csv")
self.assertEqual(dem_sweep.titles, ["id", "kw_or_chr"])
self.assertEqual(dem_sweep.count_titles, ["Cell",
"Age group 0"])
del dem_sweep.writer
mo.assert_called_with(file_name, 'w')
del dem_sweep.counts_writer
mo.assert_has_calls([call(file_name, 'w'),
call(counts_file_name, 'w')],
any_order=True)

@patch('os.makedirs')
def test_construct_age_spatial(self, mock_mkdir):
file_name = os.path.join(os.getcwd(),
self.dem_file_params["output_dir"],
"demographics.csv")
counts_file_name = os.path.join(os.getcwd(),
self.dem_file_params["output_dir"],
"counts.csv")
mo = mock_open()
self.dem_file_params["spatial_output"] = True
self.dem_file_params["age_output"] = True
with patch('pyEpiabm.output._csv_dict_writer.open', mo):
Expand All @@ -87,14 +108,16 @@ def test_construct(self, mock_mkdir):
dem_sweep.bind_population(self.test_population)
self.assertEqual(dem_sweep.spatial_output, True)
self.assertEqual(dem_sweep.age_output, True)
file_name = os.path.join(os.getcwd(),
self.dem_file_params["output_dir"],
"demographics.csv")
self.assertEqual(dem_sweep.titles, ["id", "age",
"location_x", "location_y",
"kw_or_chr"])
self.assertEqual(dem_sweep.count_titles, ["Cell"] +
[f"Age group {i}" for i in range(17)])
del dem_sweep.writer
mo.assert_called_with(file_name, 'w')
del dem_sweep.counts_writer
mo.assert_has_calls([call(file_name, 'w'),
call(counts_file_name, 'w')],
any_order=True)

@patch('os.makedirs')
def test_write_to_file_no_age_no_spatial(self, mock_mkdir):
Expand All @@ -111,12 +134,19 @@ def test_write_to_file_no_age_no_spatial(self, mock_mkdir):
person_0_0_0_0_data = {"id": "0.0.0.0", "kw_or_chr": 'W'}
person_1_0_0_0_data = {"id": "1.0.0.0", "kw_or_chr": 'C'}
person_1_0_0_1_data = {"id": "1.0.0.1", "kw_or_chr": 'X'}
cell_0_data = {"Cell": "0", "Age group 0": 1}
cell_1_data = {"Cell": "1", "Age group 0": 2}
with patch.object(dem_sweep.writer, 'write') as mock:
dem_sweep()
mock.assert_has_calls([call(person_0_0_0_0_data),
call(person_1_0_0_0_data),
call(person_1_0_0_1_data)])
self.assertEqual(mock.call_count, 3)
with patch.object(dem_sweep.counts_writer,
'write') as mock_count_write:
dem_sweep()
mock.assert_has_calls([call(person_0_0_0_0_data),
call(person_1_0_0_0_data),
call(person_1_0_0_1_data)])
self.assertEqual(mock.call_count, 3)
mock_count_write.assert_has_calls([call(cell_0_data),
call(cell_1_data)])
self.assertEqual(mock_count_write.call_count, 2)
mock_mkdir.assert_called_with(os.path.join(os.getcwd(),
self.dem_file_params[
"output_dir"]))
Expand All @@ -139,12 +169,24 @@ def test_write_to_file_age_no_spatial(self, mock_mkdir):
"kw_or_chr": 'C'}
person_1_0_0_1_data = {"id": "1.0.0.1", "age": 40,
"kw_or_chr": 'X'}
cell_0_data = {title: 0 for title in self.count_titles}
cell_0_data["Cell"] = "0"
cell_0_data["Age group 5"] = 1
cell_1_data = {title: 0 for title in self.count_titles}
cell_1_data["Cell"] = "1"
cell_1_data["Age group 13"] = 1
cell_1_data["Age group 8"] = 1
with patch.object(dem_sweep.writer, 'write') as mock:
dem_sweep()
mock.assert_has_calls([call(person_0_0_0_0_data),
call(person_1_0_0_0_data),
call(person_1_0_0_1_data)])
self.assertEqual(mock.call_count, 3)
with patch.object(dem_sweep.counts_writer,
'write') as mock_count_write:
dem_sweep()
mock.assert_has_calls([call(person_0_0_0_0_data),
call(person_1_0_0_0_data),
call(person_1_0_0_1_data)])
self.assertEqual(mock.call_count, 3)
mock_count_write.assert_has_calls([call(cell_0_data),
call(cell_1_data)])
self.assertEqual(mock_count_write.call_count, 2)
mock_mkdir.assert_called_with(os.path.join(os.getcwd(),
self.dem_file_params[
"output_dir"]))
Expand Down
Loading