Skip to content

Commit

Permalink
Minor refactoring for switched candidates check
Browse files Browse the repository at this point in the history
- Moved all checking logic to protocol_checks instead of eml class
- Made the switched candidate check one 'public' function with name which corresponds with other checks (prefix check_) and made all other helper functions 'private', i.e. with prefix _get
  • Loading branch information
chrismostert committed Feb 27, 2024
1 parent f55d2cd commit 3047276
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 118 deletions.
61 changes: 17 additions & 44 deletions src/eml.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ReportingUnitInfo,
PartyIdentifier,
InvalidEmlException,
SwitchedCandidateConfig,
VoteDifference,
SwitchedCandidate,
)
Expand Down Expand Up @@ -44,11 +45,14 @@ class EML:

PARTY_DIFFERENCE_THRESHOLD_PCT: ClassVar[float] = 50.0

# Switched candidate parameters
MINIMUM_REPORTING_UNITS_MUNICIPALITY: ClassVar[int] = 2
MINIMUM_REPORTING_UNITS_NEIGHBOURHOOD: ClassVar[int] = 5
MINIMUM_DEVIATION_FACTOR: ClassVar[int] = 10
MINIMUM_VOTES: ClassVar[int] = 20
SWITCHED_CANDIDATE_CONFIG: ClassVar[SwitchedCandidateConfig] = (
SwitchedCandidateConfig(
minimum_reporting_units_municipality=2,
minimum_reporting_units_neighbourhood=5,
minimum_deviation_factor=10,
minimum_votes=20,
)
)
# ---

