Skip to content

Commit

Permalink
feat: Custom replication slot name (#543)
Browse files Browse the repository at this point in the history
Issue #540
**client.py**
Introduced support for specifying replication_slot_name in the
replication logic
Added replication=database to the connection string
Updated application_name

**tap.py**
Validation for replication_slot_name: <=60 characters, and do not start
with pg_.
Added replication_slot_name as a configurable parameter in
config_jsonschema, with a default value of "tappostgres".
Passed replication_slot_name to the PostgresConnector.
Added configuration examples

**Added test_slot_name.py to the tests folder**

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
Co-authored-by: Edgar Ramírez-Mondragón <[email protected]>
  • Loading branch information
3 people authored Dec 3, 2024
1 parent 880cfbf commit 25bce26
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 168 deletions.
395 changes: 233 additions & 162 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extras = ["faker"]

[tool.poetry.group.dev.dependencies]
faker = ">=18.5.1"
hypothesis = ">=6.122.1"
mypy = ">=1.8.0"
pre-commit = ">=3.0.4"
ruff = "~=0.8.0"
Expand Down Expand Up @@ -102,7 +103,7 @@ select = [
"ICN", # flake8-import-conventions
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"TC", # flake8-type-checking
"ERA", # eradicate
"PGH", # pygrep-hooks
"PL", # Pylint
Expand Down
15 changes: 10 additions & 5 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class PostgresLogBasedStream(SQLStream):

connector_class = PostgresConnector

# JSONB Objects won't be selected without type_confomance_level to ROOT_ONLY
# JSONB Objects won't be selected without type_conformance_level to ROOT_ONLY
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

replication_key = "_sdc_lsn"
Expand Down Expand Up @@ -339,8 +339,11 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]:
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

# get the slot name from the configuration or use the default value
replication_slot_name = self.config.get("replication_slot_name", "tappostgres")

logical_replication_cursor.start_replication(
slot_name="tappostgres",
slot_name=replication_slot_name, # use slot name
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
Expand Down Expand Up @@ -459,9 +462,11 @@ def logical_replication_connection(self):
Uses a direct psycopg2 implementation rather than through sqlalchemy.
"""
connection_string = (
f"dbname={self.config['database']} user={self.config['user']} password="
f"{self.config['password']} host={self.config['host']} port="
f"{self.config['port']}"
f"dbname={self.config['database']} "
f"user={self.config['user']} "
f"password={self.config['password']} "
f"host={self.config['host']} "
f"port={self.config['port']}"
)
return psycopg2.connect(
connection_string,
Expand Down
16 changes: 16 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from collections.abc import Mapping, Sequence


REPLICATION_SLOT_PATTERN = "^(?!pg_)[A-Za-z0-9_]{1,63}$"


class TapPostgres(SQLTap):
"""Singer tap for Postgres."""

Expand Down Expand Up @@ -98,6 +101,19 @@ def __init__(
)

config_jsonschema = th.PropertiesList(
th.Property(
"replication_slot_name",
th.StringType(pattern=REPLICATION_SLOT_PATTERN),
default="tappostgres",
description=(
"Name of the replication slot to use for logical replication. "
"Must be unique for parallel extractions. "
"Only applicable when replication_method is LOG_BASED."
"- Contain only letters, numbers, and underscores. "
"- Be less than or equal to 63 characters. "
"- Not start with 'pg_'."
),
),
th.Property(
"host",
th.StringType,
Expand Down
60 changes: 60 additions & 0 deletions tests/test_slot_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest
from hypothesis import given
from hypothesis import strategies as st
from singer_sdk.exceptions import ConfigValidationError

from tap_postgres.tap import REPLICATION_SLOT_PATTERN, TapPostgres
from tests.settings import DB_SQLALCHEMY_URL


@pytest.fixture
def default_config():
return {"sqlalchemy_url": DB_SQLALCHEMY_URL}


def test_default_slot_name(default_config: dict):
"""Test backward compatibility when slot name is not provided."""
tap = TapPostgres(config=default_config, setup_mapper=False)
assert tap.config.get("replication_slot_name", "tappostgres") == "tappostgres"


@given(st.from_regex(REPLICATION_SLOT_PATTERN))
def test_custom_slot_name(s: str):
"""Test if the custom slot name is used."""
config = {
"sqlalchemy_url": DB_SQLALCHEMY_URL,
"replication_slot_name": s,
}
tap = TapPostgres(config=config, setup_mapper=False)
assert tap.config["replication_slot_name"] == s


def test_multiple_slots(default_config: dict):
"""Simulate using multiple configurations with different slot names."""
config_1 = {**default_config, "replication_slot_name": "slot_1"}
config_2 = {**default_config, "replication_slot_name": "slot_2"}

tap_1 = TapPostgres(config=config_1, setup_mapper=False)
tap_2 = TapPostgres(config=config_2, setup_mapper=False)

assert (
tap_1.config["replication_slot_name"] != tap_2.config["replication_slot_name"]
)
assert tap_1.config["replication_slot_name"] == "slot_1"
assert tap_2.config["replication_slot_name"] == "slot_2"


def test_invalid_slot_name(default_config: dict):
"""Test validation for invalid slot names."""
invalid_slot_name = "invalid slot name!"
invalid_config = {
**default_config,
"replication_slot_name": invalid_slot_name,
}

with pytest.raises(ConfigValidationError, match="does not match") as exc_info:
TapPostgres(config=invalid_config, setup_mapper=False)

errors = exc_info.value.errors
assert len(errors) == 1
assert invalid_slot_name in errors[0]
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ min_version = 4

[testenv]
deps =
hypothesis
pytest
commands =
pytest
Expand Down

0 comments on commit 25bce26

Please sign in to comment.