diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index f5f789fd0fc..c0c808922f9 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -1,5 +1,5 @@ use crate::util::slow::SlowQueryConfig; -use spacetimedb_sats::relation::{Column, FieldName, Header}; +use spacetimedb_lib::relation::{Column, FieldName, Header}; use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue}; use spacetimedb_vm::errors::{ConfigError, ErrorVm}; use spacetimedb_vm::relation::MemTable; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index f6cc4a626a2..a1fd5fd7c8a 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -12,7 +12,7 @@ use crate::{ SystemTable, ST_CLIENTS_ID, ST_CLIENT_IDX, ST_COLUMNS_ID, ST_COLUMNS_IDX, ST_COLUMNS_NAME, ST_CONSTRAINTS_ID, ST_CONSTRAINTS_IDX, ST_CONSTRAINTS_NAME, ST_INDEXES_ID, ST_INDEXES_IDX, ST_INDEXES_NAME, ST_MODULE_ID, ST_MODULE_IDX, ST_RESERVED_SEQUENCE_RANGE, ST_SEQUENCES_ID, - ST_SEQUENCES_IDX, ST_SEQUENCES_NAME, ST_TABLES_ID, ST_TABLES_IDX, + ST_SEQUENCES_IDX, ST_SEQUENCES_NAME, ST_TABLES_ID, ST_TABLES_IDX, ST_VAR_ID, ST_VAR_IDX, }, traits::TxData, }, @@ -204,6 +204,8 @@ impl CommittedState { self.create_table(ST_CLIENTS_ID, schemas[ST_CLIENT_IDX].clone()); + self.create_table(ST_VAR_ID, schemas[ST_VAR_IDX].clone()); + // Insert the sequences into `st_sequences` let (st_sequences, blob_store) = self.get_table_and_blob_store_or_create(ST_SEQUENCES_ID, &schemas[ST_SEQUENCES_IDX]); diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 57d02795642..8139efd1b2d 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -862,7 +862,7 @@ impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi mod tests { use super::*; use crate::db::datastore::system_tables::{ - system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_COLUMNS_ID, + system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, StVarValue, ST_COLUMNS_ID, ST_CONSTRAINTS_ID, ST_INDEXES_ID, ST_RESERVED_SEQUENCE_RANGE, ST_SEQUENCES_ID, }; use crate::db::datastore::traits::{IsolationLevel, MutTx}; @@ -1240,6 +1240,7 @@ mod tests { TableRow { id: 4, name: "st_constraints", ty: StTableType::System, access: StAccess::Public }, TableRow { id: 5, name: "st_module", ty: StTableType::System, access: StAccess::Public }, TableRow { id: 6, name: "st_clients", ty: StTableType::System, access: StAccess::Public }, + TableRow { id: 7, name: "st_var", ty: StTableType::System, access: StAccess::Public }, ])); #[rustfmt::skip] assert_eq!(query.scan_st_columns()?, map_array([ @@ -1282,6 +1283,9 @@ mod tests { ColRow { table: 6, pos: 0, name: "identity", ty: Identity::get_type() }, ColRow { table: 6, pos: 1, name: "address", ty: Address::get_type() }, + + ColRow { table: 7, pos: 0, name: "name", ty: AlgebraicType::String }, + ColRow { table: 7, pos: 1, name: "value", ty: StVarValue::type_of() }, ])); #[rustfmt::skip] assert_eq!(query.scan_st_indexes()?, map_array([ @@ -1292,6 +1296,7 @@ mod tests { IndexRow { id: 4, table: 3, col: col(0), name: "idx_st_indexes_index_id_primary_key_auto_unique", unique: true }, IndexRow { id: 5, table: 4, col: col(0), name: "idx_st_constraints_constraint_id_primary_key_auto_unique", unique: true }, IndexRow { id: 6, table: 6, col: col_list![0, 1], name: "idx_st_clients_identity_address_unique", unique: true }, + IndexRow { id: 7, table: 7, col: col(0), name: "idx_st_var_name_primary_key_unique", unique: true }, ])); let start = FIRST_NON_SYSTEM_ID as i128; #[rustfmt::skip] @@ -1316,6 +1321,7 @@ mod tests { ConstraintRow { constraint_id: 4, table_id: 3, columns: col(0), constraints: Constraints::primary_key_auto(), constraint_name: "ct_st_indexes_index_id_primary_key_auto" }, ConstraintRow { constraint_id: 5, table_id: 4, columns: col(0), constraints: Constraints::primary_key_auto(), constraint_name: "ct_st_constraints_constraint_id_primary_key_auto" }, ConstraintRow { constraint_id: 6, table_id: 6, columns: col_list![0, 1], constraints: Constraints::unique(), constraint_name: "ct_st_clients_identity_address_unique" }, + ConstraintRow { constraint_id: 7, table_id: 7, columns: col(0), constraints: Constraints::primary_key(), constraint_name: "ct_st_var_name_primary_key" }, ])); // Verify we get back the tables correctly with the proper ids... @@ -1697,6 +1703,7 @@ mod tests { IndexRow { id: 4, table: 3, col: col(0), name: "idx_st_indexes_index_id_primary_key_auto_unique", unique: true }, IndexRow { id: 5, table: 4, col: col(0), name: "idx_st_constraints_constraint_id_primary_key_auto_unique", unique: true }, IndexRow { id: 6, table: 6, col: col_list![0, 1], name: "idx_st_clients_identity_address_unique", unique: true }, + IndexRow { id: 7, table: 7, col: col(0), name: "idx_st_var_name_primary_key_unique", unique: true }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "id_idx", unique: true }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "name_idx", unique: true }, IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "age_idx", unique: true }, @@ -1742,6 +1749,7 @@ mod tests { IndexRow { id: 4, table: 3, col: col(0), name: "idx_st_indexes_index_id_primary_key_auto_unique", unique: true }, IndexRow { id: 5, table: 4, col: col(0), name: "idx_st_constraints_constraint_id_primary_key_auto_unique", unique: true }, IndexRow { id: 6, table: 6, col: col_list![0, 1], name: "idx_st_clients_identity_address_unique", unique: true }, + IndexRow { id: 7, table: 7, col: col(0), name: "idx_st_var_name_primary_key_unique", unique: true }, IndexRow { id: seq_start , table: FIRST_NON_SYSTEM_ID, col: col(0), name: "id_idx", unique: true }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "name_idx", unique: true }, IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "age_idx", unique: true }, @@ -1787,6 +1795,7 @@ mod tests { IndexRow { id: 4, table: 3, col: col(0), name: "idx_st_indexes_index_id_primary_key_auto_unique", unique: true }, IndexRow { id: 5, table: 4, col: col(0), name: "idx_st_constraints_constraint_id_primary_key_auto_unique", unique: true }, IndexRow { id: 6, table: 6, col: col_list![0, 1], name: "idx_st_clients_identity_address_unique", unique: true }, + IndexRow { id: 7, table: 7, col: col(0), name: "idx_st_var_name_primary_key_unique", unique: true }, IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "id_idx", unique: true }, IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "name_idx", unique: true }, ].map(Into::into)); diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs index 91ed4a6597d..c7b569f1347 100644 --- a/crates/core/src/db/datastore/system_tables.rs +++ b/crates/core/src/db/datastore/system_tables.rs @@ -1,18 +1,26 @@ +use crate::db::relational_db::RelationalDB; use crate::error::{DBError, TableError}; +use crate::execution_context::ExecutionContext; use core::fmt; -use spacetimedb_lib::{Address, Identity}; +use derive_more::From; +use spacetimedb_lib::{Address, Identity, SumType}; use spacetimedb_primitives::*; use spacetimedb_sats::db::auth::{StAccess, StTableType}; use spacetimedb_sats::db::def::*; use spacetimedb_sats::hash::Hash; use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_sats::{ - impl_deserialize, impl_serialize, product, AlgebraicType, AlgebraicValue, ArrayValue, ProductValue, + impl_deserialize, impl_serialize, product, AlgebraicType, AlgebraicValue, ArrayValue, ProductValue, SumTypeVariant, + SumValue, }; use spacetimedb_table::table::RowRef; use std::ops::Deref as _; +use std::str::FromStr; use strum::Display; +use super::locking_tx_datastore::tx::TxId; +use super::locking_tx_datastore::MutTxId; + /// The static ID of the table that defines tables pub(crate) const ST_TABLES_ID: TableId = TableId(0); /// The static ID of the table that defines columns @@ -26,9 +34,10 @@ pub(crate) const ST_CONSTRAINTS_ID: TableId = TableId(4); /// The static ID of the table that defines the stdb module associated with /// the database pub(crate) const ST_MODULE_ID: TableId = TableId(5); - /// The static ID of the table that defines connected clients pub(crate) const ST_CLIENTS_ID: TableId = TableId(6); +/// The static ID of the table that defines system variables +pub(crate) const ST_VAR_ID: TableId = TableId(7); pub(crate) const ST_TABLES_NAME: &str = "st_table"; pub(crate) const ST_COLUMNS_NAME: &str = "st_columns"; @@ -37,6 +46,7 @@ pub(crate) const ST_INDEXES_NAME: &str = "st_indexes"; pub(crate) const ST_CONSTRAINTS_NAME: &str = "st_constraints"; pub(crate) const ST_MODULE_NAME: &str = "st_module"; pub(crate) const ST_CLIENTS_NAME: &str = "st_clients"; +pub(crate) const ST_VAR_NAME: &str = "st_var"; /// Reserved range of sequence values used for system tables. /// @@ -63,7 +73,7 @@ pub enum SystemTable { st_indexes, st_constraints, } -pub(crate) fn system_tables() -> [TableSchema; 7] { +pub(crate) fn system_tables() -> [TableSchema; 8] { [ st_table_schema(), st_columns_schema(), @@ -71,6 +81,7 @@ pub(crate) fn system_tables() -> [TableSchema; 7] { st_constraints_schema(), st_module_schema(), st_clients_schema(), + st_var_schema(), // Is important this is always last, so the starting sequence for each // system table is correct. st_sequences_schema(), @@ -84,7 +95,8 @@ pub(crate) const ST_INDEXES_IDX: usize = 2; pub(crate) const ST_CONSTRAINTS_IDX: usize = 3; pub(crate) const ST_MODULE_IDX: usize = 4; pub(crate) const ST_CLIENT_IDX: usize = 5; -pub(crate) const ST_SEQUENCES_IDX: usize = 6; +pub(crate) const ST_VAR_IDX: usize = 6; +pub(crate) const ST_SEQUENCES_IDX: usize = 7; macro_rules! st_fields_enum { ($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => { #[derive(Copy, Clone, Debug)] @@ -187,6 +199,11 @@ st_fields_enum!(enum StClientsFields { "identity", Identity = 0, "address", Address = 1, }); +// WARNING: For a stable schema, don't change the field names and discriminants. +st_fields_enum!(enum StVarFields { + "name", Name = 0, + "value", Value = 1, +}); /// System Table [ST_TABLES_NAME] /// @@ -352,6 +369,24 @@ fn st_clients_schema() -> TableSchema { .into_schema(ST_CLIENTS_ID) } +/// System Table [ST_VAR_NAME] +/// +/// | name | value | +/// |-------------|-----------| +/// | "row_limit" | (U64 = 5) | +pub fn st_var_schema() -> TableSchema { + TableDef::new( + ST_VAR_NAME.into(), + vec![ + ColumnDef::sys(StVarFields::Name.name(), AlgebraicType::String), + ColumnDef::sys(StVarFields::Value.name(), StVarValue::type_of()), + ], + ) + .with_type(StTableType::System) + .with_column_constraint(Constraints::primary_key(), StVarFields::Name) + .into_schema(ST_VAR_ID) +} + pub(crate) fn table_name_is_system(table_name: &str) -> bool { table_name.starts_with("st_") } @@ -716,10 +751,360 @@ impl From<&StClientsRow> for ProductValue { } } +/// A handle for reading system variables from `st_var` +pub struct StVarTable; + +impl StVarTable { + /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var` + pub fn row_limit(ctx: &ExecutionContext, db: &RelationalDB, tx: &TxId) -> Result, DBError> { + if let Some(StVarValue::U64(limit)) = Self::read_var(ctx, db, tx, StVarName::RowLimit)? { + return Ok(Some(limit)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var` + pub fn query_limit(ctx: &ExecutionContext, db: &RelationalDB, tx: &TxId) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = Self::read_var(ctx, db, tx, StVarName::SlowQryThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var` + pub fn sub_limit(ctx: &ExecutionContext, db: &RelationalDB, tx: &TxId) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = Self::read_var(ctx, db, tx, StVarName::SlowQryThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of [ST_VARNAME_SLOW_INC] from `st_var` + pub fn incr_limit(ctx: &ExecutionContext, db: &RelationalDB, tx: &TxId) -> Result, DBError> { + if let Some(StVarValue::U64(ms)) = Self::read_var(ctx, db, tx, StVarName::SlowQryThreshold)? { + return Ok(Some(ms)); + } + Ok(None) + } + + /// Read the value of a system variable from `st_var` + pub fn read_var( + ctx: &ExecutionContext, + db: &RelationalDB, + tx: &TxId, + name: StVarName, + ) -> Result, DBError> { + if let Some(row_ref) = db + .iter_by_col_eq(ctx, tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? + .next() + { + return Ok(Some(StVarRow::try_from(row_ref)?.value)); + } + Ok(None) + } + + /// Update the value of a system variable in `st_var` + pub fn write_var( + ctx: &ExecutionContext, + db: &RelationalDB, + tx: &mut MutTxId, + name: StVarName, + value: StVarValue, + ) -> Result<(), DBError> { + if let Some(row_ref) = db + .iter_by_col_eq_mut(ctx, tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())? + .next() + { + db.delete(tx, ST_VAR_ID, [row_ref.pointer()]); + } + db.insert(tx, ST_VAR_ID, ProductValue::from(StVarRow { name, value }))?; + Ok(()) + } +} + +/// A row in the system table `st_var` +pub struct StVarRow { + pub name: StVarName, + pub value: StVarValue, +} + +impl StVarRow { + pub fn type_of() -> AlgebraicType { + AlgebraicType::product([("name", AlgebraicType::String), ("value", StVarValue::type_of())]) + } +} + +impl From for ProductValue { + fn from(var: StVarRow) -> Self { + product!(var.name, var.value) + } +} + +impl From for AlgebraicValue { + fn from(row: StVarRow) -> Self { + AlgebraicValue::Product(row.into()) + } +} + +/// A system variable that defines a row limit for queries and subscriptions. +/// If the cardinality of a query is estimated to exceed this limit, +/// it will be rejected before being executed. +pub const ST_VARNAME_ROW_LIMIT: &str = "row_limit"; +/// A system variable that defines a threshold for logging slow queries. +pub const ST_VARNAME_SLOW_QRY: &str = "slow_ad_hoc_query_ms"; +/// A system variable that defines a threshold for logging slow subscriptions. +pub const ST_VARNAME_SLOW_SUB: &str = "slow_subscription_query_ms"; +/// A system variable that defines a threshold for logging slow tx updates. +pub const ST_VARNAME_SLOW_INC: &str = "slow_tx_update_ms"; + +/// The name of a system variable in `st_var` +#[derive(Debug, Clone, Copy)] +pub enum StVarName { + RowLimit, + SlowQryThreshold, + SlowSubThreshold, + SlowIncThreshold, +} + +impl From for AlgebraicValue { + fn from(value: StVarName) -> Self { + match value { + StVarName::RowLimit => AlgebraicValue::String(ST_VARNAME_ROW_LIMIT.to_string().into_boxed_str()), + StVarName::SlowQryThreshold => AlgebraicValue::String(ST_VARNAME_SLOW_QRY.to_string().into_boxed_str()), + StVarName::SlowSubThreshold => AlgebraicValue::String(ST_VARNAME_SLOW_SUB.to_string().into_boxed_str()), + StVarName::SlowIncThreshold => AlgebraicValue::String(ST_VARNAME_SLOW_INC.to_string().into_boxed_str()), + } + } +} + +impl FromStr for StVarName { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + ST_VARNAME_ROW_LIMIT => Ok(StVarName::RowLimit), + ST_VARNAME_SLOW_QRY => Ok(StVarName::SlowQryThreshold), + ST_VARNAME_SLOW_SUB => Ok(StVarName::SlowSubThreshold), + ST_VARNAME_SLOW_INC => Ok(StVarName::SlowIncThreshold), + _ => Err(anyhow::anyhow!("Invalid system variable {}", s)), + } + } +} + +impl StVarName { + pub fn type_of(&self) -> AlgebraicType { + match self { + StVarName::RowLimit + | StVarName::SlowQryThreshold + | StVarName::SlowSubThreshold + | StVarName::SlowIncThreshold => AlgebraicType::U64, + } + } +} + +/// The value of a system variable in `st_var` +#[derive(Debug, Clone, From)] +pub enum StVarValue { + Bool(bool), + I8(i8), + U8(u8), + I16(i16), + U16(u16), + I32(i32), + U32(u32), + I64(i64), + U64(u64), + I128(i128), + U128(u128), + F32(f32), + F64(f64), + String(Box), +} + +impl StVarValue { + pub fn type_of() -> AlgebraicType { + AlgebraicType::Sum(SumType::new(Box::new([ + SumTypeVariant::new_named(AlgebraicType::Bool, "Bool"), + SumTypeVariant::new_named(AlgebraicType::I8, "I8"), + SumTypeVariant::new_named(AlgebraicType::U8, "U8"), + SumTypeVariant::new_named(AlgebraicType::I16, "I16"), + SumTypeVariant::new_named(AlgebraicType::U16, "U16"), + SumTypeVariant::new_named(AlgebraicType::I32, "I32"), + SumTypeVariant::new_named(AlgebraicType::U32, "U32"), + SumTypeVariant::new_named(AlgebraicType::I64, "I64"), + SumTypeVariant::new_named(AlgebraicType::U64, "U64"), + SumTypeVariant::new_named(AlgebraicType::I128, "I128"), + SumTypeVariant::new_named(AlgebraicType::U128, "U128"), + SumTypeVariant::new_named(AlgebraicType::F32, "F32"), + SumTypeVariant::new_named(AlgebraicType::F64, "F64"), + SumTypeVariant::new_named(AlgebraicType::String, "String"), + ]))) + } + + pub fn try_from_primitive(value: AlgebraicValue) -> Result { + match value { + AlgebraicValue::Bool(v) => Ok(StVarValue::Bool(v)), + AlgebraicValue::I8(v) => Ok(StVarValue::I8(v)), + AlgebraicValue::U8(v) => Ok(StVarValue::U8(v)), + AlgebraicValue::I16(v) => Ok(StVarValue::I16(v)), + AlgebraicValue::U16(v) => Ok(StVarValue::U16(v)), + AlgebraicValue::I32(v) => Ok(StVarValue::I32(v)), + AlgebraicValue::U32(v) => Ok(StVarValue::U32(v)), + AlgebraicValue::I64(v) => Ok(StVarValue::I64(v)), + AlgebraicValue::U64(v) => Ok(StVarValue::U64(v)), + AlgebraicValue::I128(v) => Ok(StVarValue::I128(v.0)), + AlgebraicValue::U128(v) => Ok(StVarValue::U128(v.0)), + AlgebraicValue::F32(v) => Ok(StVarValue::F32(v.into_inner())), + AlgebraicValue::F64(v) => Ok(StVarValue::F64(v.into_inner())), + AlgebraicValue::String(v) => Ok(StVarValue::String(v)), + _ => Err(value), + } + } + + pub fn try_from_sum(value: AlgebraicValue) -> Result { + value.into_sum()?.try_into() + } +} + +impl TryFrom for StVarValue { + type Error = AlgebraicValue; + + fn try_from(sum: SumValue) -> Result { + match sum.tag { + 0 => Ok(StVarValue::Bool(sum.value.into_bool()?)), + 1 => Ok(StVarValue::I8(sum.value.into_i8()?)), + 2 => Ok(StVarValue::U8(sum.value.into_u8()?)), + 3 => Ok(StVarValue::I16(sum.value.into_i16()?)), + 4 => Ok(StVarValue::U16(sum.value.into_u16()?)), + 5 => Ok(StVarValue::I32(sum.value.into_i32()?)), + 6 => Ok(StVarValue::U32(sum.value.into_u32()?)), + 7 => Ok(StVarValue::I64(sum.value.into_i64()?)), + 8 => Ok(StVarValue::U64(sum.value.into_u64()?)), + 9 => Ok(StVarValue::I128(sum.value.into_i128()?.0)), + 10 => Ok(StVarValue::U128(sum.value.into_u128()?.0)), + 11 => Ok(StVarValue::F32(sum.value.into_f32()?.into_inner())), + 12 => Ok(StVarValue::F64(sum.value.into_f64()?.into_inner())), + 13 => Ok(StVarValue::String(sum.value.into_string()?)), + _ => Err(*sum.value), + } + } +} + +impl From for AlgebraicValue { + fn from(value: StVarValue) -> Self { + AlgebraicValue::Sum(value.into()) + } +} + +impl From for SumValue { + fn from(value: StVarValue) -> Self { + match value { + StVarValue::Bool(v) => SumValue { + tag: 0, + value: Box::new(AlgebraicValue::Bool(v)), + }, + StVarValue::I8(v) => SumValue { + tag: 1, + value: Box::new(AlgebraicValue::I8(v)), + }, + StVarValue::U8(v) => SumValue { + tag: 2, + value: Box::new(AlgebraicValue::U8(v)), + }, + StVarValue::I16(v) => SumValue { + tag: 3, + value: Box::new(AlgebraicValue::I16(v)), + }, + StVarValue::U16(v) => SumValue { + tag: 4, + value: Box::new(AlgebraicValue::U16(v)), + }, + StVarValue::I32(v) => SumValue { + tag: 5, + value: Box::new(AlgebraicValue::I32(v)), + }, + StVarValue::U32(v) => SumValue { + tag: 6, + value: Box::new(AlgebraicValue::U32(v)), + }, + StVarValue::I64(v) => SumValue { + tag: 7, + value: Box::new(AlgebraicValue::I64(v)), + }, + StVarValue::U64(v) => SumValue { + tag: 8, + value: Box::new(AlgebraicValue::U64(v)), + }, + StVarValue::I128(v) => SumValue { + tag: 9, + value: Box::new(AlgebraicValue::I128(v.into())), + }, + StVarValue::U128(v) => SumValue { + tag: 10, + value: Box::new(AlgebraicValue::U128(v.into())), + }, + StVarValue::F32(v) => SumValue { + tag: 11, + value: Box::new(AlgebraicValue::F32(v.into())), + }, + StVarValue::F64(v) => SumValue { + tag: 12, + value: Box::new(AlgebraicValue::F64(v.into())), + }, + StVarValue::String(v) => SumValue { + tag: 13, + value: Box::new(AlgebraicValue::String(v)), + }, + } + } +} + +impl TryFrom> for StVarRow { + type Error = DBError; + + fn try_from(row: RowRef<'_>) -> Result { + // The position of the `value` column in `st_var` + let col_pos = StVarFields::Value.col_id(); + + // An error when reading the `value` column in `st_var` + let invalid_value = InvalidFieldError { + col_pos, + name: Some(StVarFields::Value.name()), + }; + + let name = row.read_col::>(StVarFields::Name.col_id())?; + let name = StVarName::from_str(&name)?; + match row.read_col::(col_pos)? { + AlgebraicValue::Sum(sum) => Ok(StVarRow { + name, + value: sum.try_into().map_err(|_| invalid_value)?, + }), + _ => Err(invalid_value.into()), + } + } +} + #[cfg(test)] mod tests { + use crate::db::relational_db::tests_utils::TestDB; + use super::*; + #[test] + fn test_system_variables() { + let db = TestDB::durable().expect("failed to create db"); + let ctx = ExecutionContext::default(); + let _ = db.with_auto_commit(&ctx, |tx| { + StVarTable::write_var(&ctx, &db, tx, StVarName::RowLimit, StVarValue::U64(5)) + }); + assert_eq!( + 5, + db.with_read_only(&ctx, |tx| StVarTable::row_limit(&ctx, &db, tx)) + .expect("failed to read from st_var") + .expect("row_limit does not exist") + ); + } + #[test] fn test_sequences_within_reserved_range() { let mut num_tables = 0; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index be684ef3261..95426f1202b 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -866,6 +866,7 @@ impl RelationalDB { pub(crate) fn set_config(&self, key: &str, value: AlgebraicValue) -> Result<(), ErrorVm> { self.config.write().set_config(key, value) } + /// Read the runtime configurations settings of the database pub(crate) fn read_config(&self) -> DatabaseConfig { *self.config.read() diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 5b0d2a8a552..01cb8a2d118 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -103,10 +103,11 @@ impl ModuleSubscriptions { check_row_limit( &execution_set, + &ctx, + &self.relational_db, &tx, |execution_set, tx| execution_set.row_estimate(tx), &auth, - &config, )?; let database_update = execution_set.eval(&ctx, sender.protocol, &self.relational_db, &tx); diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 6717624eb27..48225a51a98 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -1,7 +1,8 @@ //! The [DbProgram] that execute arbitrary queries & code against the database. -use crate::config::DatabaseConfig; +use crate::db::datastore::locking_tx_datastore::tx::TxId; use crate::db::datastore::locking_tx_datastore::IterByColRange; +use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable, StVarValue}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; use crate::estimation; @@ -23,6 +24,8 @@ use spacetimedb_vm::iterators::RelIter; use spacetimedb_vm::program::{ProgramVm, Sources}; use spacetimedb_vm::rel_ops::{EmptyRelOps, RelOps}; use spacetimedb_vm::relation::{MemTable, RelValue}; +use std::str::FromStr; +use std::sync::Arc; pub enum TxMode<'a> { MutTx(&'a mut MutTx), @@ -402,13 +405,14 @@ pub struct DbProgram<'db, 'tx> { /// reject the request if the estimated cardinality exceeds the limit. pub fn check_row_limit( queries: &QuerySet, - tx: &Tx, - row_est: impl Fn(&QuerySet, &Tx) -> u64, + ctx: &ExecutionContext, + db: &RelationalDB, + tx: &TxId, + row_est: impl Fn(&QuerySet, &TxId) -> u64, auth: &AuthCtx, - config: &DatabaseConfig, ) -> Result<(), DBError> { if auth.caller != auth.owner { - if let Some(limit) = config.row_limit { + if let Some(limit) = StVarTable::row_limit(ctx, db, tx)? { let estimate = row_est(queries, tx); if estimate > limit { return Err(DBError::Other(anyhow::anyhow!( @@ -429,10 +433,11 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { if let TxMode::Tx(tx) = self.tx { check_row_limit( query, + self.ctx, + self.db, tx, |expr, tx| estimation::num_rows(tx, expr), &self.auth, - &self.db.read_config(), )?; } @@ -576,9 +581,33 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { fn _read_config(&self, name: String) -> Result { let config = self.db.read_config(); - Ok(Code::Table(config.read_key_into_table(&name)?)) } + + fn _set_var(&mut self, name: String, value: AlgebraicValue) -> Result { + let var = StVarName::from_str(&name)?; + let tx = self.tx.unwrap_mut(); + let value = StVarValue::try_from_primitive(value) + .map_err(|v| anyhow::anyhow!("Invalid value for system variable {name}: {v:?}"))?; + StVarTable::write_var(self.ctx, self.db, tx, var, value)?; + Ok(Code::Pass(None)) + } + + fn _read_var(&self, name: String) -> Result { + fn read_key_into_table(env: &DbProgram, name: &str) -> Result { + if let TxMode::Tx(tx) = &env.tx { + let name = StVarName::from_str(name)?; + if let Some(value) = StVarTable::read_var(env.ctx, env.db, tx, name)? { + return Ok(MemTable::from_iter( + Arc::new(st_var_schema().into()), + [ProductValue::from(StVarRow { name, value })], + )); + } + } + Ok(MemTable::from_iter(Arc::new(st_var_schema().into()), [])) + } + Ok(Code::Table(read_key_into_table(self, &name)?)) + } } impl ProgramVm for DbProgram<'_, '_> { @@ -593,7 +622,10 @@ impl ProgramVm for DbProgram<'_, '_> { CrudExpr::Delete { query } => self._delete_query(&query, sources), CrudExpr::CreateTable { table } => self._create_table(*table), CrudExpr::Drop { name, kind, .. } => self._drop(&name, kind), - CrudExpr::SetVar { name, value } => self._set_config(name, value), + CrudExpr::SetVar { name, value } => { + self._set_var(name.clone(), value.clone())?; + self._set_config(name, value) + } CrudExpr::ReadVar { name } => self._read_config(name), } } diff --git a/crates/sats/src/relation.rs b/crates/sats/src/relation.rs index 7a7f26069b6..f1751614e9f 100644 --- a/crates/sats/src/relation.rs +++ b/crates/sats/src/relation.rs @@ -1,11 +1,13 @@ use crate::algebraic_value::AlgebraicValue; use crate::db::auth::{StAccess, StTableType}; +use crate::db::def::{ColumnSchema, TableSchema}; use crate::db::error::{RelationError, TypeError}; use crate::satn::Satn; use crate::{algebraic_type, AlgebraicType}; use core::fmt; use core::hash::Hash; use derive_more::From; +use itertools::Itertools; use spacetimedb_primitives::{ColId, ColList, ColListBuilder, Constraints, TableId}; use std::sync::Arc; @@ -87,6 +89,18 @@ impl Column { } } +impl From for Column { + fn from(schema: ColumnSchema) -> Self { + Column { + field: FieldName { + table: schema.table_id, + col: schema.col_pos, + }, + algebraic_type: schema.col_type, + } + } +} + #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct Header { pub table_id: TableId, @@ -95,6 +109,26 @@ pub struct Header { pub constraints: Vec<(ColList, Constraints)>, } +impl From for Header { + fn from(schema: TableSchema) -> Self { + Header { + table_id: schema.table_id, + table_name: schema.table_name.clone(), + fields: schema + .columns() + .iter() + .cloned() + .map(|schema| schema.into()) + .collect_vec(), + constraints: schema + .constraints + .into_iter() + .map(|schema| (schema.columns, schema.constraints)) + .collect_vec(), + } + } +} + impl Header { pub fn new( table_id: TableId,