Skip to content

Commit

Permalink
Ta identify index in redis (#399)
Browse files Browse the repository at this point in the history
* pass the affected index back to the pgsync from pg_notify

* optimise trigger function to perform a single query

* add index name to log
  • Loading branch information
toluaina authored Dec 12, 2022
1 parent 9f8f27e commit ca76e53
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pgsync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__author__ = "Tolu Aina"
__email__ = "[email protected]"
__version__ = "2.3.4"
__version__ = "2.4.0"
11 changes: 9 additions & 2 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

class Payload(object):

__slots__ = ("tg_op", "table", "schema", "old", "new", "xmin")
__slots__ = ("tg_op", "table", "schema", "old", "new", "xmin", "indices")

def __init__(
self,
Expand All @@ -61,13 +61,15 @@ def __init__(
old: dict = Optional[None],
new: dict = Optional[None],
xmin: int = Optional[None],
indices: List[str] = Optional[None],
):
self.tg_op: str = tg_op
self.table: str = table
self.schema: str = schema
self.old: dict = old or {}
self.new: dict = new or {}
self.xmin: str = xmin
self.indices: List[str] = indices

@property
def data(self) -> dict:
Expand Down Expand Up @@ -481,12 +483,17 @@ def logical_slot_count_changes(

# Views...
def create_view(
self, schema: str, tables: Set, user_defined_fkey_tables: dict
self,
index: str,
schema: str,
tables: Set,
user_defined_fkey_tables: dict,
) -> None:
create_view(
self.engine,
self.models,
self.fetchall,
index,
schema,
tables,
user_defined_fkey_tables,
Expand Down
20 changes: 12 additions & 8 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ def setup(self) -> None:
user_defined_fkey_tables.setdefault(node.table, set())
user_defined_fkey_tables[node.table] |= set(columns)
if tables:
self.create_view(schema, tables, user_defined_fkey_tables)
self.create_view(
self.index, schema, tables, user_defined_fkey_tables
)
self.create_triggers(
schema, tables=tables, join_queries=join_queries
)
Expand Down Expand Up @@ -1039,9 +1041,10 @@ def poll_db(self) -> None:
notification: AnyStr = conn.notifies.pop(0)
if notification.channel == self.database:
payload = json.loads(notification.payload)
payloads.append(payload)
logger.debug(f"on_notify: {payload}")
self.count["db"] += 1
if self.index in payload["indices"]:
payloads.append(payload)
logger.debug(f"on_notify: {payload}")
self.count["db"] += 1

@exception
def async_poll_db(self) -> None:
Expand All @@ -1060,9 +1063,10 @@ def async_poll_db(self) -> None:
notification: AnyStr = self.conn.notifies.pop(0)
if notification.channel == self.database:
payload = json.loads(notification.payload)
self.redis.bulk_push([payload])
logger.debug(f"on_notify: {payload}")
self.count["db"] += 1
if self.index in payload["indices"]:
self.redis.bulk_push([payload])
logger.debug(f"on_notify: {payload}")
self.count["db"] += 1

def refresh_views(self) -> None:
self._refresh_views()
Expand Down Expand Up @@ -1181,7 +1185,7 @@ async def async_status(self) -> None:

def _status(self, label: str) -> None:
sys.stdout.write(
f"{label} {self.database} "
f"{label} {self.database}:{self.index} "
f"Xlog: [{self.count['xlog']:,}] => "
f"Db: [{self.count['db']:,}] => "
f"Redis: [total = {self.count['redis']:,} "
Expand Down
10 changes: 6 additions & 4 deletions pgsync/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
new_row JSON;
notification JSON;
xmin BIGINT;
_indices TEXT [];
_primary_keys TEXT [];
_foreign_keys TEXT [];
Expand All @@ -18,8 +19,8 @@
IF TG_OP = 'DELETE' THEN
SELECT primary_keys
INTO _primary_keys
SELECT primary_keys, indices
INTO _primary_keys, _indices
FROM {MATERIALIZED_VIEW}
WHERE table_name = TG_TABLE_NAME;
Expand All @@ -33,8 +34,8 @@
ELSE
IF TG_OP <> 'TRUNCATE' THEN
SELECT primary_keys, foreign_keys
INTO _primary_keys, _foreign_keys
SELECT primary_keys, foreign_keys, indices
INTO _primary_keys, _foreign_keys, _indices
FROM {MATERIALIZED_VIEW}
WHERE table_name = TG_TABLE_NAME;
Expand All @@ -61,6 +62,7 @@
'xmin', xmin,
'new', new_row,
'old', old_row,
'indices', _indices,
'tg_op', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA
Expand Down
49 changes: 37 additions & 12 deletions pgsync/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def create_view(
engine: sa.engine.Engine,
models: Callable,
fetchall: Callable,
index: str,
schema: str,
tables: Set,
user_defined_fkey_tables: dict,
Expand All @@ -213,28 +214,36 @@ def create_view(
So if 'specie' was the only row before, and the next query returns
'unit' and 'structure', we want to end up with the result below.
table_name | primary_keys | foreign_keys
-----------+--------------+--------------
specie | {id} | {id, user_id}
unit | {id} | {id, profile_id}
structure | {id} | {id}
table_name | primary_keys | foreign_keys | indices
------------+--------------+------------------+------------
specie | {id} | {id, user_id} | {foo, bar}
unit | {id} | {id, profile_id} | {foo, bar}
structure | {id} | {id} | {foo, bar}
unit | {id} | {id, profile_id} | {foo, bar}
structure | {id} | {id} | {foo, bar}
"""

rows: dict = {}
if MATERIALIZED_VIEW in views:
for table_name, primary_keys, foreign_keys in fetchall(
for table_name, primary_keys, foreign_keys, indices in fetchall(
sa.select(["*"]).select_from(
sa.text(f"{schema}.{MATERIALIZED_VIEW}")
)
):
rows.setdefault(
table_name,
{"primary_keys": set(), "foreign_keys": set()},
{
"primary_keys": set(),
"foreign_keys": set(),
"indices": set(),
},
)
if primary_keys:
rows[table_name]["primary_keys"] = set(primary_keys)
if foreign_keys:
rows[table_name]["foreign_keys"] = set(foreign_keys)
if indices:
rows[table_name]["indices"] = set(indices)

engine.execute(DropView(schema, MATERIALIZED_VIEW))

Expand All @@ -245,39 +254,47 @@ def create_view(
for table_name, columns in fetchall(_primary_keys(models, schema, tables)):
rows.setdefault(
table_name,
{"primary_keys": set(), "foreign_keys": set()},
{"primary_keys": set(), "foreign_keys": set(), "indices": set()},
)
if columns:
rows[table_name]["primary_keys"] |= set(columns)
rows[table_name]["indices"] |= set([index])

for table_name, columns in fetchall(_foreign_keys(models, schema, tables)):
rows.setdefault(
table_name,
{"primary_keys": set(), "foreign_keys": set()},
{"primary_keys": set(), "foreign_keys": set(), "indices": set()},
)
if columns:
rows[table_name]["foreign_keys"] |= set(columns)
rows[table_name]["indices"] |= set([index])

if user_defined_fkey_tables:
for table_name, columns in user_defined_fkey_tables.items():
rows.setdefault(
table_name,
{"primary_keys": set(), "foreign_keys": set()},
{
"primary_keys": set(),
"foreign_keys": set(),
"indices": set(),
},
)
if columns:
rows[table_name]["foreign_keys"] |= set(columns)
rows[table_name]["indices"] |= set([index])

if not rows:
rows.setdefault(
None,
{"primary_keys": set(), "foreign_keys": set()},
{"primary_keys": set(), "foreign_keys": set(), "indices": set()},
)

statement = sa.select(
sa.sql.Values(
sa.column("table_name"),
sa.column("primary_keys"),
sa.column("foreign_keys"),
sa.column("indices"),
)
.data(
[
Expand All @@ -289,6 +306,9 @@ def create_view(
array(fields.get("foreign_keys"))
if fields.get("foreign_keys")
else None,
array(fields.get("indices"))
if fields.get("indices")
else None,
)
for table_name, fields in rows.items()
]
Expand All @@ -299,7 +319,12 @@ def create_view(
engine.execute(CreateView(schema, MATERIALIZED_VIEW, statement))
engine.execute(DropIndex("_idx"))
engine.execute(
CreateIndex("_idx", schema, MATERIALIZED_VIEW, ["table_name"])
CreateIndex(
"_idx",
schema,
MATERIALIZED_VIEW,
["table_name"],
)
)
logger.debug(f"Created view: {schema}.{MATERIALIZED_VIEW}")

Expand Down
3 changes: 2 additions & 1 deletion tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def test_status(self, sync):
with patch("pgsync.sync.sys") as mock_sys:
sync._status("mydb")
mock_sys.stdout.write.assert_called_once_with(
"mydb testdb "
"mydb testdb:testdb "
"Xlog: [0] => "
"Db: [0] => "
"Redis: [total = 0 "
Expand Down Expand Up @@ -765,6 +765,7 @@ def test_setup(self, mock_teardown, sync):
join_queries=True,
)
mock_create_view.assert_called_once_with(
"testdb",
"public",
{"publisher", "book"},
{"publisher": {"publisher_id", "id"}},
Expand Down
10 changes: 6 additions & 4 deletions tests/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def test_trigger_template(self):
new_row JSON;
notification JSON;
xmin BIGINT;
_indices TEXT [];
_primary_keys TEXT [];
_foreign_keys TEXT [];
Expand All @@ -27,8 +28,8 @@ def test_trigger_template(self):
IF TG_OP = 'DELETE' THEN
SELECT primary_keys
INTO _primary_keys
SELECT primary_keys, indices
INTO _primary_keys, _indices
FROM _view
WHERE table_name = TG_TABLE_NAME;
Expand All @@ -42,8 +43,8 @@ def test_trigger_template(self):
ELSE
IF TG_OP <> 'TRUNCATE' THEN
SELECT primary_keys, foreign_keys
INTO _primary_keys, _foreign_keys
SELECT primary_keys, foreign_keys, indices
INTO _primary_keys, _foreign_keys, _indices
FROM _view
WHERE table_name = TG_TABLE_NAME;
Expand All @@ -70,6 +71,7 @@ def test_trigger_template(self):
'xmin', xmin,
'new', new_row,
'old', old_row,
'indices', _indices,
'tg_op', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA
Expand Down
2 changes: 2 additions & 0 deletions tests/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def fetchall(statement):
connection.engine,
pg_base.models,
fetchall,
"testdb",
DEFAULT_SCHEMA,
["book", "publisher"],
user_defined_fkey_tables={},
Expand All @@ -254,6 +255,7 @@ def fetchall(statement):
connection.engine,
pg_base.models,
fetchall,
"testdb",
"myschema",
set(["book", "publisher"]),
user_defined_fkey_tables=user_defined_fkey_tables,
Expand Down

0 comments on commit ca76e53

Please sign in to comment.