Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log based replication #249

Merged
merged 20 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,63 @@ To connect through SSH, you will need to determine the following pieces of infor
- The private key you use for authentication with the bastion server, provided in the `ssh.private_key` configuration option. If your private key is protected by a password (alternatively called a "private key passphrase"), provide it in the `ssh.private_key_password` configuration option. If your private key doesn't have a password, you can safely leave this field blank.

After everything has been configured, be sure to indicate your use of an ssh tunnel to the tap by configuring the `ssh.enable` configuration option to be `True`. Then, you should be able to connect to your privately accessible Postgres database through the bastion server.

## Log-Based Replication
visch marked this conversation as resolved.
Show resolved Hide resolved

Log-based replication is an alternative to full-table and incremental syncs and syncs all changes tot he database, including deletes. This feature is built based on [postgres replication slots](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS).

### Negatives of Log Based Replication

visch marked this conversation as resolved.
Show resolved Hide resolved
1. Managing replication slots - Log-based replication has to be set up and maintained on the database. This tap attempts to abstract away as much complexity as possible, but there's still potentially manual effort needed
2. Log Files - When a replication slot is setup the file that holds these logs will continue to grow until consumed, this can cause issues if the tap doesn't ingest these quickly enough due to outages, etc.

visch marked this conversation as resolved.
Show resolved Hide resolved
If and when someone finds more please add them to this list!

visch marked this conversation as resolved.
Show resolved Hide resolved
### Implementation Details
Log-based replication will modify the schemas output by the tap. Specifically, all fields will be made nullable and non-required. The reason for this is that when the tap sends a message indicating that a record has been deleted, that message will leave all fields for that record (except primary keys) as null. The stream's schema must be capable of accomodating these messages, even if a source field in the database is not nullable. As a result, log-based schemas will have all fields nullable.

Note that changing what streams are selected after already beginning log-based replication can have unexpected consequences. To ensure consistent output, it is best to keep selected streams the same across invocations of the tap.

### How to Set Up Log-Based Replication

1. Ensure you are using PostgresSQL 9.4 or higher.
1. Need to access the master postgres instance
1. Install the wal2json plugin for your database. Example instructions are given below for a Postgres 15.0 database running on Ubuntu 22.04. For more information, or for alternative versions/operating systems, refer to the [wal2json documentation](https://github.com/eulerto/wal2json)
visch marked this conversation as resolved.
Show resolved Hide resolved
- Update and upgrade apt if necessary.
```bash
sudo apt update
sudo apt upgrade -y
```
- Prepare by making prerequisite installations.
```bash
sudo apt install curl ca-certificates
sudo install -d /usr/share/postgresql-common/pgdg
```
- Import the repository keys for the Postgres Apt repository
```bash
sudo curl -o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc --fail https://www.postgresql.org/media/keys/ACCC4CF8.asc
```
- Create the pgdg.list file.
```bash
sudo sh -c 'echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] https://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
```
- Use the Postgres Apt repository to install wal2json
```bash
sudo apt update
sudo apt-get install postgresql-server-dev-15
export PATH=/usr/lib/postgresql/15/bin:$PATH
sudo apt-get install postgresql-15-wal2json
```
1. Configure your database with wal2json enabled.
- Edit your `postgresql.conf` configuration file so the following parameters are appropriately set.
```
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
```
- Restart PostgresSQL
- Create a replication slot for tap-postgres.
```sql
SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json');
```
1. Ensure your configuration for tap-postgres specifies host, port, user, password, and database manually, without relying on an sqlalchemy url.
283 changes: 282 additions & 1 deletion tap_postgres/client.py
visch marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type, Union
import json
import select
import typing
from functools import cached_property
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Type, Union

import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy
from psycopg2 import extras
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.helpers._state import increment_state
from singer_sdk.helpers._typing import TypeConformanceLevel
from sqlalchemy import nullsfirst
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -219,6 +227,85 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)

def discover_catalog_entry(
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool, # noqa: FBT001
) -> CatalogEntry:
"""Override to manually specify a replication key for LOG_BASED replication."""
# Initialize unique stream name
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
delimiter="-",
)

# Detect key properties
possible_primary_keys: list[list[str]] = []
pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
if pk_def and "constrained_columns" in pk_def:
possible_primary_keys.append(pk_def["constrained_columns"])

possible_primary_keys.extend(
index_def["column_names"]
for index_def in inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)

# Initialize available replication methods
replication_method = self.config["replication_method"]

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(
typing.cast(sqlalchemy.types.TypeEngine, column_def["type"]),
)
table_schema.append(
Copy link
Member

@visch visch Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen in the Log Based Stream class, not here.

th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=False
if replication_method == "LOG_BASED"
else not is_nullable,
),
)
schema = table_schema.to_dict()

replication_key = None
if replication_method == "LOG_BASED":
visch marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen at discovery, this should happen after discovery in the LogBased Stream Class

replication_key = "_sdc_lsn"

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=key_properties,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=[replication_key] if replication_key else None,
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=replication_key,
)


class PostgresStream(SQLStream):
"""Stream class for Postgres streams."""
Expand Down Expand Up @@ -275,3 +362,197 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
with self.connector._connect() as con:
for row in con.execute(query):
yield dict(row)


class PostgresLogBasedStream(SQLStream):
"""Stream class for Postgres log-based streams."""

connector_class = PostgresConnector

# JSONB Objects won't be selected without type_confomance_level to ROOT_ONLY
visch marked this conversation as resolved.
Show resolved Hide resolved
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

