From 8fa80cf0feb3fbc9c6a54d2dfe0efa0ea7cbe595 Mon Sep 17 00:00:00 2001 From: Ben Butler-Cole Date: Tue, 24 Sep 2024 10:00:01 +0100 Subject: [PATCH 1/2] Reuse connections when resetting tables --- metrics/timescaledb/db.py | 71 ++++++++++++++-------------- tests/metrics/timescaledb/test_db.py | 30 +++++------- 2 files changed, 48 insertions(+), 53 deletions(-) diff --git a/metrics/timescaledb/db.py b/metrics/timescaledb/db.py index 3309550..7b24aa3 100644 --- a/metrics/timescaledb/db.py +++ b/metrics/timescaledb/db.py @@ -13,9 +13,10 @@ def reset_table(table, batch_size=None): - _drop_table(table, batch_size) - _ensure_table(table) - log.info("Reset table", table=table.name) + with _get_engine().begin() as connection: + _drop_table(connection, table, batch_size) + _ensure_table(connection, table) + log.info("Reset table", table=table.name) def write(table, rows): @@ -28,15 +29,15 @@ def write(table, rows): def upsert(table, rows): - _ensure_table(table) - batch_size = _batch_size(table) - non_pk_columns = set(table.columns) - set(table.primary_key.columns) + with _get_engine().begin() as connection: + _ensure_table(connection, table) + batch_size = _batch_size(table) + non_pk_columns = set(table.columns) - set(table.primary_key.columns) - # use the primary key constraint to match rows to be updated, - # using default constraint name if not otherwise specified - constraint = table.primary_key.name or table.name + "_pkey" + # use the primary key constraint to match rows to be updated, + # using default constraint name if not otherwise specified + constraint = table.primary_key.name or table.name + "_pkey" - with _get_engine().begin() as connection: for values in batched(rows, batch_size): # Construct a PostgreSQL "INSERT..ON CONFLICT" style upsert statement # https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert @@ -68,30 +69,29 @@ def _batch_size(table): return max_params // len(table.columns) -def _drop_table(table, batch_size): - with _get_engine().begin() as connection: - log.debug("Removing table: %s", table.name) +def _drop_table(connection, table, batch_size): + log.debug("Removing table: %s", table.name) - if not _has_table(connection, table): - return + if not _has_table(connection, table): + return - if _is_hypertable(table): - # We have limited shared memory in our hosted database, so we can't DROP or - # TRUNCATE our hypertables. Instead for each "raw" table we need to: - # * empty the raw rows (from the named table) in batches - # * drop the sharded "child" tables in batches - # * drop the now empty raw table - while _has_rows(connection, table): - _delete_rows(connection, table, batch_size) + if _is_hypertable(table): + # We have limited shared memory in our hosted database, so we can't DROP or + # TRUNCATE our hypertables. Instead for each "raw" table we need to: + # * empty the raw rows (from the named table) in batches + # * drop the sharded "child" tables in batches + # * drop the now empty raw table + while _has_rows(connection, table): + _delete_rows(connection, table, batch_size) - log.debug("Removed all raw rows", table=table.name) + log.debug("Removed all raw rows", table=table.name) - _drop_child_tables(connection, table) - log.debug("Removed all child tables", table=table.name) + _drop_child_tables(connection, table) + log.debug("Removed all child tables", table=table.name) - connection.execute(text(f"DROP TABLE {table.name}")) + connection.execute(text(f"DROP TABLE {table.name}")) - log.debug("Removed raw table", table=table.name) + log.debug("Removed raw table", table=table.name) def _has_table(connection, table): @@ -142,16 +142,15 @@ def _drop_child_tables(connection, table): connection.execute(text(f"DROP TABLE IF EXISTS {tables}")) -def _ensure_table(table): - with _get_engine().begin() as connection: - connection.execute(schema.CreateTable(table, if_not_exists=True)) +def _ensure_table(connection, table): + connection.execute(schema.CreateTable(table, if_not_exists=True)) - if _is_hypertable(table): - connection.execute( - text( - f"SELECT create_hypertable('{table.name}', 'time', if_not_exists => TRUE);" - ) + if _is_hypertable(table): + connection.execute( + text( + f"SELECT create_hypertable('{table.name}', 'time', if_not_exists => TRUE);" ) + ) @functools.cache diff --git a/tests/metrics/timescaledb/test_db.py b/tests/metrics/timescaledb/test_db.py index 58f17e8..a16976e 100644 --- a/tests/metrics/timescaledb/test_db.py +++ b/tests/metrics/timescaledb/test_db.py @@ -34,7 +34,7 @@ def get_rows(engine, table): return connection.execute(select(table)).all() -def assert_is_hypertable(engine, table): +def assert_is_hypertable(connection, engine, table): sql = """ SELECT count(*) @@ -46,8 +46,7 @@ def assert_is_hypertable(engine, table): trigger_name = 'ts_insert_blocker'; """ - with engine.connect() as connection: - result = connection.execute(text(sql), {"table_name": table.name}).fetchone() + result = connection.execute(text(sql), {"table_name": table.name}).fetchone() # We should have one trigger called ts_insert_blocker for a hypertable. assert result[0] == 1, result @@ -75,25 +74,19 @@ def hypertable(request): def test_ensure_table(engine, table): with engine.begin() as connection: assert not db._has_table(connection, table) - - db._ensure_table(table) - - with engine.begin() as connection: + db._ensure_table(connection, table) assert db._has_table(connection, table) def test_ensure_hypertable(engine, hypertable): with engine.begin() as connection: assert not db._has_table(connection, hypertable) - - db._ensure_table(hypertable) - - with engine.begin() as connection: + db._ensure_table(connection, hypertable) assert db._has_table(connection, hypertable) - # check there are timescaledb child tables - # https://stackoverflow.com/questions/1461722/how-to-find-child-tables-that-inherit-from-another-table-in-psql - assert_is_hypertable(engine, hypertable) + # check there are timescaledb child tables + # https://stackoverflow.com/questions/1461722/how-to-find-child-tables-that-inherit-from-another-table-in-psql + assert_is_hypertable(connection, engine, hypertable) def test_get_url(monkeypatch): @@ -114,7 +107,8 @@ def test_get_url_with_prefix(monkeypatch): def test_reset_table(engine, table): - db._ensure_table(table) + with engine.begin() as connection: + db._ensure_table(connection, table) # put enough rows in the db to make sure we exercise the batch removal of rows batch_size = 5 @@ -126,7 +120,8 @@ def test_reset_table(engine, table): def test_reset_hypertable(engine, hypertable): - db._ensure_table(hypertable) + with engine.begin() as connection: + db._ensure_table(connection, hypertable) # put enough rows in the db to make sure we exercise the batch removal of rows batch_size = 5 @@ -156,7 +151,8 @@ def check_reset(batch_size, engine, rows, table): def test_write(engine, table): # set up a table to write to - db._ensure_table(table) + with engine.begin() as connection: + db._ensure_table(connection, table) rows = [{"value": "write" + str(i)} for i in range(1, 4)] db.write(table, rows) From 6d5404d33f1c5419e2a99696fe67701fe104c2cd Mon Sep 17 00:00:00 2001 From: Ben Butler-Cole Date: Tue, 24 Sep 2024 10:01:42 +0100 Subject: [PATCH 2/2] Move explanatory comment into function --- tests/metrics/timescaledb/test_db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/metrics/timescaledb/test_db.py b/tests/metrics/timescaledb/test_db.py index a16976e..ee2fca8 100644 --- a/tests/metrics/timescaledb/test_db.py +++ b/tests/metrics/timescaledb/test_db.py @@ -35,6 +35,9 @@ def get_rows(engine, table): def assert_is_hypertable(connection, engine, table): + # check there are timescaledb child tables + # https://stackoverflow.com/questions/1461722/how-to-find-child-tables-that-inherit-from-another-table-in-psql + sql = """ SELECT count(*) @@ -83,9 +86,6 @@ def test_ensure_hypertable(engine, hypertable): assert not db._has_table(connection, hypertable) db._ensure_table(connection, hypertable) assert db._has_table(connection, hypertable) - - # check there are timescaledb child tables - # https://stackoverflow.com/questions/1461722/how-to-find-child-tables-that-inherit-from-another-table-in-psql assert_is_hypertable(connection, engine, hypertable)