Skip to content

Commit

Permalink
chore: apply suggestion from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 17, 2023
1 parent 1cf139d commit a73b55c
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec<Reg
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;

match table.region_stats().await {
match table.region_stats() {
Ok(stats) => {
let stats = stats.into_iter().map(|stat| RegionStat {
region_id: stat.region_id,
Expand Down
45 changes: 10 additions & 35 deletions src/datanode/src/heartbeat/handler/close_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use catalog::{CatalogManagerRef, DeregisterTableRequest, RegisterTableRequest};
use catalog::{CatalogManagerRef, DeregisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_telemetry::error;
Expand All @@ -23,7 +23,6 @@ use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::{EngineContext, TableReference};
use table::requests::CloseTableRequest;
use table::TableRef;

use crate::error::{self, Result};
use crate::heartbeat::handler::HeartbeatResponseHandler;
Expand Down Expand Up @@ -53,18 +52,17 @@ impl HeartbeatResponseHandler for CloseRegionHandler {

let RegionIdent {
engine,
table_id,
catalog,
schema,
table,
region_number,
..
} = region_ident;

common_runtime::spawn_bg(async move {
let result = self_ref
.close_region_inner(
engine,
table_id,
&TableReference::full(&catalog, &schema, &table),
vec![region_number],
)
Expand Down Expand Up @@ -128,7 +126,6 @@ impl CloseRegionHandler {
let region_exist =
table
.contains_region(*r)
.await
.with_context(|_| error::CheckRegionSnafu {
table_name: format_full_table_name(
catalog_name,
Expand All @@ -149,7 +146,6 @@ impl CloseRegionHandler {
async fn close_region_inner(
&self,
engine: String,
table_id: u32,
table_ref: &TableReference<'_>,
region_numbers: Vec<RegionNumber>,
) -> Result<bool> {
Expand All @@ -173,12 +169,12 @@ impl CloseRegionHandler {
return Ok(true);
}

if let Some(table) =
engine
.get_table(&ctx, table_ref)
.with_context(|_| error::GetTableSnafu {
table_name: table_ref.to_string(),
})?
if engine
.get_table(&ctx, table_ref)
.with_context(|_| error::GetTableSnafu {
table_name: table_ref.to_string(),
})?
.is_some()
{
return if engine
.close_table(
Expand All @@ -198,8 +194,8 @@ impl CloseRegionHandler {
// Deregister table if The table released.
self.deregister_table(table_ref).await
} else {
// Registers table (update)
self.register_table(table_ref, table_id, table).await
// Requires caller to update the region_numbers
Ok(true)
};
}

Expand All @@ -219,25 +215,4 @@ impl CloseRegionHandler {
table_name: table_ref.to_string(),
})
}

async fn register_table(
&self,

table_ref: &TableReference<'_>,
table_id: u32,
table: TableRef,
) -> Result<bool> {
self.catalog_manager
.register_table(RegisterTableRequest {
catalog: table_ref.catalog.to_string(),
schema: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
table,
table_id,
})
.await
.with_context(|_| error::DeregisterTableSnafu {
table_name: table_ref.to_string(),
})
}
}
1 change: 0 additions & 1 deletion src/datanode/src/heartbeat/handler/open_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ impl OpenRegionHandler {
let region_exist =
table
.contains_region(*r)
.await
.with_context(|_| error::CheckRegionSnafu {
table_name: format_full_table_name(
catalog_name,
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl Instance {
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();

let handlder_executor = HandlerGroupExecutor::new(vec![
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
Expand All @@ -222,7 +222,7 @@ impl Instance {
opts.rpc_hostname.clone(),
meta_client.as_ref().unwrap().clone(),
catalog_manager.clone(),
Arc::new(handlder_executor),
Arc::new(handlers_executor),
)),
};

Expand Down
19 changes: 8 additions & 11 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,9 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> {
Ok(())
}

async fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableResult<bool> {
fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableResult<bool> {
for r in regions {
let region_exist = table.contains_region(*r).await?;
let region_exist = table.contains_region(*r)?;
if !region_exist {
return Ok(false);
}
Expand All @@ -402,12 +402,12 @@ async fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableRes

impl<S: StorageEngine> MitoEngineInner<S> {
/// Returns Some(table) contains all specific regions
async fn check_regions(
fn check_regions(
&self,
table: TableRef,
regions: &[RegionNumber],
) -> TableResult<Option<TableRef>> {
if all_regions_open(table.clone(), regions).await? {
if all_regions_open(table.clone(), regions)? {
// If all regions have been opened
Ok(Some(table))
} else {
Expand Down Expand Up @@ -541,7 +541,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {

// TODO(weny): Returns an error earlier if the target region does not exist in the meta.
for region_number in region_numbers {
if table.contains_region(*region_number).await? {
if table.contains_region(*region_number)? {
continue;
}

Expand Down Expand Up @@ -572,7 +572,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
};

if let Some(table) = self.get_table(&table_ref) {
if let Some(table) = self.check_regions(table, &request.region_numbers).await? {
if let Some(table) = self.check_regions(table, &request.region_numbers)? {
return Ok(Some(table));
}
}
Expand All @@ -585,10 +585,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_mito_table(&table_ref) {
// Contains all regions or target region
if let Some(table) = self
.check_regions(table.clone(), &request.region_numbers)
.await?
{
if let Some(table) = self.check_regions(table.clone(), &request.region_numbers)? {
Some(table)
} else {
// Loads missing regions
Expand Down Expand Up @@ -633,7 +630,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_id = table.table_info().ident.table_id;

table
.close(&regions)
.close_regions(&regions)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Expand Down
49 changes: 25 additions & 24 deletions src/mito/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use table::requests::{
use table::table::scan::SimpleTableScan;
use table::table::{AlterContext, Table};
use table::{error as table_error, RegionStat};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;

use crate::error;
use crate::error::{
Expand All @@ -59,7 +59,6 @@ use crate::error::{
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;

#[inline]
fn table_manifest_dir(table_dir: &str) -> String {
format!("{table_dir}/manifest/")
Expand All @@ -70,7 +69,7 @@ pub struct MitoTable<R: Region> {
manifest: TableManifest,
// guarded by `self.alter_lock`
table_info: ArcSwap<TableInfo>,
regions: Arc<RwLock<HashMap<RegionNumber, R>>>,
regions: ArcSwap<HashMap<RegionNumber, R>>,
alter_lock: Mutex<()>,
}

Expand All @@ -88,7 +87,7 @@ impl<R: Region> Table for MitoTable<R> {
if request.columns_values.is_empty() {
return Ok(0);
}
let regions_guard = self.regions.read().await;
let regions_guard = self.regions.load();
let region = regions_guard
.get(&request.region_number)
.with_context(|| RegionNotFoundSnafu {
Expand Down Expand Up @@ -143,7 +142,7 @@ impl<R: Region> Table for MitoTable<R> {
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
let read_ctx = ReadContext::default();
let region_guard = self.regions.read().await;
let region_guard = self.regions.load();

let mut readers = Vec::with_capacity(region_guard.len());
let mut first_schema: Option<Arc<Schema>> = None;
Expand Down Expand Up @@ -261,7 +260,7 @@ impl<R: Region> Table for MitoTable<R> {
if request.key_column_values.is_empty() {
return Ok(0);
}
let regions_guard = self.regions.read().await;
let regions_guard = self.regions.load();
let mut rows_deleted = 0;
// TODO(hl): Should be tracked by procedure.
// TODO(hl): Parse delete request into region->keys instead of delete in each region
Expand Down Expand Up @@ -302,7 +301,7 @@ impl<R: Region> Table for MitoTable<R> {
reason: FlushReason::Manually,
})
.unwrap_or_default();
let regions_guard = self.regions.read().await;
let regions_guard = self.regions.load();

if let Some(region_number) = region_number {
if let Some(region) = regions_guard.get(&region_number) {
Expand All @@ -326,12 +325,8 @@ impl<R: Region> Table for MitoTable<R> {
Ok(())
}

async fn close(&self, region_numbers: &[RegionNumber]) -> TableResult<()> {
self.close_regions(region_numbers).await
}

async fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
let regions_guard = self.regions.read().await;
fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
let regions_guard = self.regions.load();

Ok(regions_guard
.values()
Expand All @@ -342,8 +337,8 @@ impl<R: Region> Table for MitoTable<R> {
.collect())
}

async fn contains_region(&self, region: RegionNumber) -> TableResult<bool> {
let regions_guard = self.regions.read().await;
fn contains_region(&self, region: RegionNumber) -> TableResult<bool> {
let regions_guard = self.regions.load();

Ok(regions_guard.contains_key(&region))
}
Expand Down Expand Up @@ -381,7 +376,7 @@ impl<R: Region> MitoTable<R> {
) -> Self {
Self {
table_info: ArcSwap::new(Arc::new(table_info)),
regions: Arc::new(RwLock::new(regions)),
regions: ArcSwap::new(Arc::new(regions)),
manifest,
alter_lock: Mutex::new(()),
}
Expand Down Expand Up @@ -521,7 +516,7 @@ impl<R: Region> MitoTable<R> {
/// Closes regions
/// Notes: Please release regions in StorageEngine.
pub async fn close_regions(&self, region_numbers: &[RegionNumber]) -> TableResult<()> {
let mut regions_guard = self.regions.write().await;
let regions_guard = self.regions.load_full();

let regions = region_numbers
.iter()
Expand All @@ -532,22 +527,28 @@ impl<R: Region> MitoTable<R> {
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
for region in region_numbers {
regions_guard.remove(region);
}

self.regions.rcu(|regions| {
let mut regions = HashMap::clone(regions);
for region in region_numbers {
regions.remove(region);
}

Arc::new(regions)
});

Ok(())
}

pub async fn is_releasable(&self) -> bool {
let regions_guard = self.regions.read().await;
let regions_guard = self.regions.load();

regions_guard.is_empty()
}

#[inline]
pub async fn region_ids(&self) -> Vec<RegionNumber> {
let regions_guard = self.regions.read().await;
let regions_guard = self.regions.load();
regions_guard.iter().map(|(k, _)| *k).collect()
}

Expand All @@ -572,7 +573,7 @@ impl<R: Region> MitoTable<R> {
table_version: TableVersion,
alter_op: &AlterOperation,
) -> TableResult<()> {
let regions_guard = self.regions.read().await;
let regions_guard = self.regions.load();
for region in regions_guard.values() {
let region_meta = region.in_memory_metadata();
if u64::from(region_meta.version()) > table_version {
Expand Down Expand Up @@ -602,7 +603,7 @@ impl<R: Region> MitoTable<R> {
}

pub async fn load_region(&self, region_number: RegionNumber, _region: R) -> TableResult<()> {
let info = self.table_info.load_full();
let info = self.table_info.load();

// TODO(weny): Supports to load the region
warn!(
Expand Down
1 change: 0 additions & 1 deletion src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ pub struct CloseTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
/// Donothing if region_numbers is empty
pub region_numbers: Vec<RegionNumber>,
}

Expand Down
4 changes: 2 additions & 2 deletions src/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ pub trait Table: Send + Sync {
}

/// Get region stats in this table.
async fn region_stats(&self) -> Result<Vec<RegionStat>> {
fn region_stats(&self) -> Result<Vec<RegionStat>> {
UnsupportedSnafu {
operation: "REGION_STATS",
}
.fail()?
}

/// Return true if contains the region
async fn contains_region(&self, _region: RegionNumber) -> Result<bool> {
fn contains_region(&self, _region: RegionNumber) -> Result<bool> {
UnsupportedSnafu {
operation: "contain_region",
}
Expand Down

0 comments on commit a73b55c

Please sign in to comment.