def run_protocol(
Expand All @@ -57,44 +61,6 @@ def run_protocol(
protocol_results = {}

for polling_station_id, polling_station in self.reporting_units_info.items():
neighbourhood_reference_group = (
(reporting_neighbourhoods.get_reference_group(polling_station_id))
if reporting_neighbourhoods
else None
)

potentially_switched_municipality_candidates = (
protocol_checks.get_potentially_switched_candidates(
self.main_unit_info,
polling_station,
amount_of_reporting_units=self.metadata.reporting_unit_amount,
minimum_reporting_units=EML.MINIMUM_REPORTING_UNITS_MUNICIPALITY,
minimum_deviation_factor=EML.MINIMUM_DEVIATION_FACTOR,
minimum_votes=EML.MINIMUM_VOTES,
)
)

potentially_switched_neighbourhood_candidates = (
protocol_checks.get_potentially_switched_candidates(
neighbourhood_reference_group,
polling_station,
amount_of_reporting_units=reporting_neighbourhoods.get_reference_size(
polling_station_id
),
minimum_reporting_units=EML.MINIMUM_REPORTING_UNITS_NEIGHBOURHOOD,
minimum_deviation_factor=EML.MINIMUM_DEVIATION_FACTOR,
minimum_votes=EML.MINIMUM_VOTES,
)
if neighbourhood_reference_group and reporting_neighbourhoods
else None
)

potentially_switched_candidates = (
protocol_checks.get_switched_candidate_combination(
potentially_switched_municipality_candidates,
potentially_switched_neighbourhood_candidates,
)
)

check_result = CheckResult(
zero_votes=protocol_checks.check_zero_votes(polling_station),
Expand Down Expand Up @@ -123,7 +89,14 @@ def run_protocol(
party_difference_percentages=protocol_checks.get_party_difference_percentages(
self.main_unit_info, polling_station
),
potentially_switched_candidates=potentially_switched_candidates,
potentially_switched_candidates=protocol_checks.check_potentially_switched_candidates(
polling_station_id,
self.main_unit_info,
polling_station,
self.metadata.reporting_unit_amount,
reporting_neighbourhoods,
EML.SWITCHED_CANDIDATE_CONFIG,
),
already_recounted=False,
)

Expand Down
8 changes: 8 additions & 0 deletions src/eml_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,13 @@ def __str__(self) -> str:
)


@dataclass
class SwitchedCandidateConfig:
minimum_reporting_units_municipality: int
minimum_reporting_units_neighbourhood: int
minimum_deviation_factor: int
minimum_votes: int


class InvalidEmlException(Exception):
pass
185 changes: 116 additions & 69 deletions src/protocol_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
VoteDifferencePercentage,
CandidateIdentifier,
SwitchedCandidate,
SwitchedCandidateConfig,
)
from typing import Dict, Optional, TypeVar
from itertools import product as cartesian_product

from neighbourhood import ReportingNeighbourhoods

T = TypeVar("T")
N = TypeVar("N", int, float)

Expand Down Expand Up @@ -117,26 +120,94 @@ def check_parties_with_large_percentage_difference(
)


def get_expected_candidate_votes(
main_unit: ReportingUnitInfo, reporting_unit: ReportingUnitInfo
) -> Dict[CandidateIdentifier, float]:
party_votes_without_current = _subtract_part_dictionary(
main_unit.votes_per_party, reporting_unit.votes_per_party
def check_potentially_switched_candidates(
polling_station_id: str,
main_unit: ReportingUnitInfo,
polling_station: ReportingUnitInfo,
reporting_unit_amount: int,
reporting_neighbourhoods: Optional[ReportingNeighbourhoods],
config: SwitchedCandidateConfig,
) -> List[SwitchedCandidate]:
neighbourhood_reference_group = (
(reporting_neighbourhoods.get_reference_group(polling_station_id))
if reporting_neighbourhoods
else None
)
cand_votes_without_current = _subtract_part_dictionary(
main_unit.votes_per_candidate, reporting_unit.votes_per_candidate

potentially_switched_municipality_candidates = _get_potentially_switched_candidates(
main_unit,
polling_station,
amount_of_reporting_units=reporting_unit_amount,
minimum_reporting_units=config.minimum_reporting_units_municipality,
minimum_deviation_factor=config.minimum_deviation_factor,
minimum_votes=config.minimum_votes,
)
cand_ratios = _get_candidate_ratios(
party_votes_without_current, cand_votes_without_current

potentially_switched_neighbourhood_candidates = (
_get_potentially_switched_candidates(
neighbourhood_reference_group,
polling_station,
amount_of_reporting_units=reporting_neighbourhoods.get_reference_size(
polling_station_id
),
minimum_reporting_units=config.minimum_reporting_units_neighbourhood,
minimum_deviation_factor=config.minimum_deviation_factor,
minimum_votes=config.minimum_votes,
)
if neighbourhood_reference_group and reporting_neighbourhoods
else None
)

return {
cand_id: ratio * reporting_unit.votes_per_party[cand_id.party]
for cand_id, ratio in cand_ratios.items()
}
return _get_switched_candidate_combination(
potentially_switched_municipality_candidates,
potentially_switched_neighbourhood_candidates,
)


# Implementation details


def _get_total_votes(reporting_unit: ReportingUnitInfo) -> int:
rejected_votes = reporting_unit.rejected_votes
return (
reporting_unit.total_counted
+ rejected_votes["ongeldig"]
+ rejected_votes["blanco"]
)


def get_potentially_switched_candidates(
def _get_differences(reporting_unit: ReportingUnitInfo) -> int:
admitted_voters = reporting_unit.uncounted_votes.get("toegelaten kiezers") or 0
total_votes = _get_total_votes(reporting_unit)

return abs(total_votes - admitted_voters)


def _percentage(part: int, total: int) -> Optional[float]:
try:
return part / total * 100
except ZeroDivisionError:
return None


def _get_percentages(dictionary: Dict[T, int], total: int) -> Dict[T, float]:
return_dict = {}
for key in dictionary.keys():
try:
return_dict[key] = dictionary[key] / total * 100
except ZeroDivisionError:
return_dict[key] = 0

return return_dict


def _subtract_part_dictionary(
total: Dict[T, N], part_dictionary: Dict[T, N]
) -> Dict[T, N]:
return {key: total[key] - part_dictionary[key] for key in part_dictionary.keys()}


def _get_potentially_switched_candidates(
main_unit: ReportingUnitInfo,
reporting_unit: ReportingUnitInfo,
amount_of_reporting_units: int,
Expand All @@ -149,7 +220,7 @@ def get_potentially_switched_candidates(
return []

received_votes = reporting_unit.votes_per_candidate
expected_votes = get_expected_candidate_votes(main_unit, reporting_unit)
expected_votes = _get_expected_candidate_votes(main_unit, reporting_unit)

cands_with_more_votes: List[CandidateIdentifier] = []
cands_with_less_votes: List[CandidateIdentifier] = []
Expand Down Expand Up @@ -190,7 +261,36 @@ def get_potentially_switched_candidates(
return result


def get_switched_candidate_combination(
def _get_expected_candidate_votes(
main_unit: ReportingUnitInfo, reporting_unit: ReportingUnitInfo
) -> Dict[CandidateIdentifier, float]:
party_votes_without_current = _subtract_part_dictionary(
main_unit.votes_per_party, reporting_unit.votes_per_party
)
cand_votes_without_current = _subtract_part_dictionary(
main_unit.votes_per_candidate, reporting_unit.votes_per_candidate
)
cand_ratios = _get_candidate_ratios(
party_votes_without_current, cand_votes_without_current
)

return {
cand_id: ratio * reporting_unit.votes_per_party[cand_id.party]
for cand_id, ratio in cand_ratios.items()
}


def _get_candidate_ratios(
party_votes: Dict[PartyIdentifier, int],
candidate_votes: Dict[CandidateIdentifier, int],
) -> Dict[CandidateIdentifier, float]:
return {
cand_id: votes / party_votes[cand_id.party] if votes > 0 else 0
for cand_id, votes in candidate_votes.items()
}


def _get_switched_candidate_combination(
municipality_switched: List[SwitchedCandidate],
neighbourhood_switched: Optional[List[SwitchedCandidate]],
) -> List[SwitchedCandidate]:
Expand Down Expand Up @@ -223,56 +323,3 @@ def get_switched_candidate_combination(
result.append(neighbourhood_switched_obj)

return result


# Helper functions


def _get_total_votes(reporting_unit: ReportingUnitInfo) -> int:
rejected_votes = reporting_unit.rejected_votes
return (
reporting_unit.total_counted
+ rejected_votes["ongeldig"]
+ rejected_votes["blanco"]
)


def _get_differences(reporting_unit: ReportingUnitInfo) -> int:
admitted_voters = reporting_unit.uncounted_votes.get("toegelaten kiezers") or 0
total_votes = _get_total_votes(reporting_unit)

return abs(total_votes - admitted_voters)


def _percentage(part: int, total: int) -> Optional[float]:
try:
return part / total * 100
except ZeroDivisionError:
return None


def _get_percentages(dictionary: Dict[T, int], total: int) -> Dict[T, float]:
return_dict = {}
for key in dictionary.keys():
try:
return_dict[key] = dictionary[key] / total * 100
except ZeroDivisionError:
return_dict[key] = 0

return return_dict


def _subtract_part_dictionary(
total: Dict[T, N], part_dictionary: Dict[T, N]
) -> Dict[T, N]:
return {key: total[key] - part_dictionary[key] for key in part_dictionary.keys()}


def _get_candidate_ratios(
party_votes: Dict[PartyIdentifier, int],
candidate_votes: Dict[CandidateIdentifier, int],
) -> Dict[CandidateIdentifier, float]:
return {
cand_id: votes / party_votes[cand_id.party] if votes > 0 else 0
for cand_id, votes in candidate_votes.items()
}
10 changes: 5 additions & 5 deletions test/test_protocol_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,13 @@ def test_check_explanation_sum_difference(
"main_unit, reporting_unit, expected",
[(switched_main_unit, expected_reporting_unit, expected_cand_votes)],
)
def test_check_expected_candidate_votes(
def test_get_expected_candidate_votes(
main_unit: ReportingUnitInfo,
reporting_unit: ReportingUnitInfo,
expected: Dict[CandidateIdentifier, float],
) -> None:
assert (
protocol_checks.get_expected_candidate_votes(main_unit, reporting_unit)
protocol_checks._get_expected_candidate_votes(main_unit, reporting_unit)
== expected
)

Expand Down Expand Up @@ -448,7 +448,7 @@ def test_check_expected_candidate_votes(
"main_unit, reporting_unit, expected, amount_of_reporting_units, minimum_reporting_units, minimum_deviation_factor, minimum_votes",
switched_test_cases,
)
def test_check_switched_candidate(
def test_get_switched_candidate(
main_unit,
reporting_unit,
expected,
Expand All @@ -458,7 +458,7 @@ def test_check_switched_candidate(
minimum_votes,
) -> None:
assert (
protocol_checks.get_potentially_switched_candidates(
protocol_checks._get_potentially_switched_candidates(
main_unit,
reporting_unit,
amount_of_reporting_units,
Expand Down Expand Up @@ -555,7 +555,7 @@ def test_get_switched_candidate_combination(
expected: List[SwitchedCandidate],
) -> None:
assert (
protocol_checks.get_switched_candidate_combination(
protocol_checks._get_switched_candidate_combination(
switched_municipality, switched_neighbourhood
)
== expected
Expand Down

0 comments on commit 3047276

Please sign in to comment.