Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Apr 8, 2024
1 parent 223cae8 commit 518d128
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 67 deletions.
27 changes: 8 additions & 19 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::{
collections::{HashMap, HashSet},
num::ParseFloatError,
sync::{mpsc::channel, Arc},
time::{Duration, Instant},
sync::Arc,
time::Duration,
};

use dozer_ingestion_connector::{
dozer_types::{
chrono::{self, DateTime},
chrono,
epoch::SourceTime,
log::{debug, error, info},
log::{debug, error},
models::ingestion_types::{IngestionMessage, OracleReplicator, TransactionInfo},
node::OpIdentifier,
rust_decimal::{self, Decimal},
thiserror,
types::{FieldType, Operation, Schema},
rust_decimal, thiserror,
types::{Operation, Schema},
},
Ingestor, SourceSchema, TableIdentifier, TableInfo,
};
Expand All @@ -33,15 +32,15 @@ pub struct Connector {
}

#[derive(Debug, thiserror::Error)]
enum ParseDateError {
pub(crate) enum ParseDateError {
#[error("Invalid date format: {0}")]
Chrono(#[from] chrono::ParseError),
#[error("Invalid oracle format")]
Oracle,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
pub(crate) enum Error {
#[error("oracle error: {0:?}")]
Oracle(#[from] oracle::Error),
#[error("pdb not found: {0}")]
Expand All @@ -66,26 +65,16 @@ pub enum Error {
DeleteFailedToMatch(String),
#[error("update failed to match: {0}")]
UpdateFailedToMatch(String),
#[error("field {0} not found")]
FieldNotFound(String),
#[error("null value for non-nullable field {0}")]
NullValue(String),
#[error("cannot parse float: {0}")]
ParseFloat(#[from] ParseFloatError),
#[error("cannot parse date time from {1}: {0}")]
ParseDateTime(ParseDateError, String),
#[error("got overflow float number {0}")]
FloatOverflow(Decimal),
#[error("got error when parsing uint {0}")]
ParseUIntFailed(String),
#[error("got error when parsing int {0}")]
ParseIntFailed(String),
#[error("type mismatch for {field}, expected {expected:?}, actual {actual:?}")]
TypeMismatch {
field: String,
expected: FieldType,
actual: FieldType,
},
}

/// `oracle`'s `ToSql` implementation for `&str` uses `NVARCHAR2` type, which Oracle expects to be UTF16 encoded by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use crate::connector::{Error, Scn};
use super::{LogManagerContent, RedoReader};

#[derive(Debug, Clone, Copy)]
pub struct LogMiner {
pub(crate) struct LogMiner {
pub fetch_batch_size: u32,
}

#[derive(Debug)]
pub struct LogMinerIter<'a> {
pub(crate) struct LogMinerIter<'a> {
result_set: ResultSet<'a, LogManagerContent>,
connection: &'a Connection,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use oracle::Connection;
use crate::connector::{Error, Scn};

/// Given a log file name, a redo reader emits `LogManagerContent` rows
pub trait RedoReader {
pub(crate) trait RedoReader {
type Iterator<'a>: Iterator<Item = Result<LogManagerContent, Error>>;

/// Reads the `LogManagerContent` rows that have:
Expand All @@ -20,6 +20,6 @@ pub trait RedoReader {

mod log_miner;

pub use log_miner::LogMiner;
pub(crate) use log_miner::LogMiner;

use super::LogManagerContent;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;

use fxhash::FxHashMap;
use memchr::{memchr, memchr3, memchr3_iter, memmem};
use memchr::{memchr, memchr3_iter, memmem};

use super::ParsedRow;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,22 @@
use std::{borrow::Cow, collections::HashMap, str::FromStr};
use std::{borrow::Cow, collections::HashMap};

use dozer_ingestion_connector::dozer_types::{
chrono::{DateTime, Utc},
log::trace,
rust_decimal::Decimal,
types::{Operation, Schema},
};
use fxhash::FxHashMap;

use crate::connector::{
replicate::transaction::{map::map_row, parse::insert::DmlParser},
Error, Scn,
Error,
};

use super::{
aggregate::{OperationKind, RawOperation, Transaction},
ParsedTransaction,
};

#[derive(Debug, Clone)]
pub struct ParsedOperation<'a> {
pub table_index: usize,
pub kind: ParsedOperationKind<'a>,
}

pub type ParsedRow<'a> = Vec<Option<Cow<'a, str>>>;

#[derive(Debug, Clone)]
pub enum ParsedOperationKind<'a> {
Insert(ParsedRow<'a>),
Delete(ParsedRow<'a>),
Update {
old: ParsedRow<'a>,
new: ParsedRow<'a>,
},
}

#[derive(Debug, Clone, PartialEq)]
pub enum ParsedValue {
String(String),
Number(Decimal),
Null,
}

#[derive(Debug, Clone)]
struct TableInfo {
index: usize,
Expand Down Expand Up @@ -166,21 +140,6 @@ impl<'a, I: Iterator<Item = Transaction>> Iterator for Processor<'a, I> {
}
}

impl FromStr for ParsedValue {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.starts_with('\'') {
Ok(ParsedValue::String(s[1..s.len() - 1].to_string()))
} else {
Ok(ParsedValue::Number(
s.parse()
.map_err(|e| Error::NumberToDecimal(e, s.to_owned()))?,
))
}
}
}

mod delete;
mod insert;
mod row;
Expand Down

0 comments on commit 518d128

Please sign in to comment.