Skip to content

Commit

Permalink
extract some more
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Feb 16, 2022
1 parent 6d35f1d commit 464026d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 116 deletions.
117 changes: 2 additions & 115 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,13 +1302,13 @@ mod tests {
logical_plan::{col, create_udf, sum, Expr},
};
use crate::{
datasource::{empty::EmptyTable, MemTable},
datasource::MemTable,
logical_plan::create_udaf,
physical_plan::expressions::AvgAccumulator,
};
use arrow::array::{
Array, ArrayRef, DictionaryArray, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, UInt16Array,
Int32Array, Int64Array, Int8Array, LargeStringArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow::compute::add;
Expand Down Expand Up @@ -3161,65 +3161,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn create_external_table_with_timestamps() {
let mut ctx = ExecutionContext::new();

let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";

let tmp_dir = TempDir::new().unwrap();
let file_path = tmp_dir.path().join("timestamps.csv");

// scope to ensure the file is closed and written
{
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");
}

let sql = format!(
"CREATE EXTERNAL TABLE csv_with_timestamps (
name VARCHAR,
ts TIMESTAMP
)
STORED AS CSV
LOCATION '{}'
",
file_path.to_str().expect("path is utf8")
);

plan_and_collect(&mut ctx, &sql)
.await
.expect("Executing CREATE EXTERNAL TABLE");

let sql = "SELECT * from csv_with_timestamps";
let result = plan_and_collect(&mut ctx, sql).await.unwrap();
let expected = vec![
"+--------+-------------------------+",
"| name | ts |",
"+--------+-------------------------+",
"| Andrew | 2018-11-13 17:11:10.011 |",
"| Jorge | 2018-12-13 12:12:10.011 |",
"+--------+-------------------------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn query_empty_table() {
let mut ctx = ExecutionContext::new();
let empty_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())));
ctx.register_table("test_tbl", empty_table).unwrap();
let sql = "SELECT * FROM test_tbl";
let result = plan_and_collect(&mut ctx, sql)
.await
.expect("Query empty table");
let expected = vec!["++", "++"];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn catalogs_not_leaked() {
// the information schema used to introduce cyclic Arcs
Expand All @@ -3243,60 +3184,6 @@ mod tests {
assert_eq!(Weak::strong_count(&catalog_weak), 0);
}

#[tokio::test]
async fn schema_merge_ignores_metadata() {
// Create two parquet files in same table with same schema but different metadata
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
let table_path = Path::new(&table_dir);

let mut non_empty_metadata: HashMap<String, String> = HashMap::new();
non_empty_metadata.insert("testing".to_string(), "metadata".to_string());

let fields = vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
];
let schemas = vec![
Arc::new(Schema::new_with_metadata(
fields.clone(),
non_empty_metadata.clone(),
)),
Arc::new(Schema::new(fields.clone())),
];

if let Ok(()) = fs::create_dir(table_path) {
for (i, schema) in schemas.iter().enumerate().take(2) {
let filename = format!("part-{}.parquet", i);
let path = table_path.join(&filename);
let file = fs::File::create(path).unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
.unwrap();

// create mock record batch
let ids = Arc::new(Int32Array::from_slice(&[i as i32]));
let names = Arc::new(StringArray::from_slice(&["test"]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![ids, names]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();
}
}

// Read the parquet files into a dataframe to confirm results
// (no errors)
let mut ctx = ExecutionContext::new();
let df = ctx
.read_parquet(table_dir.to_str().unwrap().to_string())
.await
.unwrap();
let result = df.collect().await.unwrap();

assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
}

#[tokio::test]
async fn normalized_column_identifiers() {
// create local execution context
Expand Down
52 changes: 52 additions & 0 deletions datafusion/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::io::Write;

use tempfile::TempDir;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -76,3 +80,51 @@ async fn csv_query_create_external_table() {
];
assert_batches_eq!(expected, &actual);
}



#[tokio::test]
async fn create_external_table_with_timestamps() {
let mut ctx = ExecutionContext::new();

let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";

let tmp_dir = TempDir::new().unwrap();
let file_path = tmp_dir.path().join("timestamps.csv");

// scope to ensure the file is closed and written
{
std::fs::File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");
}

let sql = format!(
"CREATE EXTERNAL TABLE csv_with_timestamps (
name VARCHAR,
ts TIMESTAMP
)
STORED AS CSV
LOCATION '{}'
",
file_path.to_str().expect("path is utf8")
);

plan_and_collect(&mut ctx, &sql)
.await
.expect("Executing CREATE EXTERNAL TABLE");

let sql = "SELECT * from csv_with_timestamps";
let result = plan_and_collect(&mut ctx, sql).await.unwrap();
let expected = vec![
"+--------+-------------------------+",
"| name | ts |",
"+--------+-------------------------+",
"| Andrew | 2018-11-13 17:11:10.011 |",
"| Jorge | 2018-12-13 12:12:10.011 |",
"+--------+-------------------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
60 changes: 60 additions & 0 deletions datafusion/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::{fs, path::Path, collections::HashMap};

use ::parquet::arrow::ArrowWriter;
use tempfile::TempDir;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -162,3 +167,58 @@ async fn parquet_list_columns() {
assert_eq!(result.value(2), "hij");
assert_eq!(result.value(3), "xyz");
}


#[tokio::test]
async fn schema_merge_ignores_metadata() {
// Create two parquet files in same table with same schema but different metadata
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
let table_path = Path::new(&table_dir);

let mut non_empty_metadata: HashMap<String, String> = HashMap::new();
non_empty_metadata.insert("testing".to_string(), "metadata".to_string());

let fields = vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
];
let schemas = vec![
Arc::new(Schema::new_with_metadata(
fields.clone(),
non_empty_metadata.clone(),
)),
Arc::new(Schema::new(fields.clone())),
];

if let Ok(()) = fs::create_dir(table_path) {
for (i, schema) in schemas.iter().enumerate().take(2) {
let filename = format!("part-{}.parquet", i);
let path = table_path.join(&filename);
let file = fs::File::create(path).unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
.unwrap();

// create mock record batch
let ids = Arc::new(Int32Array::from_slice(&[i as i32]));
let names = Arc::new(StringArray::from_slice(&["test"]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![ids, names]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();
}
}

// Read the parquet files into a dataframe to confirm results
// (no errors)
let mut ctx = ExecutionContext::new();
let df = ctx
.read_parquet(table_dir.to_str().unwrap().to_string())
.await
.unwrap();
let result = df.collect().await.unwrap();

assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
}
16 changes: 15 additions & 1 deletion datafusion/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use super::*;
use datafusion::{from_slice::FromSlice, physical_plan::collect_partitioned};
use datafusion::{from_slice::FromSlice, physical_plan::collect_partitioned, datasource::empty::EmptyTable};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -985,3 +985,17 @@ async fn parallel_query_with_filter() -> Result<()> {

Ok(())
}


#[tokio::test]
async fn query_empty_table() {
let mut ctx = ExecutionContext::new();
let empty_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())));
ctx.register_table("test_tbl", empty_table).unwrap();
let sql = "SELECT * FROM test_tbl";
let result = plan_and_collect(&mut ctx, sql)
.await
.expect("Query empty table");
let expected = vec!["++", "++"];
assert_batches_sorted_eq!(expected, &result);
}

0 comments on commit 464026d

Please sign in to comment.