Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubesql): Support to_timestamp(epoch) pg function, fix #8901 #8902

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions rust/cubesql/cubesql/src/compile/engine/udf/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,46 @@ fn postgres_datetime_format_to_iso(format: String) -> String {
.replace(".MS", "%.3f")
}

pub fn create_epoch_to_timestamp_udf() -> ScalarUDF {
let fun: Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync> =
Arc::new(move |args: &[ColumnarValue]| match args {
[ColumnarValue::Scalar(ScalarValue::Int64(Some(value)))] => Ok(ColumnarValue::Scalar(
ScalarValue::TimestampNanosecond(Some(value.clone() * 1_000_000_000), None),
)),
[ColumnarValue::Scalar(ScalarValue::Float64(Some(value)))] => {
let seconds = value.round() as i64;
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(seconds * 1_000_000_000),
None,
)))
}
[ColumnarValue::Scalar(ScalarValue::Decimal128(Some(value), _, _))] => {
let seconds = (*value) as i64;
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(seconds * 1_000_000_000),
None,
)))
}
_ => Err(DataFusionError::Execution(
"Unsupported arguments for to_timestamp".to_string(),
)),
});

let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None))));

let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64]),
TypeSignature::Exact(vec![DataType::Float64]),
TypeSignature::Exact(vec![DataType::Decimal(10, 0)]),
],
Volatility::Immutable,
);

ScalarUDF::new("epoch_to_timestamp", &signature, &return_type, &fun)
}

pub fn create_str_to_date_udf() -> ScalarUDF {
let fun: Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync> =
Arc::new(move |args: &[ColumnarValue]| {
Expand Down
16 changes: 16 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16251,4 +16251,20 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
displayable(physical_plan.as_ref()).indent()
);
}

#[tokio::test]
async fn test_to_timestamp() -> Result<(), CubeError> {
let query = r#"
SELECT to_timestamp(1618449331) AS result
UNION ALL
SELECT to_timestamp('2021-08-31 11:05:10.400000', '%Y-%m-%d %H:%i:%s.%f') AS result
"#;

insta::assert_snapshot!(
"to_timestamp",
execute_query(query.to_string(), DatabaseProtocol::PostgreSQL).await?
);

Ok(())
}
}
1 change: 1 addition & 0 deletions rust/cubesql/cubesql/src/compile/query_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ impl QueryEngine for SqlQueryEngine {
ctx.register_udf(create_dayofyear_udf());
ctx.register_udf(create_date_sub_udf());
ctx.register_udf(create_date_add_udf());
ctx.register_udf(create_epoch_to_timestamp_udf());
ctx.register_udf(create_str_to_date_udf());
ctx.register_udf(create_current_timestamp_udf("current_timestamp"));
ctx.register_udf(create_current_timestamp_udf("localtimestamp"));
Expand Down
1 change: 1 addition & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ impl LogicalPlanAnalysis {
|| &fun.name == "date_sub"
|| &fun.name == "date"
|| &fun.name == "date_to_timestamp"
|| &fun.name == "epoch_to_timestamp"
{
Self::eval_constant_expr(&egraph, &expr)
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
source: cubesql/src/compile/mod.rs
assertion_line: 16263
expression: "execute_query(query.to_string(), DatabaseProtocol::PostgreSQL).await?"
---
+-------------------------+
| result |
+-------------------------+
| 2021-04-15T01:15:31.000 |
| 2021-08-31T11:05:10.400 |
+-------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
source: cubesql/src/compile/test/test_udfs.rs
assertion_line: 307
expression: "execute_query(\"SELECT epoch_to_timestamp(1621123456)\".to_string(),\nDatabaseProtocol::PostgreSQL).await?"
---
+---------------------------------------+
| epoch_to_timestamp(Int64(1621123456)) |
+---------------------------------------+
| 2021-05-16T00:04:16.000 |
+---------------------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
source: cubesql/src/compile/test/test_udfs.rs
assertion_line: 316
expression: "execute_query(\"\n SELECT epoch_to_timestamp(1621123456)\n UNION ALL\n SELECT epoch_to_timestamp(1621123456.789)\n UNION ALL\n SELECT epoch_to_timestamp(cast(1621123456 as numeric(10)))\n \".to_string(),\nDatabaseProtocol::PostgreSQL).await?"
---
+---------------------------------------+
| epoch_to_timestamp(Int64(1621123456)) |
+---------------------------------------+
| 2021-05-16T00:04:16.000 |
| 2021-05-16T00:04:17.000 |
| 2021-05-16T00:04:16.000 |
+---------------------------------------+
30 changes: 30 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,36 @@ async fn test_pg_backend_pid() -> Result<(), CubeError> {
Ok(())
}

#[tokio::test]
async fn test_epoch_to_timestamp() -> Result<(), CubeError> {
insta::assert_snapshot!(
"epoch_to_timestamp_1",
execute_query(
"SELECT epoch_to_timestamp(1621123456)".to_string(),
DatabaseProtocol::PostgreSQL
)
.await?
);

insta::assert_snapshot!(
"epoch_to_timestamp_2",
execute_query(
"
SELECT epoch_to_timestamp(1621123456)
UNION ALL
SELECT epoch_to_timestamp(1621123456.789)
UNION ALL
SELECT epoch_to_timestamp(cast(1621123456 as numeric(10)))
"
.to_string(),
DatabaseProtocol::PostgreSQL
)
.await?
);

Ok(())
}

#[tokio::test]
async fn test_to_char_udf() -> Result<(), CubeError> {
insta::assert_snapshot!(
Expand Down
30 changes: 25 additions & 5 deletions rust/cubesql/cubesql/src/sql/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ impl<'ast> Visitor<'ast, ConnectionError> for RedshiftDatePartReplacer {
}
}

/// Postgres to_timestamp clashes with Datafusion to_timestamp so we replace it with str_to_date
/// Postgres to_timestamp clashes with Datafusion to_timestamp so we replace it with str_to_date/epoch_to_timestamp
#[derive(Debug)]
pub struct ToTimestampReplacer {}

Expand All @@ -940,11 +940,31 @@ impl ToTimestampReplacer {
}

impl<'ast> Visitor<'ast, ConnectionError> for ToTimestampReplacer {
fn visit_identifier(&mut self, identifier: &mut Ident) -> Result<(), ConnectionError> {
if identifier.value.to_lowercase() == "to_timestamp" {
identifier.value = "str_to_date".to_string()
};
fn visit_function(&mut self, fun: &mut Function) -> Result<(), ConnectionError> {
if fun.name.to_string().to_lowercase() == "to_timestamp" {
if fun.args.len() == 1 {
fun.name = ObjectName(vec![Ident {
value: "epoch_to_timestamp".to_string(),
quote_style: None,
}]);
} else {
fun.name = ObjectName(vec![Ident {
value: "str_to_date".to_string(),
quote_style: None,
}]);
}
}

// Continue visiting function arguments
self.visit_function_args(&mut fun.args)?;
if let Some(over) = &mut fun.over {
for res in over.partition_by.iter_mut() {
self.visit_expr(res)?;
}
for order_expr in over.order_by.iter_mut() {
self.visit_expr(&mut order_expr.expr)?;
}
}
Ok(())
}
}
Expand Down