From 71fd7b43e41c21815d682912f7ebc18d2332b9dc Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Tue, 2 Jul 2024 21:56:13 +0100 Subject: [PATCH] fix PG_LOGICAL_SLOT_PEEK_CHANGES query crashe #546 --- examples/book/schema.json | 3 +-- pgsync/base.py | 36 ++++++++++++++++++++++++++++++++++++ pgsync/querybuilder.py | 2 +- pgsync/sync.py | 25 +++++++++---------------- requirements/base.txt | 12 ++++++------ requirements/dev.txt | 24 ++++++++++++------------ tests/test_sync.py | 15 ++++++--------- 7 files changed, 71 insertions(+), 46 deletions(-) diff --git a/examples/book/schema.json b/examples/book/schema.json index aaf23852..63520e1e 100644 --- a/examples/book/schema.json +++ b/examples/book/schema.json @@ -55,8 +55,7 @@ "Hero", "Villain", "Geometry", - "Infinity", - "TextEmbedding3Small" + "Infinity" ], "nodes": { "table": "book", diff --git a/pgsync/base.py b/pgsync/base.py index 33ab6465..07fac265 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -529,6 +529,42 @@ def _logical_slot_changes( statement = statement.offset(offset) return statement + def max_lsn( + self, + slot_name: str, + txmin: t.Optional[int] = None, + txmax: t.Optional[int] = None, + ): + filters: list = [] + statement: sa.sql.Select = sa.select( + sa.func.MAX(sa.text("lsn")), + ).select_from( + sa.func.PG_LOGICAL_SLOT_PEEK_CHANGES( + slot_name, + None, + None, + ) + ) + if txmin is not None: + filters.append( + sa.cast( + sa.cast(sa.column("xid"), sa.Text), + sa.BigInteger, + ) + >= txmin + ) + if txmax is not None: + filters.append( + sa.cast( + sa.cast(sa.column("xid"), sa.Text), + sa.BigInteger, + ) + < txmax + ) + if filters: + statement = statement.where(sa.and_(*filters)) + return self.fetchone(statement)[0] + def logical_slot_get_changes( self, slot_name: str, diff --git a/pgsync/querybuilder.py b/pgsync/querybuilder.py index b32575a4..8b58a78b 100644 --- a/pgsync/querybuilder.py +++ b/pgsync/querybuilder.py @@ -34,7 +34,7 @@ def _eval_expression( ): # handle UUID typed expressions: # psycopg2.errors.UndefinedFunction: operator does not exist: uuid = integer - return expression.left == None + return expression.left is None return expression diff --git a/pgsync/sync.py b/pgsync/sync.py index 311f066d..453aa994 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -377,25 +377,21 @@ def logical_slot_changes( # minimize the tmp file disk usage when calling # PG_LOGICAL_SLOT_PEEK_CHANGES and PG_LOGICAL_SLOT_GET_CHANGES # by limiting to a smaller batch size. - offset: int = 0 - total: int = 0 - limit: int = settings.LOGICAL_SLOT_CHUNK_SIZE - count: int = self.logical_slot_count_changes( - self.__name, - txmin=txmin, - txmax=txmax, - upto_nchanges=upto_nchanges, - ) + + upto_nchanges: int = upto_nchanges or settings.LOGICAL_SLOT_CHUNK_SIZE + + # this is the max lsn we can go upto + max_lsn: int = self.max_lsn(self.__name, txmin=txmin, txmax=txmax) + while True: changes: int = self.logical_slot_peek_changes( self.__name, txmin=txmin, txmax=txmax, upto_nchanges=upto_nchanges, - limit=limit, - offset=offset, + upto_lsn=max_lsn, ) - if not changes or total > count: + if not changes: break rows: list = [] @@ -450,11 +446,8 @@ def logical_slot_changes( txmin=txmin, txmax=txmax, upto_nchanges=upto_nchanges, - limit=limit, - offset=offset, + upto_lsn=max_lsn, ) - offset += limit - total += len(changes) self.count["xlog"] += len(rows) def _root_primary_key_resolver( diff --git a/requirements/base.txt b/requirements/base.txt index 3c1c87aa..81607fc1 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -6,9 +6,9 @@ # async-timeout==4.0.3 # via redis -boto3==1.34.124 +boto3==1.34.138 # via -r requirements/base.in -botocore==1.34.124 +botocore==1.34.138 # via # boto3 # s3transfer @@ -61,7 +61,7 @@ python-dotenv==1.0.1 # via # -r requirements/base.in # environs -redis==5.0.5 +redis==5.0.7 # via -r requirements/base.in requests==2.32.3 # via @@ -69,7 +69,7 @@ requests==2.32.3 # requests-aws4auth requests-aws4auth==1.2.3 # via -r requirements/base.in -s3transfer==0.10.1 +s3transfer==0.10.2 # via boto3 six==1.16.0 # via @@ -77,7 +77,7 @@ six==1.16.0 # opensearch-py # python-dateutil # requests-aws4auth -sqlalchemy==2.0.30 +sqlalchemy==2.0.31 # via -r requirements/base.in sqlparse==0.5.0 # via -r requirements/base.in @@ -85,7 +85,7 @@ typing-extensions==4.12.2 # via # elasticsearch-dsl # sqlalchemy -urllib3==1.26.18 +urllib3==1.26.19 # via # botocore # elastic-transport diff --git a/requirements/dev.txt b/requirements/dev.txt index 792437e2..69539d3b 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -8,9 +8,9 @@ async-timeout==4.0.3 # via redis black==24.4.2 # via -r requirements/dev.in -boto3==1.34.124 +boto3==1.34.138 # via -r requirements/base.in -botocore==1.34.124 +botocore==1.34.138 # via # boto3 # s3transfer @@ -27,7 +27,7 @@ click==8.1.7 # via # -r requirements/base.in # black -coverage[toml]==7.5.3 +coverage[toml]==7.5.4 # via # -r requirements/dev.in # pytest-cov @@ -47,11 +47,11 @@ events==0.5 # via opensearch-py exceptiongroup==1.2.1 # via pytest -faker==25.8.0 +faker==26.0.0 # via -r requirements/dev.in -filelock==3.14.0 +filelock==3.15.4 # via virtualenv -flake8==7.0.0 +flake8==7.1.0 # via -r requirements/dev.in freezegun==1.5.1 # via -r requirements/dev.in @@ -100,7 +100,7 @@ pre-commit==3.5.0 # via -r requirements/dev.in psycopg2-binary==2.9.9 # via -r requirements/base.in -pycodestyle==2.11.1 +pycodestyle==2.12.0 # via flake8 pyflakes==3.2.0 # via flake8 @@ -127,7 +127,7 @@ python-dotenv==1.0.1 # environs pyyaml==6.0.1 # via pre-commit -redis==5.0.5 +redis==5.0.7 # via -r requirements/base.in requests==2.32.3 # via @@ -135,7 +135,7 @@ requests==2.32.3 # requests-aws4auth requests-aws4auth==1.2.3 # via -r requirements/base.in -s3transfer==0.10.1 +s3transfer==0.10.2 # via boto3 six==1.16.0 # via @@ -143,7 +143,7 @@ six==1.16.0 # opensearch-py # python-dateutil # requests-aws4auth -sqlalchemy==2.0.30 +sqlalchemy==2.0.31 # via -r requirements/base.in sqlparse==0.5.0 # via -r requirements/base.in @@ -157,11 +157,11 @@ typing-extensions==4.12.2 # black # elasticsearch-dsl # sqlalchemy -urllib3==1.26.18 +urllib3==1.26.19 # via # botocore # elastic-transport # opensearch-py # requests -virtualenv==20.26.2 +virtualenv==20.26.3 # via pre-commit diff --git a/tests/test_sync.py b/tests/test_sync.py index 009eb6e2..f5efc35b 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -81,9 +81,8 @@ def test_logical_slot_changes(self, mock_logger, sync): "testdb_testdb", txmin=None, txmax=None, - upto_nchanges=None, - limit=settings.LOGICAL_SLOT_CHUNK_SIZE, - offset=0, + upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_lsn=None, ) mock_sync.assert_not_called() @@ -98,9 +97,8 @@ def test_logical_slot_changes(self, mock_logger, sync): "testdb_testdb", txmin=None, txmax=None, - upto_nchanges=None, - limit=settings.LOGICAL_SLOT_CHUNK_SIZE, - offset=0, + upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_lsn=None, ) mock_sync.assert_not_called() @@ -127,9 +125,8 @@ def test_logical_slot_changes(self, mock_logger, sync): "testdb_testdb", txmin=None, txmax=None, - upto_nchanges=None, - limit=settings.LOGICAL_SLOT_CHUNK_SIZE, - offset=0, + upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_lsn=None, ) mock_get.assert_called_once() mock_sync.assert_called_once()