Skip to content

Commit

Permalink
[Issue #1686] load data from legacy (Oracle) tables to staging tables (
Browse files Browse the repository at this point in the history
…#1852)

## Summary
Fixes #1686

## Changes proposed
- Add class `LoadOracleDataTask` which loops through each legacy table
and loads the data into the corresponding staging table.
- Add a module of functions `src.data_migration.load.sql` to generate
sqlalchemy SQL expressions for the loads.
- Add a script `seed_local_legacy_tables.py` to generate fake data in
the local legacy tables.
- Add unit tests.

## Context for reviewers
This is a new batch task which loads data from the legacy (Oracle)
tables to staging tables.

It is incremental and handles the following situations:

| Situation | Detection logic | Action |
| ------------- | ------------- | ------------- |
| New row in source table | Primary key not present in destination table
| Insert into destination, set `transformed_at=NULL` |
| Updated row in source | Primary key exists in both source &
destination AND newer `last_upd_date` | Update in destination, reset
`transformed_at=NULL` |
| Deleted row in source | Primary key does not exist in source, AND not
`is_deleted` in destination | Set `is_deleted=TRUE`, reset
`transformed_at=NULL`
| Unchanged | Primary key exists in both source & destination AND
`last_upd_date` is not newer | No write to destination |

## Additional information
To run the code locally with fake data:

```
make setup-foreign-tables  # once
make seed-local-legacy-tables  # repeat to add & update data
poetry run python3 -m src.data_migration.load.load_oracle_data_task
```
  • Loading branch information
jamesbursa authored May 8, 2024
1 parent ad2fb48 commit 03cf6e9
Show file tree
Hide file tree
Showing 10 changed files with 620 additions and 3 deletions.
3 changes: 3 additions & 0 deletions api/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ copy-oracle-data:
setup-foreign-tables:
$(FLASK_CMD) data-migration setup-foreign-tables

seed-local-legacy-tables:
$(PY_RUN_CMD) python3 -m tests.lib.seed_local_legacy_tables

##################################################
# Miscellaneous Utilities
##################################################
Expand Down
Empty file.
173 changes: 173 additions & 0 deletions api/src/data_migration/load/load_oracle_data_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#
# Load data from legacy (Oracle) tables to staging tables.
#

import logging
import time

import sqlalchemy

import src.db.foreign
import src.db.models.staging
import src.logging
import src.task.task
from src.adapters import db
from src.util import datetime_util

from . import sql

logger = logging.getLogger(__name__)


class LoadOracleDataTask(src.task.task.Task):
"""Task to load data from legacy tables to staging tables."""

def __init__(
self,
db_session: db.Session,
foreign_tables: dict[str, sqlalchemy.Table],
staging_tables: dict[str, sqlalchemy.Table],
) -> None:
if foreign_tables.keys() != staging_tables.keys():
raise ValueError("keys of foreign_tables and staging_tables must be equal")

super().__init__(db_session)
self.foreign_tables = foreign_tables
self.staging_tables = staging_tables

def run_task(self) -> None:
"""Main task process, called by run()."""
with self.db_session.begin():
self.log_database_settings()
self.load_data()

def log_database_settings(self) -> None:
"""Log database settings related to foreign tables for easier troubleshooting."""
metadata = sqlalchemy.MetaData()
engine = self.db_session.bind

# Use reflection to define built-in views as Table objects.
foreign_servers = sqlalchemy.Table(
"foreign_servers", metadata, autoload_with=engine, schema="information_schema"
)
foreign_server_options = sqlalchemy.Table(
"foreign_server_options", metadata, autoload_with=engine, schema="information_schema"
)

logger.info(
"foreign server settings",
extra={
"foreign_servers": self.db_session.execute(
sqlalchemy.select(foreign_servers)
).all(),
"foreign_server_options": self.db_session.execute(
sqlalchemy.select(foreign_server_options)
).all(),
},
)

def load_data(self) -> None:
"""Load the data for all tables defined in the mapping."""
for table_name in self.foreign_tables:
try:
with self.db_session.begin():
self.load_data_for_table(table_name)
except Exception:
logger.exception("table load error", extra={"table": table_name})

def load_data_for_table(self, table_name: str) -> None:
"""Load new and updated rows for a single table from the foreign table to the staging table."""
logger.info("process table", extra={"table": table_name})
foreign_table = self.foreign_tables[table_name]
staging_table = self.staging_tables[table_name]

self.log_row_count("before", foreign_table, staging_table)

self.do_update(foreign_table, staging_table)
self.do_insert(foreign_table, staging_table)
self.do_mark_deleted(foreign_table, staging_table)

self.log_row_count("after", staging_table)

def do_insert(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.Table) -> int:
"""Determine new rows by primary key, and copy them into the staging table."""

insert_from_select_sql, select_sql = sql.build_insert_select_sql(
foreign_table, staging_table
)

# COUNT has to be a separate query as INSERTs don't return a rowcount.
insert_count = self.db_session.query(select_sql.subquery()).count()

self.increment("count.insert.total", insert_count)
self.set_metrics({f"count.insert.{staging_table.name}": insert_count})

# Execute the INSERT.
t0 = time.monotonic()
self.db_session.execute(insert_from_select_sql)
t1 = time.monotonic()

self.set_metrics({f"time.insert.{staging_table.name}": round(t1 - t0, 3)})

return insert_count

def do_update(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.Table) -> int:
"""Find updated rows using last_upd_date, copy them, and reset transformed_at to NULL."""

update_sql = sql.build_update_sql(foreign_table, staging_table).values(transformed_at=None)

t0 = time.monotonic()
result = self.db_session.execute(update_sql)
t1 = time.monotonic()
update_count = result.rowcount

self.increment("count.update.total", update_count)
self.set_metrics({f"count.update.{staging_table.name}": update_count})
self.set_metrics({f"time.update.{staging_table.name}": round(t1 - t0, 3)})

return update_count

def do_mark_deleted(
self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.Table
) -> int:
"""Find deleted rows, set is_deleted=TRUE, and reset transformed_at to NULL."""

update_sql = sql.build_mark_deleted_sql(foreign_table, staging_table).values(
transformed_at=None,
deleted_at=datetime_util.utcnow(),
)

t0 = time.monotonic()
result = self.db_session.execute(update_sql)
t1 = time.monotonic()
delete_count = result.rowcount

self.increment("count.delete.total", delete_count)
self.set_metrics({f"count.delete.{staging_table.name}": delete_count})
self.set_metrics({f"time.delete.{staging_table.name}": round(t1 - t0, 3)})

return delete_count

def log_row_count(self, message: str, *tables: sqlalchemy.Table) -> None:
"""Log the number of rows in each of the tables using SQL COUNT()."""
extra = {}
for table in tables:
count = self.db_session.query(table).count()
extra[f"count.{table.schema}.{table.name}"] = count
self.set_metrics({f"count.{message}.{table.schema}.{table.name}": count})
logger.info(f"row count {message}", extra=extra, stacklevel=2)


def main() -> None:
with src.logging.init(__package__):
db_client = db.PostgresDBClient()

foreign_tables = {t.name: t for t in src.db.foreign.metadata.tables.values()}
staging_tables = {t.name: t for t in src.db.models.staging.metadata.tables.values()}

with db_client.get_session() as db_session:
LoadOracleDataTask(db_session, foreign_tables, staging_tables).run()


if __name__ == "__main__":
main()
118 changes: 118 additions & 0 deletions api/src/data_migration/load/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# SQL building for data load process.
#

import sqlalchemy


def build_insert_select_sql(
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table
) -> tuple[sqlalchemy.Insert, sqlalchemy.Select]:
"""Build an `INSERT INTO ... SELECT ... FROM ...` query for new rows."""

all_columns = tuple(c.name for c in source_table.columns)

# Optimization: use a Common Table Expression (`WITH`) marked as MATERIALIZED. This directs the PostgreSQL
# optimizer to run it first (prevents folding it into the parent query), so it only fetches the primary keys and
# last_upd_date columns from Oracle to perform the date comparison. Without this materialized CTE, it fetches all
# columns and all rows from Oracle before applying the WHERE, which is very slow for large tables.
#
# See https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-CTE-MATERIALIZATION

# `WITH insert_pks AS MATERIALIZED (`
cte = (
# `SELECT id1, id2, id3, ... FROM <source_table>` (id1, id2, ... is the multipart primary key)
sqlalchemy.select(*source_table.primary_key.columns)
.where(
# `WHERE (id1, id2, id3, ...) NOT IN`
sqlalchemy.tuple_(*source_table.primary_key.columns).not_in(
# `(SELECT (id1, id2, id3, ...) FROM <destination_table>)` (subquery)
sqlalchemy.select(*destination_table.primary_key.columns)
)
)
.cte("insert_pks")
.prefix_with("MATERIALIZED")
)

# `SELECT col1, col2, ..., FALSE AS is_deleted FROM <source_table>`
select_sql = sqlalchemy.select(
source_table, sqlalchemy.literal_column("FALSE").label("is_deleted")
).where(
# `WHERE (id1, id2, ...)
# IN (SELECT insert_pks.id1, insert_pks.id2
# FROM insert_pks)`
sqlalchemy.tuple_(*source_table.primary_key.columns).in_(sqlalchemy.select(*cte.columns)),
)
# `INSERT INTO <destination_table> (col1, col2, ..., is_deleted) SELECT ...`
insert_from_select_sql = sqlalchemy.insert(destination_table).from_select(
all_columns + (destination_table.c.is_deleted,), select_sql
)

return insert_from_select_sql, select_sql


def build_update_sql(
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table
) -> sqlalchemy.Update:
"""Build an `UPDATE ... SET ... WHERE ...` statement for updated rows."""

# Optimization: use a Common Table Expression (`WITH`) marked as MATERIALIZED. This directs the PostgreSQL
# optimizer to run it first (prevents folding it into the parent query), so it only fetches the primary keys and
# last_upd_date columns from Oracle to perform the date comparison. Without this materialized CTE, it fetches all
# columns and all rows from Oracle before applying the WHERE, which is very slow for large tables.
#
# See https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-CTE-MATERIALIZATION

# `WITH update_pks AS MATERIALIZED (`
cte = (
# `SELECT id1, id2, id3, ... FROM <destination_table>`
sqlalchemy.select(*destination_table.primary_key.columns)
.join(
# `JOIN <source_table>
# ON (id1, id2, ...) = (id1, id2, ...)`
source_table,
sqlalchemy.tuple_(*destination_table.primary_key.columns)
== sqlalchemy.tuple_(*source_table.primary_key.columns),
)
# `WHERE ...`
.where(destination_table.c.last_upd_date < source_table.c.last_upd_date)
.cte("update_pks")
.prefix_with("MATERIALIZED")
)

return (
# `UPDATE <destination_table>`
sqlalchemy.update(destination_table)
# `SET col1=source_table.col1, col2=source_table.col2, ...`
.values(dict(source_table.columns))
# `WHERE ...`
.where(
sqlalchemy.tuple_(*destination_table.primary_key.columns)
== sqlalchemy.tuple_(*source_table.primary_key.columns),
sqlalchemy.tuple_(*destination_table.primary_key.columns).in_(
sqlalchemy.select(*cte.columns)
),
)
)


def build_mark_deleted_sql(
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table
) -> sqlalchemy.Update:
"""Build an `UPDATE ... SET is_deleted = TRUE WHERE ...` statement for deleted rows."""
return (
# `UPDATE <destination_table>`
sqlalchemy.update(destination_table)
# `SET is_deleted = TRUE`
.values(is_deleted=True)
# `WHERE`
.where(
# `is_deleted == FALSE`
destination_table.c.is_deleted == False, # noqa: E712
# `AND (id1, id2, id3, ...) NOT IN` (id1, id2, ... is the multipart primary key)
sqlalchemy.tuple_(*destination_table.primary_key.columns).not_in(
# `(SELECT (id1, id2, id3, ...) FROM <source_table>)` (subquery)
sqlalchemy.select(*source_table.primary_key.columns)
),
)
)
5 changes: 4 additions & 1 deletion api/src/db/foreign/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ def visit_create_table(self, create, **kw):

def post_create_table(self, table):
# Add foreign options at the end.
return f" SERVER grants OPTIONS (schema 'EGRANTSADMIN', table '{table.name.upper()}')"
return (
f" SERVER grants OPTIONS (schema 'EGRANTSADMIN', table '{table.name.upper()}', "
"readonly 'true', prefetch '1000')"
)

def visit_create_column(self, create, first_pk=False, **kw):
column = create.element
Expand Down
Loading

0 comments on commit 03cf6e9

Please sign in to comment.