Skip to content

Commit

Permalink
source/pg: proactively validate schemas
Browse files Browse the repository at this point in the history
The PostgreSQL logical replication protocol only notifies us that a
table has changed schema if that table participates in a transaction
*after* the schema change has occured.

This is problematic because until that table participates in a
transaction Materialize will keep advancing the upper frontier of the
table, asserting that everything is fine and potentially revealing
invalid data.

This PR puts an upper bound on this window of invalidity by proactively
re-validating the schema of all tables in an ingestion in a cadence. The
interval is set to 15s by default but configurable through LD.

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Dec 13, 2024
1 parent bd39bef commit 621bcb1
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 103 deletions.
8 changes: 8 additions & 0 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ pub const PG_OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
"Interval to fetch `offset_known`, from `pg_current_wal_lsn`",
);

/// Interval to re-validate the schemas of ingested tables.
pub const PG_SCHEMA_VALIDATION_INTERVAL: Config<Duration> = Config::new(
"pg_schema_validation_interval",
Duration::from_secs(15),
"Interval to re-validate the schemas of ingested tables.",
);

// Networking

/// Whether or not to enforce that external connection addresses are global
Expand Down Expand Up @@ -236,6 +243,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&MYSQL_OFFSET_KNOWN_INTERVAL)
.add(&PG_FETCH_SLOT_RESUME_LSN_INTERVAL)
.add(&PG_OFFSET_KNOWN_INTERVAL)
.add(&PG_SCHEMA_VALIDATION_INTERVAL)
.add(&ENFORCE_EXTERNAL_ADDRESSES)
.add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING)
.add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
Expand Down
102 changes: 82 additions & 20 deletions src/storage/src/source/postgres/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Instant;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use bytes::Bytes;
Expand All @@ -86,10 +87,10 @@ use mz_ore::collections::HashSet;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
use mz_postgres_util::tunnel::PostgresFlavor;
use mz_postgres_util::PostgresError;
use mz_postgres_util::{simple_query_opt, Client};
use mz_repr::{Datum, DatumVec, Diff, Row};
use mz_sql_parser::ast::{display::AstDisplay, Ident};
use mz_ssh_util::tunnel_manager::SshTunnelManager;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::SourceTimestamp;
use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
Expand Down Expand Up @@ -234,14 +235,15 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
&config.config.connection_context.ssh_tunnel_manager,
)
.await?;
let metadata_client = Arc::new(metadata_client);

