Skip to content

Commit

Permalink
feat(config): support for setting default visibility_mode via config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 8, 2023
1 parent f37383b commit 3edab3d
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 23 deletions.
9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ pub struct BatchConfig {
#[serde(default)]
pub distributed_query_limit: Option<u64>,

#[serde(default = "default::batch::enable_barrier_read")]
pub enable_barrier_read: bool,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
}
Expand Down Expand Up @@ -848,6 +851,12 @@ mod default {
system_param::default::telemetry_enabled()
}
}

pub mod batch {
pub fn enable_barrier_read() -> bool {
true
}
}
}

pub struct StorageMemoryConfig {
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::info;

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
use crate::session_config::visibility_mode::VisibilityMode;
pub use crate::session_config::visibility_mode::VisibilityMode;
use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
Expand Down Expand Up @@ -637,8 +637,8 @@ impl ConfigMap {
self.search_path.clone()
}

pub fn only_checkpoint_visible(&self) -> bool {
matches!(self.visibility_mode, VisibilityMode::Checkpoint)
pub fn get_visible_mode(&self) -> VisibilityMode {
self.visibility_mode
}

pub fn get_query_epoch(&self) -> Option<Epoch> {
Expand Down
11 changes: 11 additions & 0 deletions src/common/src/session_config/visibility_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ use crate::session_config::VISIBILITY_MODE;

#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
pub enum VisibilityMode {
// apply frontend config.
#[default]
Default,
// read barrier from streaming compute node.
All,
// read checkpoint from serving compute node.
Checkpoint,
}

Expand All @@ -51,6 +55,8 @@ impl TryFrom<&[&str]> for VisibilityMode {
Ok(Self::All)
} else if s.eq_ignore_ascii_case("checkpoint") {
Ok(Self::Checkpoint)
} else if s.eq_ignore_ascii_case("default") {
Ok(Self::Default)
} else {
Err(InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
Expand All @@ -63,6 +69,7 @@ impl TryFrom<&[&str]> for VisibilityMode {
impl std::fmt::Display for VisibilityMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Default => write!(f, "default"),
Self::All => write!(f, "all"),
Self::Checkpoint => write!(f, "checkpoint"),
}
Expand Down Expand Up @@ -91,6 +98,10 @@ mod tests {
VisibilityMode::try_from(["checkPoint"].as_slice()).unwrap(),
VisibilityMode::Checkpoint
);
assert_eq!(
VisibilityMode::try_from(["default"].as_slice()).unwrap(),
VisibilityMode::Default
);
assert!(VisibilityMode::try_from(["ab"].as_slice()).is_err());
}
}
3 changes: 3 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ move_table_size_limit = 2147483648
split_group_size_limit = 21474836480
do_not_config_object_storage_lifecycle = false

[batch]
enable_barrier_read = true

[batch.developer]
batch_connector_message_buffer_size = 16
batch_output_channel_size = 64
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async fn do_handle_explain(
Convention::Batch => {
let worker_node_manager_reader = WorkerNodeSelector::new(
session.env().worker_node_manager_ref(),
!session.config().only_checkpoint_visible(),
session.is_barrier_read(),
);
batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
worker_node_manager_reader,
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ fn gen_batch_plan_fragmenter(
);
let worker_node_manager_reader = WorkerNodeSelector::new(
session.env().worker_node_manager_ref(),
!session.config().only_checkpoint_visible(),
session.is_barrier_read(),
);
let plan_fragmenter = BatchPlanFragmenter::new(
worker_node_manager_reader,
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn execute(
..
} = plan_fragmenter_result;

let only_checkpoint_visible = session.config().only_checkpoint_visible();
let is_barrier_read = session.is_barrier_read();
let query_start_time = Instant::now();
let query = plan_fragmenter.generate_complete_query().await?;
tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
Expand All @@ -336,7 +336,7 @@ async fn execute(
let hummock_snapshot_manager = session.env().hummock_snapshot_manager();
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?;
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, only_checkpoint_visible)
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, is_barrier_read)
};
match query_mode {
QueryMode::Auto => unreachable!(),
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ struct OverrideConfigOpts {
#[clap(long, env = "RW_METRICS_LEVEL")]
#[override_opts(path = server.metrics_level)]
pub metrics_level: Option<u32>,

#[clap(long, env = "RW_ENABLE_BARRIER_READ")]
#[override_opts(path = batch.enable_barrier_read)]
pub enable_barrier_read: Option<bool>,
}

impl Default for FrontendOpts {
Expand Down
16 changes: 8 additions & 8 deletions src/frontend/src/scheduler/hummock_snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
pub enum PinnedHummockSnapshot {
FrontendPinned(
HummockSnapshotGuard,
// `only_checkpoint_visible`.
// `is_barrier_read`.
// It's embedded here because we always use it together with snapshot.
bool,
),
Expand All @@ -49,8 +49,8 @@ pub enum PinnedHummockSnapshot {
impl PinnedHummockSnapshot {
pub fn get_batch_query_epoch(&self) -> BatchQueryEpoch {
match self {
PinnedHummockSnapshot::FrontendPinned(s, checkpoint) => {
s.get_batch_query_epoch(*checkpoint)
PinnedHummockSnapshot::FrontendPinned(s, is_barrier_read) => {
s.get_batch_query_epoch(*is_barrier_read)
}
PinnedHummockSnapshot::Other(e) => BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
Expand All @@ -60,7 +60,7 @@ impl PinnedHummockSnapshot {

pub fn support_barrier_read(&self) -> bool {
match self {
PinnedHummockSnapshot::FrontendPinned(_, checkpoint) => !*checkpoint,
PinnedHummockSnapshot::FrontendPinned(_, is_barrier_read) => *is_barrier_read,
PinnedHummockSnapshot::Other(_) => false,
}
}
Expand Down Expand Up @@ -105,11 +105,11 @@ pub struct HummockSnapshotGuard {
}

impl HummockSnapshotGuard {
pub fn get_batch_query_epoch(&self, checkpoint: bool) -> BatchQueryEpoch {
let epoch = if checkpoint {
batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch)
} else {
pub fn get_batch_query_epoch(&self, is_barrier_read: bool) -> BatchQueryEpoch {
let epoch = if is_barrier_read {
batch_query_epoch::Epoch::Current(self.snapshot.current_epoch)
} else {
batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch)
};
BatchQueryEpoch { epoch: Some(epoch) }
}
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ impl WorkerNodeManager {
if node.property.as_ref().map_or(false, |p| p.is_serving) {
write_guard.serving_fragment_vnode_mapping.clear();
}
// update
for w in &mut write_guard.worker_nodes {
if w.id == node.id {
*w = node;
return;
}
}
// insert
write_guard.worker_nodes.push(node);
}

Expand All @@ -106,7 +114,7 @@ impl WorkerNodeManager {
if node.property.as_ref().map_or(false, |p| p.is_serving) {
write_guard.serving_fragment_vnode_mapping.clear();
}
write_guard.worker_nodes.retain(|x| *x != node);
write_guard.worker_nodes.retain(|x| x.id != node.id);
}

pub fn refresh(
Expand Down Expand Up @@ -231,6 +239,7 @@ impl WorkerNodeManagerInner {
.worker_nodes
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (w.id, w.parallel_units.clone()))
.collect();
let serving_pus_total_num = all_serving_pus.values().map(|p| p.len()).sum::<usize>();
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_common::catalog::{
use risingwave_common::config::{load_config, BatchConfig};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::session_config::{ConfigMap, VisibilityMode};
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
Expand Down Expand Up @@ -678,6 +678,14 @@ impl SessionImpl {
tracing::trace!("notice to user:{}", notice);
self.notices.write().push(notice);
}

pub fn is_barrier_read(&self) -> bool {
match self.config().get_visible_mode() {
VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
VisibilityMode::All => true,
VisibilityMode::Checkpoint => false,
}
}
}

pub struct SessionManagerImpl {
Expand Down
11 changes: 5 additions & 6 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,14 @@ where
pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> {
let mut core = self.core.write().await;
let mut worker = core.get_worker_by_host_checked(host_address.clone())?;
if worker.worker_node.state == State::Running as i32 {
return Ok(());
if worker.worker_node.state != State::Running as i32 {
worker.worker_node.state = State::Running as i32;
worker.insert(self.env.meta_store()).await?;
core.update_worker_node(worker.clone());
}
worker.worker_node.state = State::Running as i32;
worker.insert(self.env.meta_store()).await?;

core.update_worker_node(worker.clone());

// Notify frontends of new compute node.
// Always notify because a running worker's property may have been changed.
let worker_type = worker.worker_type();
if worker_type == WorkerType::ComputeNode {
self.env
Expand Down

0 comments on commit 3edab3d

Please sign in to comment.