From 755a6db0408c73d985f7628f57e41617293e1cff Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 13 Jun 2023 15:09:38 +0800 Subject: [PATCH] add schedule error type for vnode mapping --- .../src/scheduler/distributed/stage.rs | 22 +++++++++---------- src/frontend/src/scheduler/error.rs | 7 ++++++ src/frontend/src/scheduler/local.rs | 22 +++++++++---------- .../src/scheduler/worker_node_manager.rs | 15 ++++--------- 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 070538e131ff0..2ba3c9ed544df 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -638,17 +638,17 @@ impl StageRunner { } #[inline(always)] - fn get_streaming_vnode_mapping(&self, table_id: &TableId) -> Option { - self.catalog_reader - .read_guard() + fn get_streaming_vnode_mapping( + &self, + table_id: &TableId, + ) -> SchedulerResult { + let reader = self.catalog_reader.read_guard(); + let table = reader .get_table_by_id(table_id) - .map(|table| { - self.worker_node_manager - .manager - .get_streaming_fragment_mapping(&table.fragment_id) - }) - .ok() - .flatten() + .map_err(|e| SchedulerError::Internal(anyhow!(e)))?; + self.worker_node_manager + .manager + .get_streaming_fragment_mapping(&table.fragment_id) } fn choose_worker( @@ -659,7 +659,7 @@ impl StageRunner { ) -> SchedulerResult> { let plan_node = plan_fragment.root.as_ref().expect("fail to get plan node"); let vnode_mapping = match dml_table_id { - Some(table_id) => self.get_streaming_vnode_mapping(&table_id), + Some(table_id) => Some(self.get_streaming_vnode_mapping(&table_id)?), None => { if let Some(distributed_lookup_join_node) = Self::find_distributed_lookup_join_node(plan_node) diff --git a/src/frontend/src/scheduler/error.rs b/src/frontend/src/scheduler/error.rs index 50a0a245da7a2..2ca737f73ab09 100644 --- a/src/frontend/src/scheduler/error.rs +++ b/src/frontend/src/scheduler/error.rs @@ -18,6 +18,7 @@ use risingwave_rpc_client::error::RpcError; use thiserror::Error; use tonic::{Code, Status}; +use crate::catalog::FragmentId; use crate::scheduler::plan_fragmenter::QueryId; #[derive(Error, Debug)] @@ -31,6 +32,12 @@ pub enum SchedulerError { #[error("Empty workers found")] EmptyWorkerNodes, + #[error("Serving vnode mapping not found for fragment {0}")] + ServingVnodeMappingNotFound(FragmentId), + + #[error("Streaming vnode mapping not found for fragment {0}")] + StreamingVnodeMappingNotFound(FragmentId), + #[error("{0}")] TaskExecutionError(String), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index bc48b8dd3b7bc..d34e06e9971f3 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -465,25 +465,23 @@ impl LocalQueryExecution { } #[inline(always)] - fn get_streaming_vnode_mapping(&self, table_id: &TableId) -> Option { + fn get_streaming_vnode_mapping( + &self, + table_id: &TableId, + ) -> SchedulerResult { let reader = self.front_env.catalog_reader().read_guard(); - reader + let table = reader .get_table_by_id(table_id) - .map(|table| { - self.worker_node_manager - .manager - .get_streaming_fragment_mapping(&table.fragment_id) - }) - .ok() - .flatten() + .map_err(|e| SchedulerError::Internal(anyhow!(e)))?; + self.worker_node_manager + .manager + .get_streaming_fragment_mapping(&table.fragment_id) } fn choose_worker(&self, stage: &Arc) -> SchedulerResult> { if let Some(table_id) = stage.dml_table_id.as_ref() { // dml should use streaming vnode mapping - let vnode_mapping = self - .get_streaming_vnode_mapping(table_id) - .ok_or_else(|| SchedulerError::EmptyWorkerNodes)?; + let vnode_mapping = self.get_streaming_vnode_mapping(table_id)?; let worker_node = { let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index c012febce40cd..014e747352274 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::{Arc, RwLock}; -use anyhow::anyhow; use rand::seq::SliceRandom; use risingwave_common::bail; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; @@ -151,13 +150,14 @@ impl WorkerNodeManager { pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, - ) -> Option { + ) -> SchedulerResult { self.inner .read() .unwrap() .streaming_fragment_vnode_mapping .get(fragment_id) .cloned() + .ok_or_else(|| SchedulerError::StreamingVnodeMappingNotFound(*fragment_id)) } pub fn insert_streaming_fragment_mapping( @@ -202,7 +202,7 @@ impl WorkerNodeManager { .read() .unwrap() .get_serving_fragment_mapping(fragment_id) - .ok_or_else(|| SchedulerError::EmptyWorkerNodes) + .ok_or_else(|| SchedulerError::ServingVnodeMappingNotFound(fragment_id)) } pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { @@ -284,14 +284,7 @@ impl WorkerNodeSelector { fragment_id: FragmentId, ) -> SchedulerResult { if self.enable_barrier_read { - self.manager - .get_streaming_fragment_mapping(&fragment_id) - .ok_or_else(|| { - SchedulerError::Internal(anyhow!( - "vnode mapping for fragment {} not found", - fragment_id - )) - }) + self.manager.get_streaming_fragment_mapping(&fragment_id) } else { self.manager.serving_fragment_mapping(fragment_id) }