diff --git a/etlhelper/etl.py b/etlhelper/etl.py index 1a8e528..57c865f 100644 --- a/etlhelper/etl.py +++ b/etlhelper/etl.py @@ -46,8 +46,6 @@ class FailedRow(NamedTuple): exception: Exception -# iter_chunks is where data are retrieved from source database -# All data extraction processes call this function. def iter_chunks( select_query: str, conn: Connection, @@ -66,17 +64,18 @@ def iter_chunks( The transform function is applied to chunks of data as they are extracted from the database. - :param select_query: str, SQL query to execute + All data extraction functions call this function, directly or indirectly. + + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: None or a generator returning a list of objects which each + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: generator returning a list of objects which each represent a row of data using the given row_factory - :rtype: None or Generator[list[etlhelper.row_factories.Row], None, None] :raises ETLHelperExtractError: if SQL raises an error """ logger.info("Fetching rows (chunk_size=%s)", chunk_size) @@ -148,17 +147,16 @@ def iter_rows( Run SQL query against connection and return iterator object to loop over results, row-by-row. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: A generator returning objects which each represent a row of data - using the given row_factory - :rtype: Generator[etlhelper.row_factories.Row, None, None] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: generator returning a list of objects which each + represent a row of data using the given row_factory """ for chunk in iter_chunks(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -179,17 +177,15 @@ def fetchone( Get first result of query. See iter_rows for details. Note: iter_rows is recommended for looping over rows individually. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: None or a single object which represents a row of data using the given - row_factory - :rtype: None or etlhelper.row_factories.Row + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: None or a row of data using the given row_factory """ try: result = next(iter_rows(select_query, conn, row_factory=row_factory, @@ -215,16 +211,15 @@ def fetchall( """ Get all results of query as a list. See iter_rows for details. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query - :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable - of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: A list of objects which each represent a row of data using the - given row_factory - :rtype: list[etlhelper.row_factories.Row] + :param parameters: bind variables to insert in the query + :param row_factory: function that accepts a cursor and returns a function + for parsing each row + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: a row of data using the given row_factory """ return list(iter_rows(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -259,16 +254,15 @@ def executemany( disadvantage is that investigation may be required to determine exactly which records have been successfully transferred. - :param query: str, SQL insert command with placeholders for data + :param query: SQL insert command with placeholders for data :param conn: dbapi connection - :param rows: List of tuples containing data to be inserted/updated - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk has been inserted/updated - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param rows: an iterable of rows containing data to be inserted/updated + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk has been inserted/updated + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed :raises ETLHelperInsertError: if SQL raises an error """ logger.info("Executing many (chunk_size=%s)", chunk_size) @@ -348,14 +342,12 @@ def _execute_by_row( ) -> list[FailedRow]: """ Retry execution of rows individually and return failed rows along with - their errors. Successful inserts are committed. This is because - (and other?) - - :param query: str, SQL command with placeholders for data - :param conn: open dbapi connection, used for transactions - :param chunk: list, list of row parameters - :return: A list of objects which each represent a failed row of data - :rtype: list[etlhelper.row_factories.Row] + their errors. Successful inserts are committed. + + :param query: SQL query with placeholders for data + :param conn: dbapi connection + :param chunk: list of rows + :return: a list failed rows """ failed_rows: list[FailedRow] = [] @@ -400,20 +392,19 @@ def copy_rows( tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument. - :param select_query: str, select rows from Oracle. - :param source_conn: open dbapi connection - :param insert_query: - :param dest_conn: open dbapi connection - :param parameters: sequence or dict of bind variables for select query + :param select_query: SQL query to select data + :param source_conn: dbapi connection + :param insert_query: SQL query to insert data + :param dest_conn: dbapi connection + :param parameters: bind variables to insert in the select query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ rows_generator = iter_rows(select_query, source_conn, parameters=parameters, row_factory=row_factory, @@ -434,10 +425,9 @@ def execute( """ Run SQL query against connection. - :param query: str, SQL query to execute + :param query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query - :rtype: None + :param parameters: bind variables to insert in the query :raises ETLHelperQueryError: if SQL raises an error """ logger.info("Executing query") @@ -487,18 +477,17 @@ def copy_table_rows( with the list as the only argument. :param table: name of table - :param source_conn: open dbapi connection - :param dest_conn: open dbapi connection + :param source_conn: dbapi connection + :param dest_conn: dbapi connection :param target: name of target table, if different from source :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ validate_identifier(table) @@ -534,15 +523,14 @@ def load( with the list as the only argument. :param table: name of table - :param conn: open dbapi connection + :param conn: dbapi connection :param rows: iterable of named tuples or dictionaries of data - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ # Return early if rows is empty if not rows: @@ -593,12 +581,10 @@ def generate_insert_sql( a dictionary. :param table: name of table - :param row: Either a namedtuple or dictionary representing a single row of - data - :param conn: open dbapi connection - :return: An SQL statement used to insert data into the given table - :rtype: str - :raises ETLHelperInsertError: if 'row' is not a namedtuple or a dictionary, + :param row: a single row as a namedtuple or dict + :param conn: dbapi connection + :return: SQL statement to insert data into the given table + :raises ETLHelperInsertError: if 'row' is not a namedtuple or a dict, or if the database connection encounters a parameter error """ @@ -656,7 +642,6 @@ def validate_identifier(identifier: str) -> None: cannot start with `$`, or numbers. :param identifier: a database identifier - :rtype: None :raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid characters """ @@ -684,11 +669,9 @@ def _chunker( :param iterable: an iterable object :param n_chunks: the number of values in each chunk - :param fillvalue: value used to fill empty values in a chunk - :return: generator object returning 'iterable' objects of the length - 'n_chunks' where empty values are filled using 'fillvalue' - :rtype: itertools.zip_longest + :return: generator returning tuples of rows, of length n_chunks, + where empty values are filled using None """ - # _chunker('ABCDEFG', 3) --> ABC DEF G" + # _chunker((A,B,C,D,E,F,G), 3) --> (A,B,C) (D,E,F) (G,None,None) args = [iter(iterable)] * n_chunks return zip_longest(*args, fillvalue=None)