Skip to content

Commit

Permalink
Batch the number of rows we insert
Browse files Browse the repository at this point in the history
psycopg can only send 65535 parameters in one go.  Each values dict is
currently 5 items, which means we end up with 10 parameters
(name+value).  Batching at 10k rows keeps the number of parameters under
the limit.
  • Loading branch information
ghickman committed Nov 27, 2023
1 parent e570bc0 commit 6572f92
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
29 changes: 16 additions & 13 deletions metrics/timescaledb/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from sqlalchemy import create_engine, inspect, schema, text
from sqlalchemy.dialects.postgresql import insert

from ..tools.iter import batched


log = structlog.get_logger()

Expand Down Expand Up @@ -48,20 +50,21 @@ def __exit__(self, *args):
# get the primary key name from the given table
constraint = inspect(self.engine).get_pk_constraint(self.table.name)["name"]

log.debug("Will insert %s rows", len(self.values), table=self.table.name)
with self.engine.begin() as connection:
stmt = insert(self.table).values(self.values)

# use the constraint for this table to drive upserting where the
# new value (excluded.value) is used to update the row
do_update_stmt = stmt.on_conflict_do_update(
constraint=constraint,
set_={"value": stmt.excluded.value},
)

connection.execute(do_update_stmt)

log.debug("Inserted %s rows", len(self.values), table=self.table.name)
# batch our values (which are currently 5 item dicts) so we don't
# hit the 65535 params limit
for values in batched(self.values, 10_000):
stmt = insert(self.table).values(values)

# use the constraint for this table to drive upserting where the
# new value (excluded.value) is used to update the row
do_update_stmt = stmt.on_conflict_do_update(
constraint=constraint,
set_={"value": stmt.excluded.value},
)

connection.execute(do_update_stmt)
log.info("Inserted %s rows", len(values), table=self.table.name)

def write(self, date, value, **kwargs):
# convert date to a timestamp
Expand Down
14 changes: 14 additions & 0 deletions metrics/tools/iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import itertools


def batched(iterable, n):
"""
Backport of 3.12's itertools.batched
https://docs.python.org/3/library/itertools.html#itertools.batched
batched('ABCDEFG', 3) --> ABC DEF G
"""
it = iter(iterable)
while batch := tuple(itertools.islice(it, n)):
yield batch

0 comments on commit 6572f92

Please sign in to comment.