From fb47b50b583abbd65a8ddf4073dc6cb2c40c1a7b Mon Sep 17 00:00:00 2001 From: Leo Rudczenko Date: Thu, 15 Jun 2023 14:52:47 +0100 Subject: [PATCH 1/2] Add return and raises values to docstrings --- etlhelper/etl.py | 64 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 10 deletions(-) diff --git a/etlhelper/etl.py b/etlhelper/etl.py index 1808026..1a8e528 100644 --- a/etlhelper/etl.py +++ b/etlhelper/etl.py @@ -74,6 +74,10 @@ def iter_chunks( :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable of rows (possibly of different shape) :param chunk_size: int, size of chunks to group data by + :return: None or a generator returning a list of objects which each + represent a row of data using the given row_factory + :rtype: None or Generator[list[etlhelper.row_factories.Row], None, None] + :raises ETLHelperExtractError: if SQL raises an error """ logger.info("Fetching rows (chunk_size=%s)", chunk_size) logger.debug(f"Fetching:\n\n{select_query}\n\nwith parameters:\n\n" @@ -152,6 +156,9 @@ def iter_rows( :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable of rows (possibly of different shape) :param chunk_size: int, size of chunks to group data by + :return: A generator returning objects which each represent a row of data + using the given row_factory + :rtype: Generator[etlhelper.row_factories.Row, None, None] """ for chunk in iter_chunks(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -179,6 +186,10 @@ def fetchone( for parsing each row :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable of rows (possibly of different shape) + :param chunk_size: int, size of chunks to group data by + :return: None or a single object which represents a row of data using the given + row_factory + :rtype: None or etlhelper.row_factories.Row """ try: result = next(iter_rows(select_query, conn, row_factory=row_factory, @@ -203,6 +214,7 @@ def fetchall( ) -> Chunk: """ Get all results of query as a list. See iter_rows for details. + :param select_query: str, SQL query to execute :param conn: dbapi connection :param parameters: sequence or dict of bind variables to insert in the query @@ -210,6 +222,9 @@ def fetchall( :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable of rows (possibly of different shape) :param chunk_size: int, size of chunks to group data by + :return: A list of objects which each represent a row of data using the + given row_factory + :rtype: list[etlhelper.row_factories.Row] """ return list(iter_rows(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -252,7 +267,9 @@ def executemany( :param on_error: Function to be applied to failed rows in each chunk :param commit_chunks: bool, commit after each chunk has been inserted/updated :param chunk_size: int, size of chunks to group data by - :return processed, failed: (int, int) number of rows processed, failed + :return: The number of rows processed and the number of rows failed + :rtype: tuple[int, int] + :raises ETLHelperInsertError: if SQL raises an error """ logger.info("Executing many (chunk_size=%s)", chunk_size) logger.debug("Executing:\n\n%s\n\nagainst:\n\n%s", query, conn) @@ -335,9 +352,10 @@ def _execute_by_row( (and other?) :param query: str, SQL command with placeholders for data - :param chunk: list, list of row parameters :param conn: open dbapi connection, used for transactions - :returns failed_rows: list of (row, exception) tuples + :param chunk: list, list of row parameters + :return: A list of objects which each represent a failed row of data + :rtype: list[etlhelper.row_factories.Row] """ failed_rows: list[FailedRow] = [] @@ -394,7 +412,8 @@ def copy_rows( :param on_error: Function to be applied to failed rows in each chunk :param commit_chunks: bool, commit after each chunk (see executemany) :param chunk_size: int, size of chunks to group data by - :return processed, failed: (int, int) number of rows processed, failed + :return: The number of rows processed and the number of rows failed + :rtype: tuple[int, int] """ rows_generator = iter_rows(select_query, source_conn, parameters=parameters, row_factory=row_factory, @@ -418,6 +437,8 @@ def execute( :param query: str, SQL query to execute :param conn: dbapi connection :param parameters: sequence or dict of bind variables to insert in the query + :rtype: None + :raises ETLHelperQueryError: if SQL raises an error """ logger.info("Executing query") logger.debug(f"Executing:\n\n{query}\n\nwith parameters:\n\n" @@ -465,6 +486,7 @@ def copy_table_rows( tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument. + :param table: name of table :param source_conn: open dbapi connection :param dest_conn: open dbapi connection :param target: name of target table, if different from source @@ -475,9 +497,8 @@ def copy_table_rows( :param on_error: Function to be applied to failed rows in each chunk :param commit_chunks: bool, commit after each chunk (see executemany) :param chunk_size: int, size of chunks to group data by - :param select_sql_suffix: str, SQL clause(s) to append to select statement - e.g. WHERE, ORDER BY, LIMIT - :return processed, failed: (int, int) number of rows processed, failed + :return: The number of rows processed and the number of rows failed + :rtype: tuple[int, int] """ validate_identifier(table) @@ -520,7 +541,8 @@ def load( :param on_error: Function to be applied to failed rows in each chunk :param commit_chunks: bool, commit after each chunk (see executemany) :param chunk_size: int, size of chunks to group data by - :return processed, failed: (int, int) number of rows processed, failed + :return: The number of rows processed and the number of rows failed + :rtype: tuple[int, int] """ # Return early if rows is empty if not rows: @@ -566,8 +588,20 @@ def generate_insert_sql( conn: Connection ) -> str: """Generate insert SQL for table, getting column names from row and the + Generate insert SQL for table, getting column names from row and the placeholder style from the connection. `row` is either a namedtuple or - a dictionary.""" + a dictionary. + + :param table: name of table + :param row: Either a namedtuple or dictionary representing a single row of + data + :param conn: open dbapi connection + :return: An SQL statement used to insert data into the given table + :rtype: str + :raises ETLHelperInsertError: if 'row' is not a namedtuple or a dictionary, + or if the database connection encounters a + parameter error + """ helper = DB_HELPER_FACTORY.from_conn(conn) paramstyles = { "qmark": "?", @@ -621,7 +655,10 @@ def validate_identifier(identifier: str) -> None: Identifiers must comprise alpha-numeric characters, plus `_` or `$` and cannot start with `$`, or numbers. - :raises ETLHelperBadIdentifierError: + :param identifier: a database identifier + :rtype: None + :raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid + characters """ # Identifier rules are based on PostgreSQL specifications, defined here: # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS @@ -644,6 +681,13 @@ def _chunker( ) -> Iterator[tuple[Union[Row, None], ...]]: """Collect data into fixed-length chunks or blocks. Code from recipe at https://docs.python.org/3.6/library/itertools.html + + :param iterable: an iterable object + :param n_chunks: the number of values in each chunk + :param fillvalue: value used to fill empty values in a chunk + :return: generator object returning 'iterable' objects of the length + 'n_chunks' where empty values are filled using 'fillvalue' + :rtype: itertools.zip_longest """ # _chunker('ABCDEFG', 3) --> ABC DEF G" args = [iter(iterable)] * n_chunks From e96db478d1254c3e6859ac25523e71d5cd0cb654 Mon Sep 17 00:00:00 2001 From: Colin Blackburn Date: Thu, 14 Sep 2023 15:14:11 +0100 Subject: [PATCH 2/2] Remove rtype and some other types --- etlhelper/etl.py | 173 +++++++++++++++++++++-------------------------- 1 file changed, 78 insertions(+), 95 deletions(-) diff --git a/etlhelper/etl.py b/etlhelper/etl.py index 1a8e528..57c865f 100644 --- a/etlhelper/etl.py +++ b/etlhelper/etl.py @@ -46,8 +46,6 @@ class FailedRow(NamedTuple): exception: Exception -# iter_chunks is where data are retrieved from source database -# All data extraction processes call this function. def iter_chunks( select_query: str, conn: Connection, @@ -66,17 +64,18 @@ def iter_chunks( The transform function is applied to chunks of data as they are extracted from the database. - :param select_query: str, SQL query to execute + All data extraction functions call this function, directly or indirectly. + + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: None or a generator returning a list of objects which each + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: generator returning a list of objects which each represent a row of data using the given row_factory - :rtype: None or Generator[list[etlhelper.row_factories.Row], None, None] :raises ETLHelperExtractError: if SQL raises an error """ logger.info("Fetching rows (chunk_size=%s)", chunk_size) @@ -148,17 +147,16 @@ def iter_rows( Run SQL query against connection and return iterator object to loop over results, row-by-row. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: A generator returning objects which each represent a row of data - using the given row_factory - :rtype: Generator[etlhelper.row_factories.Row, None, None] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: generator returning a list of objects which each + represent a row of data using the given row_factory """ for chunk in iter_chunks(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -179,17 +177,15 @@ def fetchone( Get first result of query. See iter_rows for details. Note: iter_rows is recommended for looping over rows individually. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query + :param parameters: bind variables to insert in the query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: None or a single object which represents a row of data using the given - row_factory - :rtype: None or etlhelper.row_factories.Row + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: None or a row of data using the given row_factory """ try: result = next(iter_rows(select_query, conn, row_factory=row_factory, @@ -215,16 +211,15 @@ def fetchall( """ Get all results of query as a list. See iter_rows for details. - :param select_query: str, SQL query to execute + :param select_query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query - :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and returns an iterable - of rows (possibly of different shape) - :param chunk_size: int, size of chunks to group data by - :return: A list of objects which each represent a row of data using the - given row_factory - :rtype: list[etlhelper.row_factories.Row] + :param parameters: bind variables to insert in the query + :param row_factory: function that accepts a cursor and returns a function + for parsing each row + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param chunk_size: size of chunks to group data by + :return: a row of data using the given row_factory """ return list(iter_rows(select_query, conn, row_factory=row_factory, parameters=parameters, transform=transform, @@ -259,16 +254,15 @@ def executemany( disadvantage is that investigation may be required to determine exactly which records have been successfully transferred. - :param query: str, SQL insert command with placeholders for data + :param query: SQL insert command with placeholders for data :param conn: dbapi connection - :param rows: List of tuples containing data to be inserted/updated - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk has been inserted/updated - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param rows: an iterable of rows containing data to be inserted/updated + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk has been inserted/updated + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed :raises ETLHelperInsertError: if SQL raises an error """ logger.info("Executing many (chunk_size=%s)", chunk_size) @@ -348,14 +342,12 @@ def _execute_by_row( ) -> list[FailedRow]: """ Retry execution of rows individually and return failed rows along with - their errors. Successful inserts are committed. This is because - (and other?) - - :param query: str, SQL command with placeholders for data - :param conn: open dbapi connection, used for transactions - :param chunk: list, list of row parameters - :return: A list of objects which each represent a failed row of data - :rtype: list[etlhelper.row_factories.Row] + their errors. Successful inserts are committed. + + :param query: SQL query with placeholders for data + :param conn: dbapi connection + :param chunk: list of rows + :return: a list failed rows """ failed_rows: list[FailedRow] = [] @@ -400,20 +392,19 @@ def copy_rows( tuples. on_error is a function that is called at the end of each chunk, with the list as the only argument. - :param select_query: str, select rows from Oracle. - :param source_conn: open dbapi connection - :param insert_query: - :param dest_conn: open dbapi connection - :param parameters: sequence or dict of bind variables for select query + :param select_query: SQL query to select data + :param source_conn: dbapi connection + :param insert_query: SQL query to insert data + :param dest_conn: dbapi connection + :param parameters: bind variables to insert in the select query :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ rows_generator = iter_rows(select_query, source_conn, parameters=parameters, row_factory=row_factory, @@ -434,10 +425,9 @@ def execute( """ Run SQL query against connection. - :param query: str, SQL query to execute + :param query: SQL query to execute :param conn: dbapi connection - :param parameters: sequence or dict of bind variables to insert in the query - :rtype: None + :param parameters: bind variables to insert in the query :raises ETLHelperQueryError: if SQL raises an error """ logger.info("Executing query") @@ -487,18 +477,17 @@ def copy_table_rows( with the list as the only argument. :param table: name of table - :param source_conn: open dbapi connection - :param dest_conn: open dbapi connection + :param source_conn: dbapi connection + :param dest_conn: dbapi connection :param target: name of target table, if different from source :param row_factory: function that accepts a cursor and returns a function for parsing each row - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ validate_identifier(table) @@ -534,15 +523,14 @@ def load( with the list as the only argument. :param table: name of table - :param conn: open dbapi connection + :param conn: dbapi connection :param rows: iterable of named tuples or dictionaries of data - :param transform: function that accepts an iterable (e.g. list) of rows and - returns an iterable of rows (possibly of different shape) - :param on_error: Function to be applied to failed rows in each chunk - :param commit_chunks: bool, commit after each chunk (see executemany) - :param chunk_size: int, size of chunks to group data by - :return: The number of rows processed and the number of rows failed - :rtype: tuple[int, int] + :param transform: function that accepts a list of rows and + returns an list of rows (possibly of different shape) + :param on_error: function to be applied to failed rows in each chunk + :param commit_chunks: commit after each chunk (see executemany) + :param chunk_size: size of chunks to group data by + :return: the number of rows processed and the number of rows failed """ # Return early if rows is empty if not rows: @@ -593,12 +581,10 @@ def generate_insert_sql( a dictionary. :param table: name of table - :param row: Either a namedtuple or dictionary representing a single row of - data - :param conn: open dbapi connection - :return: An SQL statement used to insert data into the given table - :rtype: str - :raises ETLHelperInsertError: if 'row' is not a namedtuple or a dictionary, + :param row: a single row as a namedtuple or dict + :param conn: dbapi connection + :return: SQL statement to insert data into the given table + :raises ETLHelperInsertError: if 'row' is not a namedtuple or a dict, or if the database connection encounters a parameter error """ @@ -656,7 +642,6 @@ def validate_identifier(identifier: str) -> None: cannot start with `$`, or numbers. :param identifier: a database identifier - :rtype: None :raises ETLHelperBadIdentifierError: if the 'identifier' contains invalid characters """ @@ -684,11 +669,9 @@ def _chunker( :param iterable: an iterable object :param n_chunks: the number of values in each chunk - :param fillvalue: value used to fill empty values in a chunk - :return: generator object returning 'iterable' objects of the length - 'n_chunks' where empty values are filled using 'fillvalue' - :rtype: itertools.zip_longest + :return: generator returning tuples of rows, of length n_chunks, + where empty values are filled using None """ - # _chunker('ABCDEFG', 3) --> ABC DEF G" + # _chunker((A,B,C,D,E,F,G), 3) --> (A,B,C) (D,E,F) (G,None,None) args = [iter(iterable)] * n_chunks return zip_longest(*args, fillvalue=None)