Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/Eventual-Inc/Daft into conn…
Browse files Browse the repository at this point in the history
…ect_distinct
  • Loading branch information
universalmind303 committed Jan 15, 2025
2 parents 778dbe5 + 809e411 commit a189caf
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 9 deletions.
1 change: 0 additions & 1 deletion src/daft-connect/src/connect_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ impl SparkConnectService for DaftSparkConnectService {
debug!("Ignoring common metadata for relation: {common:?}; not yet implemented");
}
}
let session = self.get_session(&session_id)?;

let translator = SparkAnalyzer::new(&session);
let plan = Box::pin(translator.to_logical_plan(input))
Expand Down
2 changes: 1 addition & 1 deletion src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl SparkAnalyzer<'_> {
}
RelType::Deduplicate(rel) => self.deduplicate(*rel).await,
RelType::Sort(rel) => self.sort(*rel).await,
plan => not_yet_implemented!("relation type: \"{}\"", rel_name(&plan))?,
plan => not_yet_implemented!(r#"relation type: "{}""#, rel_name(&plan))?,
}
}

Expand Down
99 changes: 98 additions & 1 deletion src/daft-scan/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{collections::BTreeMap, sync::Arc};

use common_error::DaftResult;
use common_file_formats::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig};
use common_file_formats::{
CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig,
};
use common_io_config::IOConfig;
use common_scan_info::ScanOperatorRef;
use daft_core::prelude::TimeUnit;
Expand Down Expand Up @@ -263,6 +265,101 @@ impl CsvScanBuilder {
}
}

/// An argument builder for a JSON scan operator.
pub struct JsonScanBuilder {
pub glob_paths: Vec<String>,
pub infer_schema: bool,
pub io_config: Option<IOConfig>,
pub schema: Option<SchemaRef>,
pub file_path_column: Option<String>,
pub hive_partitioning: bool,
pub schema_hints: Option<SchemaRef>,
pub buffer_size: Option<usize>,
pub chunk_size: Option<usize>,
}

impl JsonScanBuilder {
pub fn new<T: IntoGlobPath>(glob_paths: T) -> Self {
let glob_paths = glob_paths.into_glob_path();
Self::new_impl(glob_paths)
}

fn new_impl(glob_paths: Vec<String>) -> Self {
Self {
glob_paths,
infer_schema: true,
schema: None,
io_config: None,
file_path_column: None,
hive_partitioning: false,
buffer_size: None,
chunk_size: None,
schema_hints: None,
}
}

pub fn infer_schema(mut self, infer_schema: bool) -> Self {
self.infer_schema = infer_schema;
self
}

pub fn io_config(mut self, io_config: IOConfig) -> Self {
self.io_config = Some(io_config);
self
}

pub fn schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}

pub fn file_path_column(mut self, file_path_column: String) -> Self {
self.file_path_column = Some(file_path_column);
self
}

pub fn hive_partitioning(mut self, hive_partitioning: bool) -> Self {
self.hive_partitioning = hive_partitioning;
self
}

pub fn schema_hints(mut self, schema_hints: SchemaRef) -> Self {
self.schema_hints = Some(schema_hints);
self
}

pub fn buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = Some(buffer_size);
self
}

pub fn chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = Some(chunk_size);
self
}

/// Creates a logical table scan backed by a JSON scan operator.
pub async fn finish(self) -> DaftResult<LogicalPlanBuilder> {
let cfg = JsonSourceConfig {
buffer_size: self.buffer_size,
chunk_size: self.chunk_size,
};
let operator = Arc::new(
GlobScanOperator::try_new(
self.glob_paths,
Arc::new(FileFormatConfig::Json(cfg)),
Arc::new(StorageConfig::new_internal(false, self.io_config)),
self.infer_schema,
self.schema,
self.file_path_column,
self.hive_partitioning,
)
.await?,
);
LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None)
}
}

