Skip to content

Commit

Permalink
Merge branch 'main' into distributed_copy_into_table_for_query
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 4, 2023
2 parents 3bcc456 + 51c36f8 commit fa0d942
Show file tree
Hide file tree
Showing 76 changed files with 1,711 additions and 379 deletions.
8 changes: 4 additions & 4 deletions docs/doc/03-develop/04-rust.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ async fn main() {
let sql_db_create = "CREATE DATABASE IF NOT EXISTS book_db;";
conn.exec(sql_db_create).await.unwrap();

let sql_table_create = "CREATE TABLE books (
let sql_table_create = "CREATE TABLE book_db.books (
title VARCHAR,
author VARCHAR,
date VARCHAR
);";

conn.exec(sql_table_create).await.unwrap();
let sql_insert = "INSERT INTO books VALUES ('mybook', 'author', '2022');";
let sql_insert = "INSERT INTO book_db.books VALUES ('mybook', 'author', '2022');";
conn.exec(sql_insert).await.unwrap();

let mut rows = conn.query_iter("SELECT * FROM books;").await.unwrap();
let mut rows = conn.query_iter("SELECT * FROM book_db.books;").await.unwrap();
while let Some(row) = rows.next().await {
let (title, author, date): (String, String, String) = row.unwrap().try_into().unwrap();
println!("{} {} {}", title, author, date);
}

let sql_table_drop = "DROP TABLE books;";
let sql_table_drop = "DROP TABLE book_db.books;";
conn.exec(sql_table_drop).await.unwrap();

let sql_db_drop = "DROP DATABASE book_db;";
Expand Down
8 changes: 8 additions & 0 deletions src/meta/api/src/id_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use common_meta_kvapi::kvapi;

use crate::background_api_keys::ID_GEN_BACKGROUND_JOB;
use crate::data_mask_api_keys::ID_GEN_DATA_MASK;
use crate::schema_api_keys::ID_GEN_CATALOG;
use crate::schema_api_keys::ID_GEN_DATABASE;
use crate::schema_api_keys::ID_GEN_INDEX;
use crate::schema_api_keys::ID_GEN_TABLE;
Expand Down Expand Up @@ -86,6 +87,13 @@ impl IdGenerator {
resource: ID_GEN_BACKGROUND_JOB.to_string(),
}
}

/// Create a key for generating catalog id with kvapi::KVApi
pub fn catalog_id() -> Self {
Self {
resource: ID_GEN_CATALOG.to_string(),
}
}
}

impl kvapi::Key for IdGenerator {
Expand Down
5 changes: 5 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::sync::Arc;

use common_meta_app::schema::CountTablesReply;
use common_meta_app::schema::CountTablesReq;
use common_meta_app::schema::CreateCatalogReply;
use common_meta_app::schema::CreateCatalogReq;
use common_meta_app::schema::CreateDatabaseReply;
use common_meta_app::schema::CreateDatabaseReq;
use common_meta_app::schema::CreateIndexReply;
Expand Down Expand Up @@ -216,5 +218,8 @@ pub trait SchemaApi: Send + Sync {

async fn delete_table_lock_rev(&self, req: DeleteTableLockRevReq) -> Result<(), KVAppError>;

async fn create_catalog(&self, req: CreateCatalogReq)
-> Result<CreateCatalogReply, KVAppError>;

fn name(&self) -> String;
}
82 changes: 82 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use common_meta_app::app_error::AppError;
use common_meta_app::app_error::CatalogAlreadyExists;
use common_meta_app::app_error::CreateDatabaseWithDropTime;
use common_meta_app::app_error::CreateIndexWithDropTime;
use common_meta_app::app_error::CreateTableWithDropTime;
Expand All @@ -45,9 +46,13 @@ use common_meta_app::app_error::UnknownTableId;
use common_meta_app::app_error::VirtualColumnAlreadyExists;
use common_meta_app::app_error::WrongShare;
use common_meta_app::app_error::WrongShareObject;
use common_meta_app::schema::CatalogId;
use common_meta_app::schema::CatalogIdToName;
use common_meta_app::schema::CountTablesKey;
use common_meta_app::schema::CountTablesReply;
use common_meta_app::schema::CountTablesReq;
use common_meta_app::schema::CreateCatalogReply;
use common_meta_app::schema::CreateCatalogReq;
use common_meta_app::schema::CreateDatabaseReply;
use common_meta_app::schema::CreateDatabaseReq;
use common_meta_app::schema::CreateIndexReply;
Expand Down Expand Up @@ -2990,6 +2995,83 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
Ok(())
}

async fn create_catalog(
&self,
req: CreateCatalogReq,
) -> Result<CreateCatalogReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let name_key = &req.name_ident;

let ctx = &func_name!();

let mut trials = txn_trials(None, ctx);

