Skip to content

Commit

Permalink
fix: Don't filter oracle V$LOG by rba
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Apr 7, 2024
1 parent f0bbf05 commit 37dd8b7
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 48 deletions.
9 changes: 5 additions & 4 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,10 @@ mod tests {

env_logger::init();

let replicate_user = "DOZER";
let data_user = "DOZER";
let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com";
let sid = "ORCL";
let replicate_user = "C##DOZER";
let data_user = "CHUBEI";
let host = "localhost";
let sid = "ORCLPDB1";

let mut connector = super::Connector::new(
"oracle".into(),
Expand All @@ -535,6 +535,7 @@ mod tests {
estimate_throughput(iterator);
let checkpoint = handle.join().unwrap().unwrap();

let sid = "ORCLCDB";
let mut connector = super::Connector::new(
"oracle".into(),
replicate_user.into(),
Expand Down
24 changes: 10 additions & 14 deletions dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ pub struct LogManagerContent {
pub seg_owner: Option<String>,
pub table_name: Option<String>,
pub rbasqn: u32,
pub rbablk: u32,
pub rbabyte: u16,
pub sql_redo: Option<String>,
pub csf: u8,
}
Expand Down Expand Up @@ -66,12 +64,11 @@ fn log_reader_loop(
ingestor: &Ingestor,
) {
#[derive(Debug, Clone, Copy)]
struct LastRba {
struct LastScn {
sqn: u32,
blk: u32,
byte: u16,
scn: Scn,
}
let mut last_rba: Option<LastRba> = None;
let mut last_scn: Option<LastScn> = None;

loop {
debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn);
Expand Down Expand Up @@ -99,18 +96,18 @@ fn log_reader_loop(
let log = logs.remove(0);
debug!(target: "oracle_replication",
"Reading log {} ({}) ({}, {}), starting from {:?}",
log.name, log.sequence, log.first_change, log.next_change, last_rba
log.name, log.sequence, log.first_change, log.next_change, last_scn
);

let iterator = {
let last_rba = last_rba.and_then(|last_rba| {
if log.sequence == last_rba.sqn {
Some((last_rba.blk, last_rba.byte))
let last_scn = last_scn.and_then(|last_scn| {
if last_scn.sqn == log.sequence {
Some(last_scn.scn)
} else {
None
}
});
match reader.read(connection, &log.name, last_rba, con_id) {
match reader.read(connection, &log.name, last_scn, con_id) {
Ok(iterator) => iterator,
Err(e) => {
if ingestor.is_closed() {
Expand All @@ -133,10 +130,9 @@ fn log_reader_loop(
break 'replicate_logs;
}
};
last_rba = Some(LastRba {
last_scn = Some(LastScn {
sqn: content.rbasqn,
blk: content.rbablk,
byte: content.rbabyte,
scn: content.scn,
});
if sender.send(content).is_err() {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::env;

use dozer_ingestion_connector::dozer_types::{
chrono::{DateTime, Utc},
log::{error, trace},
Expand Down Expand Up @@ -44,51 +46,65 @@ impl RedoReader for LogMiner {
&self,
connection: &'a Connection,
log_file_name: &str,
last_rba: Option<(u32, u16)>,
last_scn: Option<Scn>,
con_id: Option<u32>,
) -> Result<Self::Iterator<'a>, Error> {
let sql =
"BEGIN DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => :name, OPTIONS => DBMS_LOGMNR.NEW); END;";
trace!(target: "oracle_log_miner", "{}, {}", sql, log_file_name);
connection.execute(sql, &[&str_to_sql!(log_file_name)])?;

let sql = "
if let Some(last_scn) = last_scn {
let start_scn = last_scn + 1;
let sql = "
BEGIN
DBMS_LOGMNR.START_LOGMNR(
STARTSCN => :start_scn,
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
trace!(target: "oracle_log_miner", "{}", sql);
connection.execute(sql, &[])?;
trace!(target: "oracle_log_miner", "{}, {}", sql, start_scn);
connection.execute(sql, &[&start_scn])?;
} else {
let sql = "
BEGIN
DBMS_LOGMNR.START_LOGMNR(
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
trace!(target: "oracle_log_miner", "{}", sql);
connection.execute(sql, &[])?;
};
let stmt = |sql| {
connection
.statement(sql)
.fetch_array_size(self.fetch_batch_size)
.build()
};

let base_sql = "SELECT SCN, TIMESTAMP, XID, PXID, OPERATION_CODE, SEG_OWNER, TABLE_NAME, RBASQN, RBABLK, RBABYTE, SQL_REDO, CSF FROM V$LOGMNR_CONTENTS";
let rba_filter = "(RBABLK > :last_blk OR (RBABLK = :last_blk AND RBABYTE > :last_byte))";
let base_sql = "SELECT SCN, TIMESTAMP, XID, PXID, OPERATION_CODE, SEG_OWNER, TABLE_NAME, RBASQN, SQL_REDO, CSF FROM V$LOGMNR_CONTENTS";
let operation_code_filter = env::var("DOZER_ORACLE_LOG_MINER_OPERATION_CODE_FILTER").ok();
let con_id_filter = "SRC_CON_ID = :con_id";
let started = std::time::Instant::now();
let result_set = match (last_rba, con_id) {
(Some((last_blk, last_byte)), Some(con_id)) => {
let sql = format!("{} WHERE {} AND {}", base_sql, rba_filter, con_id_filter);
trace!(target: "oracle_log_miner", "{}, {}, {}, {}", sql, last_blk, last_byte, con_id);
stmt(&sql)?.into_result_set_named(&[
("last_blk", &last_blk),
("last_byte", &last_byte),
("con_id", &con_id),
])
let result_set = match (operation_code_filter, con_id) {
(Some(operation_code_filter), Some(con_id)) => {
let sql = format!(
"{} WHERE {} AND {}",
base_sql, operation_code_filter, con_id_filter
);
trace!(target: "oracle_log_miner", "{}, {}", sql, con_id);
stmt(&sql)?.into_result_set_named(&[("con_id", &con_id)])
}
(Some((last_blk, last_byte)), None) => {
let sql = format!("{} WHERE {}", base_sql, rba_filter);
trace!(target: "oracle_log_miner", "{}, {}, {}", sql, last_blk, last_byte);
stmt(&sql)?
.into_result_set_named(&[("last_blk", &last_blk), ("last_byte", &last_byte)])
(Some(operation_code_filter), None) => {
let sql = format!("{} WHERE {}", base_sql, operation_code_filter);
trace!(target: "oracle_log_miner", "{}", sql);
stmt(&sql)?.into_result_set(&[])
}
(None, Some(con_id)) => {
let sql = format!("{} WHERE {}", base_sql, con_id_filter);
Expand Down Expand Up @@ -121,8 +137,6 @@ impl RowValue for LogManagerContent {
seg_owner,
table_name,
rbasqn,
rbablk,
rbabyte,
sql_redo,
csf,
) = <(
Expand All @@ -134,8 +148,6 @@ impl RowValue for LogManagerContent {
Option<String>,
Option<String>,
u32,
u32,
u16,
Option<String>,
u8,
) as RowValue>::get(row)?;
Expand All @@ -148,8 +160,6 @@ impl RowValue for LogManagerContent {
seg_owner,
table_name,
rbasqn,
rbablk,
rbabyte,
sql_redo,
csf,
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use oracle::Connection;

use crate::connector::Error;
use crate::connector::{Error, Scn};

/// Given a log file name, a redo reader emits `LogManagerContent` rows
pub trait RedoReader {
type Iterator<'a>: Iterator<Item = Result<LogManagerContent, Error>>;

/// Reads the `LogManagerContent` rows that have:
///
/// - scn >= start_scn
/// - rba > last_rba.0 || (rba == last_rba.0 && rbabyte > last_rba.1)
/// - scn > last_scn
fn read<'a>(
&self,
connection: &'a Connection,
log_file_name: &str,
last_rba: Option<(u32, u16)>,
last_scn: Option<Scn>,
con_id: Option<u32>,
) -> Result<Self::Iterator<'a>, Error>;
}
Expand Down

0 comments on commit 37dd8b7

Please sign in to comment.