Skip to content

Commit

Permalink
add logs and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 13, 2023
1 parent 755a6db commit 5842fed
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 76 deletions.
7 changes: 4 additions & 3 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::ParallelUnitMapping;
Expand Down Expand Up @@ -377,9 +378,9 @@ impl FrontendObserverNode {
self.worker_node_manager
.upsert_serving_fragment_mapping(convert_pu_mapping(&mappings));
}
Operation::Delete => self
.worker_node_manager
.remove_serving_fragment_mapping(mappings.into_iter().map(|m| m.fragment_id)),
Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
&mappings.into_iter().map(|m| m.fragment_id).collect_vec(),
),
Operation::Snapshot => {
self.worker_node_manager
.set_serving_fragment_mapping(convert_pu_mapping(&mappings));
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,13 +642,15 @@ impl StageRunner {
&self,
table_id: &TableId,
) -> SchedulerResult<ParallelUnitMapping> {
let reader = self.catalog_reader.read_guard();
let table = reader
let fragment_id = self
.catalog_reader
.read_guard()
.get_table_by_id(table_id)
.map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
.map_err(|e| SchedulerError::Internal(anyhow!(e)))?
.fragment_id;
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(&table.fragment_id)
.get_streaming_fragment_mapping(&fragment_id)
}

fn choose_worker(
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,13 +469,16 @@ impl LocalQueryExecution {
&self,
table_id: &TableId,
) -> SchedulerResult<ParallelUnitMapping> {
let reader = self.front_env.catalog_reader().read_guard();
let table = reader
let fragment_id = self
.front_env
.catalog_reader()
.read_guard()
.get_table_by_id(table_id)
.map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
.map_err(|e| SchedulerError::Internal(anyhow!(e)))?
.fragment_id;
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(&table.fragment_id)
.get_streaming_fragment_mapping(&fragment_id)
}

fn choose_worker(&self, stage: &Arc<QueryStage>) -> SchedulerResult<Vec<WorkerNode>> {
Expand Down
30 changes: 22 additions & 8 deletions src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, RwLock};

use rand::seq::SliceRandom;
Expand Down Expand Up @@ -117,6 +116,15 @@ impl WorkerNodeManager {
serving_mapping: HashMap<FragmentId, ParallelUnitMapping>,
) {
let mut write_guard = self.inner.write().unwrap();
tracing::info!("Refresh worker nodes {:?}.", nodes);
tracing::info!(
"Refresh streaming vnode mapping for fragments {:?}.",
streaming_mapping.keys()
);
tracing::info!(
"Refresh serving vnode mapping for fragments {:?}.",
serving_mapping.keys()
);
write_guard.worker_nodes = nodes;
write_guard.streaming_fragment_vnode_mapping = streaming_mapping;
write_guard.serving_fragment_vnode_mapping = serving_mapping;
Expand Down Expand Up @@ -207,7 +215,10 @@ impl WorkerNodeManager {

pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, ParallelUnitMapping>) {
let mut guard = self.inner.write().unwrap();
tracing::debug!("set serving fragment mapping {:#?}", mappings);
tracing::info!(
"Set serving vnode mapping for fragments {:?}",
mappings.keys()
);
guard.serving_fragment_vnode_mapping = mappings;
}

Expand All @@ -216,20 +227,23 @@ impl WorkerNodeManager {
mappings: HashMap<FragmentId, ParallelUnitMapping>,
) {
let mut guard = self.inner.write().unwrap();
tracing::debug!("upsert serving fragment mapping {:#?}", mappings);
tracing::info!(
"Upsert serving vnode mapping for fragments {:?}",
mappings.keys()
);
for (fragment_id, mapping) in mappings {
guard
.serving_fragment_vnode_mapping
.insert(fragment_id, mapping);
}
}

pub fn remove_serving_fragment_mapping(
&self,
fragment_ids: impl IntoIterator<Item = FragmentId> + Debug,
) {
pub fn remove_serving_fragment_mapping(&self, fragment_ids: &[FragmentId]) {
let mut guard = self.inner.write().unwrap();
tracing::debug!("remove serving fragment mapping {:#?}", fragment_ids);
tracing::info!(
"Delete serving vnode mapping for fragments {:?}",
fragment_ids
);
for fragment_id in fragment_ids {
guard.serving_fragment_vnode_mapping.remove(&fragment_id);
}
Expand Down
91 changes: 59 additions & 32 deletions src/meta/src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,45 @@ impl ServingVnodeMapping {
}

/// Upsert mapping for given fragments according to the latest `workers`.
/// Returns new mappings.
/// Returns (successful updates, failed updates).
fn upsert(
&self,
streaming_fragment_mappings: impl IntoIterator<Item = FragmentParallelUnitMapping>,
workers: &[WorkerNode],
) -> HashMap<FragmentId, ParallelUnitMapping> {
) -> (HashMap<FragmentId, ParallelUnitMapping>, Vec<FragmentId>) {
let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
let mut upserted: HashMap<FragmentId, ParallelUnitMapping> = HashMap::default();
let mut failed: Vec<FragmentId> = vec![];
for fragment in streaming_fragment_mappings {
let new_mapping = {
let old_mapping = serving_vnode_mappings.get(&fragment.fragment_id);
// Set max serving parallelism to `streaming_parallelism`. It's not a must.
let streaming_parallelism =
ParallelUnitMapping::from_protobuf(fragment.mapping.as_ref().unwrap())
let streaming_parallelism = match fragment.mapping.as_ref() {
Some(mapping) => ParallelUnitMapping::from_protobuf(mapping)
.iter_unique()
.count();
.count(),
None => {
tracing::warn!(
"vnode mapping for fragment {} not found",
fragment.fragment_id
);
1
}
};
place_vnode(old_mapping, workers, streaming_parallelism)
};
match new_mapping {
None => {
serving_vnode_mappings.remove(&fragment.fragment_id as _);
failed.push(fragment.fragment_id);
}
Some(mapping) => {
serving_vnode_mappings.insert(fragment.fragment_id, mapping);
serving_vnode_mappings.insert(fragment.fragment_id, mapping.clone());
upserted.insert(fragment.fragment_id, mapping);
}
}
}
serving_vnode_mappings.clone()
(upserted, failed)
}

fn remove(&self, fragment_ids: &[FragmentId]) {
Expand All @@ -84,20 +96,8 @@ impl ServingVnodeMapping {
async fn all_streaming_fragment_mappings<S: MetaStore>(
fragment_manager: &FragmentManager<S>,
) -> Vec<FragmentParallelUnitMapping> {
fragment_manager
.list_table_fragments()
.await
.into_iter()
.flat_map(|table_fragments| {
table_fragments.fragments.into_values().map(|fragment| {
let parallel_unit_mapping = fragment.vnode_mapping.clone().unwrap();
FragmentParallelUnitMapping {
fragment_id: fragment.fragment_id,
mapping: Some(parallel_unit_mapping),
}
})
})
.collect()
let guard = fragment_manager.get_fragment_read_guard().await;
guard.all_running_fragment_mappings().collect()
}

fn to_fragment_parallel_unit_mapping(
Expand All @@ -112,17 +112,33 @@ fn to_fragment_parallel_unit_mapping(
.collect()
}

fn to_deleted_fragment_parallel_unit_mapping(
fragment_ids: &[FragmentId],
) -> Vec<FragmentParallelUnitMapping> {
fragment_ids
.iter()
.map(|fragment_id| FragmentParallelUnitMapping {
fragment_id: *fragment_id,
mapping: None,
})
.collect()
}

pub(crate) async fn on_meta_start<S: MetaStore>(
notification_manager: NotificationManagerRef<S>,
cluster_manager: ClusterManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
serving_vnode_mapping: ServingVnodeMappingRef,
) {
let streaming_fragment_mappings = all_streaming_fragment_mappings(&fragment_manager).await;
let mappings = serving_vnode_mapping.upsert(
let (mappings, _) = serving_vnode_mapping.upsert(
streaming_fragment_mappings,
&cluster_manager.list_active_serving_compute_nodes().await,
);
tracing::info!(
"Initialize serving vnode mapping snapshot for fragments {:?}.",
mappings.keys()
);
notification_manager.notify_frontend_without_version(
Operation::Snapshot,
Info::ServingParallelUnitMappings(FragmentParallelUnitMappings {
Expand Down Expand Up @@ -155,22 +171,33 @@ pub(crate) async fn start_serving_vnode_mapping_worker<S: MetaStore>(
}
let workers = cluster_manager.list_active_serving_compute_nodes().await;
let all_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await;
let mappings = serving_vnode_mapping.upsert(all_streaming_mappings, &workers);
let (mappings, _) = serving_vnode_mapping.upsert(all_streaming_mappings, &workers);
tracing::info!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys());
notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) }));
}
LocalNotification::FragmentsAdded(fragment_ids) => {
LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
if fragment_ids.is_empty() {
continue;
}
let workers = cluster_manager.list_active_serving_compute_nodes().await;
let added_streaming_mappings = all_streaming_fragment_mappings(&fragment_manager).await.into_iter().filter(|f|fragment_ids.contains(&f.fragment_id));
let mappings = serving_vnode_mapping.upsert(added_streaming_mappings, &workers);
notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) }));
let (upserted, failed) = serving_vnode_mapping.upsert(added_streaming_mappings, &workers);
if !upserted.is_empty() {
tracing::info!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) }));
}
if !failed.is_empty() {
tracing::info!("Fail to update serving vnode mapping for fragments {:?}.", failed);
notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&failed)}));
}
}
LocalNotification::FragmentsDeleted(fragment_ids) => {
LocalNotification::FragmentMappingsDelete(fragment_ids) => {
if fragment_ids.is_empty() {
continue;
}
tracing::info!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
serving_vnode_mapping.remove(&fragment_ids);
let mappings = fragment_ids.into_iter().map(|fragment_id|FragmentParallelUnitMapping {
fragment_id,
mapping: None,
}).collect();
notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings }));
notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&fragment_ids) }));
}
_ => {}
}
Expand Down
55 changes: 32 additions & 23 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct FragmentManagerCore {
}

