diff --git a/src/meta/src/batch/mod.rs b/src/meta/src/batch/mod.rs index 15a47f36f7eeb..4da9af3503d94 100644 --- a/src/meta/src/batch/mod.rs +++ b/src/meta/src/batch/mod.rs @@ -25,8 +25,7 @@ use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use crate::manager::{ - ClusterManagerRef, FragmentManager, FragmentManagerRef, LocalNotification, - NotificationManagerRef, + ClusterManagerRef, FragmentManagerRef, LocalNotification, NotificationManagerRef, }; use crate::model::FragmentId; use crate::storage::MetaStore; @@ -47,38 +46,26 @@ impl ServingVnodeMapping { /// Returns (successful updates, failed updates). fn upsert( &self, - streaming_fragment_mappings: impl IntoIterator, + streaming_parallelisms: HashMap, workers: &[WorkerNode], ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; - for fragment in streaming_fragment_mappings { + for (fragment_id, streaming_parallelism) in streaming_parallelisms { let new_mapping = { - let old_mapping = serving_vnode_mappings.get(&fragment.fragment_id); + let old_mapping = serving_vnode_mappings.get(&fragment_id); // Set max serving parallelism to `streaming_parallelism`. It's not a must. - let streaming_parallelism = match fragment.mapping.as_ref() { - Some(mapping) => ParallelUnitMapping::from_protobuf(mapping) - .iter_unique() - .count(), - None => { - tracing::warn!( - "vnode mapping for fragment {} not found", - fragment.fragment_id - ); - 1 - } - }; place_vnode(old_mapping, workers, streaming_parallelism) }; match new_mapping { None => { - serving_vnode_mappings.remove(&fragment.fragment_id as _); - failed.push(fragment.fragment_id); + serving_vnode_mappings.remove(&fragment_id as _); + failed.push(fragment_id); } Some(mapping) => { - serving_vnode_mappings.insert(fragment.fragment_id, mapping.clone()); - upserted.insert(fragment.fragment_id, mapping); + serving_vnode_mappings.insert(fragment_id, mapping.clone()); + upserted.insert(fragment_id, mapping); } } } @@ -93,13 +80,6 @@ impl ServingVnodeMapping { } } -async fn all_streaming_fragment_mappings( - fragment_manager: &FragmentManager, -) -> Vec { - let guard = fragment_manager.get_fragment_read_guard().await; - guard.all_running_fragment_mappings().collect() -} - fn to_fragment_parallel_unit_mapping( mappings: &HashMap, ) -> Vec { @@ -130,9 +110,9 @@ pub(crate) async fn on_meta_start( fragment_manager: FragmentManagerRef, serving_vnode_mapping: ServingVnodeMappingRef, ) { - let streaming_fragment_mappings = all_streaming_fragment_mappings(&fragment_manager).await; + let streaming_parallelisms = fragment_manager.running_fragment_parallelisms(None).await; let (mappings, _) = serving_vnode_mapping.upsert( - streaming_fragment_mappings, + streaming_parallelisms, &cluster_manager.list_active_serving_compute_nodes().await, ); tracing::debug!( @@ -170,8 +150,8 @@ pub(crate) async fn start_serving_vnode_mapping_worker( continue; } let workers = cluster_manager.list_active_serving_compute_nodes().await; - let all_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await; - let (mappings, _) = serving_vnode_mapping.upsert(all_streaming_mappings, &workers); + let streaming_parallelisms = fragment_manager.running_fragment_parallelisms(None).await; + let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys()); notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) })); } @@ -180,8 +160,8 @@ pub(crate) async fn start_serving_vnode_mapping_worker( continue; } let workers = cluster_manager.list_active_serving_compute_nodes().await; - let added_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await.into_iter().filter(|f|fragment_ids.contains(&f.fragment_id)); - let (upserted, failed) = serving_vnode_mapping.upsert(added_streaming_mappings, &workers); + let streaming_parallelisms = fragment_manager.running_fragment_parallelisms(Some(fragment_ids.into_iter().collect())).await; + let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); if !upserted.is_empty() { tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) })); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 55992dc2c3338..2702219695594 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -68,6 +68,34 @@ impl FragmentManagerCore { }) }) } + + fn running_fragment_parallelisms( + &self, + id_filter: Option>, + ) -> HashMap { + self.table_fragments + .values() + .filter(|tf| tf.state() != State::Initial) + .flat_map(|table_fragments| { + table_fragments.fragments.values().filter_map(|fragment| { + if let Some(id_filter) = id_filter.as_ref() && !id_filter.contains(&fragment.fragment_id) { + return None; + } + let parallelism = match fragment.vnode_mapping.as_ref() { + None => { + tracing::warn!( + "vnode mapping for fragment {} not found", + fragment.fragment_id + ); + 1 + } + Some(m) => ParallelUnitMapping::from_protobuf(m).iter_unique().count(), + }; + Some((fragment.fragment_id, parallelism)) + }) + }) + .collect() + } } /// `FragmentManager` stores definition and status of fragment as well as the actors inside. @@ -1043,4 +1071,14 @@ where Ok(mview_fragment) } + + pub async fn running_fragment_parallelisms( + &self, + id_filter: Option>, + ) -> HashMap { + self.core + .read() + .await + .running_fragment_parallelisms(id_filter) + } }