From 6572f9204e341a8caf7b7939d11d25439cc7a74c Mon Sep 17 00:00:00 2001 From: George Hickman Date: Fri, 24 Nov 2023 16:40:26 +0000 Subject: [PATCH] Batch the number of rows we insert psycopg can only send 65535 parameters in one go. Each values dict is currently 5 items, which means we end up with 10 parameters (name+value). Batching at 10k rows keeps the number of parameters under the limit. --- metrics/timescaledb/writer.py | 29 ++++++++++++++++------------- metrics/tools/iter.py | 14 ++++++++++++++ 2 files changed, 30 insertions(+), 13 deletions(-) create mode 100644 metrics/tools/iter.py diff --git a/metrics/timescaledb/writer.py b/metrics/timescaledb/writer.py index 2c0b670d..085459b2 100644 --- a/metrics/timescaledb/writer.py +++ b/metrics/timescaledb/writer.py @@ -5,6 +5,8 @@ from sqlalchemy import create_engine, inspect, schema, text from sqlalchemy.dialects.postgresql import insert +from ..tools.iter import batched + log = structlog.get_logger() @@ -48,20 +50,21 @@ def __exit__(self, *args): # get the primary key name from the given table constraint = inspect(self.engine).get_pk_constraint(self.table.name)["name"] - log.debug("Will insert %s rows", len(self.values), table=self.table.name) with self.engine.begin() as connection: - stmt = insert(self.table).values(self.values) - - # use the constraint for this table to drive upserting where the - # new value (excluded.value) is used to update the row - do_update_stmt = stmt.on_conflict_do_update( - constraint=constraint, - set_={"value": stmt.excluded.value}, - ) - - connection.execute(do_update_stmt) - - log.debug("Inserted %s rows", len(self.values), table=self.table.name) + # batch our values (which are currently 5 item dicts) so we don't + # hit the 65535 params limit + for values in batched(self.values, 10_000): + stmt = insert(self.table).values(values) + + # use the constraint for this table to drive upserting where the + # new value (excluded.value) is used to update the row + do_update_stmt = stmt.on_conflict_do_update( + constraint=constraint, + set_={"value": stmt.excluded.value}, + ) + + connection.execute(do_update_stmt) + log.info("Inserted %s rows", len(values), table=self.table.name) def write(self, date, value, **kwargs): # convert date to a timestamp diff --git a/metrics/tools/iter.py b/metrics/tools/iter.py new file mode 100644 index 00000000..bafd23b1 --- /dev/null +++ b/metrics/tools/iter.py @@ -0,0 +1,14 @@ +import itertools + + +def batched(iterable, n): + """ + Backport of 3.12's itertools.batched + + https://docs.python.org/3/library/itertools.html#itertools.batched + + batched('ABCDEFG', 3) --> ABC DEF G + """ + it = iter(iterable) + while batch := tuple(itertools.islice(it, n)): + yield batch