diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 692043a7f41eb..5e2204fcce1f7 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use itertools::Itertools; use parking_lot::RwLock; use risingwave_common::catalog::CatalogVersion; use risingwave_common::hash::ParallelUnitMapping; @@ -377,9 +378,9 @@ impl FrontendObserverNode { self.worker_node_manager .upsert_serving_fragment_mapping(convert_pu_mapping(&mappings)); } - Operation::Delete => self - .worker_node_manager - .remove_serving_fragment_mapping(mappings.into_iter().map(|m| m.fragment_id)), + Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping( + &mappings.into_iter().map(|m| m.fragment_id).collect_vec(), + ), Operation::Snapshot => { self.worker_node_manager .set_serving_fragment_mapping(convert_pu_mapping(&mappings)); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 2ba3c9ed544df..84cd4b28c1457 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -642,13 +642,15 @@ impl StageRunner { &self, table_id: &TableId, ) -> SchedulerResult { - let reader = self.catalog_reader.read_guard(); - let table = reader + let fragment_id = self + .catalog_reader + .read_guard() .get_table_by_id(table_id) - .map_err(|e| SchedulerError::Internal(anyhow!(e)))?; + .map_err(|e| SchedulerError::Internal(anyhow!(e)))? + .fragment_id; self.worker_node_manager .manager - .get_streaming_fragment_mapping(&table.fragment_id) + .get_streaming_fragment_mapping(&fragment_id) } fn choose_worker( diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index d34e06e9971f3..15473591eda31 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -469,13 +469,16 @@ impl LocalQueryExecution { &self, table_id: &TableId, ) -> SchedulerResult { - let reader = self.front_env.catalog_reader().read_guard(); - let table = reader + let fragment_id = self + .front_env + .catalog_reader() + .read_guard() .get_table_by_id(table_id) - .map_err(|e| SchedulerError::Internal(anyhow!(e)))?; + .map_err(|e| SchedulerError::Internal(anyhow!(e)))? + .fragment_id; self.worker_node_manager .manager - .get_streaming_fragment_mapping(&table.fragment_id) + .get_streaming_fragment_mapping(&fragment_id) } fn choose_worker(&self, stage: &Arc) -> SchedulerResult> { diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index 014e747352274..58f73547bc4c9 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashMap; -use std::fmt::Debug; use std::sync::{Arc, RwLock}; use rand::seq::SliceRandom; @@ -117,6 +116,15 @@ impl WorkerNodeManager { serving_mapping: HashMap, ) { let mut write_guard = self.inner.write().unwrap(); + tracing::info!("Refresh worker nodes {:?}.", nodes); + tracing::info!( + "Refresh streaming vnode mapping for fragments {:?}.", + streaming_mapping.keys() + ); + tracing::info!( + "Refresh serving vnode mapping for fragments {:?}.", + serving_mapping.keys() + ); write_guard.worker_nodes = nodes; write_guard.streaming_fragment_vnode_mapping = streaming_mapping; write_guard.serving_fragment_vnode_mapping = serving_mapping; @@ -207,7 +215,10 @@ impl WorkerNodeManager { pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); - tracing::debug!("set serving fragment mapping {:#?}", mappings); + tracing::info!( + "Set serving vnode mapping for fragments {:?}", + mappings.keys() + ); guard.serving_fragment_vnode_mapping = mappings; } @@ -216,7 +227,10 @@ impl WorkerNodeManager { mappings: HashMap, ) { let mut guard = self.inner.write().unwrap(); - tracing::debug!("upsert serving fragment mapping {:#?}", mappings); + tracing::info!( + "Upsert serving vnode mapping for fragments {:?}", + mappings.keys() + ); for (fragment_id, mapping) in mappings { guard .serving_fragment_vnode_mapping @@ -224,12 +238,12 @@ impl WorkerNodeManager { } } - pub fn remove_serving_fragment_mapping( - &self, - fragment_ids: impl IntoIterator + Debug, - ) { + pub fn remove_serving_fragment_mapping(&self, fragment_ids: &[FragmentId]) { let mut guard = self.inner.write().unwrap(); - tracing::debug!("remove serving fragment mapping {:#?}", fragment_ids); + tracing::info!( + "Delete serving vnode mapping for fragments {:?}", + fragment_ids + ); for fragment_id in fragment_ids { guard.serving_fragment_vnode_mapping.remove(&fragment_id); } diff --git a/src/meta/src/batch/mod.rs b/src/meta/src/batch/mod.rs index 535ca44b6977f..242d6cc2c09e5 100644 --- a/src/meta/src/batch/mod.rs +++ b/src/meta/src/batch/mod.rs @@ -44,33 +44,45 @@ impl ServingVnodeMapping { } /// Upsert mapping for given fragments according to the latest `workers`. - /// Returns new mappings. + /// Returns (successful updates, failed updates). fn upsert( &self, streaming_fragment_mappings: impl IntoIterator, workers: &[WorkerNode], - ) -> HashMap { + ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); + let mut upserted: HashMap = HashMap::default(); + let mut failed: Vec = vec![]; for fragment in streaming_fragment_mappings { let new_mapping = { let old_mapping = serving_vnode_mappings.get(&fragment.fragment_id); // Set max serving parallelism to `streaming_parallelism`. It's not a must. - let streaming_parallelism = - ParallelUnitMapping::from_protobuf(fragment.mapping.as_ref().unwrap()) + let streaming_parallelism = match fragment.mapping.as_ref() { + Some(mapping) => ParallelUnitMapping::from_protobuf(mapping) .iter_unique() - .count(); + .count(), + None => { + tracing::warn!( + "vnode mapping for fragment {} not found", + fragment.fragment_id + ); + 1 + } + }; place_vnode(old_mapping, workers, streaming_parallelism) }; match new_mapping { None => { serving_vnode_mappings.remove(&fragment.fragment_id as _); + failed.push(fragment.fragment_id); } Some(mapping) => { - serving_vnode_mappings.insert(fragment.fragment_id, mapping); + serving_vnode_mappings.insert(fragment.fragment_id, mapping.clone()); + upserted.insert(fragment.fragment_id, mapping); } } } - serving_vnode_mappings.clone() + (upserted, failed) } fn remove(&self, fragment_ids: &[FragmentId]) { @@ -84,20 +96,8 @@ impl ServingVnodeMapping { async fn all_streaming_fragment_mappings( fragment_manager: &FragmentManager, ) -> Vec { - fragment_manager - .list_table_fragments() - .await - .into_iter() - .flat_map(|table_fragments| { - table_fragments.fragments.into_values().map(|fragment| { - let parallel_unit_mapping = fragment.vnode_mapping.clone().unwrap(); - FragmentParallelUnitMapping { - fragment_id: fragment.fragment_id, - mapping: Some(parallel_unit_mapping), - } - }) - }) - .collect() + let guard = fragment_manager.get_fragment_read_guard().await; + guard.all_running_fragment_mappings().collect() } fn to_fragment_parallel_unit_mapping( @@ -112,6 +112,18 @@ fn to_fragment_parallel_unit_mapping( .collect() } +fn to_deleted_fragment_parallel_unit_mapping( + fragment_ids: &[FragmentId], +) -> Vec { + fragment_ids + .iter() + .map(|fragment_id| FragmentParallelUnitMapping { + fragment_id: *fragment_id, + mapping: None, + }) + .collect() +} + pub(crate) async fn on_meta_start( notification_manager: NotificationManagerRef, cluster_manager: ClusterManagerRef, @@ -119,10 +131,14 @@ pub(crate) async fn on_meta_start( serving_vnode_mapping: ServingVnodeMappingRef, ) { let streaming_fragment_mappings = all_streaming_fragment_mappings(&fragment_manager).await; - let mappings = serving_vnode_mapping.upsert( + let (mappings, _) = serving_vnode_mapping.upsert( streaming_fragment_mappings, &cluster_manager.list_active_serving_compute_nodes().await, ); + tracing::info!( + "Initialize serving vnode mapping snapshot for fragments {:?}.", + mappings.keys() + ); notification_manager.notify_frontend_without_version( Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { @@ -155,22 +171,33 @@ pub(crate) async fn start_serving_vnode_mapping_worker( } let workers = cluster_manager.list_active_serving_compute_nodes().await; let all_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await; - let mappings = serving_vnode_mapping.upsert(all_streaming_mappings, &workers); + let (mappings, _) = serving_vnode_mapping.upsert(all_streaming_mappings, &workers); + tracing::info!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys()); notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) })); } - LocalNotification::FragmentsAdded(fragment_ids) => { + LocalNotification::FragmentMappingsUpsert(fragment_ids) => { + if fragment_ids.is_empty() { + continue; + } let workers = cluster_manager.list_active_serving_compute_nodes().await; let added_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await.into_iter().filter(|f|fragment_ids.contains(&f.fragment_id)); - let mappings = serving_vnode_mapping.upsert(added_streaming_mappings, &workers); - notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) })); + let (upserted, failed) = serving_vnode_mapping.upsert(added_streaming_mappings, &workers); + if !upserted.is_empty() { + tracing::info!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); + notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) })); + } + if !failed.is_empty() { + tracing::info!("Fail to update serving vnode mapping for fragments {:?}.", failed); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&failed)})); + } } - LocalNotification::FragmentsDeleted(fragment_ids) => { + LocalNotification::FragmentMappingsDelete(fragment_ids) => { + if fragment_ids.is_empty() { + continue; + } + tracing::info!("Delete serving vnode mapping for fragments {:?}.", fragment_ids); serving_vnode_mapping.remove(&fragment_ids); - let mappings = fragment_ids.into_iter().map(|fragment_id|FragmentParallelUnitMapping { - fragment_id, - mapping: None, - }).collect(); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings })); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&fragment_ids) })); } _ => {} } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 372b58e59795a..974586e227944 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -50,7 +50,7 @@ pub struct FragmentManagerCore { } impl FragmentManagerCore { - /// List all fragment vnode mapping info that not in `State::Initial`. + /// List all fragment vnode mapping info that excludes `exclude_state`. pub fn all_running_fragment_mappings( &self, ) -> impl Iterator + '_ { @@ -135,10 +135,37 @@ where self.env .notification_manager() - .notify_frontend(operation, Info::ParallelUnitMapping(fragment_mapping)) + .notify_frontend( + operation.clone(), + Info::ParallelUnitMapping(fragment_mapping), + ) .await; } } + + // Update serving vnode mappings. + let fragment_ids = table_fragment.fragment_ids().collect(); + match operation { + Operation::Add | Operation::Update => { + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::FragmentMappingsUpsert( + fragment_ids, + )) + .await; + } + Operation::Delete => { + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::FragmentMappingsDelete( + fragment_ids, + )) + .await; + } + _ => { + tracing::warn!("unexpected fragment mapping op"); + } + } } pub async fn select_table_fragments_by_table_id( @@ -179,15 +206,9 @@ where if map.contains_key(&table_id) { bail!("table_fragment already exist: id={}", table_id); } - let fragment_ids = table_fragment.fragment_ids().collect(); let mut table_fragments = BTreeMapTransaction::new(map); table_fragments.insert(table_id, table_fragment); - commit_meta!(self, table_fragments)?; - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::FragmentsAdded(fragment_ids)) - .await; - Ok(()) + commit_meta!(self, table_fragments) } /// Called after the barrier collection of `CreateStreamingJob` command, which updates the @@ -391,24 +412,12 @@ where } commit_meta!(self, table_fragments)?; - for table_fragments in &to_delete_table_fragments { + for table_fragments in to_delete_table_fragments { if table_fragments.state() != State::Initial { - self.notify_fragment_mapping(table_fragments, Operation::Delete) + self.notify_fragment_mapping(&table_fragments, Operation::Delete) .await; } } - let all_deleted_fragment_ids = to_delete_table_fragments - .iter() - .flat_map(|t| t.fragment_ids()) - .collect_vec(); - if !all_deleted_fragment_ids.is_empty() { - self.env - .notification_manager() - .notify_local_subscribers(LocalNotification::FragmentsDeleted( - all_deleted_fragment_ids, - )) - .await; - } Ok(()) } diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index fa4f2341189ef..029948a7b6fb4 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -43,8 +43,8 @@ pub enum LocalNotification { WorkerNodeActivated(WorkerNode), CompactionTaskNeedCancel(CompactTask), SystemParamsChange(SystemParamsReader), - FragmentsAdded(Vec), - FragmentsDeleted(Vec), + FragmentMappingsUpsert(Vec), + FragmentMappingsDelete(Vec), } #[derive(Debug)]