Skip to content

Commit

Permalink
Allow SessionContext::read_csv, etc to read multiple files (#4908)
Browse files Browse the repository at this point in the history
* Added a traitDataFilePaths to convert strings and vector of strings to a vector of URLs.

* Added docs and tests. Updated DataFilePaths to accept any vector containing AsRef<str> trait.

* Added docs to read_ methods and extended the SessionContext doc.

* Ran Cargo fmt

* removed CallReadTrait methods

* Update read_csv example

Co-authored-by: Andrew Lamb <[email protected]>

* removed addition to SessionContext example

---------

Co-authored-by: Lakkam Sai Krishna Reddy <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Feb 20, 2023
1 parent ae89960 commit cfbb14d
Showing 1 changed file with 78 additions and 23 deletions.
101 changes: 78 additions & 23 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,43 @@ use super::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, ReadOptions,
};

/// DataFilePaths adds a method to convert strings and vector of strings to vector of [`ListingTableUrl`] URLs.
/// This allows methods such [`SessionContext::read_csv`] and `[`SessionContext::read_avro`]
/// to take either a single file or multiple files.
pub trait DataFilePaths {
/// Parse to a vector of [`ListingTableUrl`] URLs.
fn to_urls(self) -> Result<Vec<ListingTableUrl>>;
}

impl DataFilePaths for &str {
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
Ok(vec![ListingTableUrl::parse(self)?])
}
}

impl DataFilePaths for String {
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
Ok(vec![ListingTableUrl::parse(self)?])
}
}

impl DataFilePaths for &String {
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
Ok(vec![ListingTableUrl::parse(self)?])
}
}

impl<P> DataFilePaths for Vec<P>
where
P: AsRef<str>,
{
fn to_urls(self) -> Result<Vec<ListingTableUrl>> {
self.iter()
.map(ListingTableUrl::parse)
.collect::<Result<Vec<ListingTableUrl>>>()
}
}

/// SessionContext is the main interface for executing queries with DataFusion. It stands for
/// the connection between user and DataFusion/Ballista cluster.
/// The context provides the following functionality
Expand Down Expand Up @@ -627,22 +664,18 @@ impl SessionContext {
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
async fn _read_type<'a>(
async fn _read_type<'a, P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: impl ReadOptions<'a>,
) -> Result<DataFrame> {
let table_path = ListingTableUrl::parse(table_path)?;
let table_paths = table_paths.to_urls()?;
let session_config = self.copied_config();
let listing_options = options.to_listing_options(&session_config);
let resolved_schema = match options
.get_resolved_schema(&session_config, self.state(), table_path.clone())
.await
{
Ok(resolved_schema) => resolved_schema,
Err(e) => return Err(e),
};
let config = ListingTableConfig::new(table_path)
let resolved_schema = options
.get_resolved_schema(&session_config, self.state(), table_paths[0].clone())
.await?;
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?;
Expand All @@ -653,24 +686,28 @@ impl SessionContext {
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_avro(
///
/// For an example, see [`read_csv`](Self::read_csv)
pub async fn read_avro<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: AvroReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates a [`DataFrame`] for reading an JSON data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_json(
///
/// For an example, see [`read_csv`](Self::read_csv)
pub async fn read_json<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: NdJsonReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates an empty DataFrame.
Expand All @@ -685,24 +722,42 @@ impl SessionContext {
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_csv(
///
/// Example usage is given below:
///
/// ```
/// use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// // You can read a single file using `read_csv`
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// // you can also read multiple files:
/// let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn read_csv<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: CsvReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates a [`DataFrame`] for reading a Parquet data source.
///
/// For more control such as reading multiple files, you can use
/// [`read_table`](Self::read_table) with a [`ListingTable`].
pub async fn read_parquet(
///
/// For an example, see [`read_csv`](Self::read_csv)
pub async fn read_parquet<P: DataFilePaths>(
&self,
table_path: impl AsRef<str>,
table_paths: P,
options: ParquetReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_path, options).await
self._read_type(table_paths, options).await
}

/// Creates a [`DataFrame`] for a [`TableProvider`] such as a
Expand Down

0 comments on commit cfbb14d

Please sign in to comment.