Skip to content

Commit

Permalink
Modify collect_mileage_file_by_elr()
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeqfu committed Sep 26, 2019
1 parent fe6373d commit 3a4b210
Showing 1 changed file with 96 additions and 28 deletions.
124 changes: 96 additions & 28 deletions pyrcs/line_data_cls/elrs_mileages.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,21 @@ def __init__(self, data_dir=None):

# Change directory to "dat\\line-data\\elrs-and-mileages" and sub-directories
def cd_em(self, *sub_dir):
"""
:param sub_dir: [str]
:return: [str]
"""
path = self.DataDir
for x in sub_dir:
path = os.path.join(path, x)
return path

# Change directory to "dat\\line-data\\elrs-and-mileages\\dat" and sub-directories
def cdd_em(self, *sub_dir):
"""
:param sub_dir: [str]
:return: [str]
"""
path = self.cd_em("dat")
for x in sub_dir:
path = os.path.join(path, x)
Expand All @@ -275,6 +283,11 @@ def collect_elr_by_initial(self, initial, update=False, verbose=False):
[pandas.DataFrame] data of ELRs whose names start with the given 'initial', incl. ELR names,
line name, mileages, datum and some notes
[str] date of when the data was last updated
Testing e.g.
initial = 'a'
update = False
verbose = False
"""
assert initial in string.ascii_letters
beginning_with = initial.upper()
Expand Down Expand Up @@ -343,36 +356,77 @@ def collect_mileage_file_by_elr(self, elr, parsed=True, confirmation_required=Tr
- Mileages in parentheses are not on that ELR, but are included for reference,
e.g. ANL, (8.67) NORTHOLT [London Underground]
- As with the main ELR list, mileages preceded by a tilde (~) are approximate.
Testing e.g.
elr
parsed = True
confirmation_required = False
pickle_it = False
verbose = False
"""
if confirmed("To collect mileage file for \"{}\"?".format(elr.upper()),
confirmation_required=confirmation_required):

try:
# The URL of the mileage file for the ELR
url = self.HomeURL + '/elrs/_mileages/{}/{}.shtm'.format(elr[0].lower(), elr.lower())
source = requests.get(url)
source_text = bs4.BeautifulSoup(source.text, 'lxml')

# Search for information of the line -----------------------------------------------------------------
line_name, sub_line_name = source_text.find('h3').text, source_text.find('h4')

if line_name == '"404" error: page not found':
return None
else:
line_name = line_name.split('\t')
if sub_line_name:
sub_headers = sub_line_name.text.split('\t')
# assert sub_headers[0] == elr
initial = elr[0]
elr_dat = self.collect_elr_by_initial(initial, verbose=verbose)[initial]
elr_dat = elr_dat[elr_dat.ELR == elr]

notes = elr_dat.Notes.values[0]
if re.match(r'(Now( part of)? |= |See )[A-Z]{3}(\d)?$', notes):
new_elr = re.search(r'(?<= )[A-Z]{3}(\d)?', notes).group(0)
mileage_file = self.fetch_mileage_file(elr=new_elr, pickle_it=pickle_it)
return mileage_file

else:
line_name, mileages = elr_dat[['Line name', 'Mileages']].values[0]
if re.match(r'(\w ?)+ \((\w+ \w+)+.\)', line_name):
line_name_ = re.search(r'(?<=\w \()(\w+ \w+)+.(?=\))', line_name).group(0)
try:
location_a, _, location_b = re.split(r' (and|&|to) ', line_name_)
line_name = re.search(r'(\w+ \w+)+.(?= \((\w ?)+\))', line_name).group(0)
except ValueError:
location_a, _, location_b = re.split(r' (and|&|to) ', notes)
line_name = line_name_
elif elr_dat.Mileages.values[0].startswith('0.00') and elr_dat.Datum.values[0] != '':
location_a = elr_dat.Datum.values[0]
location_b = re.split(r' (and|&|to) ', line_name)[
2] if location_a in line_name else line_name
else:
try:
location_a, _, location_b = re.split(r' (and|&|to) ', notes)
except (ValueError, TypeError):
location_a, _, location_b = re.split(r' (and|&|to) ', line_name)
else:
location_a, location_b = '', ''
location_b_ = re.sub(r' Branch| Curve', '', location_b) \
if re.match(r'.*( Branch| Curve)$', location_b) else location_b

miles_chains, locations = mileages.split(' - '), [location_a, location_b_]
parsed_content = [[m, l] for m, l in zip(miles_chains, locations)]

else:
sub_headers = ['', '']
# assert line_name[0] == elr
line_name = line_name.split('\t')[1]
parsed_content = [x.strip().split('\t', 1)
for x in source_text.find('pre').text.splitlines() if x != '']
parsed_content = [[y.replace(' ', ' ').replace('\t', ' ') for y in x]
for x in parsed_content]
parsed_content = [[''] + x if (len(x) == 1) & ('Note that' not in x[0]) else x
for x in parsed_content]

