Skip to content

Commit

Permalink
support to_timestamp with optional chrono formats (#8886)
Browse files Browse the repository at this point in the history
* Support to_timestamp with chrono formatting #5398

* Updated user guide's to_timestamp to include chrono formatting information #5398

* Minor comment update.

* Small documentation updates for to_timestamp functions.

* Cargo fmt and clippy improvements.

* Switched to assert and unwrap_err based on feedback

* Fixed assert, code compiles and runs as expected now.

* Fix fmt (again).

* Add additional to_timestamp tests covering usage with tables with and without valid formats.

* to_timestamp documentation fixes.

* - Changed internal_err! -> exec_err! for unsupported data type errors.
- Extracted out to_timestamp_impl method to reduce code duplication as per PR feedback.
- Extracted out validate_to_timestamp_data_types to reduce code duplication as per PR feedback.
- Added additional tests for argument validation and invalid arguments.
- Removed unnecessary shim function 'string_to_timestamp_nanos_with_format_shim'

* Resolved merge conflict, updated toStringXXX methods to reflect upstream change

* prettier

* Fix clippy

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
Omega359 and alamb authored Jan 20, 2024
1 parent d0c84cc commit e7c0482
Show file tree
Hide file tree
Showing 7 changed files with 1,115 additions and 179 deletions.
109 changes: 109 additions & 0 deletions datafusion-examples/examples/dataframe_to_timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_contains;

/// This example demonstrates how to use the to_timestamp function in the DataFrame API as well as via sql.
#[tokio::main]
async fn main() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"2020-08-09 12:13:29",
"2020-01-02",
])),
Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"08-09-2020 13/42/29",
"09-27-2020 13:42:29-05:30",
])),
],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// use to_timestamp function to convert col 'a' to timestamp type using the default parsing
let df = df.with_column("a", to_timestamp(vec![col("a")]))?;
// use to_timestamp_seconds function to convert col 'b' to timestamp(Seconds) type using a list of chrono formats to try
let df = df.with_column(
"b",
to_timestamp_seconds(vec![
col("b"),
lit("%+"),
lit("%d-%m-%Y %H/%M/%S"),
lit("%m-%d-%Y %H:%M:%S%#z"),
]),
)?;

let df = df.select_columns(&["a", "b"])?;

// print the results
df.show().await?;

// use sql to convert col 'a' to timestamp using the default parsing
let df = ctx.sql("select to_timestamp(a) from t").await?;

// print the results
df.show().await?;

// use sql to convert col 'b' to timestamp using a list of chrono formats to try
let df = ctx.sql("select to_timestamp(b, '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z') from t").await?;

// print the results
df.show().await?;

// use sql to convert a static string to a timestamp using a list of chrono formats to try
let df = ctx.sql("select to_timestamp('01-14-2023 01:01:30+05:30', '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z')").await?;

// print the results
df.show().await?;

// use sql to convert a static string to a timestamp using a non-matching chrono format to try
let result = ctx
.sql("select to_timestamp('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')")
.await?
.collect()
.await;

let expected = "Error parsing timestamp from '01-14-2023 01/01/30' using format '%d-%m-%Y %H:%M:%S': input contains invalid characters";
assert_contains!(result.unwrap_err().to_string(), expected);

Ok(())
}
68 changes: 7 additions & 61 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,67 +1053,13 @@ impl BuiltinScalarFunction {
vec![Exact(vec![Utf8, Int64]), Exact(vec![LargeUtf8, Int64])],
self.volatility(),
),
BuiltinScalarFunction::ToTimestamp => Signature::uniform(
1,
vec![
Int64,
Float64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampMillis => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampMicros => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampNanos => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampSeconds => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestamp
| BuiltinScalarFunction::ToTimestampSeconds
| BuiltinScalarFunction::ToTimestampMillis
| BuiltinScalarFunction::ToTimestampMicros
| BuiltinScalarFunction::ToTimestampNanos => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
Expand Down
25 changes: 13 additions & 12 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,29 +885,30 @@ nary_scalar_expr!(
scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date");
scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision");
scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval");
scalar_expr!(
nary_scalar_expr!(
ToTimestamp,
to_timestamp,
"converts a string and optional formats to a `Timestamp(Nanoseconds, None)`"
);
nary_scalar_expr!(
ToTimestampMillis,
to_timestamp_millis,
date,
"converts a string to a `Timestamp(Milliseconds, None)`"
"converts a string and optional formats to a `Timestamp(Milliseconds, None)`"
);
scalar_expr!(
nary_scalar_expr!(
ToTimestampMicros,
to_timestamp_micros,
date,
"converts a string to a `Timestamp(Microseconds, None)`"
"converts a string and optional formats to a `Timestamp(Microseconds, None)`"
);
scalar_expr!(
nary_scalar_expr!(
ToTimestampNanos,
to_timestamp_nanos,
date,
"converts a string to a `Timestamp(Nanoseconds, None)`"
"converts a string and optional formats to a `Timestamp(Nanoseconds, None)`"
);
scalar_expr!(
nary_scalar_expr!(
ToTimestampSeconds,
to_timestamp_seconds,
date,
"converts a string to a `Timestamp(Seconds, None)`"
"converts a string and optional formats to a `Timestamp(Seconds, None)`"
);
scalar_expr!(
FromUnixtime,
Expand Down
Loading

0 comments on commit e7c0482

Please sign in to comment.