Skip to content

Commit

Permalink
Remove rtype and some other types
Browse files Browse the repository at this point in the history
  • Loading branch information
ximenesuk committed Sep 14, 2023
1 parent fb47b50 commit e96db47
Showing 1 changed file with 78 additions and 95 deletions.
173 changes: 78 additions & 95 deletions etlhelper/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class FailedRow(NamedTuple):
exception: Exception


# iter_chunks is where data are retrieved from source database
# All data extraction processes call this function.
def iter_chunks(
select_query: str,
conn: Connection,
Expand All @@ -66,17 +64,18 @@ def iter_chunks(
The transform function is applied to chunks of data as they are extracted
from the database.
:param select_query: str, SQL query to execute
All data extraction functions call this function, directly or indirectly.
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:return: None or a generator returning a list of objects which each
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: generator returning a list of objects which each
represent a row of data using the given row_factory
:rtype: None or Generator[list[etlhelper.row_factories.Row], None, None]
:raises ETLHelperExtractError: if SQL raises an error
"""
logger.info("Fetching rows (chunk_size=%s)", chunk_size)
Expand Down Expand Up @@ -148,17 +147,16 @@ def iter_rows(
Run SQL query against connection and return iterator object to loop over
results, row-by-row.
:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:return: A generator returning objects which each represent a row of data
using the given row_factory
:rtype: Generator[etlhelper.row_factories.Row, None, None]
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: generator returning a list of objects which each
represent a row of data using the given row_factory
"""
for chunk in iter_chunks(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
Expand All @@ -179,17 +177,15 @@ def fetchone(
Get first result of query. See iter_rows for details. Note: iter_rows is
recommended for looping over rows individually.
:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:return: None or a single object which represents a row of data using the given
row_factory
:rtype: None or etlhelper.row_factories.Row
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: None or a row of data using the given row_factory
"""
try:
result = next(iter_rows(select_query, conn, row_factory=row_factory,
Expand All @@ -215,16 +211,15 @@ def fetchall(
"""
Get all results of query as a list. See iter_rows for details.
:param select_query: str, SQL query to execute
:param select_query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable
of rows (possibly of different shape)
:param chunk_size: int, size of chunks to group data by
:return: A list of objects which each represent a row of data using the
given row_factory
:rtype: list[etlhelper.row_factories.Row]
:param parameters: bind variables to insert in the query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param chunk_size: size of chunks to group data by
:return: a row of data using the given row_factory
"""
return list(iter_rows(select_query, conn, row_factory=row_factory,
parameters=parameters, transform=transform,
Expand Down Expand Up @@ -259,16 +254,15 @@ def executemany(
disadvantage is that investigation may be required to determine exactly
which records have been successfully transferred.
:param query: str, SQL insert command with placeholders for data
:param query: SQL insert command with placeholders for data
:param conn: dbapi connection
:param rows: List of tuples containing data to be inserted/updated
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk has been inserted/updated
:param chunk_size: int, size of chunks to group data by
:return: The number of rows processed and the number of rows failed
:rtype: tuple[int, int]
:param rows: an iterable of rows containing data to be inserted/updated
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk has been inserted/updated
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
:raises ETLHelperInsertError: if SQL raises an error
"""
logger.info("Executing many (chunk_size=%s)", chunk_size)
Expand Down Expand Up @@ -348,14 +342,12 @@ def _execute_by_row(
) -> list[FailedRow]:
"""
Retry execution of rows individually and return failed rows along with
their errors. Successful inserts are committed. This is because
(and other?)
:param query: str, SQL command with placeholders for data
:param conn: open dbapi connection, used for transactions
:param chunk: list, list of row parameters
:return: A list of objects which each represent a failed row of data
:rtype: list[etlhelper.row_factories.Row]
their errors. Successful inserts are committed.
:param query: SQL query with placeholders for data
:param conn: dbapi connection
:param chunk: list of rows
:return: a list failed rows
"""
failed_rows: list[FailedRow] = []

Expand Down Expand Up @@ -400,20 +392,19 @@ def copy_rows(
tuples. on_error is a function that is called at the end of each chunk,
with the list as the only argument.
:param select_query: str, select rows from Oracle.
:param source_conn: open dbapi connection
:param insert_query:
:param dest_conn: open dbapi connection
:param parameters: sequence or dict of bind variables for select query
:param select_query: SQL query to select data
:param source_conn: dbapi connection
:param insert_query: SQL query to insert data
:param dest_conn: dbapi connection
:param parameters: bind variables to insert in the select query
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return: The number of rows processed and the number of rows failed
:rtype: tuple[int, int]
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
rows_generator = iter_rows(select_query, source_conn,
parameters=parameters, row_factory=row_factory,
Expand All @@ -434,10 +425,9 @@ def execute(
"""
Run SQL query against connection.
:param query: str, SQL query to execute
:param query: SQL query to execute
:param conn: dbapi connection
:param parameters: sequence or dict of bind variables to insert in the query
:rtype: None
:param parameters: bind variables to insert in the query
:raises ETLHelperQueryError: if SQL raises an error
"""
logger.info("Executing query")
Expand Down Expand Up @@ -487,18 +477,17 @@ def copy_table_rows(
with the list as the only argument.
:param table: name of table
:param source_conn: open dbapi connection
:param dest_conn: open dbapi connection
:param source_conn: dbapi connection
:param dest_conn: dbapi connection
:param target: name of target table, if different from source
:param row_factory: function that accepts a cursor and returns a function
for parsing each row
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return: The number of rows processed and the number of rows failed
:rtype: tuple[int, int]
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
validate_identifier(table)

Expand Down Expand Up @@ -534,15 +523,14 @@ def load(
with the list as the only argument.
:param table: name of table
:param conn: open dbapi connection
:param conn: dbapi connection
:param rows: iterable of named tuples or dictionaries of data
:param transform: function that accepts an iterable (e.g. list) of rows and
returns an iterable of rows (possibly of different shape)
:param on_error: Function to be applied to failed rows in each chunk
:param commit_chunks: bool, commit after each chunk (see executemany)
:param chunk_size: int, size of chunks to group data by
:return: The number of rows processed and the number of rows failed
:rtype: tuple[int, int]
:param transform: function that accepts a list of rows and
returns an list of rows (possibly of different shape)
:param on_error: function to be applied to failed rows in each chunk
:param commit_chunks: commit after each chunk (see executemany)
:param chunk_size: size of chunks to group data by
:return: the number of rows processed and the number of rows failed
"""
# Return early if rows is empty
if not rows:
Expand Down Expand Up @@ -593,12 +581,10 @@ def generate_insert_sql(
a dictionary.
:param table: name of table
:param row: Either a namedtuple or dictionary representing a single row of
data
:param conn: open dbapi connection
:return: An SQL statement used to insert data into the given table
:rtype: str
:raises ETLHelperInsertError: if 'row' is not a namedtuple or a dictionary,
:param row: a single row as a namedtuple or dict
:param conn: dbapi connection
:return: SQL statement to insert data into the given table
:raises ETLHelperInsertError: if 'row' is not a namedtuple or a dict,
or if the database connection encounters a
parameter error
"""
Expand Down Expand Up @@ -656,7 +642,6 @@ def validate_identifier(identifier: str) -> None:
cannot start with `$`, or numbers.
:param identifier: a database identifier
:rtype: None
:raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid
characters
"""
Expand Down Expand Up @@ -684,11 +669,9 @@ def _chunker(
:param iterable: an iterable object
:param n_chunks: the number of values in each chunk
:param fillvalue: value used to fill empty values in a chunk
:return: generator object returning 'iterable' objects of the length
'n_chunks' where empty values are filled using 'fillvalue'
:rtype: itertools.zip_longest
:return: generator returning tuples of rows, of length n_chunks,
where empty values are filled using None
"""
# _chunker('ABCDEFG', 3) --> ABC DEF G"
# _chunker((A,B,C,D,E,F,G), 3) --> (A,B,C) (D,E,F) (G,None,None)
args = [iter(iterable)] * n_chunks
return zip_longest(*args, fillvalue=None)

0 comments on commit e96db47

Please sign in to comment.