Skip to content

Commit

Permalink
Implement potentially switched candidates for neighbourhoods
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismostert committed Feb 22, 2024
1 parent 3d415a4 commit b2c3142
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 32 deletions.
22 changes: 20 additions & 2 deletions src/eml.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class CheckResult:
parties_with_high_difference_percentage: List[str]
party_difference_percentages: Dict[PartyIdentifier, float]
potentially_switched_candidates: List[SwitchedCandidate]
potentially_switched_neighbourhood_candidates: Optional[List[SwitchedCandidate]]
already_recounted: bool


Expand All @@ -48,11 +49,17 @@ class EML:
MINIMUM_VOTES: ClassVar[int] = 20

def run_protocol(
self, neighbourhood_data: Optional[ReportingNeighbourhoods] = None
self, reporting_neighbourhoods: Optional[ReportingNeighbourhoods] = None
) -> Dict[str, CheckResult]:
protocol_results = {}

for polling_station_id, polling_station in self.reporting_units_info.items():
neighbourhood_reference_group = (
(reporting_neighbourhoods.get_reference_group(polling_station_id))
if reporting_neighbourhoods
else None
)

check_result = CheckResult(
zero_votes=protocol_checks.check_zero_votes(polling_station),
inexplicable_difference=protocol_checks.check_inexplicable_difference(
Expand Down Expand Up @@ -83,12 +90,23 @@ def run_protocol(
potentially_switched_candidates=protocol_checks.get_potentially_switched_candidates(
self.main_unit_info,
polling_station,
neighbourhood_data,
amount_of_reporting_units=self.metadata.reporting_unit_amount,
minimum_reporting_units=EML.MINIMUM_REPORTING_UNITS,
minimum_deviation_factor=EML.MINIMUM_DEVIATION_FACTOR,
minimum_votes=EML.MINIMUM_VOTES,
),
potentially_switched_neighbourhood_candidates=(
protocol_checks.get_potentially_switched_candidates(
neighbourhood_reference_group,
polling_station,
amount_of_reporting_units=self.metadata.reporting_unit_amount,
minimum_reporting_units=EML.MINIMUM_REPORTING_UNITS,
minimum_deviation_factor=EML.MINIMUM_DEVIATION_FACTOR,
minimum_votes=EML.MINIMUM_VOTES,
)
if neighbourhood_reference_group
else None
),
already_recounted=False,
)

Expand Down
5 changes: 2 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from odt import ODT
from neighbourhood import NeighbourhoodData
import csv_write
from pprint import pprint


def create_csv_files(
Expand All @@ -20,12 +19,12 @@ def create_csv_files(
neighbourhood_data = NeighbourhoodData.from_path(path_to_neighbourhood_data)
if neighbourhood_data:
reporting_neighbourhouds = neighbourhood_data.fetch_reporting_neighbourhoods(
eml.metadata.reporting_unit_zips
eml.metadata.reporting_unit_zips, eml.reporting_units_info
)
else:
reporting_neighbourhouds = None

check_results = eml.run_protocol(neighbourhood_data=reporting_neighbourhouds)
check_results = eml.run_protocol(reporting_neighbourhoods=reporting_neighbourhouds)
eml_metadata = eml.metadata

# If odt_path is specified we try to read the file and extract the relevant
Expand Down
102 changes: 77 additions & 25 deletions src/neighbourhood.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
import polars as pl
from pathlib import Path
from typing import Optional, List, Dict
from typing import Optional, List, Dict, TypeVar, Set
from dataclasses import dataclass
from collections import defaultdict
from eml_types import ReportingUnitInfo

T = TypeVar("T")


def _add_dict(a: Dict[T, int], b: Dict[T, int]) -> Dict[T, int]:
return {key_a: a[key_a] + b[key_a] for key_a in a.keys()}


@dataclass
class ReportingNeighbourhoods:
reporting_unit_to_neighbourhood: Dict[str, Optional[str]]
neighbourhood_to_reporting_units: Dict[str, List[str]]

def get_reference_group(self, reporting_unit_id: str) -> List[str]:
neighbourhood = self.reporting_unit_to_neighbourhood.get(reporting_unit_id)
if not neighbourhood:
return []
else:
return [
comparison_id
for comparison_id in self.neighbourhood_to_reporting_units[
neighbourhood
]
if comparison_id != reporting_unit_id
]
reporting_unit_id_to_neighbourhood_id: Dict[str, Optional[str]]
neighbourhood_id_to_reporting_unit_ids: Dict[str, Set[str]]
neighbourhood_id_to_reference_group: Dict[str, ReportingUnitInfo]

def get_reference_group(
self, reporting_unit_id: str
) -> Optional[ReportingUnitInfo]:
neighbourhood_id = self.reporting_unit_id_to_neighbourhood_id.get(
reporting_unit_id
)
if neighbourhood_id is None:
return None

return self.neighbourhood_id_to_reference_group[neighbourhood_id]


class NeighbourhoodData:
Expand All @@ -39,35 +45,81 @@ def fetch_neighbourhood_code(self, zip_code: str) -> str:
return queried_result.item()

def fetch_reporting_neighbourhoods(
self, reporting_unit_zips: Dict[str, Optional[str]]
self,
reporting_unit_zips: Dict[str, Optional[str]],
reporting_unit_info: Dict[str, ReportingUnitInfo],
) -> ReportingNeighbourhoods:
# Fetch the neighbourhood codes for all unique zips
zips = set((zip for zip in reporting_unit_zips.values() if zip is not None))
zips_to_neighbourhoods = {
zip: self.fetch_neighbourhood_code(zip) for zip in zips
}

reporting_unit_to_neighbourhood = {}
# Construct mapping from reporting unit id to neighbourhood id
reporting_unit_id_to_neighbourhood_id = {}
for reporting_unit_id, zip in reporting_unit_zips.items():
if zip is None:
reporting_unit_to_neighbourhood[reporting_unit_id] = None
reporting_unit_id_to_neighbourhood_id[reporting_unit_id] = None
else:
reporting_unit_to_neighbourhood[reporting_unit_id] = (
reporting_unit_id_to_neighbourhood_id[reporting_unit_id] = (
zips_to_neighbourhoods[zip]
)

neighbourhood_to_reporting_units = defaultdict(list)
# Construct mapping from neighbourhood id to reporting unit id list
neighbourhood_id_to_reporting_unit_ids = defaultdict(set)
for (
reporting_unit_id,
neighbourhood_code,
) in reporting_unit_to_neighbourhood.items():
) in reporting_unit_id_to_neighbourhood_id.items():
if neighbourhood_code:
neighbourhood_to_reporting_units[neighbourhood_code].append(
neighbourhood_id_to_reporting_unit_ids[neighbourhood_code].add(
reporting_unit_id
)

# Given the mapping from neighbourhood ids to reporting unit ids, constuct reporting unit objects
neighbourhood_id_to_reference_group = {}
for (
neighbourhood_id,
reporting_unit_ids,
) in neighbourhood_id_to_reporting_unit_ids.items():

# Construct the reference group vote counts by adding dicts of the reporting ids
# that belong to this neighbourhood
summed_votes_per_party = {}
summed_votes_per_candidate = {}
for reporting_unit_id, reporting_unit in reporting_unit_info.items():
if reporting_unit_id not in reporting_unit_ids:
continue

if len(summed_votes_per_party) == 0:
summed_votes_per_party = reporting_unit.votes_per_party
else:
summed_votes_per_party = _add_dict(
summed_votes_per_party, reporting_unit.votes_per_party
)

if len(summed_votes_per_candidate) == 0:
summed_votes_per_candidate = reporting_unit.votes_per_candidate
else:
summed_votes_per_candidate = _add_dict(
summed_votes_per_candidate, reporting_unit.votes_per_candidate
)

neighbourhood_id_to_reference_group[neighbourhood_id] = ReportingUnitInfo(
reporting_unit_id=neighbourhood_id,
reporting_unit_name=f"Reference group for {neighbourhood_id}",
cast=0,
total_counted=0,
rejected_votes={},
uncounted_votes={},
votes_per_party=summed_votes_per_party,
votes_per_candidate=summed_votes_per_candidate,
)

return ReportingNeighbourhoods(
reporting_unit_to_neighbourhood=reporting_unit_to_neighbourhood,
neighbourhood_to_reporting_units=neighbourhood_to_reporting_units,
reporting_unit_id_to_neighbourhood_id=reporting_unit_id_to_neighbourhood_id,
neighbourhood_id_to_reporting_unit_ids=neighbourhood_id_to_reporting_unit_ids,
neighbourhood_id_to_reference_group=neighbourhood_id_to_reference_group,
)

@staticmethod
Expand Down
2 changes: 0 additions & 2 deletions src/protocol_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
)
from typing import Dict, Optional, TypeVar
from itertools import product as cartesian_product
from neighbourhood import ReportingNeighbourhoods

T = TypeVar("T")
N = TypeVar("N", int, float)
Expand Down Expand Up @@ -140,7 +139,6 @@ def get_expected_candidate_votes(
def get_potentially_switched_candidates(
main_unit: ReportingUnitInfo,
reporting_unit: ReportingUnitInfo,
neighbourhood_data: Optional[ReportingNeighbourhoods],
amount_of_reporting_units: int,
minimum_reporting_units: int,
minimum_deviation_factor: int,
Expand Down

0 comments on commit b2c3142

Please sign in to comment.