Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres date-time types #2890

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/factor-outbound-pg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = { workspace = true }

[dependencies]
anyhow = { workspace = true }
chrono = "0.4"
native-tls = "0.2"
postgres-native-tls = "0.5"
spin-core = { path = "../core" }
Expand All @@ -14,7 +15,7 @@ spin-factors = { path = "../factors" }
spin-resource-table = { path = "../table" }
spin-world = { path = "../world" }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-postgres = "0.7"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
155 changes: 123 additions & 32 deletions crates/factor-outbound-pg/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use anyhow::{anyhow, Result};
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use spin_world::async_trait;
use spin_world::v2::postgres::{self as v2};
use spin_world::v2::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet};
use spin_world::spin::postgres::postgres::{
self as v3, Column, DbDataType, DbValue, ParameterValue, RowSet,
};
use tokio_postgres::types::Type;
use tokio_postgres::{config::SslMode, types::ToSql, Row};
use tokio_postgres::{Client as TokioClient, NoTls, Socket};
Expand All @@ -18,13 +19,13 @@ pub trait Client {
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<u64, v2::Error>;
) -> Result<u64, v3::Error>;

async fn query(
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<RowSet, v2::Error>;
) -> Result<RowSet, v3::Error>;
}

#[async_trait]
Expand Down Expand Up @@ -54,33 +55,43 @@ impl Client for TokioClient {
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<u64, v2::Error> {
let params: Vec<&(dyn ToSql + Sync)> = params
) -> Result<u64, v3::Error> {
let params = params
.iter()
.map(to_sql_parameter)
.collect::<Result<Vec<_>>>()
.map_err(|e| v2::Error::ValueConversionFailed(format!("{:?}", e)))?;
.map_err(|e| v3::Error::ValueConversionFailed(format!("{:?}", e)))?;

self.execute(&statement, params.as_slice())
let params_refs: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
.collect();

self.execute(&statement, params_refs.as_slice())
.await
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))
}

