Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sql integration test organization #2333

Merged
merged 2 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,3 +1107,287 @@ async fn aggregate_avg_add() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn case_sensitive_identifiers_aggregates() {
let ctx = SessionContext::new();
ctx.register_table("t", table_with_sequence(1, 1).unwrap())
.unwrap();

let expected = vec![
"+----------+",
"| MAX(t.i) |",
"+----------+",
"| 1 |",
"+----------+",
];

let results = plan_and_collect(&ctx, "SELECT max(i) FROM t")
.await
.unwrap();

assert_batches_sorted_eq!(expected, &results);

let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t")
.await
.unwrap();
assert_batches_sorted_eq!(expected, &results);

// Using double quotes allows specifying the function name with capitalization
let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t")
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Invalid function 'MAX'"
);

let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t")
.await
.unwrap();
assert_batches_sorted_eq!(expected, &results);
}

#[tokio::test]
async fn count_basic() -> Result<()> {
let results =
execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----------------+----------------+",
"| COUNT(test.c1) | COUNT(test.c2) |",
"+----------------+----------------+",
"| 10 | 10 |",
"+----------------+----------------+",
];
assert_batches_sorted_eq!(expected, &results);
Ok(())
}

#[tokio::test]
async fn count_partitioned() -> Result<()> {
let results =
execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+----------------+----------------+",
"| COUNT(test.c1) | COUNT(test.c2) |",
"+----------------+----------------+",
"| 40 | 40 |",
"+----------------+----------------+",
];
assert_batches_sorted_eq!(expected, &results);
Ok(())
}

#[tokio::test]
async fn count_aggregated() -> Result<()> {
let results =
execute_with_partition("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;

let expected = vec![
"+----+----------------+",
"| c1 | COUNT(test.c2) |",
"+----+----------------+",
"| 0 | 10 |",
"| 1 | 10 |",
"| 2 | 10 |",
"| 3 | 10 |",
"+----+----------------+",
];
assert_batches_sorted_eq!(expected, &results);
Ok(())
}

#[tokio::test]
async fn simple_avg() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

let batch1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
)?;
let batch2 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
)?;

let ctx = SessionContext::new();

let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
ctx.register_table("t", Arc::new(provider))?;

let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?;

let batch = &result[0];
assert_eq!(1, batch.num_columns());
assert_eq!(1, batch.num_rows());

let values = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.expect("failed to cast version");
assert_eq!(values.len(), 1);
// avg(1,2,3,4,5) = 3.0
assert_eq!(values.value(0), 3.0_f64);
Ok(())
}

#[tokio::test]
async fn query_count_distinct() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));

let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![
Some(0),
Some(1),
None,
Some(3),
Some(3),
]))],
)?;

let table = MemTable::try_new(schema, vec![vec![data]])?;

let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;
let sql = "SELECT COUNT(DISTINCT c1) FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------+",
"| COUNT(DISTINCT test.c1) |",
"+-------------------------+",
"| 3 |",
"+-------------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

async fn run_count_distinct_integers_aggregated_scenario(
partitions: Vec<Vec<(&str, u64)>>,
) -> Result<Vec<RecordBatch>> {
let tmp_dir = TempDir::new()?;
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![
Field::new("c_group", DataType::Utf8, false),
Field::new("c_int8", DataType::Int8, false),
Field::new("c_int16", DataType::Int16, false),
Field::new("c_int32", DataType::Int32, false),
Field::new("c_int64", DataType::Int64, false),
Field::new("c_uint8", DataType::UInt8, false),
Field::new("c_uint16", DataType::UInt16, false),
Field::new("c_uint32", DataType::UInt32, false),
Field::new("c_uint64", DataType::UInt64, false),
]));

for (i, partition) in partitions.iter().enumerate() {
let filename = format!("partition-{}.csv", i);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;
for row in partition {
let row_str = format!(
"{},{}\n",
row.0,
// Populate values for each of the integer fields in the
// schema.
(0..8)
.map(|_| { row.1.to_string() })
.collect::<Vec<_>>()
.join(","),
);
file.write_all(row_str.as_bytes())?;
}
}
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema).has_header(false),
)
.await?;

let results = plan_and_collect(
&ctx,
"
SELECT
c_group,
COUNT(c_uint64),
COUNT(DISTINCT c_int8),
COUNT(DISTINCT c_int16),
COUNT(DISTINCT c_int32),
COUNT(DISTINCT c_int64),
COUNT(DISTINCT c_uint8),
COUNT(DISTINCT c_uint16),
COUNT(DISTINCT c_uint32),
COUNT(DISTINCT c_uint64)
FROM test
GROUP BY c_group
",
)
.await?;

Ok(results)
}

#[tokio::test]
async fn count_distinct_integers_aggregated_single_partition() -> Result<()> {
let partitions = vec![
// The first member of each tuple will be the value for the
// `c_group` column, and the second member will be the value for
// each of the int/uint fields.
vec![
("a", 1),
("a", 1),
("a", 2),
("b", 9),
("c", 9),
("c", 10),
("c", 9),
],
];

let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;

let expected = vec![
"+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
"| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
"+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
"| a | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |",
"| b | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
"| c | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |",
"+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> {
let partitions = vec![
// The first member of each tuple will be the value for the
// `c_group` column, and the second member will be the value for
// each of the int/uint fields.
vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)],
vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)],
];

let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;

let expected = vec![
"+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
"| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
"+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
"| a | 5 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 |",
"| b | 5 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 |",
"| c | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
"+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}
Loading