Skip to content

Commit

Permalink
Merge pull request #187 from BritishGeologicalSurvey/etl-post-type-hi…
Browse files Browse the repository at this point in the history
…nt-docs

Etl post type hint docs
  • Loading branch information
volcan01010 authored Sep 15, 2023
2 parents 3729ba6 + e96db47 commit cbd83f3
Showing 1 changed file with 96 additions and 69 deletions.
165 changes: 96 additions & 69 deletions etlhelper/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class FailedRow(NamedTuple):
exception: Exception


# iter_chunks is where data are retrieved from source database
# All data extraction processes call this function.
def iter_chunks(
select_query: str,
conn: Connection,
Expand All @@ -66,14 +64,19 @@ def iter_chunks(
The transform function is applied to chunks of data as they are extracted
from the database.
:param select_query: str, SQL query to execute
All data extraction functions call this function, directly or indirectly.
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: generator returning a list of objects which each
represent a row of data using the given row_factory
:raises ETLHelperExtractError: if SQL raises an error
"""
logger.info("Fetching rows (chunk_size=%s)", chunk_size)
logger.debug(f"Fetching:\n\n{select_query}\n\nwith parameters:\n\n"
Expand Down Expand Up @@ -144,14 +147,16 @@ def iter_rows(
Run SQL query against connection and return iterator object to loop over
results, row-by-row.
:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: generator returning a list of objects which each
represent a row of data using the given row_factory
"""
for chunk in iter_chunks(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
Expand All @@ -172,13 +177,15 @@ def fetchone(
Get first result of query. See iter_rows for details. Note: iter_rows is
recommended for looping over rows individually.
:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: None or a row of data using the given row_factory
"""
try:
result = next(iter_rows(select_query, conn, row_factory=row_factory,
Expand All @@ -203,13 +210,16 @@ def fetchall(
) -> Chunk:
"""
Get all results of query as a list. See iter_rows for details.
:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable
of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: a row of data using the given row_factory
"""
return list(iter_rows(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
Expand Down Expand Up @@ -244,15 +254,16 @@ def executemany(
disadvantage is that investigation may be required to determine exactly
which records have been successfully transferred.
:param query: str, SQL insert command with placeholders for data
:param query: SQL insert command with placeholders for data
:param conn: dbapi connection
:param rows: List of tuples containing data to be inserted/updated
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk has been inserted/updated
:param chunk_size: int, size of chunks to group data by
:return processed, failed: (int, int) number of rows processed, failed
:param rows: an iterable of rows containing data to be inserted/updated
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk has been inserted/updated
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
:raises ETLHelperInsertError: if SQL raises an error
"""
logger.info("Executing many (chunk_size=%s)", chunk_size)
logger.debug("Executing:\n\n%s\n\nagainst:\n\n%s", query, conn)
Expand Down Expand Up @@ -331,13 +342,12 @@ def _execute_by_row(
) -> list[FailedRow]:
"""
Retry execution of rows individually and return failed rows along with
their errors. Successful inserts are committed. This is because
(and other?)
their errors. Successful inserts are committed.
:param query: str, SQL command with placeholders for data
:param chunk: list, list of row parameters
:param conn: open dbapi connection, used for transactions
:returns failed_rows: list of (row, exception) tuples
:param query: SQL query with placeholders for data
:param conn: dbapi connection
:param chunk: list of rows
:return: a list failed rows
"""
failed_rows: list[FailedRow] = []

Expand Down Expand Up @@ -382,19 +392,19 @@ def copy_rows(
tuples. on_error is a function that is called at the end of each chunk,
with the list as the only argument.
:param select_query: str, select rows from Oracle.
:param source_conn: open dbapi connection
:param insert_query:
:param dest_conn: open dbapi connection
:param parameters: sequence or dict of bind variables for select query
:param select_query: SQL query to select data
:param source_conn: dbapi connection
:param insert_query: SQL query to insert data
:param dest_conn: dbapi connection
:param parameters: bind variables to insert in the select query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return processed, failed: (int, int) number of rows processed, failed
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
rows_generator = iter_rows(select_query, source_conn,
parameters=parameters, row_factory=row_factory,
Expand All @@ -415,9 +425,10 @@ def execute(
"""
Run SQL query against connection.
:param query: str, SQL query to execute
:param query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:raises ETLHelperQueryError: if SQL raises an error
"""
logger.info("Executing query")
logger.debug(f"Executing:\n\n{query}\n\nwith parameters:\n\n"
Expand Down Expand Up @@ -465,19 +476,18 @@ def copy_table_rows(
tuples. on_error is a function that is called at the end of each chunk,
with the list as the only argument.
:param source_conn: open dbapi connection
:param dest_conn: open dbapi connection
:param table: name of table
:param source_conn: dbapi connection
:param dest_conn: dbapi connection
:param target: name of target table, if different from source
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:param select_sql_suffix: str, SQL clause(s) to append to select statement
e.g. WHERE, ORDER BY, LIMIT
:return processed, failed: (int, int) number of rows processed, failed
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
validate_identifier(table)

Expand Down Expand Up @@ -513,14 +523,14 @@ def load(
with the list as the only argument.
:param table: name of table
:param conn: open dbapi connection
:param conn: dbapi connection
:param rows: iterable of named tuples or dictionaries of data
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return processed, failed: (int, int) number of rows processed, failed
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
# Return early if rows is empty
if not rows:
Expand Down Expand Up @@ -566,8 +576,18 @@ def generate_insert_sql(
conn: Connection
) -> str:
"""Generate insert SQL for table, getting column names from row and the
Generate insert SQL for table, getting column names from row and the
placeholder style from the connection. `row` is either a namedtuple or
a dictionary."""
a dictionary.
:param table: name of table
:param row: a single row as a namedtuple or dict
:param conn: dbapi connection
:return: SQL statement to insert data into the given table
:raises ETLHelperInsertError: if 'row' is not a namedtuple or a dict,
or if the database connection encounters a
parameter error
"""
helper = DB_HELPER_FACTORY.from_conn(conn)
paramstyles = {
"qmark": "?",
Expand Down Expand Up @@ -621,7 +641,9 @@ def validate_identifier(identifier: str) -> None:
Identifiers must comprise alpha-numeric characters, plus `_` or `$` and
cannot start with `$`, or numbers.
:raises ETLHelperBadIdentifierError:
:param identifier: a database identifier
:raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid
characters
"""
# Identifier rules are based on PostgreSQL specifications, defined here:
# https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
Expand All @@ -644,7 +666,12 @@ def _chunker(
) -> Iterator[tuple[Union[Row, None], ...]]:
"""Collect data into fixed-length chunks or blocks.
Code from recipe at https://docs.python.org/3.6/library/itertools.html
:param iterable: an iterable object
:param n_chunks: the number of values in each chunk
:return: generator returning tuples of rows, of length n_chunks,
where empty values are filled using None
"""
# _chunker('ABCDEFG', 3) --> ABC DEF G"
# _chunker((A,B,C,D,E,F,G), 3) --> (A,B,C) (D,E,F) (G,None,None)
args = [iter(iterable)] * n_chunks
return zip_longest(*args, fillvalue=None)

0 comments on commit cbd83f3

Please sign in to comment.