Skip to content

Commit

Permalink
Move timestamp related tests out of context.rs and into sql integrati…
Browse files Browse the repository at this point in the history
…on test (#1696)

* Move some tests out of context.rs and into sql

* Move support test out of context.rs and into sql tests

* Fixup tests and make them compile
  • Loading branch information
alamb authored Jan 28, 2022
1 parent ed1de63 commit ab145c8
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 258 deletions.
165 changes: 0 additions & 165 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2265,121 +2265,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_sum() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1).await?;
ctx.register_table("t", test::table_with_timestamps())
.unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t",
)
.await
.unwrap_err();

assert_eq!(results.to_string(), "Error during planning: The function Sum does not support inputs of type Timestamp(Nanosecond, None).");

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_count() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1).await?;
ctx.register_table("t", test::table_with_timestamps())
.unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT count(nanos), count(micros), count(millis), count(secs) FROM t",
)
.await
.unwrap();

let expected = vec![
"+----------------+-----------------+-----------------+---------------+",
"| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |",
"+----------------+-----------------+-----------------+---------------+",
"| 3 | 3 | 3 | 3 |",
"+----------------+-----------------+-----------------+---------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_min() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1).await?;
ctx.register_table("t", test::table_with_timestamps())
.unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT min(nanos), min(micros), min(millis), min(secs) FROM t",
)
.await
.unwrap();

let expected = vec![
"+----------------------------+----------------------------+-------------------------+---------------------+",
"| MIN(t.nanos) | MIN(t.micros) | MIN(t.millis) | MIN(t.secs) |",
"+----------------------------+----------------------------+-------------------------+---------------------+",
"| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |",
"+----------------------------+----------------------------+-------------------------+---------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_max() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1).await?;
ctx.register_table("t", test::table_with_timestamps())
.unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT max(nanos), max(micros), max(millis), max(secs) FROM t",
)
.await
.unwrap();

let expected = vec![
"+-------------------------+-------------------------+-------------------------+---------------------+",
"| MAX(t.nanos) | MAX(t.micros) | MAX(t.millis) | MAX(t.secs) |",
"+-------------------------+-------------------------+-------------------------+---------------------+",
"| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 |",
"+-------------------------+-------------------------+-------------------------+---------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_avg() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1).await?;
ctx.register_table("t", test::table_with_timestamps())
.unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t",
)
.await
.unwrap_err();

assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None).");
Ok(())
}

#[tokio::test]
async fn aggregate_avg_add() -> Result<()> {
let results = execute(
Expand Down Expand Up @@ -2418,56 +2303,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn join_timestamp() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, 1).await?;
ctx.register_table("t", test::table_with_timestamps())
.unwrap();

let expected = vec![
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
"| nanos | micros | millis | secs | name | nanos | micros | millis | secs | name |",
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
"| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 |",
"| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 |",
"| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 |",
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
];

let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t) as t2 \
ON t1.nanos = t2.nanos",
)
.await
.unwrap();
assert_batches_sorted_eq!(expected, &results);

let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t) as t2 \
ON t1.micros = t2.micros",
)
.await
.unwrap();
assert_batches_sorted_eq!(expected, &results);

