From fe3bad7ae1c2a237ed410316000f1ed1a7005060 Mon Sep 17 00:00:00 2001 From: Liam Spencer <76099100+spenny-liam@users.noreply.github.com> Date: Wed, 26 Jun 2024 13:05:34 +0100 Subject: [PATCH 1/4] Add info about parameters --- docs/recipes/database_to_api.rst | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/recipes/database_to_api.rst b/docs/recipes/database_to_api.rst index 903341b..c927291 100644 --- a/docs/recipes/database_to_api.rst +++ b/docs/recipes/database_to_api.rst @@ -7,9 +7,12 @@ ETL for posting data from a database into an HTTP API. The API could be a NoSQL document store (e.g. ElasticSearch, Cassandra) or some other web service. -This example transfers data from Oracle to ElasticSearch. It uses +This example posts data from an Oracle database to an HTTP API. It uses ``iter_chunks`` to fetch data from the database without loading it all -into memory at once. A custom transform function creates a dictionary +into memory at once. `Parameters `__ are sent with the database query to filter +rows to only those changed within specified time peiod. This is used to +only transfer data that has changed since the last time this script was +ran. A custom transform function creates a dictionary structure from each row of data. This is “dumped” into JSON and posted to the API via ``aiohttp``. From 05e6c8ca651248be3b75b942f61e7696cfd93902 Mon Sep 17 00:00:00 2001 From: Liam Spencer <76099100+spenny-liam@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:54:20 +0100 Subject: [PATCH 2/4] add more comments --- docs/code_demos/recipes/database_to_api.py | 23 ++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/docs/code_demos/recipes/database_to_api.py b/docs/code_demos/recipes/database_to_api.py index 8534f32..9bc3870 100644 --- a/docs/code_demos/recipes/database_to_api.py +++ b/docs/code_demos/recipes/database_to_api.py @@ -5,37 +5,47 @@ import datetime as dt import aiohttp -from etlhelper import iter_chunks +import etlhelper as etl +# import DbParams from db import ORACLE_DB logger = logging.getLogger("copy_sensors_async") +# SQL query to get data from Oracle SELECT_SENSORS = """ SELECT CODE, DESCRIPTION FROM BGS.DIC_SEN_SENSOR WHERE date_updated BETWEEN :startdate AND :enddate ORDER BY date_updated """ + +# URL of API we want to send data to BASE_URL = "http://localhost:9200/" +# Headers to tell the API we are sending data in JSON format HEADERS = {"Content-Type": "application/json"} def copy_sensors(startdate: dt.datetime, enddate: dt.datetime) -> None: - """Read sensors from Oracle and post to REST API.""" + """Read sensors from Oracle and post to REST API. + Requires startdate amd enddate to filter to rows changed in a certain time period. + """ logger.info("Copying sensors with timestamps from %s to %s", startdate.isoformat(), enddate.isoformat()) row_count = 0 + #connect using the DbParams we imported with ORACLE_DB.connect("ORACLE_PASSWORD") as conn: # chunks is a generator that yields lists of dictionaries - chunks = iter_chunks( + # passing in our select query, connection object, bind variable parameters and custom transform function + chunks = etl.iter_chunks( SELECT_SENSORS, conn, parameters={"startdate": startdate, "enddate": enddate}, transform=transform_sensors, ) + # for each chunk of rows, synchronously post them to API for chunk in chunks: result = asyncio.run(post_chunk(chunk)) row_count += len(result) @@ -65,10 +75,14 @@ def transform_sensors(chunk: list[tuple]) -> list[tuple]: async def post_chunk(chunk: list[tuple]) -> list: """Post multiple items to API asynchronously.""" + # initialize aiohttp session async with aiohttp.ClientSession() as session: # Build list of tasks tasks = [] + # add each row to list of tasks for aiohttp to execute for item in chunk: + # a task is the instance of a function being executed with distinct arguments + # in this case, the post_one function with argument of a dictionary representing a row of data tasks.append(post_one(item, session)) # Process tasks in parallel. An exception in any will be raised. @@ -83,6 +97,7 @@ async def post_one(item: tuple, session: aiohttp.ClientSession) -> int: response = await session.post( BASE_URL + "sensors/_doc", headers=HEADERS, + # convert python dict to json object data=json.dumps(item), ) @@ -108,6 +123,6 @@ async def post_one(item: tuple, session: aiohttp.ClientSession) -> int: logger.setLevel(logging.INFO) logger.addHandler(handler) - # Copy data from 1 January 2000 to 00:00:00 today + # Copy data that was updated between 1 January 2000 to 00:00:00 today today = dt.datetime.combine(dt.date.today(), dt.time.min) copy_sensors(dt.datetime(2000, 1, 1), today) From 62a967742887d9b766c161d837fe5d7217bca63d Mon Sep 17 00:00:00 2001 From: Liam Spencer <76099100+spenny-liam@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:56:14 +0100 Subject: [PATCH 3/4] Update database_to_api.rst --- docs/recipes/database_to_api.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/recipes/database_to_api.rst b/docs/recipes/database_to_api.rst index c927291..e4cdccf 100644 --- a/docs/recipes/database_to_api.rst +++ b/docs/recipes/database_to_api.rst @@ -26,3 +26,7 @@ transfer as opposed to posting records in series. In this example, failed rows will fail the whole job. Removing the ``raise_for_status()`` call will let them just be logged instead. + +To provide the database connection, a `DbParams `__ object is +imported from a separate `db` file. + From 530f4923aad023eb49271278909b69389e52a931 Mon Sep 17 00:00:00 2001 From: John A Stevenson Date: Wed, 26 Jun 2024 16:31:13 +0100 Subject: [PATCH 4/4] Replace links with internal references + fix minor typos --- docs/code_demos/recipes/database_to_api.py | 4 ++-- docs/etl_functions/extract.rst | 2 ++ docs/recipes/database_to_api.rst | 9 +++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/code_demos/recipes/database_to_api.py b/docs/code_demos/recipes/database_to_api.py index 9bc3870..3b796ff 100644 --- a/docs/code_demos/recipes/database_to_api.py +++ b/docs/code_demos/recipes/database_to_api.py @@ -12,7 +12,7 @@ logger = logging.getLogger("copy_sensors_async") -# SQL query to get data from Oracle +# SQL query to get data from Oracle SELECT_SENSORS = """ SELECT CODE, DESCRIPTION FROM BGS.DIC_SEN_SENSOR @@ -34,7 +34,7 @@ def copy_sensors(startdate: dt.datetime, enddate: dt.datetime) -> None: startdate.isoformat(), enddate.isoformat()) row_count = 0 - #connect using the DbParams we imported + # Connect using the imported DbParams with ORACLE_DB.connect("ORACLE_PASSWORD") as conn: # chunks is a generator that yields lists of dictionaries # passing in our select query, connection object, bind variable parameters and custom transform function diff --git a/docs/etl_functions/extract.rst b/docs/etl_functions/extract.rst index fe3eb88..f5e031c 100644 --- a/docs/etl_functions/extract.rst +++ b/docs/etl_functions/extract.rst @@ -56,6 +56,8 @@ Keyword arguments All extract functions are derived from :func:`iter_chunks() ` and take the same keyword arguments, which are passed through. +.. _parameters: + parameters """""""""" diff --git a/docs/recipes/database_to_api.rst b/docs/recipes/database_to_api.rst index e4cdccf..47468e6 100644 --- a/docs/recipes/database_to_api.rst +++ b/docs/recipes/database_to_api.rst @@ -8,9 +8,10 @@ a NoSQL document store (e.g. ElasticSearch, Cassandra) or some other web service. This example posts data from an Oracle database to an HTTP API. It uses -``iter_chunks`` to fetch data from the database without loading it all -into memory at once. `Parameters `__ are sent with the database query to filter -rows to only those changed within specified time peiod. This is used to +:func:`iter_chunks() ` to fetch data from the +database without loading it all +into memory at once. :ref:`Parameters ` are sent with the database query to filter +rows to only those changed within specified time period. This is used to only transfer data that has changed since the last time this script was ran. A custom transform function creates a dictionary structure from each row of data. This is “dumped” into JSON and posted @@ -27,6 +28,6 @@ transfer as opposed to posting records in series. In this example, failed rows will fail the whole job. Removing the ``raise_for_status()`` call will let them just be logged instead. -To provide the database connection, a `DbParams `__ object is +To provide the database connection, :class:`DbParams ` object is imported from a separate `db` file.