Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feature/9421
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/proto/src/logical_plan/from_proto.rs
  • Loading branch information
Omega359 committed Mar 3, 2024
2 parents bcd2bd1 + 89aea0a commit 761d873
Show file tree
Hide file tree
Showing 86 changed files with 1,553 additions and 923 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
exclude = ["datafusion-cli"]
members = [
"datafusion/common",
"datafusion/common_runtime",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
Expand Down Expand Up @@ -72,6 +73,7 @@ ctor = "0.2.0"
dashmap = "5.4.0"
datafusion = { path = "datafusion/core", version = "36.0.0", default-features = false }
datafusion-common = { path = "datafusion/common", version = "36.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common_runtime", version = "36.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "36.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "36.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "36.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
[API Docs](https://docs.rs/datafusion/latest/datafusion/) |
[Chat](https://discord.com/channels/885562378132000778/885562378132000781)

<img src="https://arrow.apache.org/datafusion/_images/DataFusion-Logo-Background-White.png" width="256" alt="logo"/>
<img src="./docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>

DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in
[Rust](http://rustlang.org), using the [Apache Arrow](https://arrow.apache.org)
Expand Down
9 changes: 9 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions datafusion/common_runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-common-runtime"
description = "Common Runtime functionality for DataFusion query engine"
keywords = ["arrow", "query", "sql"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lib]
name = "datafusion_common_runtime"
path = "src/lib.rs"

[dependencies]
tokio = { workspace = true }
26 changes: 26 additions & 0 deletions datafusion/common_runtime/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Common Runtime

[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

This crate is a submodule of DataFusion that provides common utilities.

[df]: https://crates.io/crates/datafusion
60 changes: 60 additions & 0 deletions datafusion/common_runtime/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::future::Future;

use tokio::task::{JoinError, JoinSet};

/// Helper that provides a simple API to spawn a single task and join it.
/// Provides guarantees of aborting on `Drop` to keep it cancel-safe.
///
/// Technically, it's just a wrapper of `JoinSet` (with size=1).
#[derive(Debug)]
pub struct SpawnedTask<R> {
inner: JoinSet<R>,
}

impl<R: 'static> SpawnedTask<R> {
pub fn spawn<T>(task: T) -> Self
where
T: Future<Output = R>,
T: Send + 'static,
R: Send,
{
let mut inner = JoinSet::new();
inner.spawn(task);
Self { inner }
}

pub fn spawn_blocking<T>(task: T) -> Self
where
T: FnOnce() -> R,
T: Send + 'static,
R: Send,
{
let mut inner = JoinSet::new();
inner.spawn_blocking(task);
Self { inner }
}

pub async fn join(mut self) -> Result<R, JoinError> {
self.inner
.join_next()
.await
.expect("`SpawnedTask` instance always contains exactly 1 task")
}
}
20 changes: 20 additions & 0 deletions datafusion/common_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod common;

pub use common::SpawnedTask;
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,7 @@ mod tests {
use arrow::array::{self, Int32Array};
use arrow::datatypes::DataType;
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
Expand Down Expand Up @@ -2169,15 +2170,14 @@ mod tests {
}

#[tokio::test]
#[allow(clippy::disallowed_methods)]
async fn sendable() {
let df = test_table().await.unwrap();
// dataframes should be sendable between threads/tasks
let task = tokio::task::spawn(async move {
let task = SpawnedTask::spawn(async move {
df.select_columns(&["c1"])
.expect("should be usable in a task")
});
task.await.expect("task completed successfully");
task.join().await.expect("task completed successfully");
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use bytes::{BufMut, BytesMut};
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::common::SpawnedTask;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArr
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::{exec_datafusion_err, DataFusionError};

use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;

use futures::StreamExt;
use object_store::path::Path;

use rand::distributions::DistString;

use datafusion_physical_plan::common::SpawnedTask;
use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};

type RecordBatchReceiver = Receiver<RecordBatch>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use crate::physical_plan::SendableRecordBatchStream;

use arrow_array::RecordBatch;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;

use bytes::Bytes;
use datafusion_physical_plan::common::SpawnedTask;
use futures::try_join;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use async_trait::async_trait;
use futures::StreamExt;

use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_plan::common::SpawnedTask;
use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,7 @@ mod tests {
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use crate::variable::VarType;
use async_trait::async_trait;
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::Expr;
use std::env;
use std::path::PathBuf;
Expand Down Expand Up @@ -2321,7 +2322,6 @@ mod tests {
}

#[tokio::test]
#[allow(clippy::disallowed_methods)]
async fn send_context_to_threads() -> Result<()> {
// ensure SessionContexts can be used in a multi-threaded
// environment. Usecase is for concurrent planing.
Expand All @@ -2332,7 +2332,7 @@ mod tests {
let threads: Vec<_> = (0..2)
.map(|_| ctx.clone())
.map(|ctx| {
tokio::spawn(async move {
SpawnedTask::spawn(async move {
// Ensure we can create logical plan code on a separate thread.
ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
.await
Expand All @@ -2341,7 +2341,7 @@ mod tests {
.collect();

for handle in threads {
handle.await.unwrap().unwrap();
handle.join().await.unwrap().unwrap();
}
Ok(())
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,11 @@ pub use parquet;
/// re-export of [`datafusion_common`] crate
pub mod common {
pub use datafusion_common::*;

/// re-export of [`datafusion_common_runtime`] crate
pub mod runtime {
pub use datafusion_common_runtime::*;
}
}

// Backwards compatibility
Expand Down Expand Up @@ -524,7 +529,7 @@ pub mod functions {
/// re-export of [`datafusion_functions_array`] crate, if "array_expressions" feature is enabled
pub mod functions_array {
#[cfg(feature = "array_expressions")]
pub use datafusion_functions::*;
pub use datafusion_functions_array::*;
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::physical_plan::windows::{
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_expr::{
AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
Expand Down Expand Up @@ -123,8 +124,7 @@ async fn window_bounded_window_random_comparison() -> Result<()> {
for i in 0..n {
let idx = i % test_cases.len();
let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone();
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
let job = tokio::spawn(run_window_test(
let job = SpawnedTask::spawn(run_window_test(
make_staggered_batches::<true>(1000, n_distinct, i as u64),
i as u64,
pb_cols,
Expand All @@ -134,7 +134,7 @@ async fn window_bounded_window_random_comparison() -> Result<()> {
handles.push(job);
}
for job in handles {
job.await.unwrap()?;
job.join().await.unwrap()?;
}
}
Ok(())
Expand Down
9 changes: 5 additions & 4 deletions datafusion/expr/src/field_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,19 @@ impl GetFieldAccessSchema {
Self::ListIndex{ key_dt } => {
match (data_type, key_dt) {
(DataType::List(lt), DataType::Int64) => Ok(Field::new("list", lt.data_type().clone(), true)),
(DataType::List(_), _) => plan_err!(
"Only ints are valid as an indexed field in a list"
(DataType::LargeList(lt), DataType::Int64) => Ok(Field::new("large_list", lt.data_type().clone(), true)),
(DataType::List(_), _) | (DataType::LargeList(_), _) => plan_err!(
"Only ints are valid as an indexed field in a List/LargeList"
),
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List` or `Struct` types, got {other}"),
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List`, `LargeList` or `Struct` types, got {other}"),
}
}
Self::ListRange { start_dt, stop_dt, stride_dt } => {
match (data_type, start_dt, stop_dt, stride_dt) {
(DataType::List(_), DataType::Int64, DataType::Int64, DataType::Int64) => Ok(Field::new("list", data_type.clone(), true)),
(DataType::LargeList(_), DataType::Int64, DataType::Int64, DataType::Int64) => Ok(Field::new("large_list", data_type.clone(), true)),
(DataType::List(_), _, _, _) | (DataType::LargeList(_), _, _, _)=> plan_err!(
"Only ints are valid as an indexed field in a list"
"Only ints are valid as an indexed field in a List/LargeList"
),
(other, _, _, _) => plan_err!("The expression to get an indexed field is only valid for `List`, `LargeList` or `Struct` types, got {other}"),
}
Expand Down
Loading

0 comments on commit 761d873

Please sign in to comment.