Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Custom replication slot name #543

Merged
merged 27 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f0abd1e
add support for configurable replication_slot_name in logical replica…
RobyBen Nov 23, 2024
b9de248
fixes
RobyBen Nov 23, 2024
d2c6ed5
another fixes
RobyBen Nov 23, 2024
970ec8d
idk why dont work
RobyBen Nov 23, 2024
c7e7236
finaly&
RobyBen Nov 23, 2024
f811255
ruff format
RobyBen Nov 23, 2024
802bda8
little changes client.py and tap.py
RobyBen Nov 24, 2024
e81b989
ruff format again
RobyBen Nov 24, 2024
098b239
changed th.Property
RobyBen Nov 24, 2024
a8e2720
I forgot to delete the asserts
RobyBen Nov 24, 2024
d2b38aa
Move `pattern` parameter to `StringType` instance
edgarrmondragon Nov 25, 2024
eb7bea3
Update tap_postgres/tap.py
edgarrmondragon Nov 25, 2024
1aceb57
Merge branch 'main' into feature/slot_name
edgarrmondragon Nov 25, 2024
4f9deb3
Use keyword argument in tests
edgarrmondragon Nov 25, 2024
80071d2
Add `user`
edgarrmondragon Nov 25, 2024
cc8960f
Add `password`
edgarrmondragon Nov 25, 2024
c61234f
Update tests/test_slot_name.py
edgarrmondragon Nov 25, 2024
14f1854
changed default_config in the test, maybe it will help
RobyBen Nov 30, 2024
e45cd2a
Changed default_config in the test, maybe it will help. Please check …
RobyBen Nov 30, 2024
40a06fd
Merge branch 'main' into feature/slot_name
RobyBen Nov 30, 2024
6f21aa3
Merge branch 'feature/slot_name' of https://github.com/RobyBen/tap-po…
RobyBen Nov 30, 2024
73b7e8b
Ok, the default_config in the test should have changed , maybe that w…
RobyBen Nov 30, 2024
e5a3037
Merge branch 'main' into feature/slot_name
edgarrmondragon Dec 2, 2024
0927913
Changed test_slot_name
RobyBen Dec 2, 2024
2e11ad4
Merge branch 'feature/slot_name' of https://github.com/RobyBen/tap-po…
RobyBen Dec 2, 2024
f6bd4bb
Use pytest-style tests
edgarrmondragon Dec 2, 2024
d567ba4
Use hypothesis
edgarrmondragon Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,11 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, Any]]:
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

# get the slot name from the configuration or use the default value
replication_slot_name = self.config.get("replication_slot_name", "tappostgres")

logical_replication_cursor.start_replication(
slot_name="tappostgres",
slot_name=replication_slot_name, # use slot name
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
Expand Down Expand Up @@ -460,13 +463,20 @@ def logical_replication_connection(self):
Uses a direct psycopg2 implementation rather than through sqlalchemy.
"""
connection_string = (
f"dbname={self.config['database']} user={self.config['user']} password="
f"{self.config['password']} host={self.config['host']} port="
f"{self.config['port']}"
f"dbname={self.config['database']} "
f"user={self.config['user']} "
f"password{self.config['password']} "
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
f"host={self.config['host']} "
f"port={self.config['port']}"
"replication=database"
)
return psycopg2.connect(
connection_string,
application_name="tap_postgres",
# add slot name to application_name
application_name=(
f"tap_postgres_{self.config.get(
'replication_slot_name', "f"'tappostgres')}"
),
connection_factory=extras.LogicalReplicationConnection,
)

Expand Down
44 changes: 44 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ def __init__(
See https://github.com/MeltanoLabs/tap-postgres/issues/141
"""
super().__init__(*args, **kwargs)

# Add replication_slot_name validation
if self.config.get("replication_method") == "LOG_BASED" and self.config.get(
"replication_slot_name"
):
slot_name = self.config["replication_slot_name"]
assert (
slot_name.isalnum() or "_" in slot_name
), "Replication slot name must contain letters, numbers and underscores"
max_slot_name_len = 63
assert (
len(slot_name) <= max_slot_name_len
), "Replication slot name must be less than 63 characters"
assert not slot_name.startswith(
"pg_"
), "Replication slot name cannot start with 'pg_'"
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

assert (self.config.get("sqlalchemy_url") is not None) or (
self.config.get("host") is not None
and self.config.get("port") is not None
Expand Down Expand Up @@ -98,6 +115,16 @@ def __init__(
)

config_jsonschema = th.PropertiesList(
th.Property(
"replication_slot_name",
th.StringType,
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
default="tappostgres",
description=(
"Name of the replication slot to use for logical replication. "
"Must be unique for parallel extractions. "
"Only applicable when replication_method is LOG_BASED."
),
), # New Property
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
th.Property(
"host",
th.StringType,
Expand Down Expand Up @@ -617,3 +644,20 @@ def discover_streams(self) -> Sequence[Stream]:
PostgresStream(self, catalog_entry, connector=self.connector)
)
return streams


# Configuration Example for Parallel Replication

config_1 = {
"host": "database1.example.com",
"port": 5432,
"dbname": "example_db_1",
"replication_slot_name": "slot_1",
}

config_2 = {
"host": "database2.example.com",
"port": 5432,
"dbname": "example_db_2",
"replication_slot_name": "slot_2",
}
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
51 changes: 51 additions & 0 deletions tests/test_slot_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import unittest

from tap_postgres.tap import TapPostgres


class TestReplicationSlot(unittest.TestCase):
def setUp(self):
self.default_config = {
"host": "localhost",
"port": 5432,
"dbname": "test_db",
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
}

def test_default_slot_name(self):
# Test backward compatibility when slot name is not provided.
config = self.default_config
tap = TapPostgres(config)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(
tap.config.get("replication_slot_name", "tappostgres"), "tappostgres"
)

def test_custom_slot_name(self):
# Test if the custom slot name is used.
config = {**self.default_config, "replication_slot_name": "custom_slot"}
tap = TapPostgres(config)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(tap.config["replication_slot_name"], "custom_slot")

def test_multiple_slots(self):
# Simulate using multiple configurations with different slot names.
config_1 = {**self.default_config, "replication_slot_name": "slot_1"}
config_2 = {**self.default_config, "replication_slot_name": "slot_2"}

tap_1 = TapPostgres(config_1)
tap_2 = TapPostgres(config_2)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

self.assertNotEqual(
tap_1.config["replication_slot_name"],
tap_2.config["replication_slot_name"],
)
self.assertEqual(tap_1.config["replication_slot_name"], "slot_1")
self.assertEqual(tap_2.config["replication_slot_name"], "slot_2")

def test_invalid_slot_name(self):
# Test validation for invalid slot names (if any validation rules exist).
invalid_config = {
**self.default_config,
"replication_slot_name": "invalid slot name!",
}

with self.assertRaises(ValueError):
TapPostgres(invalid_config)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
Loading