From a130c50441607d6ae5ebc9134e7a7b10abfc24c7 Mon Sep 17 00:00:00 2001 From: Matthew Somerville Date: Sat, 5 Sep 2020 17:42:48 +0100 Subject: [PATCH] [UK] Update to new Written API. --- wrans-2014/parse.py | 329 +++++++++++++++----------------------------- 1 file changed, 111 insertions(+), 218 deletions(-) diff --git a/wrans-2014/parse.py b/wrans-2014/parse.py index bcdbe37ef..46aab99e8 100755 --- a/wrans-2014/parse.py +++ b/wrans-2014/parse.py @@ -1,8 +1,8 @@ #!/usr/bin/env python # # UK Parliament Written Answers are now in a new database-driven website. This -# site has a nice search, and RSS feeds, but for now we still need a bit of -# scraping to fetch the data. Ever so slightly simpler than the past, though! +# site has a nice-ish search, but for now we still need a bit of scraping to +# fetch the data. Ever so slightly simpler than the past, though! import argparse import datetime @@ -13,14 +13,13 @@ from xml.sax.saxutils import escape import bs4 -import dateutil.parser import requests import requests_cache # Command line arguments yesterday = datetime.date.today() - datetime.timedelta(days=1) -parser = argparse.ArgumentParser(description='Scrape/parse new Written Answers database.') +parser = argparse.ArgumentParser(description='Scrape/parse new Written Answers/Statements database.') parser.add_argument('--house', required=True, choices=['commons', 'lords'], help='Which house to fetch') parser.add_argument('--type', required=True, choices=['answers', 'statements'], help='What sort of thing to fetch') parser.add_argument('--date', default=yesterday.isoformat(), help='date to fetch') @@ -32,90 +31,43 @@ cache_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache') requests_cache.install_cache(cache_path, expire_after=60*60*12) -HOST = 'https://www.parliament.uk' -URL_ROOT = '%s/business/publications/written-questions-answers-statements/' % HOST +HOST = 'https://questions-statements.parliament.uk' +API_HOST = 'https://writtenquestions-api.parliament.uk' if ARGS.type == 'answers': - URL_INDEX = URL_ROOT + 'written-questions-answers/' + URL_INDEX = HOST + '/written-questions' + API_INDEX = API_HOST + '/api/writtenquestions/questions' else: - URL_INDEX = URL_ROOT + 'written-statements/' - - -def _lord_name_on_date(p, date): - for n in PERSONS[p]: - if n.get('start_date', '0000-00-00') <= date <= n.get('end_date', '9999-12-31'): - return _lord_name(n) - # Not available on this date (e.g. deceased) - return '' - -def _lord_name(n): - name = n['honorific_prefix'] - if name in ('Bishop', 'Archbishop'): - name = 'Lord %s' % name - if n['lordname']: - name += ' %s' % n['lordname'] - if n['lordofname']: - name += ' of %s' % n['lordofname'] - # Earl of Clancarty is in the peerage of Ireland but the Lords - # still uses it :/ Should really use peers-aliases.xml here. - if name == 'Viscount Clancarty': - name = 'The Earl of Clancarty' - return name - + URL_INDEX = HOST + '/written-statements' + API_INDEX = API_HOST + '/api/writtenstatements/statements' with open(ARGS.members) as fp: MEMBERS = json.load(fp) -if ARGS.house == 'lords': - MEMBERS_BY_NAME = {} - PERSONS = {p['id']: [n for n in p.get('other_names', []) if n['note']=='Main'] for p in MEMBERS['persons']} - for m in MEMBERS['memberships']: - if m.get('organization_id') != 'house-of-lords': continue - if not m['start_date'] <= ARGS.date <= m.get('end_date', '9999-12-31'): continue - name = _lord_name_on_date(m['person_id'], ARGS.date).lower() - if name: - MEMBERS_BY_NAME[name] = m['person_id'] -else: DATADOTPARL_ID_TO_PERSON_ID = {} for person in MEMBERS['persons']: for i in person.get('identifiers', []): if i['scheme'] == 'datadotparl_id': - DATADOTPARL_ID_TO_PERSON_ID[i['identifier']] = person['id'] + DATADOTPARL_ID_TO_PERSON_ID[int(i['identifier'])] = person['id'] def main(): - params = urllib.urlencode({ - 'page': 1, - 'max': 100, - 'questiontype': 'QuestionsWithAnswersOnly', # 'AllQuestions', - 'house': ARGS.house, - 'use-dates': 'True', - 'answered-from': ARGS.date, - 'answered-to': ARGS.date, - }) - url_page = '%s?%s' % (URL_INDEX, params) + params = { + 'take': 20, + 'house': ARGS.house.title(), + 'expandMember': 'true', + } if ARGS.type == 'answers': writtens = Questions() + get_from_list(writtens, dict(params, answeredWhenFrom=ARGS.date, answeredWhenTo=ARGS.date)) + get_from_list(writtens, dict(params, correctedWhenFrom=ARGS.date, correctedWhenTo=ARGS.date)) + # Make sure we have all grouped questions (some might actually not have + # been returned due to being on another day) + for uin, qn in writtens.by_id.items(): + qn.groupedQuestions = map(writtens.get_by_uin, qn.groupedQuestions) else: writtens = Statements() - errors = 0 - while url_page: - r = requests.get(url_page) - if 'There are no results' in r.content: - break - if 'Server Error' in r.content: - requests.Session().cache.delete_url(url_page) - errors += 1 - if errors >= 5: - raise Exception, 'Too many server errors, giving up' - continue - writtens.add_from_html(r.content) - url_page = writtens.next_page - - # Make sure we have all grouped questions (some might actually not have - # been returned due to bugs/being on another day) - if ARGS.type == 'answers': - for uid, qn in writtens.by_id.items(): - for a in qn.groupedquestions: - writtens.get_by_id(a) + params['madeWhenFrom'] = ARGS.date + params['madeWhenTo'] = ARGS.date + get_from_list(writtens, params) output = ('%s' % writtens).encode('utf-8') if output: @@ -129,29 +81,51 @@ def main(): print "* %s Written %s: found %d new items" % (ARGS.house.title(), ARGS.type.title(), writtens.number) +def get_from_list(writtens, params): + params = urllib.urlencode(params) + url_page = '%s?%s' % (API_INDEX, params) + errors = 0 + skip = 0 + while url_page: + url = '%s&skip=%d' % (url_page, skip) + print url + j = requests.get(url).json() + if not j['results']: + break + if j.get('status') == 500: + requests.Session().cache.delete_url(url_page) + errors += 1 + if errors >= 5: + raise Exception, 'Too many server errors, giving up: %s' % j['title'] + continue + writtens.add_from_json(j) + if writtens.number < j['totalResults']: + skip += 20 + else: + url_page = None + + class AttrDict(dict): def __init__(self, *args, **kwargs): super(AttrDict, self).__init__(*args, **kwargs) self.__dict__ = self -class WrittenThing(object): - def find_speaker(self, h, date): - name = h.a.text.strip() - if ARGS.house == 'lords': - # Loop through all, match on name and date - name = name.lower().replace(u'\u2019', "'") - person_id = MEMBERS_BY_NAME[re.sub('^the ', '', name)] - else: - speaker_id = re.search('(\d+)$', h.a['href']).group(1) - person_id = DATADOTPARL_ID_TO_PERSON_ID[speaker_id] - return AttrDict(id=person_id, name=name) +class WrittenThing(AttrDict): + def find_speaker(self, speaker): + person_id = DATADOTPARL_ID_TO_PERSON_ID[speaker['id']] + return AttrDict(id=person_id, name=speaker['name']) + + def find_date(self, date): + return date.replace('T00:00:00', '') - def find_date(self, d, regex): - date = re.match('\s*%s on:\s+(?P.*)' % regex, d.text) - date = date.group('date') - date = dateutil.parser.parse(date).date().isoformat() - return date + def fix_text(self, text): + soup = bs4.BeautifulSoup(text) + return ''.join(map(unicode, soup.body.contents)) + + def get_detail(self): + url = '%s/%s' % (API_INDEX, self['id']) + return requests.get(url).json()['value'] class WrittenThings(object): @@ -159,25 +133,13 @@ def __init__(self): self.by_id = {} self.by_dept = {} - def wanted_thing(self, st): - return True - - def add_from_html(self, data): - """Provide a page of HTML, parse out all its things""" - soup = bs4.BeautifulSoup(data) - for item in soup.find_all(class_="qna-result-container"): + def add_from_json(self, data): + """Provide the API JSON, parse out all its things""" + for result in data['results']: + item = result['value'] item = self.model(item, self) - if self.wanted_thing(item): - self.by_id[item.uid] = item - self.by_dept.setdefault(item.dept, []).append(item) - - n = soup.find(text='Next').parent - if n.name == 'span': - self.next_page = None - elif n.name == 'a': - self.next_page = '%s%s' % (HOST, n['href']) - else: - raise Exception + self.by_id[item.uin] = item + self.by_dept.setdefault(item.answeringBodyName, []).append(item) @property def number(self): @@ -190,7 +152,7 @@ def __str__(self): out = '\n' for dept, deptitems in self.by_dept.items(): out += ''' - + {dept} '''.format(dept=dept, item=deptitems[0], gid=self.gid_type) @@ -201,116 +163,61 @@ def __str__(self): class Statement(WrittenThing): def __init__(self, st, sts): - self.uid = escape(st.find(class_="qna-result-ws-uin").a.text) - self.dept = escape(st.find(class_="qna-result-writtenstatement-answeringbody").text) - self.heading = escape(st.find(class_="qna-result-ws-content-heading").text) - - date = st.find(class_="qna-result-ws-date") - self.date = self.find_date(date, 'Made') - speaker = st.find(class_='qna-result-writtenstatement-made-by') - self.speaker = self.find_speaker(speaker, self.date) + super(Statement, self).__init__(st) + self.date = self.find_date(self.dateMade) + self.uin = self.uin.lower() + self.heading = escape(self.title) + self.speaker = self.find_speaker(self.member) - joint = st.find(class_='qna-result-writtenstatement-joint-statement-row') - if joint: - a = joint.find('a') - a['href'] = HOST + a['href'] - - statement = st.find(class_="qna-result-writtenstatement-text") - self.statement = statement.renderContents().decode('utf-8').strip() + data = self.get_detail() + self.statement = self.fix_text(data['text']) + for a in data['attachments']: + self.statement += '