let catalog_id = loop {
trials.next().unwrap()?;

// Get catalog by name to ensure absence
let (catalog_id_seq, catalog_id) = get_u64_value(self, name_key).await?;
debug!(catalog_id_seq, catalog_id, ?name_key, "get_catalog");

if catalog_id_seq > 0 {
return if req.if_not_exists {
Ok(CreateCatalogReply { catalog_id })
} else {
Err(KVAppError::AppError(AppError::CatalogAlreadyExists(
CatalogAlreadyExists::new(
&name_key.catalog_name,
format!("create catalog: tenant: {}", name_key.tenant),
),
)))
};
}

// Create catalog by inserting these record:
// (tenant, catalog_name) -> catalog_id
// (catalog_id) -> catalog_meta
// (catalog_id) -> (tenant, catalog_name)
let catalog_id = fetch_id(self, IdGenerator::catalog_id()).await?;
let id_key = CatalogId { catalog_id };
let id_to_name_key = CatalogIdToName { catalog_id };

debug!(catalog_id, name_key = debug(&name_key), "new catalog id");

{
let condition = vec![
txn_cond_seq(name_key, Eq, 0),
txn_cond_seq(&id_to_name_key, Eq, 0),
];
let if_then = vec![
txn_op_put(name_key, serialize_u64(catalog_id)?), /* (tenant, catalog_name) -> catalog_id */
txn_op_put(&id_key, serialize_struct(&req.meta)?), /* (catalog_id) -> catalog_meta */
txn_op_put(&id_to_name_key, serialize_struct(name_key)?), /* __fd_catalog_id_to_name/<catalog_id> -> (tenant,catalog_name) */
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _) = send_txn(self, txn_req).await?;

debug!(
name = debug(&name_key),
id = debug(&id_key),
succ = display(succ),
"create_catalog"
);

if succ {
break catalog_id;
}
}
};

Ok(CreateCatalogReply { catalog_id })
}

fn name(&self) -> String {
"SchemaApiImpl".to_string()
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/api/src/schema_api_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ pub(crate) const ID_GEN_TABLE: &str = "table_id";
pub(crate) const ID_GEN_DATABASE: &str = "database_id";
pub(crate) const ID_GEN_TABLE_LOCK: &str = "table_lock_id";
pub(crate) const ID_GEN_INDEX: &str = "index_id";

pub(crate) const ID_GEN_CATALOG: &str = "catalog_id";
26 changes: 26 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ impl DatabaseAlreadyExists {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)]
#[error("CatalogAlreadyExists: `{catalog_name}` while `{context}`")]
pub struct CatalogAlreadyExists {
catalog_name: String,
context: String,
}

impl CatalogAlreadyExists {
pub fn new(catalog_name: impl Into<String>, context: impl Into<String>) -> Self {
Self {
catalog_name: catalog_name.into(),
context: context.into(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, thiserror::Error)]
#[error("DatamaskAlreadyExists: `{name}` while `{context}`")]
pub struct DatamaskAlreadyExists {
Expand Down Expand Up @@ -760,6 +776,9 @@ pub enum AppError {
#[error(transparent)]
DatabaseAlreadyExists(#[from] DatabaseAlreadyExists),

#[error(transparent)]
CatalogAlreadyExists(#[from] CatalogAlreadyExists),

#[error(transparent)]
CreateDatabaseWithDropTime(#[from] CreateDatabaseWithDropTime),

Expand Down Expand Up @@ -891,6 +910,12 @@ impl AppErrorMessage for DatabaseAlreadyExists {
}
}

impl AppErrorMessage for CatalogAlreadyExists {
fn message(&self) -> String {
format!("Catalog '{}' already exists", self.catalog_name)
}
}

impl AppErrorMessage for CreateDatabaseWithDropTime {
fn message(&self) -> String {
format!("Create database '{}' with drop time", self.db_name)
Expand Down Expand Up @@ -1153,6 +1178,7 @@ impl From<AppError> for ErrorCode {
AppError::UnknownTableId(err) => ErrorCode::UnknownTableId(err.message()),
AppError::UnknownTable(err) => ErrorCode::UnknownTable(err.message()),
AppError::DatabaseAlreadyExists(err) => ErrorCode::DatabaseAlreadyExists(err.message()),
AppError::CatalogAlreadyExists(err) => ErrorCode::CatalogAlreadyExists(err.message()),
AppError::CreateDatabaseWithDropTime(err) => {
ErrorCode::CreateDatabaseWithDropTime(err.message())
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/app/src/principal/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum StageType {
/// LegacyInternal will be deprecated.
///
/// Please never use this variant except in `proto_conv`. We keep this
/// stage type for backword compatible.
/// stage type for backward compatible.
///
/// TODO(xuanwo): remove this when we are releasing v0.9.
LegacyInternal,
Expand Down Expand Up @@ -135,7 +135,7 @@ impl FromStr for StageFileCompression {
"xz" => Ok(StageFileCompression::Xz),
"none" => Ok(StageFileCompression::None),
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | raw_deflate | lzo | snappy | xz | none }"
.to_string()),
.to_string()),
}
}
}
Expand Down
Loading

0 comments on commit fa0d942

Please sign in to comment.