replication_key = "_sdc_lsn"

@property
def config(self) -> Mapping[str, Any]:
visch marked this conversation as resolved.
Show resolved Hide resolved
"""Return a read-only config dictionary."""
return MappingProxyType(self._config)

@cached_property
def schema(self) -> dict:
"""Override schema for log-based replication adding _sdc columns."""
schema_dict = typing.cast(dict, self._singer_catalog_entry.schema.to_dict())
schema_dict["properties"].update({"_sdc_deleted_at": {"type": ["string"]}})
visch marked this conversation as resolved.
Show resolved Hide resolved
schema_dict["properties"].update({"_sdc_lsn": {"type": ["integer"]}})
return schema_dict

def _increment_stream_state(
visch marked this conversation as resolved.
Show resolved Hide resolved
self,
latest_record: dict[str, Any],
*,
context: dict | None = None,
) -> None:
"""Update state of stream or partition with data from the provided record.

The default implementation does not advance any bookmarks unless
`self.replication_method == 'INCREMENTAL'`. For us, `self.replication_method ==
'LOG_BASED'`, so an override is required.
"""
# This also creates a state entry if one does not yet exist:
state_dict = self.get_context_state(context)

# Advance state bookmark values if applicable
if latest_record: # This is the only line that has been overridden.
if not self.replication_key:
msg = (
f"Could not detect replication key for '{self.name}' "
f"stream(replication method={self.replication_method})"
)
raise ValueError(msg)
treat_as_sorted = self.is_sorted
if not treat_as_sorted and self.state_partitioning_keys is not None:
# Streams with custom state partitioning are not resumable.
treat_as_sorted = False
increment_state(
state_dict,
replication_key=self.replication_key,
latest_record=latest_record,
is_sorted=treat_as_sorted,
check_sorted=self.check_sorted,
)

def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects."""
status_interval = 5.0 # if no records in 5 seconds the tap can exit
start_lsn = self.get_starting_replication_key_value(context=context)
if start_lsn is None:
start_lsn = 0
logical_replication_connection = self.logical_replication_connection()
logical_replication_cursor = logical_replication_connection.cursor()

# Flush logs from the previous sync. send_feedback() will only flush LSNs before
# the value of flush_lsn, not including the value of flush_lsn, so this is safe
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

logical_replication_cursor.start_replication(
slot_name="tappostgres",
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
options={
"format-version": 2,
"include-transaction": False,
"add-tables": self.fully_qualified_name,
},
)

# Using scaffolding layout from:
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor
while True:
visch marked this conversation as resolved.
Show resolved Hide resolved
message = logical_replication_cursor.read_message()
if message:
row = self.consume(message)
if row:
yield row
else:
timeout = (
status_interval
- (
datetime.datetime.now()
visch marked this conversation as resolved.
Show resolved Hide resolved
- logical_replication_cursor.feedback_timestamp
).total_seconds()
)
try:
# If the timeout has passed and the cursor still has no new
# messages, the sync has completed.
if (
select.select(
[logical_replication_cursor], [], [], max(0, timeout)
visch marked this conversation as resolved.
Show resolved Hide resolved
)[0]
== []
):
break
except InterruptedError:
pass

logical_replication_cursor.close()
logical_replication_connection.close()

def consume(self, message) -> dict | None:
"""Ingest WAL message."""
try:
message_payload = json.loads(message.payload)
except json.JSONDecodeError:
self.logger.warning(
visch marked this conversation as resolved.
Show resolved Hide resolved
"A message payload of %s could not be converted to JSON",
message.payload,
)
return

row = {}

upsert_actions = {"I", "U"}
delete_actions = {"D"}
truncate_actions = {"T"}
transaction_actions = {"B", "C"}

if message_payload["action"] in upsert_actions:
for column in message_payload["columns"]:
row.update({column["name"]: column["value"]})
row.update({"_sdc_deleted_at": None})
row.update({"_sdc_lsn": message.data_start})
elif message_payload["action"] in delete_actions:
for column in message_payload["identity"]:
row.update({column["name"]: column["value"]})
row.update(
{
"_sdc_deleted_at": datetime.datetime.utcnow().strftime(
visch marked this conversation as resolved.
Show resolved Hide resolved
r"%Y-%m-%dT%H:%M:%SZ"
)
}
)
row.update({"_sdc_lsn": message.data_start})
elif message_payload["action"] in truncate_actions:
self.logger.debug(
(
"A message payload of %s (corresponding to a truncate action) "
"could not be processed."
),
message.payload,
)
elif message_payload["action"] in transaction_actions:
self.logger.debug(
(
"A message payload of %s (corresponding to a transaction beginning "
"or commit) could not be processed."
),
message.payload,
)
else:
raise RuntimeError(
(
"A message payload of %s (corresponding to an unknown action type) "
"could not be processed."
),
message.payload,
)

return row

def logical_replication_connection(self):
"""A logical replication connection to the database.

Uses a direct psycopg2 implementation rather than through sqlalchemy.
"""
connection_string = (
visch marked this conversation as resolved.
Show resolved Hide resolved
f"dbname={self.config['database']} user={self.config['user']} password="
f"{self.config['password']} host={self.config['host']} port="
f"{self.config['port']}"
)
return psycopg2.connect(
connection_string,
application_name="tap_postgres",
connection_factory=extras.LogicalReplicationConnection,
)
Loading
Loading