Skip to content

Commit

Permalink
feat: flush or compact table and region functions (#3363)
Browse files Browse the repository at this point in the history
* feat: adds Requester to process table flush and compaction request

* feat: admin_fn macros for administration functions

* test: add query result

* feat: impl flush_region, flush_table, compact_region, and flush_region functions

* docs: add Arguments to admin_fn macro

* chore: apply suggestion

Co-authored-by: Zhenchi <[email protected]>

* chore: apply suggestion

Co-authored-by: Zhenchi <[email protected]>

* fix: group_requests_by_peer and adds log

* Update src/common/macro/src/admin_fn.rs

Co-authored-by: Ruihang Xia <[email protected]>

* feat: adds todo for spawan thread

* feat: rebase with main

---------

Co-authored-by: Zhenchi <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent dbb1ce1 commit 4b36c28
Show file tree
Hide file tree
Showing 44 changed files with 1,596 additions and 395 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl RegionRequester {

check_response_header(header)?;

Ok(affected_rows)
Ok(affected_rows as _)
}

pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub mod readable_size;
use core::any::Any;
use std::sync::{Arc, Mutex, MutexGuard};

pub type AffectedRows = usize;

pub use bit_vec::BitVec;

/// [`Plugins`] is a wrapper of Arc contents.
Expand Down
2 changes: 2 additions & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
Expand All @@ -33,6 +34,7 @@ serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true

[dev-dependencies]
Expand Down
28 changes: 25 additions & 3 deletions src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};

pub type AffectedRows = usize;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};

/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
Expand All @@ -30,6 +30,28 @@ pub trait TableMutationHandler: Send + Sync {

/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;

/// Trigger a flush task for table.
async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef)
-> Result<AffectedRows>;

/// Trigger a compaction task for table.
async fn compact(
&self,
request: CompactTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;

/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;

/// Trigger a compaction task for a table region.
async fn compact_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
}

/// A trait for handling procedure service requests in `QueryEngine`.
Expand Down
16 changes: 16 additions & 0 deletions src/common/function/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_query::error::{InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::ResultExt;

/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
Expand All @@ -27,3 +31,15 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)

Signature::one_of(sigs, Volatility::Immutable)
}

/// Cast a [`ValueRef`] to u64, returns `None` if fails
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
cast((*value).into(), &ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
value.data_type(),
),
})
.map(|v| v.as_u64())
}
63 changes: 61 additions & 2 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ impl FunctionState {

use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};

use crate::handlers::ProcedureServiceHandler;
use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
const ROWS: usize = 42;

#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
Expand All @@ -56,8 +64,59 @@ impl FunctionState {
}
}

#[async_trait]
impl TableMutationHandler for MockTableMutationHandler {
async fn insert(
&self,
_request: InsertRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn delete(
&self,
_request: DeleteRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn flush(
&self,
_request: FlushTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn compact(
&self,
_request: CompactTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn flush_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn compact_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
}

Self {
table_mutation_handler: None,
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
}
}
Expand Down
Loading

0 comments on commit 4b36c28

Please sign in to comment.