Skip to content

Commit

Permalink
Merge branch 'add_show_tables' into 'master'
Browse files Browse the repository at this point in the history
添加show tables, show columns 功能

Closes apache#31 and apache#32

See merge request noah/argo_engine!68
  • Loading branch information
高俊 committed Jan 18, 2022
2 parents c989e9e + 4909540 commit cf8fe54
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 74 deletions.
3 changes: 1 addition & 2 deletions ci/branch_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ echo -e "\033[42;37m 开始 build \033[0m"
rustup component add llvm-tools-preview
cargo install grcov
export RUST_BACKTRACE=full
cargo build --release --features "simd mimalloc"
cargo test --release --features "simd mimalloc"
cargo test --release --features "simd standalone"

# 生成测试报告
echo -e "\033[42;37m 生成测试报告 build \033[0m"
Expand Down
55 changes: 55 additions & 0 deletions ci/tests/argo_engine_server_test.rs
Original file line number Diff line number Diff line change
@@ -1 +1,56 @@
use datafusion::assert_batches_eq;
use common::config::ArgoEngineConfig;
use rust_client::argo_engine_context::ArgoEngineContext;

#[cfg(feature = "standalone")]
#[tokio::test]
async fn test_argo_engine_server() {
start_server().await;
test_server().await;
}

#[cfg(feature = "standalone")]
async fn start_server() {
ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 1)
.await
.unwrap();
}

#[cfg(feature = "standalone")]
async fn test_server() {
let context_remote = ArgoEngineContext::remote("localhost", 50050, &ArgoEngineConfig::new());

let dir = env!("CARGO_MANIFEST_DIR");
let sql = format!("CREATE EXTERNAL TABLE inte_test_table (a INT, b INT) STORED AS CSV LOCATION '{}/data/test_inte/test.csv'", dir);
println!("sql={}", sql);
let results = context_remote.sql(sql.as_str()).await;
assert!(results.is_ok());

let sql = "select * from inte_test_table";
println!("sql={}", sql);
let df = context_remote.sql(sql).await.unwrap();
let results = df.collect().await.unwrap();
let vec1 = vec![
"+---+---+",
"| a | b |",
"+---+---+",
"| 3 | 4 |",
"+---+---+",
];
assert_batches_eq!(vec1, &results);

let sql = "show tables";
println!("sql={}", sql);
let df = context_remote.sql(sql).await.unwrap();
let results = df.collect().await.unwrap();
let vec1 = vec![
"+---------------+--------------------+-----------------+------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+-----------------+------------+",
"| datafusion | public | inte_test_table | BASE TABLE |",
"| datafusion | information_schema | tables | VIEW |",
"| datafusion | information_schema | columns | VIEW |",
"+---------------+--------------------+-----------------+------------+",
];
assert_batches_eq!(vec1, &results);
}
75 changes: 4 additions & 71 deletions ci/tests/stream_load_flight_service_test.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
use arrow::util::pretty;
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use arrow_flight::{Action, Criteria, flight_descriptor, FlightDescriptor, Ticket};
use tokio;

use datafusion::arrow::datatypes::Schema;

use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{flight_descriptor, Action, Criteria, FlightDescriptor, Ticket};
use tokio::sync::Mutex;

use common::config::ArgoEngineConfig;
use common::error::ArgoEngineError;
use tonic::transport::Server;

use datafusion::assert_batches_eq;
use datafusion::prelude::ExecutionConfig;
use rust_client::argo_engine_context::ArgoEngineContext;
use rust_client::steam_load_flight_client::StreamLoadFlightClient;

Expand All @@ -39,7 +23,6 @@ async fn test_get() {
test_get_schema().await;
test_get_flight_info().await;
test_list_flight().await;
test_server().await;
}

#[cfg(feature = "standalone")]
Expand Down Expand Up @@ -127,57 +110,7 @@ async fn test_list_flight() {

#[cfg(feature = "standalone")]
async fn start_server() {
ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 4).await.unwrap();
// let mut context: Option<ArgoEngineContext> = None;
// let mut n = 0;
// let mut success = false;
// while !success && n < 10 {
// match ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 4).await {
// Err(_) => {
// success = false;
// n += 1;
//
// tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
// log::info!("Attempting to start server...",);
// }
// Ok(s) => {
// success = true;
// n += 1;
//
// context = Some(s);
// }
// }
// }

// match context {
// Some(m) => Ok(m),
// None => Err(ArgoEngineError::General("start server error".to_string())),
// }
ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 4)
.await
.unwrap();
}


#[cfg(feature = "standalone")]
async fn test_server() {
// let context = ArgoEngineContext::standalone(&ArgoEngineConfig::new(), 1)
// .await
// .unwrap();

let context_remote =
ArgoEngineContext::remote("localhost", 50050, &ArgoEngineConfig::new());

let dir = env!("CARGO_MANIFEST_DIR");
let sql = format!("CREATE EXTERNAL TABLE inte_test_table (a INT, b INT) STORED AS CSV LOCATION '{}/data/test_inte/test.csv'", dir);
println!("sql={}", sql);
let df = context_remote.sql(sql.as_str()).await.unwrap();
// let result = df.show().await;
let results = df.collect().await.unwrap();
pretty::print_batches(&results).unwrap();

let sql = "select * from inte_test_table";
println!("sql={}", sql);
let df = context_remote.sql(sql).await.unwrap();
// let result = df.show().await;
let results = df.collect().await.unwrap();
pretty::print_batches(&results).unwrap();
assert!(results.len() > 0);
}
1 change: 1 addition & 0 deletions client/rust-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ futures = "0.3.16"
tokio = "1.10.0"
log = "0.4"
tonic = "0.6"
sqlparser = "0.13"

server = {path = "../../server" , optional = true }
common = { path= "../../common" }
Expand Down
40 changes: 39 additions & 1 deletion client/rust-client/src/argo_engine_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::sql::parser::FileType;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};
use sqlparser::ast::Statement;

pub struct ArgoEngineContextState {
/// Ballista configuration
Expand Down Expand Up @@ -122,6 +124,13 @@ impl ArgoEngineContext {
)
};

let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
ctx =
ExecutionContext::with_config(ExecutionConfig::new().with_information_schema(true));
}

// get the catalogs from ArgoEngineServer
let catalog_data = self.get_catalogs().await?;
// register catalog with DataFusion context
Expand Down Expand Up @@ -185,6 +194,35 @@ impl ArgoEngineContext {
}
}

/// is a 'show *' sql
pub async fn is_show_statement(&self, sql: &str) -> Result<bool> {
let mut is_show_variable: bool = false;
let statements = DFParser::parse_sql(sql)?;

if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

if let DFStatement::Statement(s) = &statements[0] {
let st: &Statement = s;
match st {
Statement::ShowVariable { .. } => {
is_show_variable = true;
}
Statement::ShowColumns { .. } => {
is_show_variable = true;
}
_ => {
is_show_variable = false;
}
}
};

Ok(is_show_variable)
}

/// get catalogs from scheduler
pub async fn get_catalogs(&self) -> Result<Arc<dyn CatalogList>> {
let scheduler_url = format!(
Expand Down

0 comments on commit cf8fe54

Please sign in to comment.