impl FragmentManagerCore {
/// List all fragment vnode mapping info that not in `State::Initial`.
/// List all fragment vnode mapping info that excludes `exclude_state`.
pub fn all_running_fragment_mappings(
&self,
) -> impl Iterator<Item = FragmentParallelUnitMapping> + '_ {
Expand Down Expand Up @@ -135,10 +135,37 @@ where

self.env
.notification_manager()
.notify_frontend(operation, Info::ParallelUnitMapping(fragment_mapping))
.notify_frontend(
operation.clone(),
Info::ParallelUnitMapping(fragment_mapping),
)
.await;
}
}

// Update serving vnode mappings.
let fragment_ids = table_fragment.fragment_ids().collect();
match operation {
Operation::Add | Operation::Update => {
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
fragment_ids,
))
.await;
}
Operation::Delete => {
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
fragment_ids,
))
.await;
}
_ => {
tracing::warn!("unexpected fragment mapping op");
}
}
}

pub async fn select_table_fragments_by_table_id(
Expand Down Expand Up @@ -179,15 +206,9 @@ where
if map.contains_key(&table_id) {
bail!("table_fragment already exist: id={}", table_id);
}
let fragment_ids = table_fragment.fragment_ids().collect();
let mut table_fragments = BTreeMapTransaction::new(map);
table_fragments.insert(table_id, table_fragment);
commit_meta!(self, table_fragments)?;
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::FragmentsAdded(fragment_ids))
.await;
Ok(())
commit_meta!(self, table_fragments)
}