#[cfg(feature = "python")]
pub fn delta_scan<T: AsRef<str>>(
glob_path: T,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,11 +1060,11 @@ impl<'a> SQLPlanner<'a> {

/// Plan a `FROM <path>` table factor by rewriting to relevant table-value function.
fn plan_relation_path(&self, name: &ObjectName) -> SQLPlannerResult<Relation> {
let path = name.to_string();
let path = &path[1..path.len() - 1]; // strip single-quotes ' '
let path = name.0[0].value.as_str();
let func = match Path::new(path).extension() {
Some(ext) if ext.eq_ignore_ascii_case("csv") => "read_csv",
Some(ext) if ext.eq_ignore_ascii_case("json") => "read_json",
Some(ext) if ext.eq_ignore_ascii_case("jsonl") => "read_json",
Some(ext) if ext.eq_ignore_ascii_case("parquet") => "read_parquet",
Some(_) => invalid_operation_err!("unsupported file path extension: {}", name),
None => invalid_operation_err!("unsupported file path, no extension: {}", name),
Expand Down
6 changes: 4 additions & 2 deletions src/daft-sql/src/table_provider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub mod read_csv;
pub mod read_json;
pub mod read_parquet;
use std::{collections::HashMap, sync::Arc};

use daft_logical_plan::LogicalPlanBuilder;
use once_cell::sync::Lazy;
use read_csv::ReadCsvFunction;
use read_json::ReadJsonFunction;
use read_parquet::ReadParquetFunction;
use sqlparser::ast::TableFunctionArgs;

Expand All @@ -17,11 +19,11 @@ use crate::{

pub(crate) static SQL_TABLE_FUNCTIONS: Lazy<SQLTableFunctions> = Lazy::new(|| {
let mut functions = SQLTableFunctions::new();
functions.add_fn("read_parquet", ReadParquetFunction);
functions.add_fn("read_csv", ReadCsvFunction);
functions.add_fn("read_json", ReadJsonFunction);
functions.add_fn("read_parquet", ReadParquetFunction);
#[cfg(feature = "python")]
functions.add_fn("read_deltalake", ReadDeltalakeFunction);

functions
});

Expand Down
4 changes: 4 additions & 0 deletions src/daft-sql/src/table_provider/read_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ impl TryFrom<SQLFunctionArguments> for CsvScanBuilder {
type Error = PlannerError;

fn try_from(args: SQLFunctionArguments) -> Result<Self, Self::Error> {
// TODO validations (unsure if should carry over from python API)
// - schema_hints is deprecated
// - ensure infer_schema is true if schema is None.

let delimiter = args.try_get_named("delimiter")?;
let has_headers: bool = args.try_get_named("has_headers")?.unwrap_or(true);
let double_quote: bool = args.try_get_named("double_quote")?.unwrap_or(true);
Expand Down
68 changes: 68 additions & 0 deletions src/daft-sql/src/table_provider/read_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use daft_scan::builder::JsonScanBuilder;

use super::{expr_to_iocfg, SQLTableFunction};
use crate::{error::PlannerError, functions::SQLFunctionArguments};

pub(super) struct ReadJsonFunction;

impl SQLTableFunction for ReadJsonFunction {
fn plan(
&self,
planner: &crate::SQLPlanner,
args: &sqlparser::ast::TableFunctionArgs,
) -> crate::error::SQLPlannerResult<daft_logical_plan::LogicalPlanBuilder> {
let builder: JsonScanBuilder = planner.plan_function_args(
args.args.as_slice(),
&[
"path",
"infer_schema",
// "schema"
"io_config",
"file_path_column",
"hive_partitioning",
// "schema_hints",
"buffer_size",
"chunk_size",
],
1, // (path)
)?;
let runtime = common_runtime::get_io_runtime(true);
let result = runtime.block_on(builder.finish())??;
Ok(result)
}
}

impl TryFrom<SQLFunctionArguments> for JsonScanBuilder {
type Error = PlannerError;

fn try_from(args: SQLFunctionArguments) -> Result<Self, Self::Error> {
// TODO validations (unsure if should carry over from python API)
// - schema_hints is deprecated
// - ensure infer_schema is true if schema is None.

let glob_paths: String = args
.try_get_positional(0)?
.ok_or_else(|| PlannerError::invalid_operation("path is required for `read_json`"))?;

let infer_schema = args.try_get_named("infer_schema")?.unwrap_or(true);
let chunk_size = args.try_get_named("chunk_size")?;
let buffer_size = args.try_get_named("buffer_size")?;
let file_path_column = args.try_get_named("file_path_column")?;
let hive_partitioning = args.try_get_named("hive_partitioning")?.unwrap_or(false);
let schema = None; // TODO
let schema_hints = None; // TODO
let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?;

Ok(Self {
glob_paths: vec![glob_paths],
infer_schema,
schema,
io_config,
file_path_column,
hive_partitioning,
schema_hints,
buffer_size,
chunk_size,
})
}
}
2 changes: 0 additions & 2 deletions tests/sql/test_table_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ def sample_schema():
return {"a": daft.DataType.float32(), "b": daft.DataType.string()}


@pytest.mark.skip("read_json table function not supported (yet) see github #3196")
def test_sql_read_json():
df = daft.sql("SELECT * FROM read_json('tests/assets/json-data/small.jsonl')").collect()
expected = daft.read_json("tests/assets/json-data/small.jsonl").collect()
assert df.to_pydict() == expected.to_pydict()


@pytest.mark.skip("read_json table function not supported (yet) see github #3196")
def test_sql_read_json_path():
df = daft.sql("SELECT * FROM 'tests/assets/json-data/small.jsonl'").collect()
expected = daft.read_json("tests/assets/json-data/small.jsonl").collect()
Expand Down

0 comments on commit a189caf

Please sign in to comment.