Skip to content

Commit

Permalink
add schedule error type for vnode mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 13, 2023
1 parent 42a4af4 commit 755a6db
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 34 deletions.
22 changes: 11 additions & 11 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,17 +638,17 @@ impl StageRunner {
}

#[inline(always)]
fn get_streaming_vnode_mapping(&self, table_id: &TableId) -> Option<ParallelUnitMapping> {
self.catalog_reader
.read_guard()
fn get_streaming_vnode_mapping(
&self,
table_id: &TableId,
) -> SchedulerResult<ParallelUnitMapping> {
let reader = self.catalog_reader.read_guard();
let table = reader
.get_table_by_id(table_id)
.map(|table| {
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(&table.fragment_id)
})
.ok()
.flatten()
.map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(&table.fragment_id)
}

fn choose_worker(
Expand All @@ -659,7 +659,7 @@ impl StageRunner {
) -> SchedulerResult<Option<WorkerNode>> {
let plan_node = plan_fragment.root.as_ref().expect("fail to get plan node");
let vnode_mapping = match dml_table_id {
Some(table_id) => self.get_streaming_vnode_mapping(&table_id),
Some(table_id) => Some(self.get_streaming_vnode_mapping(&table_id)?),
None => {
if let Some(distributed_lookup_join_node) =
Self::find_distributed_lookup_join_node(plan_node)
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/scheduler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_rpc_client::error::RpcError;
use thiserror::Error;
use tonic::{Code, Status};

use crate::catalog::FragmentId;
use crate::scheduler::plan_fragmenter::QueryId;

#[derive(Error, Debug)]
Expand All @@ -31,6 +32,12 @@ pub enum SchedulerError {
#[error("Empty workers found")]
EmptyWorkerNodes,

#[error("Serving vnode mapping not found for fragment {0}")]
ServingVnodeMappingNotFound(FragmentId),

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),

#[error("{0}")]
TaskExecutionError(String),

Expand Down
22 changes: 10 additions & 12 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,25 +465,23 @@ impl LocalQueryExecution {
}

#[inline(always)]
fn get_streaming_vnode_mapping(&self, table_id: &TableId) -> Option<ParallelUnitMapping> {
fn get_streaming_vnode_mapping(
&self,
table_id: &TableId,
) -> SchedulerResult<ParallelUnitMapping> {
let reader = self.front_env.catalog_reader().read_guard();
reader
let table = reader
.get_table_by_id(table_id)
.map(|table| {
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(&table.fragment_id)
})
.ok()
.flatten()
.map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(&table.fragment_id)
}

fn choose_worker(&self, stage: &Arc<QueryStage>) -> SchedulerResult<Vec<WorkerNode>> {
if let Some(table_id) = stage.dml_table_id.as_ref() {
// dml should use streaming vnode mapping
let vnode_mapping = self
.get_streaming_vnode_mapping(table_id)
.ok_or_else(|| SchedulerError::EmptyWorkerNodes)?;
let vnode_mapping = self.get_streaming_vnode_mapping(table_id)?;
let worker_node = {
let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec();
let candidates = self
Expand Down
15 changes: 4 additions & 11 deletions src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, RwLock};

use anyhow::anyhow;
use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
Expand Down Expand Up @@ -151,13 +150,14 @@ impl WorkerNodeManager {
pub fn get_streaming_fragment_mapping(
&self,
fragment_id: &FragmentId,
) -> Option<ParallelUnitMapping> {
) -> SchedulerResult<ParallelUnitMapping> {
self.inner
.read()
.unwrap()
.streaming_fragment_vnode_mapping
.get(fragment_id)
.cloned()
.ok_or_else(|| SchedulerError::StreamingVnodeMappingNotFound(*fragment_id))
}

pub fn insert_streaming_fragment_mapping(
Expand Down Expand Up @@ -202,7 +202,7 @@ impl WorkerNodeManager {
.read()
.unwrap()
.get_serving_fragment_mapping(fragment_id)
.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
.ok_or_else(|| SchedulerError::ServingVnodeMappingNotFound(fragment_id))
}

pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, ParallelUnitMapping>) {
Expand Down Expand Up @@ -284,14 +284,7 @@ impl WorkerNodeSelector {
fragment_id: FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
if self.enable_barrier_read {
self.manager
.get_streaming_fragment_mapping(&fragment_id)
.ok_or_else(|| {
SchedulerError::Internal(anyhow!(
"vnode mapping for fragment {} not found",
fragment_id
))
})
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
self.manager.serving_fragment_mapping(fragment_id)
}
Expand Down

0 comments on commit 755a6db

Please sign in to comment.