Skip to content

Commit

Permalink
refactor(frontend): minor changes around FrontendInstance construct…
Browse files Browse the repository at this point in the history
…or (GreptimeTeam#748)

* refactor: minor changes in some testing codes

Co-authored-by: luofucong <[email protected]>
  • Loading branch information
2 people authored and paomian committed Oct 19, 2023
1 parent 96f4ff5 commit 5248251
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 144 deletions.
1 change: 0 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ async fn build_frontend(
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone());
frontend_instance.set_script_handler(datanode_instance);
Ok(Frontend::new(fe_opts, frontend_instance, plugins))
}
Expand Down
53 changes: 1 addition & 52 deletions src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use query::QueryEngineFactory;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
use table::table::{TableIdProvider, TableIdProviderRef};
use table::table::TableIdProvider;

use crate::datanode::DatanodeOptions;
use crate::error::Result;
Expand All @@ -34,57 +34,6 @@ use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;

impl Instance {
// This method is used in other crate's testing codes, so move it out of "cfg(test)".
// TODO(LFC): Delete it when callers no longer need it.
pub async fn new_mock() -> Result<Self> {
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
let mock_info = meta_srv::mocks::mock_with_memstore().await;
let meta_client = Arc::new(mock_meta_client(mock_info, 0).await);
let (dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;

let logstore = Arc::new(create_local_file_log_store(dir.path().to_str().unwrap()).await?);
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
MockEngine::default(),
object_store,
));

let catalog_manager = Arc::new(
catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);

let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();

let sql_handler = SqlHandler::new(
mock_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
);
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
.unwrap();

let heartbeat_task = Some(HeartbeatTask::new(
0,
"127.0.0.1:3302".to_string(),
meta_client,
));

let table_id_provider = Some(catalog_manager.clone() as TableIdProviderRef);
Ok(Self {
query_engine,
sql_handler,
catalog_manager,
script_executor,
heartbeat_task,
table_id_provider,
logstore,
})
}

pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result<Self> {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
Self::with_mock_meta_server(opts, mock_info).await
Expand Down
18 changes: 5 additions & 13 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ async fn assert_query_result(instance: &Instance, sql: &str, ts: i64, host: &str
}
}