# Make a dict of line information
line_info = {'ELR': elr, 'Line': line_name[1], 'Sub-Line': sub_headers[1]}
# assert sub_headers[0] == elr
sub_headers = sub_line_name.text.split('\t')[1] if sub_line_name else ''

# Parse the main texts ------------------------------------------------------------------------------
parsed_content = source_text.find('pre').text.splitlines()
parsed_content = [x.strip().split('\t', 1) for x in parsed_content if x != '']
parsed_content = [[y.replace(' ', ' ').replace('\t', ' ') for y in x] for x in parsed_content]
parsed_content = [[''] + x if (len(x) == 1) & ('Note that' not in x[0]) else x for x in parsed_content]
# Make a dict of line information
line_info = {'ELR': elr, 'Line': line_name, 'Sub-Line': sub_headers}

# Search for note
note_temp = min(parsed_content, key=len)
Expand All @@ -391,9 +445,11 @@ def collect_mileage_file_by_elr(self, elr, parsed=True, confirmation_required=Tr
if mileage_data.iloc[-1].Mileage == '':
note = [note, mileage_data.iloc[-1].Node] if note else mileage_data.iloc[-1].Node
mileage_data = mileage_data[:-1]

if len(mileage_data.iloc[-1].Mileage) > 6:
note = [note, mileage_data.iloc[-1].Mileage] if note else mileage_data.iloc[-1].Mileage
mileage_data = mileage_data[:-1]

# Make a dict of note
note_dat = {'Note': note}

Expand All @@ -415,7 +471,7 @@ def collect_mileage_file_by_elr(self, elr, parsed=True, confirmation_required=Tr
save_pickle(mileage_file, path_to_pickle, verbose)

except Exception as e:
print("Failed to collect the mileage file for \"{}.\" {}.".format(elr, e))
print("Failed to collect the mileage file for \"{}\". {}.".format(elr, e))
mileage_file = None

return mileage_file
Expand Down Expand Up @@ -491,20 +547,31 @@ def search_conn(start_elr, start_em, end_elr, end_em):
def get_conn_mileages(self, start_elr, end_elr, update=False, pickle_mileage_file=False, data_dir=None,
verbose=False):
"""
start_elr, end_elr, update = 'NAY', 'LTN2', False
:param start_elr: [str]
:param end_elr: [str]
:param update: [bool] (default: False)
:param pickle_mileage_file: [bool] (default: False)
:param data_dir: [str; None (default)]
:param verbose: [bool] (default: False)
:return: [tuple]
Testing e.g.
start_elr, end_elr = 'NAY', 'LTN2'
update = False
pickle_mileage_file = False
data_dir = None
verbose = True
"""
start_file = self.fetch_mileage_file(start_elr, update, pickle_mileage_file, data_dir, verbose=verbose)
end_file = self.fetch_mileage_file(end_elr, update, pickle_mileage_file, data_dir, verbose=verbose)

if start_file is not None and end_file is not None:
start_em, end_em = start_file[start_elr], end_file[end_elr]
key_pat = re.compile(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)')
if isinstance(start_em, dict):
start_em = start_em[[k for k in start_em.keys()
if re.match(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)', k)][0]]
start_em = start_em[[k for k in start_em.keys() if re.match(key_pat, k)][0]]
if isinstance(end_em, dict):
end_em = end_em[[k for k in end_em.keys()
if re.match(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)', k)][0]]
end_em = end_em[[k for k in end_em.keys() if re.match(key_pat, k)][0]]
#
start_dest_mileage, end_orig_mileage = self.search_conn(start_elr, start_em, end_elr, end_em)

Expand All @@ -524,15 +591,16 @@ def get_conn_mileages(self, start_elr, end_elr, update=False, pickle_mileage_fil
conn_elr = conn_temp.iloc[j]
conn_em = self.fetch_mileage_file(conn_elr, update=update)
if conn_em is not None:
conn_elr = conn_em['ELR']
conn_em = conn_em[conn_elr]
if isinstance(conn_em, dict):
conn_em = conn_em[[k for k in conn_em.keys()
if re.match(r'(Current\s)|(One\s)|(Later\s)|(Usual\s)', k)][0]]
conn_em = conn_em[[k for k in conn_em.keys() if re.match(key_pat, k)][0]]
#
start_dest_mileage, conn_orig_mileage = self.search_conn(start_elr, start_em, conn_elr,
conn_em)
start_dest_mileage, conn_orig_mileage = \
self.search_conn(start_elr, start_em, conn_elr, conn_em)
#
conn_dest_mileage, end_orig_mileage = self.search_conn(conn_elr, conn_em, end_elr, end_em)
conn_dest_mileage, end_orig_mileage = \
self.search_conn(conn_elr, conn_em, end_elr, end_em)

if conn_dest_mileage and end_orig_mileage:
if not start_dest_mileage:
Expand Down

0 comments on commit 3a4b210

Please sign in to comment.