diff --git a/pyrcs/line_data_cls/elrs_mileages.py b/pyrcs/line_data_cls/elrs_mileages.py index a5dc022..4debad0 100644 --- a/pyrcs/line_data_cls/elrs_mileages.py +++ b/pyrcs/line_data_cls/elrs_mileages.py @@ -253,6 +253,10 @@ def __init__(self, data_dir=None): # Change directory to "dat\\line-data\\elrs-and-mileages" and sub-directories def cd_em(self, *sub_dir): + """ + :param sub_dir: [str] + :return: [str] + """ path = self.DataDir for x in sub_dir: path = os.path.join(path, x) @@ -260,6 +264,10 @@ def cd_em(self, *sub_dir): # Change directory to "dat\\line-data\\elrs-and-mileages\\dat" and sub-directories def cdd_em(self, *sub_dir): + """ + :param sub_dir: [str] + :return: [str] + """ path = self.cd_em("dat") for x in sub_dir: path = os.path.join(path, x) @@ -275,6 +283,11 @@ def collect_elr_by_initial(self, initial, update=False, verbose=False): [pandas.DataFrame] data of ELRs whose names start with the given 'initial', incl. ELR names, line name, mileages, datum and some notes [str] date of when the data was last updated + + Testing e.g. + initial = 'a' + update = False + verbose = False """ assert initial in string.ascii_letters beginning_with = initial.upper() @@ -343,36 +356,77 @@ def collect_mileage_file_by_elr(self, elr, parsed=True, confirmation_required=Tr - Mileages in parentheses are not on that ELR, but are included for reference, e.g. ANL, (8.67) NORTHOLT [London Underground] - As with the main ELR list, mileages preceded by a tilde (~) are approximate. + + Testing e.g. + elr + parsed = True + confirmation_required = False + pickle_it = False + verbose = False """ if confirmed("To collect mileage file for \"{}\"?".format(elr.upper()), confirmation_required=confirmation_required): + try: # The URL of the mileage file for the ELR url = self.HomeURL + '/elrs/_mileages/{}/{}.shtm'.format(elr[0].lower(), elr.lower()) source = requests.get(url) source_text = bs4.BeautifulSoup(source.text, 'lxml') - # Search for information of the line ----------------------------------------------------------------- line_name, sub_line_name = source_text.find('h3').text, source_text.find('h4') + if line_name == '"404" error: page not found': - return None - else: - line_name = line_name.split('\t') - if sub_line_name: - sub_headers = sub_line_name.text.split('\t') - # assert sub_headers[0] == elr + initial = elr[0] + elr_dat = self.collect_elr_by_initial(initial, verbose=verbose)[initial] + elr_dat = elr_dat[elr_dat.ELR == elr] + + notes = elr_dat.Notes.values[0] + if re.match(r'(Now( part of)? |= |See )[A-Z]{3}(\d)?$', notes): + new_elr = re.search(r'(?<= )[A-Z]{3}(\d)?', notes).group(0) + mileage_file = self.fetch_mileage_file(elr=new_elr, pickle_it=pickle_it) + return mileage_file + + else: + line_name, mileages = elr_dat[['Line name', 'Mileages']].values[0] + if re.match(r'(\w ?)+ \((\w+ \w+)+.\)', line_name): + line_name_ = re.search(r'(?<=\w \()(\w+ \w+)+.(?=\))', line_name).group(0) + try: + location_a, _, location_b = re.split(r' (and|&|to) ', line_name_) + line_name = re.search(r'(\w+ \w+)+.(?= \((\w ?)+\))', line_name).group(0) + except ValueError: + location_a, _, location_b = re.split(r' (and|&|to) ', notes) + line_name = line_name_ + elif elr_dat.Mileages.values[0].startswith('0.00') and elr_dat.Datum.values[0] != '': + location_a = elr_dat.Datum.values[0] + location_b = re.split(r' (and|&|to) ', line_name)[ + 2] if location_a in line_name else line_name + else: + try: + location_a, _, location_b = re.split(r' (and|&|to) ', notes) + except (ValueError, TypeError): + location_a, _, location_b = re.split(r' (and|&|to) ', line_name) + else: + location_a, location_b = '', '' + location_b_ = re.sub(r' Branch| Curve', '', location_b) \ + if re.match(r'.*( Branch| Curve)$', location_b) else location_b + + miles_chains, locations = mileages.split(' - '), [location_a, location_b_] + parsed_content = [[m, l] for m, l in zip(miles_chains, locations)] + else: - sub_headers = ['', ''] - # assert line_name[0] == elr + line_name = line_name.split('\t')[1] + parsed_content = [x.strip().split('\t', 1) + for x in source_text.find('pre').text.splitlines() if x != ''] + parsed_content = [[y.replace(' ', ' ').replace('\t', ' ') for y in x] + for x in parsed_content] + parsed_content = [[''] + x if (len(x) == 1) & ('Note that' not in x[0]) else x + for x in parsed_content] - # Make a dict of line information - line_info = {'ELR': elr, 'Line': line_name[1], 'Sub-Line': sub_headers[1]} + # assert sub_headers[0] == elr + sub_headers = sub_line_name.text.split('\t')[1] if sub_line_name else '' - # Parse the main texts ------------------------------------------------------------------------------ - parsed_content = source_text.find('pre').text.splitlines() - parsed_content = [x.strip().split('\t', 1) for x in parsed_content if x != ''] - parsed_content = [[y.replace(' ', ' ').replace('\t', ' ') for y in x] for x in parsed_content] - parsed_content = [[''] + x if (len(x) == 1) & ('Note that' not in x[0]) else x for x in parsed_content] + # Make a dict of line information + line_info = {'ELR': elr, 'Line': line_name, 'Sub-Line': sub_headers} # Search for note note_temp = min(parsed_content, key=len) @@ -391,9 +445,11 @@ def collect_mileage_file_by_elr(self, elr, parsed=True, confirmation_required=Tr if mileage_data.iloc[-1].Mileage == '': note = [note, mileage_data.iloc[-1].Node] if note else mileage_data.iloc[-1].Node mileage_data = mileage_data[:-1] + if len(mileage_data.iloc[-1].Mileage) > 6: note = [note, mileage_data.iloc[-1].Mileage] if note else mileage_data.iloc[-1].Mileage mileage_data = mileage_data[:-1] + # Make a dict of note note_dat = {'Note': note} @@ -415,7 +471,7 @@ def collect_mileage_file_by_elr(self, elr, parsed=True, confirmation_required=Tr save_pickle(mileage_file, path_to_pickle, verbose) except Exception as e: - print("Failed to collect the mileage file for \"{}.\" {}.".format(elr, e)) + print("Failed to collect the mileage file for \"{}\". {}.".format(elr, e)) mileage_file = None return mileage_file @@ -491,20 +547,31 @@ def search_conn(start_elr, start_em, end_elr, end_em): def get_conn_mileages(self, start_elr, end_elr, update=False, pickle_mileage_file=False, data_dir=None, verbose=False): """ - start_elr, end_elr, update = 'NAY', 'LTN2', False - + :param start_elr: [str] + :param end_elr: [str] + :param update: [bool] (default: False) + :param pickle_mileage_file: [bool] (default: False) + :param data_dir: [str; None (default)] + :param verbose: [bool] (default: False) + :return: [tuple] + + Testing e.g. + start_elr, end_elr = 'NAY', 'LTN2' + update = False + pickle_mileage_file = False + data_dir = None + verbose = True """ start_file = self.fetch_mileage_file(start_elr, update, pickle_mileage_file, data_dir, verbose=verbose) end_file = self.fetch_mileage_file(end_elr, update, pickle_mileage_file, data_dir, verbose=verbose) if start_file is not None and end_file is not None: start_em, end_em = start_file[start_elr], end_file[end_elr] + key_pat = re.compile(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)') if isinstance(start_em, dict): - start_em = start_em[[k for k in start_em.keys() - if re.match(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)', k)][0]] + start_em = start_em[[k for k in start_em.keys() if re.match(key_pat, k)][0]] if isinstance(end_em, dict): - end_em = end_em[[k for k in end_em.keys() - if re.match(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)', k)][0]] + end_em = end_em[[k for k in end_em.keys() if re.match(key_pat, k)][0]] # start_dest_mileage, end_orig_mileage = self.search_conn(start_elr, start_em, end_elr, end_em) @@ -524,15 +591,16 @@ def get_conn_mileages(self, start_elr, end_elr, update=False, pickle_mileage_fil conn_elr = conn_temp.iloc[j] conn_em = self.fetch_mileage_file(conn_elr, update=update) if conn_em is not None: + conn_elr = conn_em['ELR'] conn_em = conn_em[conn_elr] if isinstance(conn_em, dict): - conn_em = conn_em[[k for k in conn_em.keys() - if re.match(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)', k)][0]] + conn_em = conn_em[[k for k in conn_em.keys() if re.match(key_pat, k)][0]] # - start_dest_mileage, conn_orig_mileage = self.search_conn(start_elr, start_em, conn_elr, - conn_em) + start_dest_mileage, conn_orig_mileage = \ + self.search_conn(start_elr, start_em, conn_elr, conn_em) # - conn_dest_mileage, end_orig_mileage = self.search_conn(conn_elr, conn_em, end_elr, end_em) + conn_dest_mileage, end_orig_mileage = \ + self.search_conn(conn_elr, conn_em, end_elr, end_em) if conn_dest_mileage and end_orig_mileage: if not start_dest_mileage: