Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Feb 1, 2022
1 parent 5764e4f commit 57024c7
Show file tree
Hide file tree
Showing 33 changed files with 145 additions and 142 deletions.
1 change: 1 addition & 0 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ log = "0.4"
tokio = "1.0"
tempfile = "3"
sqlparser = "0.13"
parking_lot = "0.11"

datafusion = { path = "../../../datafusion", version = "6.0.0" }

Expand Down
19 changes: 10 additions & 9 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

//! Distributed execution context.
use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -162,7 +163,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -186,7 +187,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -203,7 +204,7 @@ impl BallistaContext {
name: &str,
table: Arc<dyn TableProvider>,
) -> Result<()> {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
state.tables.insert(name.to_owned(), table);
Ok(())
}
Expand Down Expand Up @@ -280,7 +281,7 @@ impl BallistaContext {
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&state.scheduler_host,
state.scheduler_port,
Expand All @@ -291,7 +292,7 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock().unwrap();
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
Expand All @@ -301,7 +302,7 @@ impl BallistaContext {

// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Expand Down Expand Up @@ -483,7 +484,7 @@ mod tests {
.unwrap();

{
let mut guard = context.state.lock().unwrap();
let mut guard = context.state.lock();
let csv_table = guard.tables.get("single_nan");

if let Some(table_provide) = csv_table {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ parse_arg = "0.1.3"
arrow-flight = { version = "8.0.0" }
datafusion = { path = "../../../datafusion", version = "6.0.0" }

parking_lot = "0.11"

[dev-dependencies]
tempfile = "3"

Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

//! Client API for sending requests to executors.
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -154,7 +155,7 @@ impl Stream for FlightDataStream {
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut stream = self.stream.lock().expect("mutex is bad");
let mut stream = self.stream.lock();
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.
use parking_lot::Mutex;
use std::fs::File;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, pin::Pin};

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.11"

[dev-dependencies]

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"
parking_lot = "0.11"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
1 change: 1 addition & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.15", optional = true }
tempfile = "3"
parking_lot = "0.11"

[dev-dependencies]
criterion = "0.3"
Expand Down
5 changes: 3 additions & 2 deletions datafusion/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use crate::criterion::Criterion;
use data_utils::create_table_provider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
extern crate criterion;
use criterion::Criterion;

use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;

use tokio::runtime::Runtime;

Expand All @@ -40,7 +41,7 @@ fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
}

Expand Down
13 changes: 7 additions & 6 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::object_store::local::LocalFileSystem;

use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;

extern crate arrow;
extern crate datafusion;
Expand All @@ -38,7 +39,7 @@ fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
}

Expand Down Expand Up @@ -81,18 +82,18 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
rt.block_on(async {
// create local execution context
let mut ctx = ExecutionContext::new();
ctx.state.lock().unwrap().config.target_partitions = 1;
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
ctx.state.lock().config.target_partitions = 1;
let runtime = ctx.state.lock().runtime_env.clone();

let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
.unwrap();
ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx)))
ctx_holder.lock().push(Arc::new(Mutex::new(ctx)))
});

let ctx = ctx_holder.lock().unwrap().get(0).unwrap().clone();
let ctx = ctx_holder.lock().get(0).unwrap().clone();
ctx
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/benches/window_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use crate::criterion::Criterion;
use data_utils::create_table_provider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().unwrap().sql(sql)).unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

Expand Down
15 changes: 8 additions & 7 deletions datafusion/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
//! representing collections of named schemas.
use crate::catalog::schema::SchemaProvider;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::sync::Arc;

/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {
Expand Down Expand Up @@ -75,17 +76,17 @@ impl CatalogList for MemoryCatalogList {
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let mut catalogs = self.catalogs.write().unwrap();
let mut catalogs = self.catalogs.write();
catalogs.insert(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
let catalogs = self.catalogs.read().unwrap();
let catalogs = self.catalogs.read();
catalogs.keys().map(|s| s.to_string()).collect()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let catalogs = self.catalogs.read().unwrap();
let catalogs = self.catalogs.read();
catalogs.get(name).cloned()
}
}
Expand Down Expand Up @@ -129,7 +130,7 @@ impl MemoryCatalogProvider {
name: impl Into<String>,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
let mut schemas = self.schemas.write().unwrap();
let mut schemas = self.schemas.write();
schemas.insert(name.into(), schema)
}
}
Expand All @@ -140,12 +141,12 @@ impl CatalogProvider for MemoryCatalogProvider {
}

fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.read().unwrap();
let schemas = self.schemas.read();
schemas.keys().cloned().collect()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.read().unwrap();
let schemas = self.schemas.read();
schemas.get(name).cloned()
}
}
13 changes: 7 additions & 6 deletions datafusion/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//! Describes the interface and built-in implementations of schemas,
//! representing collections of named tables.
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::sync::Arc;

use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -91,12 +92,12 @@ impl SchemaProvider for MemorySchemaProvider {
}

fn table_names(&self) -> Vec<String> {
let tables = self.tables.read().unwrap();
let tables = self.tables.read();
tables.keys().cloned().collect()
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let tables = self.tables.read().unwrap();
let tables = self.tables.read();
tables.get(name).cloned()
}

Expand All @@ -111,17 +112,17 @@ impl SchemaProvider for MemorySchemaProvider {
name
)));
}
let mut tables = self.tables.write().unwrap();
let mut tables = self.tables.write();
Ok(tables.insert(name, table))
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let mut tables = self.tables.write().unwrap();
let mut tables = self.tables.write();
Ok(tables.remove(name))
}

fn table_exist(&self, name: &str) -> bool {
let tables = self.tables.read().unwrap();
let tables = self.tables.read();
tables.contains_key(name)
}
}
Expand Down
Loading

0 comments on commit 57024c7

Please sign in to comment.