%s (%s, %.1fKB)

' % (a['url'], a['title'], a['fileType'], a['fileSizeBytes']/1024.0) def __str__(self): return u''' - + {st.heading} - + {st.statement} -'''.format(st=self, house=ARGS.house.title(), url_root=URL_ROOT) +'''.format(st=self, url_root=HOST) class Question(WrittenThing): def __init__(self, qn, qns): - self.qns = qns - - self.uid = escape(qn.find(class_="qna-result-question-uin").a.text) - self.dept = escape(qn.find(class_="qna-result-question-answeringbody").text) - try: - hdg = qn.find(class_="qna-result-question-hansardheading") - self.heading = escape(hdg.text) - except: - self.heading = '*No heading*' - - date_asked = qn.find(class_="qna-result-question-date") - self.date_asked = self.find_date(date_asked, 'Asked') - self.date = self.date_asked - asker = qn.find(class_='qna-result-question-title') - self.asker = self.find_speaker(asker, self.date_asked) - question = qn.find(class_="qna-result-question-text").text.strip() - self.question = escape(question) - - self.answers = [] - for answer_ctr in qn.findAll(class_='qna-result-answer-container'): - date_answer = answer_ctr.find(class_="qna-result-answer-date") - date_answer = self.find_date(date_answer, '(Answered|Corrected)') - answerer = answer_ctr.find(class_="qna-result-answer-title") - answerer = self.find_speaker(answerer, date_answer) - - groupedquestions_container = answer_ctr.find(class_="qna-result-groupedquestions-container") - groupedquestions = [] - if groupedquestions_container: - groupedquestions = answer_ctr.find_all(class_='qna-result-groupedquestion-container') - groupedquestions = [x.a.text for x in groupedquestions] - groupedquestions_container.extract() - - answer = answer_ctr.find(class_="qna-result-answer-content") - # Remove show/hide changes JS-only code at top - for l in answer.findAll('label'): l.extract() - for l in answer.findAll('input'): l.extract() - # Replace spans with semantic elements - for l in answer.findAll('span', class_='ins'): - l.name = 'ins' - for l in answer.findAll('span', class_='del'): - l.name = 'del' - - answer = answer.renderContents().decode('utf-8').strip() - - # Attachments are okay just left in the answer - # attachments = answer_ctr.find(class_="qna-result-attachments-container") - # if attachments: - # self.attachments = answer_ctr.find_all(class_='qna-result-attachment-container') - - self.answers.append(AttrDict({ - 'answerer': answerer, - 'date': date_answer, - 'answer': answer, - 'groupedquestions': groupedquestions, - })) - - @property - def groupedquestions(self): - if len(self.answers): - return self.answers[-1].groupedquestions - return [] + super(Question, self).__init__(qn) + self.date = self.find_date(self.dateTabled) + self.heading = escape(self.heading or 'Question') + self.asker = self.find_speaker(self.askingMember) + + data = self.get_detail() + self.question = escape(data['questionText']) + self.answerer = self.find_speaker(self.answeringMember) + self.date_answer = self.find_date(self.dateAnswered) + self.answer = self.fix_text(data['answerText']) + for a in data['attachments']: + self.answer += '