while let Some(_) = slot_ready_input.next().await {
// Wait for the slot to be created
}
tracing::info!(%id, "ensuring replication slot {slot} exists");
super::ensure_replication_slot(&replication_client, slot).await?;
let slot_metadata = super::fetch_slot_metadata(
&metadata_client,
&*metadata_client,
slot,
mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
.get(config.config.config_set()),
Expand Down Expand Up @@ -366,7 +368,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
let stream_result = raw_stream(
&config,
replication_client,
metadata_client,
Arc::clone(&metadata_client),
&connection.publication_details.slot,
&connection.publication_details.timeline_id,
&connection.publication,
Expand Down Expand Up @@ -417,6 +419,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
// compatible with `columnation`, to Vec<u8> data that is.
let mut col_temp: Vec<Vec<u8>> = vec![];
let mut row_temp = vec![];
let mut last_schema_validation = Instant::now();
while let Some(event) = stream.as_mut().next().await {
use LogicalReplicationMessage::*;
use ReplicationMessage::*;
Expand All @@ -427,10 +430,9 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(

let mut tx = pin!(extract_transaction(
stream.by_ref(),
&*metadata_client,
commit_lsn,
&table_info,
&connection_config,
&config.config.connection_context.ssh_tunnel_manager,
&metrics,
&connection.publication,
&mut errored
Expand Down Expand Up @@ -492,12 +494,75 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
_ => return Err(TransientError::BareTransactionEvent),
},
Ok(PrimaryKeepAlive(keepalive)) => {
trace!(
%id,
"timely-{worker_id} received \
keepalive lsn={}",
trace!( %id,
"timely-{worker_id} received keepalive lsn={}",
keepalive.wal_end()
);
let validation_interval = mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL.get(config.config.config_set());
if last_schema_validation.elapsed() > validation_interval {
trace!(%id, "timely-{worker_id} validating schemas");
let upstream_info = {
match mz_postgres_util::publication_info(&*metadata_client, &connection.publication)
.await
{
Ok(info) => info.into_iter().map(|t| (t.oid, t)).collect(),
// If the replication stream cannot be obtained in a definite way there is
// nothing else to do. These errors are not retractable.
Err(PostgresError::PublicationMissing(publication)) => {
let err = DefiniteError::PublicationDropped(publication);
// If the publication is missing there is nothing else to
// do. These errors are not retractable.
for (oid, outputs) in table_info.iter() {
for output_index in outputs.keys() {
let update = (
(
*oid,
*output_index,
Err(DataflowError::from(err.clone())),
),
data_cap_set[0].time().clone(),
1,
);
data_output.give_fueled(&data_cap_set[0], update).await;
}
}
definite_error_handle.give(
&definite_error_cap_set[0],
ReplicationError::Definite(Rc::new(err)),
);
return Ok(());
}
Err(e) => Err(TransientError::from(e))?,
}
};
for (&oid, outputs) in table_info.iter() {
for (output_index, info) in outputs {
if errored.contains(output_index) {
trace!(%id, "timely-{worker_id} output index {output_index} \
for oid {oid} skipped");
continue;
}
match verify_schema(oid, &info.desc, &upstream_info, &*info.casts) {
Ok(()) => {
trace!(%id, "timely-{worker_id} schema of output \
index {output_index} for oid {oid} valid");
}
Err(err) => {
trace!(%id, "timely-{worker_id} schema of output \
index {output_index} for oid {oid} invalid");
let update = (
(oid, *output_index, Err(err.into())),
data_cap_set[0].time().clone(),
1,
);
data_output.give_fueled(&data_cap_set[0], update).await;
errored.insert(*output_index);
}
}
}
}
last_schema_validation = Instant::now();
}
data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
}
Ok(_) => return Err(TransientError::UnknownReplicationMessage),
Expand Down Expand Up @@ -570,7 +635,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
async fn raw_stream<'a>(
config: &'a RawSourceCreationConfig,
replication_client: Client,
metadata_client: Client,
metadata_client: Arc<Client>,
slot: &'a str,
timeline_id: &'a Option<u64>,
publication: &'a str,
Expand All @@ -597,7 +662,7 @@ async fn raw_stream<'a>(
>,
TransientError,
> {
if let Err(err) = ensure_publication_exists(&metadata_client, publication).await? {
if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
// If the publication gets deleted there is nothing else to do. These errors
// are not retractable.
return Ok(Err(err));
Expand Down Expand Up @@ -628,7 +693,7 @@ async fn raw_stream<'a>(
// Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
// Postgres versions disallow SHOW commands from within replication connection.
// See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
let row = simple_query_opt(&metadata_client, "SHOW wal_sender_timeout;")
let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
.await?
.unwrap();
let wal_sender_timeout = match row.get("wal_sender_timeout") {
Expand Down Expand Up @@ -680,7 +745,7 @@ async fn raw_stream<'a>(
// cannot use the replication client to do that because it's already in CopyBoth mode.
// [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
let slot_metadata = super::fetch_slot_metadata(
&metadata_client,
&*metadata_client,
slot,
mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
.get(config.config.config_set()),
Expand All @@ -704,7 +769,7 @@ async fn raw_stream<'a>(
while !probe_tx.is_closed() {
interval.tick().await;
let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
let probe_or_err = super::fetch_max_lsn(&metadata_client)
let probe_or_err = super::fetch_max_lsn(&*metadata_client)
.await
.map(|lsn| Probe {
probe_ts,
Expand Down Expand Up @@ -800,10 +865,9 @@ async fn raw_stream<'a>(
fn extract_transaction<'a>(
stream: impl AsyncStream<Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>>
+ 'a,
metadata_client: &'a Client,
commit_lsn: MzOffset,
table_info: &'a BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
connection_config: &'a mz_postgres_util::Config,
ssh_tunnel_manager: &'a SshTunnelManager,
metrics: &'a PgSourceMetrics,
publication: &'a str,
errored_outputs: &'a mut HashSet<usize>,
Expand Down Expand Up @@ -919,11 +983,9 @@ fn extract_transaction<'a>(
// to check the current local schema against the current remote schema to
// ensure e.g. we haven't received a schema update with the same terminal
// column name which is actually a different column.
let client = connection_config
.connect("replication schema verification", ssh_tunnel_manager)
.await?;
let upstream_info =
mz_postgres_util::publication_info(&client, publication).await?;
mz_postgres_util::publication_info(metadata_client, publication)
.await?;
let upstream_info = upstream_info.into_iter().map(|t| (t.oid, t)).collect();

for (output_index, output) in valid_outputs {
Expand Down
14 changes: 5 additions & 9 deletions test/pg-cdc-old-syntax/alter-source.td
Original file line number Diff line number Diff line change
Expand Up @@ -288,42 +288,38 @@ true
> SELECT * FROM table_a;
1
2
3

# If you add columns you can re-ingest them
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE table_a ADD COLUMN f2 text;
INSERT INTO table_a VALUES (4, 'four');
INSERT INTO table_a VALUES (3, 'three');

> SELECT * FROM table_a;
1
2
3
4

> DROP SOURCE table_a;
> ALTER SOURCE mz_source ADD SUBSOURCE table_a;

> SELECT * FROM table_a;
1 <null>
2 <null>
3 <null>
4 four
3 three

# If you add a NOT NULL constraint, you can propagate it.
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER TABLE table_a ADD COLUMN f3 int DEFAULT 1 NOT NULL;
INSERT INTO table_a VALUES (5, 'five', 5);
INSERT INTO table_a VALUES (4, 'four', 4);

> DROP SOURCE table_a;
> ALTER SOURCE mz_source ADD SUBSOURCE table_a;

> SELECT * FROM table_a;
1 <null> 1
2 <null> 1
3 <null> 1
4 four 1
5 five 5
3 three 1
4 four 4

? EXPLAIN SELECT * FROM table_a WHERE f3 IS NULL;
Explained Query (fast path):
Expand Down
Loading

0 comments on commit 621bcb1

Please sign in to comment.