let results = plan_and_collect(
&mut ctx,
"SELECT * FROM t as t1 \
JOIN (SELECT * FROM t) as t2 \
ON t1.millis = t2.millis",
)
.await
.unwrap();
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn count_basic() -> Result<()> {
let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?;
Expand Down
92 changes: 1 addition & 91 deletions datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ use crate::datasource::{MemTable, PartitionedFile, TableProvider};
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use array::{
Array, ArrayRef, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use array::{Array, ArrayRef};
use arrow::array::{self, DecimalBuilder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -185,14 +182,6 @@ pub fn make_partition(sz: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![arr]).unwrap()
}

/// Return a new table provider containing all of the supported timestamp types
pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
let batch = make_timestamps();
let schema = batch.schema();
let partitions = vec![vec![batch]];
Arc::new(MemTable::try_new(schema, partitions).unwrap())
}

/// Return a new table which provide this decimal column
pub fn table_with_decimal() -> Arc<dyn TableProvider> {
let batch_decimal = make_decimal();
Expand All @@ -214,85 +203,6 @@ fn make_decimal() -> RecordBatch {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
}

/// Return record batch with all of the supported timestamp types
/// values
///
/// Columns are named:
/// "nanos" --> TimestampNanosecondArray
/// "micros" --> TimestampMicrosecondArray
/// "millis" --> TimestampMillisecondArray
/// "secs" --> TimestampSecondArray
/// "names" --> StringArray
pub fn make_timestamps() -> RecordBatch {
let ts_strings = vec![
Some("2018-11-13T17:11:10.011375885995"),
Some("2011-12-13T11:13:10.12345"),
None,
Some("2021-1-1T05:11:10.432"),
];

let ts_nanos = ts_strings
.into_iter()
.map(|t| {
t.map(|t| {
t.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_nanos()
})
})
.collect::<Vec<_>>();

let ts_micros = ts_nanos
.iter()
.map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
.collect::<Vec<_>>();

let ts_millis = ts_nanos
.iter()
.map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
.collect::<Vec<_>>();

let ts_secs = ts_nanos
.iter()
.map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
.collect::<Vec<_>>();

let names = ts_nanos
.iter()
.enumerate()
.map(|(i, _)| format!("Row {}", i))
.collect::<Vec<_>>();

let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None);

let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let arr_names = StringArray::from(names);

let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

RecordBatch::try_new(
schema,
vec![
Arc::new(arr_nanos),
Arc::new(arr_micros),
Arc::new(arr_millis),
Arc::new(arr_secs),
Arc::new(arr_names),
],
)
.unwrap()
}

/// Asserts that given future is pending.
pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
let waker = futures::task::noop_waker();
Expand Down
102 changes: 102 additions & 0 deletions datafusion/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,105 @@ async fn csv_query_array_agg_distinct() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_sum() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("t", table_with_timestamps()).unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t",
)
.await
.unwrap_err();

assert_eq!(results.to_string(), "Error during planning: The function Sum does not support inputs of type Timestamp(Nanosecond, None).");

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_count() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("t", table_with_timestamps()).unwrap();

let results = execute_to_batches(
&mut ctx,
"SELECT count(nanos), count(micros), count(millis), count(secs) FROM t",
)
.await;

let expected = vec![
"+----------------+-----------------+-----------------+---------------+",
"| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |",
"+----------------+-----------------+-----------------+---------------+",
"| 3 | 3 | 3 | 3 |",
"+----------------+-----------------+-----------------+---------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_min() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("t", table_with_timestamps()).unwrap();

let results = execute_to_batches(
&mut ctx,
"SELECT min(nanos), min(micros), min(millis), min(secs) FROM t",
)
.await;

let expected = vec![
"+----------------------------+----------------------------+-------------------------+---------------------+",
"| MIN(t.nanos) | MIN(t.micros) | MIN(t.millis) | MIN(t.secs) |",
"+----------------------------+----------------------------+-------------------------+---------------------+",
"| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |",
"+----------------------------+----------------------------+-------------------------+---------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_max() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("t", table_with_timestamps()).unwrap();

let results = execute_to_batches(
&mut ctx,
"SELECT max(nanos), max(micros), max(millis), max(secs) FROM t",
)
.await;

let expected = vec![
"+-------------------------+-------------------------+-------------------------+---------------------+",
"| MAX(t.nanos) | MAX(t.micros) | MAX(t.millis) | MAX(t.secs) |",
"+-------------------------+-------------------------+-------------------------+---------------------+",
"| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 |",
"+-------------------------+-------------------------+-------------------------+---------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_timestamps_avg() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("t", table_with_timestamps()).unwrap();

let results = plan_and_collect(
&mut ctx,
"SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t",
)
.await
.unwrap_err();

assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None).");
Ok(())
}
Loading

0 comments on commit ab145c8

Please sign in to comment.