Skip to content

Commit

Permalink
fmt + fix warn
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 25, 2024
1 parent b593ea2 commit 3f2696f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 55 deletions.
23 changes: 18 additions & 5 deletions src/connector/src/connector_common/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::collections::HashMap;
use std::fmt;

use tokio_postgres::types::Kind as PgKind;

use anyhow::anyhow;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres_openssl::MakeTlsConnector;
Expand All @@ -29,11 +27,11 @@ use serde_derive::Deserialize;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use sqlx::PgPool;
use thiserror_ext::AsReport;
use tokio_postgres::types::Kind as PgKind;
use tokio_postgres::{Client as PgClient, NoTls};

use super::maybe_tls_connector::MaybeMakeTlsConnector;
use crate::error::ConnectorResult;
use crate::source::cdc::external::ExternalTableConfig;

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -236,14 +234,29 @@ impl PostgresExternalTable {
SeaType::Array(_) => bail!("nested array type is not supported"),
SeaType::Unknown(name) => {
// Treat as enum type
Ok(PgType::new(name.clone(), 0, PgKind::Array(PgType::new(name.clone(), 0, PgKind::Enum(vec![]), "".into())), "".into()))
Ok(PgType::new(
name.clone(),
0,
PgKind::Array(PgType::new(
name.clone(),
0,
PgKind::Enum(vec![]),
"".into(),
)),
"".into(),
))
}
_ => bail!("unsupported array type: {:?}", t),
}
}
SeaType::Unknown(name) => {
// Treat as enum type
Ok(PgType::new(name.clone(), 0, PgKind::Enum(vec![]), "".into()))
Ok(PgType::new(
name.clone(),
0,
PgKind::Enum(vec![]),
"".into(),
))
}
_ => bail!("unsupported type: {:?}", discovered_type),
}
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/scalar_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::str::FromStr;
use anyhow::anyhow;
use bytes::BytesMut;
use pg_bigdecimal::PgNumeric;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Decimal, Int256, ListValue, ScalarImpl, ScalarRefImpl};
use thiserror_ext::AsReport;
use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type};
Expand Down
41 changes: 4 additions & 37 deletions src/connector/src/sink/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::array::data_chunk_iter::RowRefIter;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use simd_json::prelude::ArrayTrait;
use thiserror_ext::AsReport;
use tokio_postgres::Statement;
use with_options::WithOptions;

use super::{
SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Expand All @@ -49,7 +45,10 @@ macro_rules! rw_row_to_pg_values {
let ty = &$statement.params()[i];
match ScalarAdapter::from_scalar(d, ty) {
Ok(scalar) => Some(scalar),
Err(e) => None,
Err(e) => {
tracing::error!(error=%e.as_report(), scalar=?d, "Failed to convert scalar to pg value");
None
}
}
})
})
Expand Down Expand Up @@ -411,7 +410,6 @@ impl PostgresSinkWriter {
let mut unmatched_update_insert = 0;
for chunk in self.buffer.drain() {
for (op, row) in chunk.rows() {
let mut expect_update_delete = false;
match op {
Op::Insert => {
self.client
Expand Down Expand Up @@ -491,37 +489,6 @@ impl SinkWriter for PostgresSinkWriter {
}
}

fn data_type_not_supported(data_type_name: &str) -> SinkError {
SinkError::Postgres(anyhow!(format!(
"{data_type_name} is not supported in SQL Server"
)))
}

fn check_data_type_compatibility(data_type: DataType) -> Result<()> {
match data_type {
DataType::Boolean
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
| DataType::Decimal
| DataType::Date
| DataType::Varchar
| DataType::Time
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Jsonb
| DataType::Interval
| DataType::Bytea => Ok(()),
DataType::Struct(_) => Err(data_type_not_supported("Struct")),
DataType::List(_) => Err(data_type_not_supported("List")),
DataType::Serial => Err(data_type_not_supported("Serial")),
DataType::Int256 => Err(data_type_not_supported("Int256")),
DataType::Map(_) => Err(data_type_not_supported("Map")),
}
}

fn create_insert_sql(schema: &Schema, table_name: &str) -> String {
let columns: String = schema
.fields()
Expand Down
16 changes: 4 additions & 12 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,26 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::HashMap;

use anyhow::{anyhow, Context};
use anyhow::Context;
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::catalog::Schema;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::postgres::def::{ColumnType, TableInfo};
use sea_schema::postgres::discovery::SchemaDiscovery;
use serde_derive::{Deserialize, Serialize};
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use sqlx::PgPool;
use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;

use crate::connector_common::postgres::type_to_rw_type;
use crate::connector_common::{create_pg_client, PostgresExternalTable};
use crate::connector_common::create_pg_client;
#[cfg(not(madsim))]
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
use crate::parser::scalar_adapter::ScalarAdapter;
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName, SslMode,
SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down

0 comments on commit 3f2696f

Please sign in to comment.