Skip to content

Commit

Permalink
refactor to reduce clone
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 13, 2023
1 parent 9d3318b commit b909f63
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 34 deletions.
48 changes: 14 additions & 34 deletions src/meta/src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::manager::{
ClusterManagerRef, FragmentManager, FragmentManagerRef, LocalNotification,
NotificationManagerRef,
ClusterManagerRef, FragmentManagerRef, LocalNotification, NotificationManagerRef,
};
use crate::model::FragmentId;
use crate::storage::MetaStore;
Expand All @@ -47,38 +46,26 @@ impl ServingVnodeMapping {
/// Returns (successful updates, failed updates).
fn upsert(
&self,
streaming_fragment_mappings: impl IntoIterator<Item = FragmentParallelUnitMapping>,
streaming_parallelisms: HashMap<FragmentId, usize>,
workers: &[WorkerNode],
) -> (HashMap<FragmentId, ParallelUnitMapping>, Vec<FragmentId>) {
let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
let mut upserted: HashMap<FragmentId, ParallelUnitMapping> = HashMap::default();
let mut failed: Vec<FragmentId> = vec![];
for fragment in streaming_fragment_mappings {
for (fragment_id, streaming_parallelism) in streaming_parallelisms {
let new_mapping = {
let old_mapping = serving_vnode_mappings.get(&fragment.fragment_id);
let old_mapping = serving_vnode_mappings.get(&fragment_id);
// Set max serving parallelism to `streaming_parallelism`. It's not a must.
let streaming_parallelism = match fragment.mapping.as_ref() {
Some(mapping) => ParallelUnitMapping::from_protobuf(mapping)
.iter_unique()
.count(),
None => {
tracing::warn!(
"vnode mapping for fragment {} not found",
fragment.fragment_id
);
1
}
};
place_vnode(old_mapping, workers, streaming_parallelism)
};
match new_mapping {
None => {
serving_vnode_mappings.remove(&fragment.fragment_id as _);
failed.push(fragment.fragment_id);
serving_vnode_mappings.remove(&fragment_id as _);
failed.push(fragment_id);
}
Some(mapping) => {
serving_vnode_mappings.insert(fragment.fragment_id, mapping.clone());
upserted.insert(fragment.fragment_id, mapping);
serving_vnode_mappings.insert(fragment_id, mapping.clone());
upserted.insert(fragment_id, mapping);
}
}
}
Expand All @@ -93,13 +80,6 @@ impl ServingVnodeMapping {
}
}

async fn all_streaming_fragment_mappings<S: MetaStore>(
fragment_manager: &FragmentManager<S>,
) -> Vec<FragmentParallelUnitMapping> {
let guard = fragment_manager.get_fragment_read_guard().await;
guard.all_running_fragment_mappings().collect()
}

fn to_fragment_parallel_unit_mapping(
mappings: &HashMap<FragmentId, ParallelUnitMapping>,
) -> Vec<FragmentParallelUnitMapping> {
Expand Down Expand Up @@ -130,9 +110,9 @@ pub(crate) async fn on_meta_start<S: MetaStore>(
fragment_manager: FragmentManagerRef<S>,
serving_vnode_mapping: ServingVnodeMappingRef,
) {
let streaming_fragment_mappings = all_streaming_fragment_mappings(&fragment_manager).await;
let streaming_parallelisms = fragment_manager.running_fragment_parallelisms(None).await;
let (mappings, _) = serving_vnode_mapping.upsert(
streaming_fragment_mappings,
streaming_parallelisms,
&cluster_manager.list_active_serving_compute_nodes().await,
);
tracing::debug!(
Expand Down Expand Up @@ -170,8 +150,8 @@ pub(crate) async fn start_serving_vnode_mapping_worker<S: MetaStore>(
continue;
}
let workers = cluster_manager.list_active_serving_compute_nodes().await;
let all_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await;
let (mappings, _) = serving_vnode_mapping.upsert(all_streaming_mappings, &workers);
let streaming_parallelisms = fragment_manager.running_fragment_parallelisms(None).await;
let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers);
tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys());
notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) }));
}
Expand All @@ -180,8 +160,8 @@ pub(crate) async fn start_serving_vnode_mapping_worker<S: MetaStore>(
continue;
}
let workers = cluster_manager.list_active_serving_compute_nodes().await;
let added_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await.into_iter().filter(|f|fragment_ids.contains(&f.fragment_id));
let (upserted, failed) = serving_vnode_mapping.upsert(added_streaming_mappings, &workers);
let streaming_parallelisms = fragment_manager.running_fragment_parallelisms(Some(fragment_ids.into_iter().collect())).await;
let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers);
if !upserted.is_empty() {
tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) }));
Expand Down
38 changes: 38 additions & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,34 @@ impl FragmentManagerCore {
})
})
}

fn running_fragment_parallelisms(
&self,
id_filter: Option<HashSet<FragmentId>>,
) -> HashMap<FragmentId, usize> {
self.table_fragments
.values()
.filter(|tf| tf.state() != State::Initial)
.flat_map(|table_fragments| {
table_fragments.fragments.values().filter_map(|fragment| {
if let Some(id_filter) = id_filter.as_ref() && !id_filter.contains(&fragment.fragment_id) {
return None;
}
let parallelism = match fragment.vnode_mapping.as_ref() {
None => {
tracing::warn!(
"vnode mapping for fragment {} not found",
fragment.fragment_id
);
1
}
Some(m) => ParallelUnitMapping::from_protobuf(m).iter_unique().count(),
};
Some((fragment.fragment_id, parallelism))
})
})
.collect()
}
}

/// `FragmentManager` stores definition and status of fragment as well as the actors inside.
Expand Down Expand Up @@ -1043,4 +1071,14 @@ where

Ok(mview_fragment)
}

pub async fn running_fragment_parallelisms(
&self,
id_filter: Option<HashSet<FragmentId>>,
) -> HashMap<FragmentId, usize> {
self.core
.read()
.await
.running_fragment_parallelisms(id_filter)
}
}

0 comments on commit b909f63

Please sign in to comment.