Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SC scraper #441

Merged
merged 3 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions warn/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import logging
import os
import typing
from os.path import expanduser, join
from pathlib import Path

Expand Down Expand Up @@ -71,13 +72,16 @@ def read_csv(self, name):
with open(path) as fh:
return list(csv.reader(fh))

def download(self, name: str, url: str, **kwargs) -> Path:
def download(
self, name: str, url: str, encoding: typing.Optional[str] = None, **kwargs
) -> Path:
"""
Download the provided URL and save it in the cache.

Args:
name (str): The path where the file will be saved. Can be a simple string like "ia/data.xlsx"
url (str): The URL to download
encoding (str): The encoding of the response. Optional.
**kwargs: Additional arguments to pass to requests.get()

Returns: The Path where the file was saved
Expand All @@ -86,7 +90,9 @@ def download(self, name: str, url: str, **kwargs) -> Path:
logger.debug(f"Requesting {url}")
with get_url(url, stream=True, **kwargs) as r:
# If there's no encoding, set it
if r.encoding is None:
if encoding:
r.encoding = encoding
elif r.encoding is None:
r.encoding = "utf-8"

# Open the local Path
Expand Down
131 changes: 131 additions & 0 deletions warn/scrapers/sc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import re
from datetime import datetime
from pathlib import Path

import pdfplumber
from bs4 import BeautifulSoup

from .. import utils
from ..cache import Cache

__authors__ = ["palewire"]
__tags__ = [
"html",
"pdf",
]


def scrape(
data_dir: Path = utils.WARN_DATA_DIR,
cache_dir: Path = utils.WARN_CACHE_DIR,
) -> Path:
"""
Scrape data from South Carolina.

Keyword arguments:
data_dir -- the Path were the result will be saved (default WARN_DATA_DIR)
cache_dir -- the Path where results can be cached (default WARN_CACHE_DIR)

Returns: the Path where the file is written
"""
# Get URL
r = utils.get_url(
"https://scworks.org/employer/employer-programs/at-risk-of-closing/layoff-notification-reports",
verify=False,
)
html = r.text

# Save it to the cache
cache = Cache(cache_dir)
cache.write("sc/source.html", html)

# Parse out the PDF links
soup = BeautifulSoup(html, "html.parser")
link_list = soup.find_all("a")
pdf_list = [a for a in link_list if "pdf" in a["href"]]

# Pattern to find and extract data cells
naics_re = re.compile("^[0-9]{6}$")
date_re = re.compile("^[0-9]{1,2}/[0-9]{1,2}[/]{1,2}[0-9]{2}")
jobs_re = re.compile("^[0-9]{1,5}$")

current_year = datetime.now().year
output_rows = []
for pdf in pdf_list:
pdf_year = int(pdf.text[:4].strip())
cache_key = f"sc/{pdf_year}.pdf"
if cache.exists(cache_key) and pdf_year < (current_year - 1):
pdf_path = cache.path / cache_key
else:
pdf_path = cache.download(
cache_key, f"https://scworks.org/{pdf['href']}", verify=False
)

# Open the PDF
with pdfplumber.open(pdf_path) as pdf:

# Loop through the pages
for page in pdf.pages:

# Pull out the table
row_list = page.extract_table()

# Skip empty pages
if not row_list:
continue

# Skip skinny and empty rows
real_rows = []
for row in row_list:
values = [v for v in row if v]
if len(values) < 4:
continue
real_rows.append(row)

# Loop through each row in the table
for row in real_rows:

# Clean values
cell_list = [_clean_cell(c) for c in row if _clean_cell(c)]

# Pluck out the values based on our regex
d = {}
for cell in cell_list:
if naics_re.search(cell):
d["naics"] = cell
elif date_re.search(cell):
d["date"] = cell
elif jobs_re.search(cell):
d["jobs"] = int(cell)

# If there haven't been at least two matches, it must be junk
if len(d) < 2:
continue

# The first one should be the company
d["company"] = cell_list[0]

# The second one should be the location
d["location"] = cell_list[1]

# Keep what we got
output_rows.append(d)

# Write out the data to a CSV
data_path = data_dir / "sc.csv"
headers = ["company", "location", "date", "jobs", "naics"]
utils.write_dict_rows_to_csv(data_path, headers, output_rows, extrasaction="ignore")

# Return the Path to the CSV
return data_path


def _clean_cell(cell):
"""Clean the value in the provided cell."""
if cell is None:
return None
return cell.strip().replace("\n", "")


if __name__ == "__main__":
scrape()