Skip to content

Commit

Permalink
fix PG_LOGICAL_SLOT_PEEK_CHANGES query crashe #546
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Jul 2, 2024
1 parent 0f22a46 commit 71fd7b4
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 46 deletions.
3 changes: 1 addition & 2 deletions examples/book/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@
"Hero",
"Villain",
"Geometry",
"Infinity",
"TextEmbedding3Small"
"Infinity"
],
"nodes": {
"table": "book",
Expand Down
36 changes: 36 additions & 0 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,42 @@ def _logical_slot_changes(
statement = statement.offset(offset)
return statement

def max_lsn(
self,
slot_name: str,
txmin: t.Optional[int] = None,
txmax: t.Optional[int] = None,
):
filters: list = []
statement: sa.sql.Select = sa.select(
sa.func.MAX(sa.text("lsn")),
).select_from(
sa.func.PG_LOGICAL_SLOT_PEEK_CHANGES(
slot_name,
None,
None,
)
)
if txmin is not None:
filters.append(
sa.cast(
sa.cast(sa.column("xid"), sa.Text),
sa.BigInteger,
)
>= txmin
)
if txmax is not None:
filters.append(
sa.cast(
sa.cast(sa.column("xid"), sa.Text),
sa.BigInteger,
)
< txmax
)
if filters:
statement = statement.where(sa.and_(*filters))
return self.fetchone(statement)[0]

def logical_slot_get_changes(
self,
slot_name: str,
Expand Down
2 changes: 1 addition & 1 deletion pgsync/querybuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _eval_expression(
):
# handle UUID typed expressions:
# psycopg2.errors.UndefinedFunction: operator does not exist: uuid = integer
return expression.left == None
return expression.left is None

return expression

Expand Down
25 changes: 9 additions & 16 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,25 +377,21 @@ def logical_slot_changes(
# minimize the tmp file disk usage when calling
# PG_LOGICAL_SLOT_PEEK_CHANGES and PG_LOGICAL_SLOT_GET_CHANGES
# by limiting to a smaller batch size.
offset: int = 0
total: int = 0
limit: int = settings.LOGICAL_SLOT_CHUNK_SIZE
count: int = self.logical_slot_count_changes(
self.__name,
txmin=txmin,
txmax=txmax,
upto_nchanges=upto_nchanges,
)

upto_nchanges: int = upto_nchanges or settings.LOGICAL_SLOT_CHUNK_SIZE

# this is the max lsn we can go upto
max_lsn: int = self.max_lsn(self.__name, txmin=txmin, txmax=txmax)

while True:
changes: int = self.logical_slot_peek_changes(
self.__name,
txmin=txmin,
txmax=txmax,
upto_nchanges=upto_nchanges,
limit=limit,
offset=offset,
upto_lsn=max_lsn,
)
if not changes or total > count:
if not changes:
break

rows: list = []
Expand Down Expand Up @@ -450,11 +446,8 @@ def logical_slot_changes(
txmin=txmin,
txmax=txmax,
upto_nchanges=upto_nchanges,
limit=limit,
offset=offset,
upto_lsn=max_lsn,
)
offset += limit
total += len(changes)
self.count["xlog"] += len(rows)

def _root_primary_key_resolver(
Expand Down
12 changes: 6 additions & 6 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#
async-timeout==4.0.3
# via redis
boto3==1.34.124
boto3==1.34.138
# via -r requirements/base.in
botocore==1.34.124
botocore==1.34.138
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -61,31 +61,31 @@ python-dotenv==1.0.1
# via
# -r requirements/base.in
# environs
redis==5.0.5
redis==5.0.7
# via -r requirements/base.in
requests==2.32.3
# via
# opensearch-py
# requests-aws4auth
requests-aws4auth==1.2.3
# via -r requirements/base.in
s3transfer==0.10.1
s3transfer==0.10.2
# via boto3
six==1.16.0
# via
# opensearch-dsl
# opensearch-py
# python-dateutil
# requests-aws4auth
sqlalchemy==2.0.30
sqlalchemy==2.0.31
# via -r requirements/base.in
sqlparse==0.5.0
# via -r requirements/base.in
typing-extensions==4.12.2
# via
# elasticsearch-dsl
# sqlalchemy
urllib3==1.26.18
urllib3==1.26.19
# via
# botocore
# elastic-transport
Expand Down
24 changes: 12 additions & 12 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ async-timeout==4.0.3
# via redis
black==24.4.2
# via -r requirements/dev.in
boto3==1.34.124
boto3==1.34.138
# via -r requirements/base.in
botocore==1.34.124
botocore==1.34.138
# via
# boto3
# s3transfer
Expand All @@ -27,7 +27,7 @@ click==8.1.7
# via
# -r requirements/base.in
# black
coverage[toml]==7.5.3
coverage[toml]==7.5.4
# via
# -r requirements/dev.in
# pytest-cov
Expand All @@ -47,11 +47,11 @@ events==0.5
# via opensearch-py
exceptiongroup==1.2.1
# via pytest
faker==25.8.0
faker==26.0.0
# via -r requirements/dev.in
filelock==3.14.0
filelock==3.15.4
# via virtualenv
flake8==7.0.0
flake8==7.1.0
# via -r requirements/dev.in
freezegun==1.5.1
# via -r requirements/dev.in
Expand Down Expand Up @@ -100,7 +100,7 @@ pre-commit==3.5.0
# via -r requirements/dev.in
psycopg2-binary==2.9.9
# via -r requirements/base.in
pycodestyle==2.11.1
pycodestyle==2.12.0
# via flake8
pyflakes==3.2.0
# via flake8
Expand All @@ -127,23 +127,23 @@ python-dotenv==1.0.1
# environs
pyyaml==6.0.1
# via pre-commit
redis==5.0.5
redis==5.0.7
# via -r requirements/base.in
requests==2.32.3
# via
# opensearch-py
# requests-aws4auth
requests-aws4auth==1.2.3
# via -r requirements/base.in
s3transfer==0.10.1
s3transfer==0.10.2
# via boto3
six==1.16.0
# via
# opensearch-dsl
# opensearch-py
# python-dateutil
# requests-aws4auth
sqlalchemy==2.0.30
sqlalchemy==2.0.31
# via -r requirements/base.in
sqlparse==0.5.0
# via -r requirements/base.in
Expand All @@ -157,11 +157,11 @@ typing-extensions==4.12.2
# black
# elasticsearch-dsl
# sqlalchemy
urllib3==1.26.18
urllib3==1.26.19
# via
# botocore
# elastic-transport
# opensearch-py
# requests
virtualenv==20.26.2
virtualenv==20.26.3
# via pre-commit
15 changes: 6 additions & 9 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ def test_logical_slot_changes(self, mock_logger, sync):
"testdb_testdb",
txmin=None,
txmax=None,
upto_nchanges=None,
limit=settings.LOGICAL_SLOT_CHUNK_SIZE,
offset=0,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_lsn=None,
)
mock_sync.assert_not_called()

Expand All @@ -98,9 +97,8 @@ def test_logical_slot_changes(self, mock_logger, sync):
"testdb_testdb",
txmin=None,
txmax=None,
upto_nchanges=None,
limit=settings.LOGICAL_SLOT_CHUNK_SIZE,
offset=0,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_lsn=None,
)
mock_sync.assert_not_called()

Expand All @@ -127,9 +125,8 @@ def test_logical_slot_changes(self, mock_logger, sync):
"testdb_testdb",
txmin=None,
txmax=None,
upto_nchanges=None,
limit=settings.LOGICAL_SLOT_CHUNK_SIZE,
offset=0,
upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE,
upto_lsn=None,
)
mock_get.assert_called_once()
mock_sync.assert_called_once()
Expand Down

0 comments on commit 71fd7b4

Please sign in to comment.