diff --git a/src/common/src/config.rs b/src/common/src/config.rs index c4126e9a7d24..56ea6af559a3 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -283,6 +283,9 @@ pub struct BatchConfig { #[serde(default)] pub distributed_query_limit: Option, + #[serde(default = "default::batch::enable_barrier_read")] + pub enable_barrier_read: bool, + #[serde(default, flatten)] pub unrecognized: Unrecognized, } @@ -848,6 +851,12 @@ mod default { system_param::default::telemetry_enabled() } } + + pub mod batch { + pub fn enable_barrier_read() -> bool { + true + } + } } pub struct StorageMemoryConfig { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 6e7da0e8eba2..96ef175993fc 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -29,7 +29,7 @@ use tracing::info; use crate::error::{ErrorCode, RwError}; use crate::session_config::transaction_isolation_level::IsolationLevel; -use crate::session_config::visibility_mode::VisibilityMode; +pub use crate::session_config::visibility_mode::VisibilityMode; use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. @@ -637,8 +637,8 @@ impl ConfigMap { self.search_path.clone() } - pub fn only_checkpoint_visible(&self) -> bool { - matches!(self.visibility_mode, VisibilityMode::Checkpoint) + pub fn get_visible_mode(&self) -> VisibilityMode { + self.visibility_mode } pub fn get_query_epoch(&self) -> Option { diff --git a/src/common/src/session_config/visibility_mode.rs b/src/common/src/session_config/visibility_mode.rs index 88e63814acbe..78e559ae369b 100644 --- a/src/common/src/session_config/visibility_mode.rs +++ b/src/common/src/session_config/visibility_mode.rs @@ -23,8 +23,12 @@ use crate::session_config::VISIBILITY_MODE; #[derive(Copy, Default, Debug, Clone, PartialEq, Eq)] pub enum VisibilityMode { + // apply frontend config. #[default] + Default, + // read barrier from streaming compute node. All, + // read checkpoint from serving compute node. Checkpoint, } @@ -51,6 +55,8 @@ impl TryFrom<&[&str]> for VisibilityMode { Ok(Self::All) } else if s.eq_ignore_ascii_case("checkpoint") { Ok(Self::Checkpoint) + } else if s.eq_ignore_ascii_case("default") { + Ok(Self::Default) } else { Err(InvalidConfigValue { config_entry: Self::entry_name().to_string(), @@ -63,6 +69,7 @@ impl TryFrom<&[&str]> for VisibilityMode { impl std::fmt::Display for VisibilityMode { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { + Self::Default => write!(f, "default"), Self::All => write!(f, "all"), Self::Checkpoint => write!(f, "checkpoint"), } @@ -91,6 +98,10 @@ mod tests { VisibilityMode::try_from(["checkPoint"].as_slice()).unwrap(), VisibilityMode::Checkpoint ); + assert_eq!( + VisibilityMode::try_from(["default"].as_slice()).unwrap(), + VisibilityMode::Default + ); assert!(VisibilityMode::try_from(["ab"].as_slice()).is_err()); } } diff --git a/src/config/example.toml b/src/config/example.toml index f20d2480a183..030d09769e09 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -28,6 +28,9 @@ move_table_size_limit = 2147483648 split_group_size_limit = 21474836480 do_not_config_object_storage_lifecycle = false +[batch] +enable_barrier_read = true + [batch.developer] batch_connector_message_buffer_size = 16 batch_output_channel_size = 64 diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 71b34e60e6bb..c300989fce15 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -181,7 +181,7 @@ async fn do_handle_explain( Convention::Batch => { let worker_node_manager_reader = WorkerNodeSelector::new( session.env().worker_node_manager_ref(), - !session.config().only_checkpoint_visible(), + session.is_barrier_read(), ); batch_plan_fragmenter = Some(BatchPlanFragmenter::new( worker_node_manager_reader, diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 291b131a1581..36d5b536a10f 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -280,7 +280,7 @@ fn gen_batch_plan_fragmenter( ); let worker_node_manager_reader = WorkerNodeSelector::new( session.env().worker_node_manager_ref(), - !session.config().only_checkpoint_visible(), + session.is_barrier_read(), ); let plan_fragmenter = BatchPlanFragmenter::new( worker_node_manager_reader, @@ -311,7 +311,7 @@ async fn execute( .. } = plan_fragmenter_result; - let only_checkpoint_visible = session.config().only_checkpoint_visible(); + let is_barrier_read = session.is_barrier_read(); let query_start_time = Instant::now(); let query = plan_fragmenter.generate_complete_query().await?; tracing::trace!("Generated query after plan fragmenter: {:?}", &query); @@ -336,7 +336,7 @@ async fn execute( let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); let query_id = query.query_id().clone(); let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; - PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, only_checkpoint_visible) + PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, is_barrier_read) }; match query_mode { QueryMode::Auto => unreachable!(), diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 01e816bef70d..950cbf6bd178 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -136,6 +136,10 @@ struct OverrideConfigOpts { #[clap(long, env = "RW_METRICS_LEVEL")] #[override_opts(path = server.metrics_level)] pub metrics_level: Option, + + #[clap(long, env = "RW_ENABLE_BARRIER_READ")] + #[override_opts(path = batch.enable_barrier_read)] + pub enable_barrier_read: Option, } impl Default for FrontendOpts { diff --git a/src/frontend/src/scheduler/hummock_snapshot_manager.rs b/src/frontend/src/scheduler/hummock_snapshot_manager.rs index 9c1595fc8249..f76515259166 100644 --- a/src/frontend/src/scheduler/hummock_snapshot_manager.rs +++ b/src/frontend/src/scheduler/hummock_snapshot_manager.rs @@ -36,7 +36,7 @@ pub type HummockSnapshotManagerRef = Arc; pub enum PinnedHummockSnapshot { FrontendPinned( HummockSnapshotGuard, - // `only_checkpoint_visible`. + // `is_barrier_read`. // It's embedded here because we always use it together with snapshot. bool, ), @@ -49,8 +49,8 @@ pub enum PinnedHummockSnapshot { impl PinnedHummockSnapshot { pub fn get_batch_query_epoch(&self) -> BatchQueryEpoch { match self { - PinnedHummockSnapshot::FrontendPinned(s, checkpoint) => { - s.get_batch_query_epoch(*checkpoint) + PinnedHummockSnapshot::FrontendPinned(s, is_barrier_read) => { + s.get_batch_query_epoch(*is_barrier_read) } PinnedHummockSnapshot::Other(e) => BatchQueryEpoch { epoch: Some(batch_query_epoch::Epoch::Backup(e.0)), @@ -60,7 +60,7 @@ impl PinnedHummockSnapshot { pub fn support_barrier_read(&self) -> bool { match self { - PinnedHummockSnapshot::FrontendPinned(_, checkpoint) => !*checkpoint, + PinnedHummockSnapshot::FrontendPinned(_, is_barrier_read) => *is_barrier_read, PinnedHummockSnapshot::Other(_) => false, } } @@ -105,11 +105,11 @@ pub struct HummockSnapshotGuard { } impl HummockSnapshotGuard { - pub fn get_batch_query_epoch(&self, checkpoint: bool) -> BatchQueryEpoch { - let epoch = if checkpoint { - batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch) - } else { + pub fn get_batch_query_epoch(&self, is_barrier_read: bool) -> BatchQueryEpoch { + let epoch = if is_barrier_read { batch_query_epoch::Epoch::Current(self.snapshot.current_epoch) + } else { + batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch) }; BatchQueryEpoch { epoch: Some(epoch) } } diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index 61eb9140845c..cec4939a48ef 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -98,6 +98,14 @@ impl WorkerNodeManager { if node.property.as_ref().map_or(false, |p| p.is_serving) { write_guard.serving_fragment_vnode_mapping.clear(); } + // update + for w in &mut write_guard.worker_nodes { + if w.id == node.id { + *w = node; + return; + } + } + // insert write_guard.worker_nodes.push(node); } @@ -106,7 +114,7 @@ impl WorkerNodeManager { if node.property.as_ref().map_or(false, |p| p.is_serving) { write_guard.serving_fragment_vnode_mapping.clear(); } - write_guard.worker_nodes.retain(|x| *x != node); + write_guard.worker_nodes.retain(|x| x.id != node.id); } pub fn refresh( @@ -231,6 +239,7 @@ impl WorkerNodeManagerInner { .worker_nodes .iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) + .sorted_by_key(|w| w.id) .map(|w| (w.id, w.parallel_units.clone())) .collect(); let serving_pus_total_num = all_serving_pus.values().map(|p| p.len()).sum::(); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 557d8d119aee..a0223ff57268 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -34,7 +34,7 @@ use risingwave_common::catalog::{ use risingwave_common::config::{load_config, BatchConfig}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::monitor::process_linux::monitor_process; -use risingwave_common::session_config::ConfigMap; +use risingwave_common::session_config::{ConfigMap, VisibilityMode}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; @@ -678,6 +678,14 @@ impl SessionImpl { tracing::trace!("notice to user:{}", notice); self.notices.write().push(notice); } + + pub fn is_barrier_read(&self) -> bool { + match self.config().get_visible_mode() { + VisibilityMode::Default => self.env.batch_config.enable_barrier_read, + VisibilityMode::All => true, + VisibilityMode::Checkpoint => false, + } + } } pub struct SessionManagerImpl { diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 76d78d33e8fa..3d4b21835d4f 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -151,15 +151,14 @@ where pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> { let mut core = self.core.write().await; let mut worker = core.get_worker_by_host_checked(host_address.clone())?; - if worker.worker_node.state == State::Running as i32 { - return Ok(()); + if worker.worker_node.state != State::Running as i32 { + worker.worker_node.state = State::Running as i32; + worker.insert(self.env.meta_store()).await?; + core.update_worker_node(worker.clone()); } - worker.worker_node.state = State::Running as i32; - worker.insert(self.env.meta_store()).await?; - - core.update_worker_node(worker.clone()); // Notify frontends of new compute node. + // Always notify because a running worker's property may have been changed. let worker_type = worker.worker_type(); if worker_type == WorkerType::ComputeNode { self.env