-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SP] convert wrans intermediate XML to PW format
- Loading branch information
Showing
2 changed files
with
192 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
""" | ||
Convert the structured data from Scottish Parliament to | ||
the XML format used by TheyWorkForYou | ||
Link to TWFY IDs for members and debate items. | ||
""" | ||
|
||
import datetime | ||
import re | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from typing import Optional | ||
|
||
from lxml import etree | ||
|
||
from .resolvenames import get_unique_person_id, is_member_vote | ||
|
||
|
||
def slugify(text: str) -> str: | ||
""" | ||
Convert a string to a url safe slug | ||
""" | ||
text = text.lower() | ||
text = re.sub(r"[^\w\s]", "", text).replace(" ", "-") | ||
|
||
return text | ||
|
||
|
||
@dataclass | ||
class IDFactory: | ||
committee_slug: str | ||
iso_date: str | ||
base_id: str = "uk.org.publicwhip/spor/" | ||
latest_major: int = -1 | ||
latest_minor: int = -1 | ||
|
||
def _current_id(self) -> str: | ||
if self.committee_slug in ("meeting-of-the-parliament", "plenary"): | ||
slug = "" | ||
else: | ||
slug = self.committee_slug + "/" | ||
return f"{self.base_id}{slug}{self.iso_date}.{self.latest_major}.{self.latest_minor}" | ||
|
||
def get_next_major_id(self) -> str: | ||
self.latest_major += 1 | ||
self.latest_minor = 0 | ||
return self._current_id() | ||
|
||
def get_next_minor_id(self) -> str: | ||
self.latest_minor += 1 | ||
return self._current_id() | ||
|
||
|
||
def slugify_committee(name: str) -> str: | ||
""" | ||
Convert a committee name to a slug | ||
""" | ||
name = slugify(name) | ||
# if this ends in a year (four digita number) - assume it's a date and remove the last three elements | ||
if name[-4:].isdigit(): | ||
name = "-".join(name.split("-")[:-3]) | ||
|
||
return name | ||
|
||
|
||
def convert_wrans_xml_to_twfy(file_path: Path, output_dir: Path, verbose: bool = False): | ||
""" | ||
Convert from the loose structured xml format to the | ||
TWFY xml format | ||
""" | ||
if verbose: | ||
print(f"Converting {file_path}") | ||
|
||
# get source as an xml tree | ||
with file_path.open("r") as f: | ||
source = etree.fromstring(f.read()) | ||
|
||
# root of the tree is a publicwhip object | ||
root = etree.Element("publicwhip") | ||
|
||
iso_date = source.get("date") | ||
source_id = source.get("id") | ||
|
||
# get the date in format Thursday 9 June 2005 | ||
date_str = datetime.date.fromisoformat(iso_date).strftime("%A %d %B %Y") | ||
|
||
committee_slug = "sp-written" | ||
|
||
dest_path = output_dir / committee_slug / f"spwa{iso_date}.xml" | ||
dest_path.parent.mkdir(parents=True, exist_ok=True) | ||
|
||
id_factory = IDFactory(committee_slug=committee_slug, iso_date=iso_date) | ||
|
||
# there is only questions for today | ||
major_heading = etree.Element("major-heading") | ||
major_heading.set("id", id_factory.get_next_major_id()) | ||
major_heading.set("nospeaker", "True") | ||
# major_heading.set("url", item.get("url")) | ||
major_heading.text = f"Written Questions for {date_str}" | ||
root.append(major_heading) | ||
|
||
# iterate through the agenda_items | ||
for item in source.iter("spwrans"): | ||
# iterate through the questions | ||
# each question is a minor heading using the id as the title because | ||
# we don't have anything else to use | ||
minor_heading = etree.Element("minor-heading") | ||
minor_heading.set("id", id_factory.get_next_minor_id()) | ||
minor_heading.text = f"Question {item.get('id')}" | ||
root.append(minor_heading) | ||
|
||
previous_speech = None | ||
missing_speakers = [] | ||
for subitem in item.find("parsed"): | ||
if subitem.tag == "question": | ||
speaker_name = subitem.get("speaker_name") | ||
person_id = get_unique_person_id(speaker_name, iso_date) | ||
if ( | ||
person_id is None | ||
and speaker_name not in missing_speakers | ||
and verbose | ||
): | ||
print(f"Could not find person id for {speaker_name}") | ||
missing_speakers.append(speaker_name) | ||
speech = etree.Element("ques") | ||
speech.set("id", id_factory.get_next_minor_id()) | ||
speech.set("url", item.get("url") or "") | ||
speech.set("speakername", speaker_name) | ||
speech.set("person_id", person_id or "unknown") | ||
for child in subitem: | ||
speech.append(child) | ||
root.append(speech) | ||
|
||
elif subitem.tag == "answer": | ||
speaker_name = subitem.get("speaker_name") | ||
person_id = get_unique_person_id(speaker_name, iso_date) | ||
if ( | ||
person_id is None | ||
and speaker_name not in missing_speakers | ||
and verbose | ||
): | ||
print(f"Could not find person id for {speaker_name}") | ||
missing_speakers.append(speaker_name) | ||
speech = etree.Element("reply") | ||
speech.set("id", id_factory.get_next_minor_id()) | ||
speech.set("url", item.get("url") or "") | ||
speech.set("speakername", speaker_name) | ||
speech.set("person_id", person_id or "unknown") | ||
for child in subitem: | ||
speech.append(child) | ||
root.append(speech) | ||
|
||
# for all mspnames elements, we need to create an ID property | ||
for mspname in root.iter("mspname"): | ||
person_name = mspname.text | ||
person_id = is_member_vote(person_name, iso_date) | ||
if person_id is None: | ||
print(f"Could not find person id for {person_name}") | ||
mspname.set("id", person_id or "unknown") | ||
|
||
# write the new xml to a file | ||
etree.indent(root, space=" ") | ||
|
||
with dest_path.open("wb") as f: | ||
f.write(etree.tostring(root, pretty_print=True)) | ||
|
||
|
||
def convert_to_twfy( | ||
cache_dir: Path, | ||
output_dir: Path, | ||
partial_file_name: Optional[str] = None, | ||
verbose: bool = False, | ||
): | ||
""" | ||
Given a cache directory, parse the raw_html elements in the xml files | ||
This updates the 'parsed' element under each agenda-item. | ||
""" | ||
if partial_file_name: | ||
xmls = list(cache_dir.glob(f"{partial_file_name}*")) | ||
else: | ||
xmls = list(cache_dir.glob("*.xml")) | ||
for xml in xmls: | ||
convert_xml_to_twfy(xml, output_dir, verbose=verbose) |