/// Called after the barrier collection of `CreateStreamingJob` command, which updates the
Expand Down Expand Up @@ -391,24 +412,12 @@ where
}
commit_meta!(self, table_fragments)?;

for table_fragments in &to_delete_table_fragments {
for table_fragments in to_delete_table_fragments {
if table_fragments.state() != State::Initial {
self.notify_fragment_mapping(table_fragments, Operation::Delete)
self.notify_fragment_mapping(&table_fragments, Operation::Delete)
.await;
}
}
let all_deleted_fragment_ids = to_delete_table_fragments
.iter()
.flat_map(|t| t.fragment_ids())
.collect_vec();
if !all_deleted_fragment_ids.is_empty() {
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::FragmentsDeleted(
all_deleted_fragment_ids,
))
.await;
}

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ pub enum LocalNotification {
WorkerNodeActivated(WorkerNode),
CompactionTaskNeedCancel(CompactTask),
SystemParamsChange(SystemParamsReader),
FragmentsAdded(Vec<FragmentId>),
FragmentsDeleted(Vec<FragmentId>),
FragmentMappingsUpsert(Vec<FragmentId>),
FragmentMappingsDelete(Vec<FragmentId>),
}

#[derive(Debug)]
Expand Down

0 comments on commit 5842fed

Please sign in to comment.