async fn setup_test_instance() -> Instance {
async fn setup_test_instance(test_name: &str) -> Instance {
common_telemetry::init_default_ut_logging();

let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_insert");
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(test_name);
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();

Expand All @@ -193,7 +193,7 @@ async fn setup_test_instance() -> Instance {

#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = setup_test_instance().await;
let instance = setup_test_instance("test_execute_insert").await;
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
Expand Down Expand Up @@ -409,18 +409,10 @@ async fn check_output_stream(output: Output, expected: Vec<&str>) {
assert_eq!(pretty_print, expected);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table() {
let instance = Instance::new_mock().await.unwrap();
instance.start().await.unwrap();
let instance = setup_test_instance("test_alter_table").await;

test_util::create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millis_datatype(),
)
.await
.unwrap();
// make sure table insertion is ok before altering table
execute_sql(
&instance,
Expand Down
83 changes: 29 additions & 54 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ use sql::parser::ParserContext;
use sql::statements::create::Partitions;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use table::TableRef;

use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, CatalogNotFoundSnafu, CatalogSnafu, CreateDatabaseSnafu,
CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
SchemaNotFoundSnafu,
self, AlterTableOnInsertionSnafu, CatalogSnafu, CreateDatabaseSnafu, CreateTableSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
Expand Down Expand Up @@ -90,8 +90,7 @@ pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;

#[derive(Clone)]
pub struct Instance {
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<CatalogManagerRef>,
catalog_manager: CatalogManagerRef,
/// Script handler is None in distributed mode, only works on standalone mode.
script_handler: Option<ScriptHandlerRef>,
create_expr_factory: CreateExprFactoryRef,
Expand Down Expand Up @@ -128,7 +127,7 @@ impl Instance {
let dist_instance_ref = Arc::new(dist_instance.clone());

Ok(Instance {
catalog_manager: Some(catalog_manager),
catalog_manager,
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Distributed,
Expand Down Expand Up @@ -171,7 +170,7 @@ impl Instance {

pub fn new_standalone(dn_instance: DnInstanceRef) -> Self {
Instance {
catalog_manager: None,
catalog_manager: dn_instance.catalog_manager().clone(),
script_handler: None,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
mode: Mode::Standalone,
Expand All @@ -182,18 +181,10 @@ impl Instance {
}
}

pub fn catalog_manager(&self) -> &Option<CatalogManagerRef> {
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}

pub fn set_catalog_manager(&mut self, catalog_manager: CatalogManagerRef) {
debug_assert!(
self.catalog_manager.is_none(),
"Catalog manager can be set only once!"
);
self.catalog_manager = Some(catalog_manager);
}

pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) {
debug_assert!(
self.script_handler.is_none(),
Expand Down Expand Up @@ -293,21 +284,7 @@ impl Instance {
table_name: &str,
columns: &[Column],
) -> Result<()> {
match self
.catalog_manager
.as_ref()
.expect("catalog manager cannot be None")
.catalog(catalog_name)
.context(CatalogSnafu)?
.context(CatalogNotFoundSnafu { catalog_name })?
.schema(schema_name)
.context(CatalogSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: schema_name,
})?
.table(table_name)
.context(CatalogSnafu)?
{
match self.find_table(catalog_name, schema_name, table_name)? {
None => {
info!(
"Table {}.{}.{} does not exist, try create table",
Expand Down Expand Up @@ -403,8 +380,6 @@ impl Instance {

fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
self.catalog_manager
.as_ref()
.context(error::CatalogManagerSnafu)?
.catalog(catalog_name)
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu { catalog_name })
Expand All @@ -419,6 +394,12 @@ impl Instance {
})
}

fn find_table(&self, catalog: &str, schema: &str, table: &str) -> Result<Option<TableRef>> {
self.catalog_manager
.table(catalog, schema, table)
.context(CatalogSnafu)
}

async fn sql_dist_insert(&self, insert: Box<Insert>) -> Result<usize> {
let (catalog, schema, table) = insert.full_table_name().context(error::ParseSqlSnafu)?;

Expand Down Expand Up @@ -458,23 +439,17 @@ impl Instance {
}

fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
let catalog_manager = &self.catalog_manager;
if let Some(catalog_manager) = catalog_manager {
ensure!(
catalog_manager
.schema(DEFAULT_CATALOG_NAME, &db)
.context(error::CatalogSnafu)?
.is_some(),
error::SchemaNotFoundSnafu { schema_info: &db }
);

query_ctx.set_current_schema(&db);

Ok(Output::RecordBatches(RecordBatches::empty()))
} else {
// TODO(LFC): Handle "use" stmt here.
unimplemented!()
}
ensure!(
self.catalog_manager
.schema(DEFAULT_CATALOG_NAME, &db)
.context(error::CatalogSnafu)?
.is_some(),
error::SchemaNotFoundSnafu { schema_info: &db }
);

query_ctx.set_current_schema(&db);

Ok(Output::RecordBatches(RecordBatches::empty()))
}
}

Expand Down Expand Up @@ -679,11 +654,11 @@ mod tests {
use super::*;
use crate::tests;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_sql() {
let query_ctx = Arc::new(QueryContext::new());

let instance = tests::create_frontend_instance().await;
let (instance, _guard) = tests::create_frontend_instance("test_execute_sql").await;

let sql = r#"CREATE TABLE demo(
host STRING,
Expand Down Expand Up @@ -761,9 +736,9 @@ mod tests {
};
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_grpc() {
let instance = tests::create_frontend_instance().await;
let (instance, _guard) = tests::create_frontend_instance("test_execute_grpc").await;

// testing data:
let expected_host_col = Column {
Expand Down
14 changes: 9 additions & 5 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ mod tests {
use super::*;
use crate::tests;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_exec() {
let instance = tests::create_frontend_instance().await;
let (instance, _guard) = tests::create_frontend_instance("test_exec").await;
instance
.exec(
&DataPoint::try_create(
Expand All @@ -88,9 +88,10 @@ mod tests {
.unwrap();
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_opentsdb_metric() {
let instance = tests::create_frontend_instance().await;
let (instance, _guard) =
tests::create_frontend_instance("test_insert_opentsdb_metric").await;

let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
Expand Down Expand Up @@ -124,7 +125,10 @@ mod tests {
assert!(result.is_ok());

let output = instance
.do_query("select * from my_metric_1", Arc::new(QueryContext::new()))
.do_query(
"select * from my_metric_1 order by greptime_timestamp",
Arc::new(QueryContext::new()),
)
.await
.unwrap();
match output {
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/instance/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@ mod tests {
use super::*;
use crate::tests;

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_prometheus_remote_write_and_read() {
common_telemetry::init_default_ut_logging();
let instance = tests::create_frontend_instance().await;
let (instance, _guard) =
tests::create_frontend_instance("test_prometheus_remote_write_and_read").await;

let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
Expand Down
Loading

0 comments on commit 5248251

Please sign in to comment.