%s (%s, %.1fKB)

' % (a['url'], a['title'], a['fileType'], a['fileSizeBytes']/1024.0) @property def secondary_group_question(self): - return self.groupedquestions and self.uid > min(self.groupedquestions) + return self.groupedQuestions and self.uin > min(map(lambda x: x['uin'], self.groupedQuestions)) @property def questions_xml(self): - qns = [self] + [self.qns.by_id[q] for q in self.groupedquestions] + qns = [self] + self.groupedQuestions return ''.join([u''' - -

{qn.question}

-
'''.format(i=i, qn=qn, house=ARGS.house.title(), url_root=URL_ROOT) for i, qn in enumerate(qns)]) + +

{qn.question}

+
'''.format(i=i, qn=qn, url_root=HOST) for i, qn in enumerate(qns)]) @property def answers_xml(self): - return ''.join([u''' - - {answer.answer} -'''.format(i=i, qn=self, answer=answer) for i, answer in enumerate(self.answers)]) + return u''' + + {qn.answer} +'''.format(i=0, qn=self) def __str__(self): # TODO If we were to import unanswered questions, we would want to @@ -324,7 +231,7 @@ def __str__(self): # return '\n' % (oldgid, newgid, matchtype) return u''' - + {qn.heading} {qn.questions_xml} @@ -341,29 +248,15 @@ class Questions(WrittenThings): gid_type = 'wrans' model = Question - def wanted_thing(self, qn): - # If the question was answered/corrected on our date - if ARGS.date in [ a.date for a in qn.answers ]: - return True - return False - - def get_by_id(self, uid): - # It is possible that when we need to look up a grouped ID, it's not - # present. Either it was on a different day (possible), or there's a - # mistake in what has been returned from the search. Perform a separate - # scrape/parse of the question if so, to get its data. - if uid in self.by_id: - return self.by_id[uid] - r = requests.get(URL_INDEX, params={'uin': uid}) - if 'There are no results' in r.content: - return - if 'Server Error' in r.content: - return - soup = bs4.BeautifulSoup(r.content) - qn = soup.find(class_='qna-result-container') + def get_by_uin(self, uin): + if uin in self.by_id: + return self.by_id[uin] + + qn = requests.get(API_INDEX, params={'expandMember': 'true', 'uin': uin}).json() + qn = qn['results'][0]['value'] qn = Question(qn, self) - self.by_id[qn.uid] = qn - self.by_dept.setdefault(qn.dept, []).append(qn) + self.by_id[qn.uin] = qn + self.by_dept.setdefault(qn.answeringBodyName, []).append(qn) return qn