Skip to content

Commit

Permalink
Add docstring documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismostert committed Mar 18, 2024
1 parent eb44687 commit 61fca69
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 91 deletions.
36 changes: 33 additions & 3 deletions src/eml.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
SwitchedCandidateConfig,
VoteDifference,
)
from neighbourhood import NeighbourhoodData
from neighbourhood import NeighbourhoodData, ReportingNeighbourhoods


@dataclass
class CheckResult:
"""Container representing the result of running all checks
on a given EML file.
"""

zero_votes: bool
inexplicable_difference: int
explanation_sum_difference: int
Expand All @@ -32,6 +36,11 @@ class CheckResult:

@dataclass
class EML:
"""Main container for all information which has been loaded from an .eml.xml file.
Contains all the necessary information for running all checks, and additionally
contains the configuration for all the checks.
"""

eml_file_id: str
main_unit_info: ReportingUnitInfo
reporting_units_info: Dict[str, ReportingUnitInfo]
Expand Down Expand Up @@ -59,9 +68,17 @@ class EML:
def run_protocol(
self, neighbourhood_data: Optional[NeighbourhoodData] = None
) -> Dict[str, CheckResult]:
"""Run all specified protocol checks on this EML instance.
Args:
neighbourhood_data: If NeighbourhoodData is specified, also run some checks at neighbourhood level.
Returns:
Dictionary mapping reporting unit ids to resulting CheckResults obtained by running all checks
"""
# Generate reporting neighbourhoods data which can be reused for all individual
# polling stations
reporting_neighbourhoods = (
reporting_neighbourhoods: Optional[ReportingNeighbourhoods] = (
neighbourhood_data.fetch_reporting_neighbourhoods(
self.metadata.reporting_unit_zips, self.reporting_units_info
)
Expand Down Expand Up @@ -115,7 +132,20 @@ def run_protocol(
return protocol_results

@staticmethod
def from_xml(file_path):
def from_xml(file_path: str) -> "EML":
"""Static method for constructing an instance of the EML class
from a given file_path
Args:
file_path: Path to the .eml.xml file to read.
Raises:
InvalidEmlException: when specified .eml.xml is of incorrect type.
InvalidEmlException: when any reporting unit does not have an id.
Returns:
EML class instance with all relevant data to run the protocol checks.
"""
# Root element of the XML file
xml_root = xml_parser.parse_xml(file_path)

Expand Down
23 changes: 23 additions & 0 deletions src/eml_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

@dataclass
class EmlMetadata:
"""Dataclass which holds EML metadata like the creation_date_time of
the EML file or derived mappings from reporting_unit_id to zip or name.
"""

creation_date_time: Optional[str]
authority_id: Optional[str]
authority_name: Optional[str]
Expand All @@ -19,18 +23,27 @@ class EmlMetadata:

@dataclass(frozen=True, order=True)
class PartyIdentifier:
"""Party identifier used for matching parties in dicts."""

id: int
name: Optional[str]


@dataclass(frozen=True, order=True)
class CandidateIdentifier:
"""Candidate identifier used for matching candidates in dicts."""

party: PartyIdentifier
cand_id: int


@dataclass
class ReportingUnitInfo:
"""Container which holds the main information for a given reporting unit, containing
vote counts at candidate and party level, and information about those votes
(amount of blank votes, invalid votes etc..)
"""

reporting_unit_id: Optional[str]
reporting_unit_name: Optional[str]
cast: int
Expand All @@ -43,11 +56,15 @@ class ReportingUnitInfo:

@dataclass
class VoteDifferenceAmount:
"""Simple wrapper for int value."""

value: int


@dataclass
class VoteDifferencePercentage:
"""Simple wrapper for float value."""

value: float


Expand All @@ -56,6 +73,8 @@ class VoteDifferencePercentage:

@dataclass
class SwitchedCandidate:
"""Container representing a potentially switched candidate."""

candidate_with_fewer: CandidateIdentifier
candidate_with_fewer_received: int
candidate_with_fewer_expected: int
Expand All @@ -80,6 +99,10 @@ def __str__(self) -> str:

@dataclass
class SwitchedCandidateConfig:
"""Container for configuration parameters for the switched candidate
check
"""

minimum_reporting_units_municipality: int
minimum_reporting_units_neighbourhood: int
minimum_deviation_factor: int
Expand Down
31 changes: 25 additions & 6 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
from typing import Optional

import csv_write
from eml import EML
from neighbourhood import NeighbourhoodData
from odt import ODT


def create_csv_files(
path_to_xml,
dest_a,
dest_b,
dest_c,
path_to_odt=None,
path_to_neighbourhood_data=None,
path_to_xml: str,
dest_a: str,
dest_b: str,
dest_c: str,
path_to_odt: Optional[str] = None,
path_to_neighbourhood_data: Optional[str] = None,
) -> None:
"""Main entry point for running HCP on a given .eml.xml file. We can optionally specify
the following data:
- path_to_odt: if a path to the corresponding ODT (proces verbaal) is specified
then HCP additionally checks if a given reporting unit has already recounted
and thus is exempt from certain mandatory recounts.
- path_to_neighbourhood_data: if a path to neighbourhood data is specified
then we run some checks at a neighbourhood level in addition to the municipality
level.
Args:
path_to_xml: Path to the .eml.xml file to run HCP on.
dest_a: Path to write output file a (inexplicable differences) to.
dest_b: Path to write output file b (warnings and remarkable results) to.
dest_c: Path to write output file c (percentage deviation per reporting unit per affiliation) to.
path_to_odt: Path to the ODT (proces verbaal) corresponding to the provided .eml.xml.
path_to_neighbourhood_data: Path to either .csv or .parquet file containing neighbourhood data.
"""
# Parse the eml from the path
eml = EML.from_xml(path_to_xml)

Expand Down
78 changes: 74 additions & 4 deletions src/neighbourhood.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,84 @@ def _add_dict(a: Dict[T, int], b: Dict[T, int]) -> Dict[T, int]:

@dataclass
class ReportingNeighbourhoods:
"""Container which contains several mappings used for running checks at
a neighbourhood level. The combination of these mappings can then be used
to look up a reference `ReportingUnitInfo` for a given reporting unit id
by hopping from `reportinging_unit_id` -> `neighbourhood_id` ->
`ReportingUnitInfo`.
This 'reference_group' is a `ReportingUnitInfo` instance which is the sum
of all reporting unit vote counts which are in the same
neighbourhood as the specified reporting unit.
The following mappings are present:
- reporting_unit_id_to_neighbourhood_id: used for lookup of neighbourhood
corresponding to specified reporting unit id.
- neighbourhood_id_to_reporting_unit_ids: used for getting all reporting
unit ids which are in the given neighbourhood.
- neighbourhood_id_to_reference_group: used for lookup of reference group
for a given neighbourhood id.
"""

reporting_unit_id_to_neighbourhood_id: Dict[str, Optional[str]]
neighbourhood_id_to_reporting_unit_ids: Dict[str, Set[str]]
neighbourhood_id_to_reference_group: Dict[str, ReportingUnitInfo]

def get_reference_group(
self, reporting_unit_id: str
) -> Optional[ReportingUnitInfo]:
"""Get the reference group for a given reporting unit id.
Args:
reporting_unit_id: The reporting unit id to query for.
Returns:
An optional `ReportingUnitInfo` instance which contains the sum of all vote counts
of all reporting units in that neighbourhood.
"""
neighbourhood_id = self.reporting_unit_id_to_neighbourhood_id[reporting_unit_id]
if not neighbourhood_id:
return None
return self.neighbourhood_id_to_reference_group[neighbourhood_id]

def get_reference_size(self, reporting_unit_id: str) -> int:
"""Get the reference size (amount of reporting units) in the associated neighbourhood
for a given `reporting_unit_id`. Note that the reporting unit itself is also included
in this size.
Args:
reporting_unit_id: The reporting unit id to query for
Returns:
The amount of reporting units present in the corresponding neighbourhood.
"""
neighbourhood_id = self.reporting_unit_id_to_neighbourhood_id[reporting_unit_id]
if not neighbourhood_id:
return 0
return len(self.neighbourhood_id_to_reporting_unit_ids[neighbourhood_id])


@dataclass
class NeighbourhoodData:
data: pl.LazyFrame
"""Class containing a **lazy** dataframe containing neighbourhood data.
This allows you to call the defined methods on this lazyframe, without
having to load in all the neighbourhood data to memory.
"""

def __init__(self, data) -> None:
self.data = data
data: pl.LazyFrame

def fetch_neighbourhood_code(self, zip_code: str) -> Optional[str]:
"""Given a specified zip_code, return the corresponding neighbourhood
code as specified in the neighbourhood data.
Args:
zip_code: Zip code to query the data for, without spaces (e.g. `1011AB`)
Returns:
The corresponding neighbourhood code (e.g. `WK0363AF`) if the zip code was found
"""
queried_result = (
self.data.filter(pl.col("zip_code") == zip_code)
.select("neighbourhood_code")
Expand All @@ -65,6 +117,19 @@ def fetch_reporting_neighbourhoods(
reporting_unit_zips: Dict[str, Optional[str]],
reporting_unit_info: Dict[str, ReportingUnitInfo],
) -> ReportingNeighbourhoods:
"""Constructs a `ReportingNeighbourhoods` instance for the given
neighbourhood data. The reference groups are calculated by summing
up the votes per party and the votes per candidate for all reporting
units and constructing a new `ReportingUnitInfo` instance for each
neighbourhood.
Args:
reporting_unit_zips: Mapping from reporting unit id to the associated zip code.
reporting_unit_info: Mapping from reporting unit id to the `ReportingUnitInfo`.
Returns:
Instance of `ReportingNeighbourhoods`.
"""
# Fetch the neighbourhood codes for all unique zips
zips = set((zip for zip in reporting_unit_zips.values() if zip is not None))
zips_to_neighbourhoods = {
Expand Down Expand Up @@ -141,7 +206,12 @@ def fetch_reporting_neighbourhoods(
)

@staticmethod
def from_path(str_path: Optional[str]):
def from_path(str_path: Optional[str]) -> Optional["NeighbourhoodData"]:
"""Construct an instance of `NeihbourhoodData` from a given path.
Returns:
`NeighbourhoodData` instance if path was specified, `None` otherwise.
"""
if str_path is None:
return None

Expand Down
32 changes: 27 additions & 5 deletions src/odt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,40 @@


class ODT_TYPE(Enum):
"""Enumeration of the different types of PV (Proces verbaal)"""

na31_1 = auto()
na31_2 = auto()


@dataclass(frozen=True)
class PollingStation:
"""Polling station identifier used for comparing different polling stations"""

id: int
name: str
zip: Optional[str]


@dataclass
class ODT:
"""Container for an ODT file (proces verbaal) which specifies the type of PV
and the XML `ElementTree` of the contents of the ODT file.
"""

type: ODT_TYPE
odt_xml: XmlElement

@staticmethod
def from_path(odt_path):
def from_path(odt_path: Optional[str]) -> Optional["ODT"]:
"""Constrcuts an `ODT` instance from a given file path.
Returns:
ODT class instance with all needed fields set for running methods.
"""
if not odt_path:
return None

try:
if "Model_Na31-1.odt" in odt_path:
return ODT(
Expand All @@ -41,11 +57,17 @@ def from_path(odt_path):
return ODT(
type=ODT_TYPE.na31_2, odt_xml=_extract_odt_xml_root(odt_path)
)
return None
except Exception:
return None

def get_already_recounted_polling_stations(self) -> List[PollingStation]:
"""Get a list of polling stations which according to the ODT (PV)
have already recounted.
Returns:
List of polling stations, list is empty if either no polling stations recounted
or some exception occurred when parsing the XML DOM-tree
"""
try:
if self.type == ODT_TYPE.na31_1:
return _get_polling_stations_with_recounts_na31_1(self.odt_xml)
Expand Down Expand Up @@ -76,9 +98,9 @@ def _table_rows_to_polling_stations(
".//text:span[@text:description]", NAMESPACE
)

polling_station_id = xml_parser.get_text(polling_station_descriptors[0])
polling_station_name = xml_parser.get_text(polling_station_descriptors[1])
polling_station_zip = xml_parser.get_text(polling_station_descriptors[2])
polling_station_id = xml_parser._get_text(polling_station_descriptors[0])
polling_station_name = xml_parser._get_text(polling_station_descriptors[1])
polling_station_zip = xml_parser._get_text(polling_station_descriptors[2])

# Polling station id and name is *required* to be sure that a polling station
# can be matched. Thus if these are not present we skip this polling station.
Expand Down
Loading

0 comments on commit 61fca69

Please sign in to comment.