From 57024c73bae3895a3d72a4e000880c367b89a4c8 Mon Sep 17 00:00:00 2001 From: DreaMer963 Date: Tue, 1 Feb 2022 22:34:36 +0800 Subject: [PATCH] fmt --- ballista/rust/client/Cargo.toml | 1 + ballista/rust/client/src/context.rs | 19 +++--- ballista/rust/core/Cargo.toml | 2 + ballista/rust/core/src/client.rs | 5 +- .../src/execution_plans/shuffle_writer.rs | 3 +- ballista/rust/executor/Cargo.toml | 1 + ballista/rust/scheduler/Cargo.toml | 1 + datafusion/Cargo.toml | 1 + datafusion/benches/aggregate_query_sql.rs | 5 +- datafusion/benches/math_query_sql.rs | 5 +- datafusion/benches/sort_limit_query_sql.rs | 13 ++-- datafusion/benches/window_query_sql.rs | 5 +- datafusion/src/catalog/catalog.rs | 15 +++-- datafusion/src/catalog/schema.rs | 13 ++-- datafusion/src/datasource/object_store/mod.rs | 16 ++--- datafusion/src/execution/context.rs | 67 +++++++------------ datafusion/src/execution/dataframe_impl.rs | 15 +++-- datafusion/src/execution/disk_manager.rs | 6 +- datafusion/src/execution/memory_manager.rs | 27 ++++---- datafusion/src/physical_plan/cross_join.rs | 8 +-- datafusion/src/physical_plan/metrics/mod.rs | 7 +- datafusion/src/physical_plan/metrics/value.rs | 12 ++-- datafusion/src/physical_plan/sorts/mod.rs | 9 +-- .../sorts/sort_preserving_merge.rs | 5 +- datafusion/tests/custom_sources.rs | 4 +- datafusion/tests/parquet_pruning.rs | 2 +- datafusion/tests/sql/aggregates.rs | 2 +- datafusion/tests/sql/avro.rs | 2 +- datafusion/tests/sql/errors.rs | 2 +- datafusion/tests/sql/explain_analyze.rs | 6 +- datafusion/tests/sql/mod.rs | 2 +- datafusion/tests/sql/parquet.rs | 4 +- datafusion/tests/sql/timestamp.rs | 2 +- 33 files changed, 145 insertions(+), 142 deletions(-) diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index aa8297f8d06d..4ec1abe77654 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -35,6 +35,7 @@ log = "0.4" tokio = "1.0" tempfile = "3" sqlparser = "0.13" +parking_lot = "0.11" datafusion = { path = "../../../datafusion", version = "6.0.0" } diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 3fb347bddbce..4cd5a219461e 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -17,11 +17,12 @@ //! Distributed execution context. +use parking_lot::Mutex; use sqlparser::ast::Statement; use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use ballista_core::config::BallistaConfig; use ballista_core::utils::create_df_ctx_with_ballista_query_planner; @@ -142,7 +143,7 @@ impl BallistaContext { // use local DataFusion context for now but later this might call the scheduler let mut ctx = { - let guard = self.state.lock().unwrap(); + let guard = self.state.lock(); create_df_ctx_with_ballista_query_planner( &guard.scheduler_host, guard.scheduler_port, @@ -162,7 +163,7 @@ impl BallistaContext { // use local DataFusion context for now but later this might call the scheduler let mut ctx = { - let guard = self.state.lock().unwrap(); + let guard = self.state.lock(); create_df_ctx_with_ballista_query_planner( &guard.scheduler_host, guard.scheduler_port, @@ -186,7 +187,7 @@ impl BallistaContext { // use local DataFusion context for now but later this might call the scheduler let mut ctx = { - let guard = self.state.lock().unwrap(); + let guard = self.state.lock(); create_df_ctx_with_ballista_query_planner( &guard.scheduler_host, guard.scheduler_port, @@ -203,7 +204,7 @@ impl BallistaContext { name: &str, table: Arc, ) -> Result<()> { - let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock(); state.tables.insert(name.to_owned(), table); Ok(()) } @@ -280,7 +281,7 @@ impl BallistaContext { /// might require the schema to be inferred. pub async fn sql(&self, sql: &str) -> Result> { let mut ctx = { - let state = self.state.lock().unwrap(); + let state = self.state.lock(); create_df_ctx_with_ballista_query_planner( &state.scheduler_host, state.scheduler_port, @@ -291,7 +292,7 @@ impl BallistaContext { let is_show = self.is_show_statement(sql).await?; // the show tables、 show columns sql can not run at scheduler because the tables is store at client if is_show { - let state = self.state.lock().unwrap(); + let state = self.state.lock(); ctx = ExecutionContext::with_config( ExecutionConfig::new().with_information_schema( state.config.default_with_information_schema(), @@ -301,7 +302,7 @@ impl BallistaContext { // register tables with DataFusion context { - let state = self.state.lock().unwrap(); + let state = self.state.lock(); for (name, prov) in &state.tables { ctx.register_table( TableReference::Bare { table: name }, @@ -483,7 +484,7 @@ mod tests { .unwrap(); { - let mut guard = context.state.lock().unwrap(); + let mut guard = context.state.lock(); let csv_table = guard.tables.get("single_nan"); if let Some(table_provide) = csv_table { diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index fa68be6b0ead..043f79a962b2 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -48,6 +48,8 @@ parse_arg = "0.1.3" arrow-flight = { version = "8.0.0" } datafusion = { path = "../../../datafusion", version = "6.0.0" } +parking_lot = "0.11" + [dev-dependencies] tempfile = "3" diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index b40c8788320a..aae4b2bb1bb2 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -17,7 +17,8 @@ //! Client API for sending requests to executors. -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use std::{collections::HashMap, pin::Pin}; use std::{ convert::{TryFrom, TryInto}, @@ -154,7 +155,7 @@ impl Stream for FlightDataStream { self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let mut stream = self.stream.lock().expect("mutex is bad"); + let mut stream = self.stream.lock(); stream.poll_next_unpin(cx).map(|x| match x { Some(flight_data_chunk_result) => { let converted_chunk = flight_data_chunk_result diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 4f027c1f28bd..724bb3518d74 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -20,10 +20,11 @@ //! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query //! will use the ShuffleReaderExec to read these results. +use parking_lot::Mutex; use std::fs::File; use std::iter::Iterator; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Instant; use std::{any::Any, pin::Pin}; diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 4eba2a152328..cfe11b1b6642 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -46,6 +46,7 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } hyper = "0.14.4" +parking_lot = "0.11" [dev-dependencies] diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index 10b3723712da..fdeb7e726d57 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true } tonic = "0.6" tower = { version = "0.4" } warp = "0.3" +parking_lot = "0.11" [dev-dependencies] ballista-core = { path = "../core", version = "0.6.0" } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 422a776448d9..eea2ee87eeab 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -78,6 +78,7 @@ avro-rs = { version = "0.13", features = ["snappy"], optional = true } num-traits = { version = "0.2", optional = true } pyo3 = { version = "0.15", optional = true } tempfile = "3" +parking_lot = "0.11" [dev-dependencies] criterion = "0.3" diff --git a/datafusion/benches/aggregate_query_sql.rs b/datafusion/benches/aggregate_query_sql.rs index dc40c61db41d..e587fe58cd44 100644 --- a/datafusion/benches/aggregate_query_sql.rs +++ b/datafusion/benches/aggregate_query_sql.rs @@ -25,12 +25,13 @@ use crate::criterion::Criterion; use data_utils::create_table_provider; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use tokio::runtime::Runtime; fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); - let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap(); + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } diff --git a/datafusion/benches/math_query_sql.rs b/datafusion/benches/math_query_sql.rs index 85d11fa67ab5..6195937dc4e5 100644 --- a/datafusion/benches/math_query_sql.rs +++ b/datafusion/benches/math_query_sql.rs @@ -19,7 +19,8 @@ extern crate criterion; use criterion::Criterion; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use tokio::runtime::Runtime; @@ -40,7 +41,7 @@ fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query - let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap(); + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); rt.block_on(df.collect()).unwrap(); } diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index 41f8c171e236..3828014e9892 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -22,7 +22,8 @@ use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::listing::{ListingOptions, ListingTable}; use datafusion::datasource::object_store::local::LocalFileSystem; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; extern crate arrow; extern crate datafusion; @@ -38,7 +39,7 @@ fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query - let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap(); + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); rt.block_on(df.collect()).unwrap(); } @@ -81,18 +82,18 @@ fn create_context() -> Arc> { rt.block_on(async { // create local execution context let mut ctx = ExecutionContext::new(); - ctx.state.lock().unwrap().config.target_partitions = 1; - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + ctx.state.lock().config.target_partitions = 1; + let runtime = ctx.state.lock().runtime_env.clone(); let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime) .await .unwrap(); ctx.register_table("aggregate_test_100", Arc::new(mem_table)) .unwrap(); - ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx))) + ctx_holder.lock().push(Arc::new(Mutex::new(ctx))) }); - let ctx = ctx_holder.lock().unwrap().get(0).unwrap().clone(); + let ctx = ctx_holder.lock().get(0).unwrap().clone(); ctx } diff --git a/datafusion/benches/window_query_sql.rs b/datafusion/benches/window_query_sql.rs index bca4a38360fe..dad838eb7f62 100644 --- a/datafusion/benches/window_query_sql.rs +++ b/datafusion/benches/window_query_sql.rs @@ -25,12 +25,13 @@ use crate::criterion::Criterion; use data_utils::create_table_provider; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; use tokio::runtime::Runtime; fn query(ctx: Arc>, sql: &str) { let rt = Runtime::new().unwrap(); - let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap(); + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } diff --git a/datafusion/src/catalog/catalog.rs b/datafusion/src/catalog/catalog.rs index 7dbfa5a80c3e..d5f509f62bcc 100644 --- a/datafusion/src/catalog/catalog.rs +++ b/datafusion/src/catalog/catalog.rs @@ -19,9 +19,10 @@ //! representing collections of named schemas. use crate::catalog::schema::SchemaProvider; +use parking_lot::RwLock; use std::any::Any; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; /// Represent a list of named catalogs pub trait CatalogList: Sync + Send { @@ -75,17 +76,17 @@ impl CatalogList for MemoryCatalogList { name: String, catalog: Arc, ) -> Option> { - let mut catalogs = self.catalogs.write().unwrap(); + let mut catalogs = self.catalogs.write(); catalogs.insert(name, catalog) } fn catalog_names(&self) -> Vec { - let catalogs = self.catalogs.read().unwrap(); + let catalogs = self.catalogs.read(); catalogs.keys().map(|s| s.to_string()).collect() } fn catalog(&self, name: &str) -> Option> { - let catalogs = self.catalogs.read().unwrap(); + let catalogs = self.catalogs.read(); catalogs.get(name).cloned() } } @@ -129,7 +130,7 @@ impl MemoryCatalogProvider { name: impl Into, schema: Arc, ) -> Option> { - let mut schemas = self.schemas.write().unwrap(); + let mut schemas = self.schemas.write(); schemas.insert(name.into(), schema) } } @@ -140,12 +141,12 @@ impl CatalogProvider for MemoryCatalogProvider { } fn schema_names(&self) -> Vec { - let schemas = self.schemas.read().unwrap(); + let schemas = self.schemas.read(); schemas.keys().cloned().collect() } fn schema(&self, name: &str) -> Option> { - let schemas = self.schemas.read().unwrap(); + let schemas = self.schemas.read(); schemas.get(name).cloned() } } diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs index 08707ea3347a..60894d1098d0 100644 --- a/datafusion/src/catalog/schema.rs +++ b/datafusion/src/catalog/schema.rs @@ -18,9 +18,10 @@ //! Describes the interface and built-in implementations of schemas, //! representing collections of named tables. +use parking_lot::RwLock; use std::any::Any; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -91,12 +92,12 @@ impl SchemaProvider for MemorySchemaProvider { } fn table_names(&self) -> Vec { - let tables = self.tables.read().unwrap(); + let tables = self.tables.read(); tables.keys().cloned().collect() } fn table(&self, name: &str) -> Option> { - let tables = self.tables.read().unwrap(); + let tables = self.tables.read(); tables.get(name).cloned() } @@ -111,17 +112,17 @@ impl SchemaProvider for MemorySchemaProvider { name ))); } - let mut tables = self.tables.write().unwrap(); + let mut tables = self.tables.write(); Ok(tables.insert(name, table)) } fn deregister_table(&self, name: &str) -> Result>> { - let mut tables = self.tables.write().unwrap(); + let mut tables = self.tables.write(); Ok(tables.remove(name)) } fn table_exist(&self, name: &str) -> bool { - let tables = self.tables.read().unwrap(); + let tables = self.tables.read(); tables.contains_key(name) } } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index c77489689a86..4ca0d54c4092 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -19,11 +19,12 @@ pub mod local; +use parking_lot::RwLock; use std::collections::HashMap; use std::fmt::{self, Debug}; use std::io::Read; use std::pin::Pin; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -175,12 +176,7 @@ impl fmt::Debug for ObjectStoreRegistry { f.debug_struct("ObjectStoreRegistry") .field( "schemes", - &self - .object_stores - .read() - .unwrap() - .keys() - .collect::>(), + &self.object_stores.read().keys().collect::>(), ) .finish() } @@ -211,13 +207,13 @@ impl ObjectStoreRegistry { scheme: String, store: Arc, ) -> Option> { - let mut stores = self.object_stores.write().unwrap(); + let mut stores = self.object_stores.write(); stores.insert(scheme, store) } /// Get the store registered for scheme pub fn get(&self, scheme: &str) -> Option> { - let stores = self.object_stores.read().unwrap(); + let stores = self.object_stores.read(); stores.get(scheme).cloned() } @@ -231,7 +227,7 @@ impl ObjectStoreRegistry { uri: &'a str, ) -> Result<(Arc, &'a str)> { if let Some((scheme, path)) = uri.split_once("://") { - let stores = self.object_stores.read().unwrap(); + let stores = self.object_stores.read(); let store = stores .get(&*scheme.to_lowercase()) .map(Clone::clone) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 023d3a0023be..deec84d5a0ff 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -39,13 +39,11 @@ use crate::{ }, }; use log::debug; +use parking_lot::Mutex; +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::string::String; use std::sync::Arc; -use std::{ - collections::{HashMap, HashSet}, - sync::Mutex, -}; use std::{fs, path::PathBuf}; use futures::{StreamExt, TryStreamExt}; @@ -201,7 +199,7 @@ impl ExecutionContext { /// Return the [RuntimeEnv] used to run queries with this [ExecutionContext] pub fn runtime_env(&self) -> Arc { - self.state.lock().unwrap().runtime_env.clone() + self.state.lock().runtime_env.clone() } /// Creates a dataframe that will execute a SQL query. @@ -242,12 +240,7 @@ impl ExecutionContext { format: file_format, collect_stat: false, file_extension: file_extension.to_owned(), - target_partitions: self - .state - .lock() - .unwrap() - .config - .target_partitions, + target_partitions: self.state.lock().config.target_partitions, table_partition_cols: vec![], }; @@ -312,7 +305,7 @@ impl ExecutionContext { } // create a query planner - let state = self.state.lock().unwrap().clone(); + let state = self.state.lock().clone(); let query_planner = SqlToRel::new(&state); query_planner.statement_to_plan(&statements[0]) } @@ -325,7 +318,6 @@ impl ExecutionContext { ) { self.state .lock() - .unwrap() .execution_props .add_var_provider(variable_type, provider); } @@ -340,7 +332,6 @@ impl ExecutionContext { pub fn register_udf(&mut self, f: ScalarUDF) { self.state .lock() - .unwrap() .scalar_functions .insert(f.name.clone(), Arc::new(f)); } @@ -355,7 +346,6 @@ impl ExecutionContext { pub fn register_udaf(&mut self, f: AggregateUDF) { self.state .lock() - .unwrap() .aggregate_functions .insert(f.name.clone(), Arc::new(f)); } @@ -369,7 +359,7 @@ impl ExecutionContext { ) -> Result> { let uri: String = uri.into(); let (object_store, path) = self.object_store(&uri)?; - let target_partitions = self.state.lock().unwrap().config.target_partitions; + let target_partitions = self.state.lock().config.target_partitions; Ok(Arc::new(DataFrameImpl::new( self.state.clone(), &LogicalPlanBuilder::scan_avro( @@ -400,7 +390,7 @@ impl ExecutionContext { ) -> Result> { let uri: String = uri.into(); let (object_store, path) = self.object_store(&uri)?; - let target_partitions = self.state.lock().unwrap().config.target_partitions; + let target_partitions = self.state.lock().config.target_partitions; Ok(Arc::new(DataFrameImpl::new( self.state.clone(), &LogicalPlanBuilder::scan_csv( @@ -422,7 +412,7 @@ impl ExecutionContext { ) -> Result> { let uri: String = uri.into(); let (object_store, path) = self.object_store(&uri)?; - let target_partitions = self.state.lock().unwrap().config.target_partitions; + let target_partitions = self.state.lock().config.target_partitions; let logical_plan = LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions) .await? @@ -477,8 +467,8 @@ impl ExecutionContext { uri: &str, options: CsvReadOptions<'_>, ) -> Result<()> { - let listing_options = options - .to_listing_options(self.state.lock().unwrap().config.target_partitions); + let listing_options = + options.to_listing_options(self.state.lock().config.target_partitions); self.register_listing_table( name, @@ -495,7 +485,7 @@ impl ExecutionContext { /// executed against this context. pub async fn register_parquet(&mut self, name: &str, uri: &str) -> Result<()> { let (target_partitions, enable_pruning) = { - let m = self.state.lock().unwrap(); + let m = self.state.lock(); (m.config.target_partitions, m.config.parquet_pruning) }; let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning); @@ -521,8 +511,8 @@ impl ExecutionContext { uri: &str, options: AvroReadOptions<'_>, ) -> Result<()> { - let listing_options = options - .to_listing_options(self.state.lock().unwrap().config.target_partitions); + let listing_options = + options.to_listing_options(self.state.lock().config.target_partitions); self.register_listing_table(name, uri, listing_options, options.schema) .await?; @@ -542,7 +532,7 @@ impl ExecutionContext { ) -> Option> { let name = name.into(); - let state = self.state.lock().unwrap(); + let state = self.state.lock(); let catalog = if state.config.information_schema { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&state.catalog_list), @@ -557,7 +547,7 @@ impl ExecutionContext { /// Retrieves a `CatalogProvider` instance by name pub fn catalog(&self, name: &str) -> Option> { - self.state.lock().unwrap().catalog_list.catalog(name) + self.state.lock().catalog_list.catalog(name) } /// Registers a object store with scheme using a custom `ObjectStore` so that @@ -573,7 +563,6 @@ impl ExecutionContext { self.state .lock() - .unwrap() .object_store_registry .register_store(scheme, object_store) } @@ -585,7 +574,6 @@ impl ExecutionContext { ) -> Result<(Arc, &'a str)> { self.state .lock() - .unwrap() .object_store_registry .get_by_uri(uri) .map_err(DataFusionError::from) @@ -605,7 +593,6 @@ impl ExecutionContext { let table_ref = table_ref.into(); self.state .lock() - .unwrap() .schema_for_ref(table_ref)? .register_table(table_ref.table().to_owned(), provider) } @@ -620,7 +607,6 @@ impl ExecutionContext { let table_ref = table_ref.into(); self.state .lock() - .unwrap() .schema_for_ref(table_ref)? .deregister_table(table_ref.table()) } @@ -634,7 +620,7 @@ impl ExecutionContext { table_ref: impl Into>, ) -> Result> { let table_ref = table_ref.into(); - let schema = self.state.lock().unwrap().schema_for_ref(table_ref)?; + let schema = self.state.lock().schema_for_ref(table_ref)?; match schema.table(table_ref.table()) { Some(ref provider) => { let plan = LogicalPlanBuilder::scan( @@ -664,7 +650,6 @@ impl ExecutionContext { Ok(self .state .lock() - .unwrap() // a bare reference will always resolve to the default catalog and schema .schema_for_ref(TableReference::Bare { table: "" })? .table_names() @@ -703,7 +688,7 @@ impl ExecutionContext { logical_plan: &LogicalPlan, ) -> Result> { let (state, planner) = { - let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock(); state.execution_props.start_execution(); // We need to clone `state` to release the lock that is not `Send`. We could @@ -815,7 +800,7 @@ impl ExecutionContext { where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { - let state = &mut self.state.lock().unwrap(); + let state = &mut self.state.lock(); let execution_props = &mut state.execution_props.clone(); let optimizers = &state.config.optimizers; @@ -840,15 +825,15 @@ impl From>> for ExecutionContext { impl FunctionRegistry for ExecutionContext { fn udfs(&self) -> HashSet { - self.state.lock().unwrap().udfs() + self.state.lock().udfs() } fn udf(&self, name: &str) -> Result> { - self.state.lock().unwrap().udf(name) + self.state.lock().udf(name) } fn udaf(&self, name: &str) -> Result> { - self.state.lock().unwrap().udaf(name) + self.state.lock().udaf(name) } } @@ -1512,7 +1497,7 @@ mod tests { let physical_plan = ctx.create_physical_plan(&logical_plan).await?; - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let results = collect_partitioned(physical_plan, runtime).await?; // note that the order of partitions is not deterministic @@ -1561,7 +1546,7 @@ mod tests { let tmp_dir = TempDir::new()?; let partition_count = 4; let ctx = create_ctx(&tmp_dir, partition_count).await?; - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let table = ctx.table("test")?; let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()) @@ -1669,7 +1654,7 @@ mod tests { assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("b", physical_plan.schema().field(0).name().as_str()); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let batches = collect(physical_plan, runtime).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -3279,7 +3264,7 @@ mod tests { let plan = ctx.optimize(&plan)?; let plan = ctx.create_physical_plan(&plan).await?; - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let result = collect(plan, runtime).await?; let expected = vec![ @@ -3585,7 +3570,7 @@ mod tests { ctx.register_catalog("my_catalog", catalog); let catalog_list_weak = { - let state = ctx.state.lock().unwrap(); + let state = ctx.state.lock(); Arc::downgrade(&state.catalog_list) }; diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index d3f62bbb46db..3fcaa28af973 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -17,8 +17,9 @@ //! Implementation of DataFrame API. +use parking_lot::Mutex; use std::any::Any; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use crate::arrow::datatypes::Schema; use crate::arrow::datatypes::SchemaRef; @@ -61,7 +62,7 @@ impl DataFrameImpl { /// Create a physical plan async fn create_physical_plan(&self) -> Result> { - let state = self.ctx_state.lock().unwrap().clone(); + let state = self.ctx_state.lock().clone(); let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); let plan = ctx.optimize(&self.plan)?; ctx.create_physical_plan(&plan).await @@ -221,7 +222,7 @@ impl DataFrame for DataFrameImpl { /// execute it, collecting all resulting batches into memory async fn collect(&self) -> Result> { let plan = self.create_physical_plan().await?; - let runtime = self.ctx_state.lock().unwrap().runtime_env.clone(); + let runtime = self.ctx_state.lock().runtime_env.clone(); Ok(collect(plan, runtime).await?) } @@ -241,7 +242,7 @@ impl DataFrame for DataFrameImpl { /// execute it, returning a stream over a single partition async fn execute_stream(&self) -> Result { let plan = self.create_physical_plan().await?; - let runtime = self.ctx_state.lock().unwrap().runtime_env.clone(); + let runtime = self.ctx_state.lock().runtime_env.clone(); execute_stream(plan, runtime).await } @@ -250,7 +251,7 @@ impl DataFrame for DataFrameImpl { /// partitioning async fn collect_partitioned(&self) -> Result>> { let plan = self.create_physical_plan().await?; - let runtime = self.ctx_state.lock().unwrap().runtime_env.clone(); + let runtime = self.ctx_state.lock().runtime_env.clone(); Ok(collect_partitioned(plan, runtime).await?) } @@ -258,7 +259,7 @@ impl DataFrame for DataFrameImpl { /// execute it, returning a stream for each partition async fn execute_stream_partitioned(&self) -> Result> { let plan = self.create_physical_plan().await?; - let runtime = self.ctx_state.lock().unwrap().runtime_env.clone(); + let runtime = self.ctx_state.lock().runtime_env.clone(); Ok(execute_stream_partitioned(plan, runtime).await?) } @@ -275,7 +276,7 @@ impl DataFrame for DataFrameImpl { } fn registry(&self) -> Arc { - let registry = self.ctx_state.lock().unwrap().clone(); + let registry = self.ctx_state.lock().clone(); Arc::new(registry) } diff --git a/datafusion/src/execution/disk_manager.rs b/datafusion/src/execution/disk_manager.rs index 4486f53a21b8..c4fe6b4160fa 100644 --- a/datafusion/src/execution/disk_manager.rs +++ b/datafusion/src/execution/disk_manager.rs @@ -20,9 +20,10 @@ use crate::error::{DataFusionError, Result}; use log::debug; +use parking_lot::Mutex; use rand::{thread_rng, Rng}; +use std::path::PathBuf; use std::sync::Arc; -use std::{path::PathBuf, sync::Mutex}; use tempfile::{Builder, NamedTempFile, TempDir}; /// Configuration for temporary disk access @@ -95,7 +96,7 @@ impl DiskManager { /// Return a temporary file from a randomized choice in the configured locations pub fn create_tmp_file(&self) -> Result { - let mut local_dirs = self.local_dirs.lock().unwrap(); + let mut local_dirs = self.local_dirs.lock(); // Create a temporary directory if needed if local_dirs.is_empty() { @@ -169,7 +170,6 @@ mod tests { fn local_dir_snapshot(dm: &DiskManager) -> Vec { dm.local_dirs .lock() - .unwrap() .iter() .map(|p| p.path().into()) .collect() diff --git a/datafusion/src/execution/memory_manager.rs b/datafusion/src/execution/memory_manager.rs index 5015f466c674..d39eaab3c215 100644 --- a/datafusion/src/execution/memory_manager.rs +++ b/datafusion/src/execution/memory_manager.rs @@ -21,10 +21,11 @@ use crate::error::{DataFusionError, Result}; use async_trait::async_trait; use hashbrown::HashSet; use log::debug; +use parking_lot::{Condvar, Mutex}; use std::fmt; use std::fmt::{Debug, Display, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::Arc; static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0); @@ -302,12 +303,12 @@ impl MemoryManager { } fn get_requester_total(&self) -> usize { - *self.requesters_total.lock().unwrap() + *self.requesters_total.lock() } /// Register a new memory requester pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) { - self.requesters.lock().unwrap().insert(requester_id.clone()); + self.requesters.lock().insert(requester_id.clone()); } fn max_mem_for_requesters(&self) -> usize { @@ -317,8 +318,8 @@ impl MemoryManager { /// Grow memory attempt from a consumer, return if we could grant that much to it async fn can_grow_directly(&self, required: usize, current: usize) -> bool { - let num_rqt = self.requesters.lock().unwrap().len(); - let mut rqt_current_used = self.requesters_total.lock().unwrap(); + let num_rqt = self.requesters.lock().len(); + let mut rqt_current_used = self.requesters_total.lock(); let mut rqt_max = self.max_mem_for_requesters(); let granted; @@ -339,7 +340,7 @@ impl MemoryManager { } else if current < min_per_rqt { // if we cannot acquire at lease 1/2n memory, just wait for others // to spill instead spill self frequently with limited total mem - rqt_current_used = self.cv.wait(rqt_current_used).unwrap(); + self.cv.wait(&mut rqt_current_used); } else { granted = false; break; @@ -351,8 +352,8 @@ impl MemoryManager { granted } - fn record_free_then_acquire(&self, freed: usize, acquired: usize) { - let mut requesters_total = self.requesters_total.lock().unwrap(); + fn record_free_then_acquire(&self, freed: usize, acquired: usize) -> usize { + let mut requesters_total = self.requesters_total.lock(); assert!(*requesters_total >= freed); *requesters_total -= freed; *requesters_total += acquired; @@ -363,9 +364,9 @@ impl MemoryManager { pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) { // find in requesters first { - let mut requesters = self.requesters.lock().unwrap(); + let mut requesters = self.requesters.lock(); if requesters.remove(id) { - let mut total = self.requesters_total.lock().unwrap(); + let mut total = self.requesters_total.lock(); assert!(*total >= mem_used); *total -= mem_used; } @@ -381,7 +382,7 @@ impl Display for MemoryManager { "MemoryManager usage statistics: total {}, trackers used {}, total {} requesters used: {}", human_readable_size(self.pool_size), human_readable_size(self.get_tracker_total()), - self.requesters.lock().unwrap().len(), + self.requesters.lock().len(), human_readable_size(self.get_requester_total()), ) } @@ -558,7 +559,7 @@ mod tests { requester1.do_with_mem(10).await.unwrap(); assert_eq!(requester1.get_spills(), 0); assert_eq!(requester1.mem_used(), 50); - assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 50); + assert_eq!(*runtime.memory_manager.requesters_total.lock(), 50); let requester2 = DummyRequester::new(0, runtime.clone()); runtime.register_requester(requester2.id()); @@ -572,7 +573,7 @@ mod tests { assert_eq!(requester1.get_spills(), 1); assert_eq!(requester1.mem_used(), 10); - assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 40); + assert_eq!(*runtime.memory_manager.requesters_total.lock(), 40); } #[tokio::test] diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 48301f0916fe..e4369c180c85 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -192,7 +192,7 @@ impl ExecutionPlan for CrossJoinExec { schema: self.schema.clone(), left_data, right: stream, - right_batch: Arc::new(std::sync::Mutex::new(None)), + right_batch: Arc::new(parking_lot::Mutex::new(None)), left_index: 0, num_input_batches: 0, num_input_rows: 0, @@ -299,7 +299,7 @@ struct CrossJoinStream { /// Current value on the left left_index: usize, /// Current batch being processed from the right side - right_batch: Arc>>, + right_batch: Arc>>, /// number of input batches num_input_batches: usize, /// number of input rows @@ -354,7 +354,7 @@ impl Stream for CrossJoinStream { if self.left_index > 0 && self.left_index < self.left_data.num_rows() { let start = Instant::now(); let right_batch = { - let right_batch = self.right_batch.lock().unwrap(); + let right_batch = self.right_batch.lock(); right_batch.clone().unwrap() }; let result = @@ -389,7 +389,7 @@ impl Stream for CrossJoinStream { } self.left_index = 1; - let mut right_batch = self.right_batch.lock().unwrap(); + let mut right_batch = self.right_batch.lock(); *right_batch = Some(batch); Some(result) diff --git a/datafusion/src/physical_plan/metrics/mod.rs b/datafusion/src/physical_plan/metrics/mod.rs index e609beb08c37..021f2df823ae 100644 --- a/datafusion/src/physical_plan/metrics/mod.rs +++ b/datafusion/src/physical_plan/metrics/mod.rs @@ -23,10 +23,11 @@ mod composite; mod tracker; mod value; +use parking_lot::Mutex; use std::{ borrow::Cow, fmt::{Debug, Display}, - sync::{Arc, Mutex}, + sync::Arc, }; use hashbrown::HashMap; @@ -339,12 +340,12 @@ impl ExecutionPlanMetricsSet { /// Add the specified metric to the underlying metric set pub fn register(&self, metric: Arc) { - self.inner.lock().expect("not poisoned").push(metric) + self.inner.lock().push(metric) } /// Return a clone of the inner MetricsSet pub fn clone_inner(&self) -> MetricsSet { - let guard = self.inner.lock().expect("not poisoned"); + let guard = self.inner.lock(); (*guard).clone() } } diff --git a/datafusion/src/physical_plan/metrics/value.rs b/datafusion/src/physical_plan/metrics/value.rs index 6ac282a496ee..43a0ad236500 100644 --- a/datafusion/src/physical_plan/metrics/value.rs +++ b/datafusion/src/physical_plan/metrics/value.rs @@ -22,11 +22,13 @@ use std::{ fmt::Display, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, + Arc, }, time::{Duration, Instant}, }; +use parking_lot::Mutex; + use chrono::{DateTime, Utc}; /// A counter to record things such as number of input or output rows @@ -229,7 +231,7 @@ impl Timestamp { /// Sets the timestamps value to a specified time pub fn set(&self, now: DateTime) { - *self.timestamp.lock().unwrap() = Some(now); + *self.timestamp.lock() = Some(now); } /// return the timestamps value at the last time `record()` was @@ -237,7 +239,7 @@ impl Timestamp { /// /// Returns `None` if `record()` has not been called pub fn value(&self) -> Option> { - *self.timestamp.lock().unwrap() + *self.timestamp.lock() } /// sets the value of this timestamp to the minimum of this and other @@ -249,7 +251,7 @@ impl Timestamp { (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }), }; - *self.timestamp.lock().unwrap() = min; + *self.timestamp.lock() = min; } /// sets the value of this timestamp to the maximum of this and other @@ -261,7 +263,7 @@ impl Timestamp { (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }), }; - *self.timestamp.lock().unwrap() = max; + *self.timestamp.lock() = max; } } diff --git a/datafusion/src/physical_plan/sorts/mod.rs b/datafusion/src/physical_plan/sorts/mod.rs index 64ec29179b19..818546f316fc 100644 --- a/datafusion/src/physical_plan/sorts/mod.rs +++ b/datafusion/src/physical_plan/sorts/mod.rs @@ -28,11 +28,12 @@ use futures::channel::mpsc; use futures::stream::FusedStream; use futures::Stream; use hashbrown::HashMap; +use parking_lot::RwLock; use std::borrow::BorrowMut; use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::pin::Pin; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::task::{Context, Poll}; pub mod sort; @@ -135,7 +136,7 @@ impl SortKeyCursor { .collect::>(); self.init_cmp_if_needed(other, &zipped)?; - let map = self.batch_comparators.read().unwrap(); + let map = self.batch_comparators.read(); let cmp = map.get(&other.batch_id).ok_or_else(|| { DataFusionError::Execution(format!( "Failed to find comparator for {} cmp {}", @@ -172,10 +173,10 @@ impl SortKeyCursor { other: &SortKeyCursor, zipped: &[((&ArrayRef, &ArrayRef), &SortOptions)], ) -> Result<()> { - let hm = self.batch_comparators.read().unwrap(); + let hm = self.batch_comparators.read(); if !hm.contains_key(&other.batch_id) { drop(hm); - let mut map = self.batch_comparators.write().unwrap(); + let mut map = self.batch_comparators.write(); let cmp = map .borrow_mut() .entry(other.batch_id) diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 7b9d5d5de328..ddc9ff1f9e47 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -21,11 +21,12 @@ use crate::physical_plan::common::AbortOnDropMany; use crate::physical_plan::metrics::{ ExecutionPlanMetricsSet, MemTrackingMetrics, MetricsSet, }; +use parking_lot::Mutex; use std::any::Any; use std::collections::{BinaryHeap, VecDeque}; use std::fmt::Debug; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use arrow::{ @@ -368,7 +369,7 @@ impl SortPreservingMergeStream { } let mut empty_batch = false; { - let mut streams = self.streams.streams.lock().unwrap(); + let mut streams = self.streams.streams.lock(); let stream = &mut streams[idx]; if stream.is_terminated() { diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index e069dd750c18..0e7f733232fa 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -242,7 +242,7 @@ async fn custom_source_dataframe() -> Result<()> { assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let batches = collect(physical_plan, runtime).await?; let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?; assert_eq!(1, batches.len()); @@ -289,7 +289,7 @@ async fn optimizers_catch_all_statistics() { ) .unwrap(); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let actual = collect(physical_plan, runtime).await.unwrap(); assert_eq!(actual.len(), 1); diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 9abf3fd55a64..9869a1f6b16a 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -537,7 +537,7 @@ impl ContextWithParquet { .await .expect("creating physical plan"); - let runtime = self.ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = self.ctx.state.lock().runtime_env.clone(); let results = datafusion::physical_plan::collect(physical_plan.clone(), runtime) .await .expect("Running"); diff --git a/datafusion/tests/sql/aggregates.rs b/datafusion/tests/sql/aggregates.rs index a025d4eeec86..fd1d15cc0ca7 100644 --- a/datafusion/tests/sql/aggregates.rs +++ b/datafusion/tests/sql/aggregates.rs @@ -26,7 +26,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let results = collect(plan, runtime).await.unwrap(); let batch = &results[0]; let column = batch.column(0); diff --git a/datafusion/tests/sql/avro.rs b/datafusion/tests/sql/avro.rs index d0cdf71b0868..82d91a0bd481 100644 --- a/datafusion/tests/sql/avro.rs +++ b/datafusion/tests/sql/avro.rs @@ -124,7 +124,7 @@ async fn avro_single_nan_schema() { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let results = collect(plan, runtime).await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows()); diff --git a/datafusion/tests/sql/errors.rs b/datafusion/tests/sql/errors.rs index 05ca0642bae0..92b634dd5e96 100644 --- a/datafusion/tests/sql/errors.rs +++ b/datafusion/tests/sql/errors.rs @@ -37,7 +37,7 @@ async fn test_cast_expressions_error() -> Result<()> { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let result = collect(plan, runtime).await; match result { diff --git a/datafusion/tests/sql/explain_analyze.rs b/datafusion/tests/sql/explain_analyze.rs index 2bd78ec728f5..2051bdd1b80b 100644 --- a/datafusion/tests/sql/explain_analyze.rs +++ b/datafusion/tests/sql/explain_analyze.rs @@ -41,7 +41,7 @@ async fn explain_analyze_baseline_metrics() { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let results = collect(physical_plan.clone(), runtime).await.unwrap(); let formatted = arrow::util::pretty::pretty_format_batches(&results) .unwrap() @@ -329,7 +329,7 @@ async fn csv_explain_plans() { // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let results = collect(plan, runtime).await.expect(&msg); let actual = result_vec(&results); // flatten to a single string @@ -527,7 +527,7 @@ async fn csv_explain_verbose_plans() { // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let results = collect(plan, runtime).await.expect(&msg); let actual = result_vec(&results); // flatten to a single string diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index 90fe5138ac44..ea6829969462 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -543,7 +543,7 @@ async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec Result<()> { let plan = ctx.create_physical_plan(&plan).await.expect(&msg); let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); - let runtime = ctx.state.lock().unwrap().runtime_env.clone(); + let runtime = ctx.state.lock().runtime_env.clone(); let res = collect(plan, runtime).await.expect(&msg); let actual = result_vec(&res);