async fn query(
&self,
statement: String,
params: Vec<ParameterValue>,
) -> Result<RowSet, v2::Error> {
let params: Vec<&(dyn ToSql + Sync)> = params
) -> Result<RowSet, v3::Error> {
let params = params
.iter()
.map(to_sql_parameter)
.collect::<Result<Vec<_>>>()
.map_err(|e| v2::Error::BadParameter(format!("{:?}", e)))?;
.map_err(|e| v3::Error::BadParameter(format!("{:?}", e)))?;

let params_refs: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
.collect();

let results = self
.query(&statement, params.as_slice())
.query(&statement, params_refs.as_slice())
.await
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?;

if results.is_empty() {
return Ok(RowSet {
Expand All @@ -94,7 +105,7 @@ impl Client for TokioClient {
.iter()
.map(convert_row)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?;

Ok(RowSet { columns, rows })
}
Expand All @@ -111,22 +122,43 @@ where
});
}

fn to_sql_parameter(value: &ParameterValue) -> Result<&(dyn ToSql + Sync)> {
fn to_sql_parameter(value: &ParameterValue) -> Result<Box<dyn ToSql + Send + Sync>> {
match value {
ParameterValue::Boolean(v) => Ok(v),
ParameterValue::Int32(v) => Ok(v),
ParameterValue::Int64(v) => Ok(v),
ParameterValue::Int8(v) => Ok(v),
ParameterValue::Int16(v) => Ok(v),
ParameterValue::Floating32(v) => Ok(v),
ParameterValue::Floating64(v) => Ok(v),
ParameterValue::Uint8(_)
| ParameterValue::Uint16(_)
| ParameterValue::Uint32(_)
| ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")),
ParameterValue::Str(v) => Ok(v),
ParameterValue::Binary(v) => Ok(v),
ParameterValue::DbNull => Ok(&PgNull),
ParameterValue::Boolean(v) => Ok(Box::new(*v)),
ParameterValue::Int32(v) => Ok(Box::new(*v)),
ParameterValue::Int64(v) => Ok(Box::new(*v)),
ParameterValue::Int8(v) => Ok(Box::new(*v)),
ParameterValue::Int16(v) => Ok(Box::new(*v)),
ParameterValue::Floating32(v) => Ok(Box::new(*v)),
ParameterValue::Floating64(v) => Ok(Box::new(*v)),
ParameterValue::Str(v) => Ok(Box::new(v.clone())),
ParameterValue::Binary(v) => Ok(Box::new(v.clone())),
ParameterValue::Date((y, mon, d)) => {
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
Ok(Box::new(naive_date))
}
ParameterValue::Time((h, min, s, ns)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we support time zone input? It looks like Postgres allows some standard timezone extensions that a user could provide as an optional parameter.

Alternatively, we can document in the WIT that UTC is assumed.

let naive_time =
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
Ok(Box::new(naive_time))
}
ParameterValue::Datetime((y, mon, d, h, min, s, ns)) => {
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
let naive_time =
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
let dt = chrono::NaiveDateTime::new(naive_date, naive_time);
Ok(Box::new(dt))
}
ParameterValue::Timestamp(v) => {
let ts = chrono::DateTime::<chrono::Utc>::from_timestamp(*v, 0)
.ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?;
Ok(Box::new(ts))
}
ParameterValue::DbNull => Ok(Box::new(PgNull)),
}
}

Expand Down Expand Up @@ -155,22 +187,25 @@ fn convert_data_type(pg_type: &Type) -> DbDataType {
Type::INT4 => DbDataType::Int32,
Type::INT8 => DbDataType::Int64,
Type::TEXT | Type::VARCHAR | Type::BPCHAR => DbDataType::Str,
Type::TIMESTAMP | Type::TIMESTAMPTZ => DbDataType::Timestamp,
Type::DATE => DbDataType::Date,
Type::TIME => DbDataType::Time,
_ => {
tracing::debug!("Couldn't convert Postgres type {} to WIT", pg_type.name(),);
DbDataType::Other
}
}
}

fn convert_row(row: &Row) -> Result<Vec<DbValue>, tokio_postgres::Error> {
fn convert_row(row: &Row) -> anyhow::Result<Vec<DbValue>> {
let mut result = Vec::with_capacity(row.len());
for index in 0..row.len() {
result.push(convert_entry(row, index)?);
}
Ok(result)
}

fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Error> {
fn convert_entry(row: &Row, index: usize) -> anyhow::Result<DbValue> {
let column = &row.columns()[index];
let value = match column.type_() {
&Type::BOOL => {
Expand Down Expand Up @@ -229,6 +264,27 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
None => DbValue::DbNull,
}
}
&Type::TIMESTAMP | &Type::TIMESTAMPTZ => {
let value: Option<chrono::NaiveDateTime> = row.try_get(index)?;
match value {
Some(v) => DbValue::Datetime(tuplify_date_time(v)?),
None => DbValue::DbNull,
}
}
&Type::DATE => {
let value: Option<chrono::NaiveDate> = row.try_get(index)?;
match value {
Some(v) => DbValue::Date(tuplify_date(v)?),
None => DbValue::DbNull,
}
}
&Type::TIME => {
let value: Option<chrono::NaiveTime> = row.try_get(index)?;
match value {
Some(v) => DbValue::Time(tuplify_time(v)?),
None => DbValue::DbNull,
}
}
t => {
tracing::debug!(
"Couldn't convert Postgres type {} in column {}",
Expand All @@ -241,6 +297,41 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
Ok(value)
}

// Functions to convert from the chrono types to the WIT interface tuples
fn tuplify_date_time(
value: chrono::NaiveDateTime,
) -> anyhow::Result<(i32, u8, u8, u8, u8, u8, u32)> {
use chrono::{Datelike, Timelike};
Ok((
value.year(),
value.month().try_into()?,
value.day().try_into()?,
value.hour().try_into()?,
value.minute().try_into()?,
value.second().try_into()?,
value.nanosecond(),
))
}

fn tuplify_date(value: chrono::NaiveDate) -> anyhow::Result<(i32, u8, u8)> {
use chrono::Datelike;
Ok((
value.year(),
value.month().try_into()?,
value.day().try_into()?,
))
}

fn tuplify_time(value: chrono::NaiveTime) -> anyhow::Result<(u8, u8, u8, u32)> {
use chrono::Timelike;
Ok((
value.hour().try_into()?,
value.minute().try_into()?,
value.second().try_into()?,
value.nanosecond(),
))
}

/// Although the Postgres crate converts Rust Option::None to Postgres NULL,
/// it enforces the type of the Option as it does so. (For example, trying to
/// pass an Option::<i32>::None to a VARCHAR column fails conversion.) As we
Expand Down
Loading
Loading