Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): support for setting default visibility_mode via config #10186

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ pub struct BatchConfig {
#[serde(default)]
pub distributed_query_limit: Option<u64>,

#[serde(default = "default::batch::enable_barrier_read")]
pub enable_barrier_read: bool,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
}
Expand Down Expand Up @@ -855,6 +858,12 @@ mod default {
system_param::default::telemetry_enabled()
}
}

pub mod batch {
pub fn enable_barrier_read() -> bool {
true
}
}
}

pub struct StorageMemoryConfig {
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::info;

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
use crate::session_config::visibility_mode::VisibilityMode;
pub use crate::session_config::visibility_mode::VisibilityMode;
use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
Expand Down Expand Up @@ -652,8 +652,8 @@ impl ConfigMap {
self.search_path.clone()
}

pub fn only_checkpoint_visible(&self) -> bool {
matches!(self.visibility_mode, VisibilityMode::Checkpoint)
pub fn get_visible_mode(&self) -> VisibilityMode {
self.visibility_mode
}

pub fn get_query_epoch(&self) -> Option<Epoch> {
Expand Down
11 changes: 11 additions & 0 deletions src/common/src/session_config/visibility_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ use crate::session_config::VISIBILITY_MODE;

#[derive(Copy, Default, Debug, Clone, PartialEq, Eq)]
pub enum VisibilityMode {
// apply frontend config.
#[default]
Default,
// read barrier from streaming compute node.
All,
// read checkpoint from serving compute node.
Checkpoint,
}

Expand All @@ -51,6 +55,8 @@ impl TryFrom<&[&str]> for VisibilityMode {
Ok(Self::All)
} else if s.eq_ignore_ascii_case("checkpoint") {
Ok(Self::Checkpoint)
} else if s.eq_ignore_ascii_case("default") {
Ok(Self::Default)
} else {
Err(InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
Expand All @@ -63,6 +69,7 @@ impl TryFrom<&[&str]> for VisibilityMode {
impl std::fmt::Display for VisibilityMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Default => write!(f, "default"),
Self::All => write!(f, "all"),
Self::Checkpoint => write!(f, "checkpoint"),
}
Expand Down Expand Up @@ -91,6 +98,10 @@ mod tests {
VisibilityMode::try_from(["checkPoint"].as_slice()).unwrap(),
VisibilityMode::Checkpoint
);
assert_eq!(
VisibilityMode::try_from(["default"].as_slice()).unwrap(),
VisibilityMode::Default
);
assert!(VisibilityMode::try_from(["ab"].as_slice()).is_err());
}
}
3 changes: 3 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ split_group_size_limit = 21474836480
do_not_config_object_storage_lifecycle = false
partition_vnode_count = 64

[batch]
enable_barrier_read = true

[batch.developer]
batch_connector_message_buffer_size = 16
batch_output_channel_size = 64
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async fn do_handle_explain(
Convention::Batch => {
let worker_node_manager_reader = WorkerNodeSelector::new(
session.env().worker_node_manager_ref(),
!session.config().only_checkpoint_visible(),
session.is_barrier_read(),
);
batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
worker_node_manager_reader,
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ fn gen_batch_plan_fragmenter(
);
let worker_node_manager_reader = WorkerNodeSelector::new(
session.env().worker_node_manager_ref(),
!session.config().only_checkpoint_visible(),
session.is_barrier_read(),
);
let plan_fragmenter = BatchPlanFragmenter::new(
worker_node_manager_reader,
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn execute(
..
} = plan_fragmenter_result;

let only_checkpoint_visible = session.config().only_checkpoint_visible();
let is_barrier_read = session.is_barrier_read();
let query_start_time = Instant::now();
let query = plan_fragmenter.generate_complete_query().await?;
tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
Expand All @@ -336,7 +336,7 @@ async fn execute(
let hummock_snapshot_manager = session.env().hummock_snapshot_manager();
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?;
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, only_checkpoint_visible)
PinnedHummockSnapshot::FrontendPinned(pinned_snapshot, is_barrier_read)
};
match query_mode {
QueryMode::Auto => unreachable!(),
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ struct OverrideConfigOpts {
#[clap(long, env = "RW_METRICS_LEVEL")]
#[override_opts(path = server.metrics_level)]
pub metrics_level: Option<u32>,

#[clap(long, env = "RW_ENABLE_BARRIER_READ")]
#[override_opts(path = batch.enable_barrier_read)]
pub enable_barrier_read: Option<bool>,
}

impl Default for FrontendOpts {
Expand Down
16 changes: 8 additions & 8 deletions src/frontend/src/scheduler/hummock_snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
pub enum PinnedHummockSnapshot {
FrontendPinned(
HummockSnapshotGuard,
// `only_checkpoint_visible`.
// `is_barrier_read`.
// It's embedded here because we always use it together with snapshot.
bool,
),
Expand All @@ -49,8 +49,8 @@ pub enum PinnedHummockSnapshot {
impl PinnedHummockSnapshot {
pub fn get_batch_query_epoch(&self) -> BatchQueryEpoch {
match self {
PinnedHummockSnapshot::FrontendPinned(s, checkpoint) => {
s.get_batch_query_epoch(*checkpoint)
PinnedHummockSnapshot::FrontendPinned(s, is_barrier_read) => {
s.get_batch_query_epoch(*is_barrier_read)
}
PinnedHummockSnapshot::Other(e) => BatchQueryEpoch {
epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
Expand All @@ -60,7 +60,7 @@ impl PinnedHummockSnapshot {

pub fn support_barrier_read(&self) -> bool {
match self {
PinnedHummockSnapshot::FrontendPinned(_, checkpoint) => !*checkpoint,
PinnedHummockSnapshot::FrontendPinned(_, is_barrier_read) => *is_barrier_read,
PinnedHummockSnapshot::Other(_) => false,
}
}
Expand Down Expand Up @@ -105,11 +105,11 @@ pub struct HummockSnapshotGuard {
}

impl HummockSnapshotGuard {
pub fn get_batch_query_epoch(&self, checkpoint: bool) -> BatchQueryEpoch {
let epoch = if checkpoint {
batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch)
} else {
pub fn get_batch_query_epoch(&self, is_barrier_read: bool) -> BatchQueryEpoch {
let epoch = if is_barrier_read {
batch_query_epoch::Epoch::Current(self.snapshot.current_epoch)
} else {
batch_query_epoch::Epoch::Committed(self.snapshot.committed_epoch)
};
BatchQueryEpoch { epoch: Some(epoch) }
}
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ impl WorkerNodeManager {
if node.property.as_ref().map_or(false, |p| p.is_serving) {
write_guard.serving_fragment_vnode_mapping.clear();
}
// update
for w in &mut write_guard.worker_nodes {
if w.id == node.id {
*w = node;
return;
}
}
// insert
write_guard.worker_nodes.push(node);
}

Expand All @@ -106,7 +114,7 @@ impl WorkerNodeManager {
if node.property.as_ref().map_or(false, |p| p.is_serving) {
write_guard.serving_fragment_vnode_mapping.clear();
}
write_guard.worker_nodes.retain(|x| *x != node);
write_guard.worker_nodes.retain(|x| x.id != node.id);
}

pub fn refresh(
Expand Down Expand Up @@ -231,6 +239,7 @@ impl WorkerNodeManagerInner {
.worker_nodes
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (w.id, w.parallel_units.clone()))
.collect();
let serving_pus_total_num = all_serving_pus.values().map(|p| p.len()).sum::<usize>();
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_common::catalog::{
use risingwave_common::config::{load_config, BatchConfig};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::session_config::ConfigMap;
use risingwave_common::session_config::{ConfigMap, VisibilityMode};
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
Expand Down Expand Up @@ -678,6 +678,14 @@ impl SessionImpl {
tracing::trace!("notice to user:{}", notice);
self.notices.write().push(notice);
}

pub fn is_barrier_read(&self) -> bool {
match self.config().get_visible_mode() {
VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
VisibilityMode::All => true,
VisibilityMode::Checkpoint => false,
}
}
}

pub struct SessionManagerImpl {
Expand Down
11 changes: 5 additions & 6 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,14 @@ where
pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> {
let mut core = self.core.write().await;
let mut worker = core.get_worker_by_host_checked(host_address.clone())?;
if worker.worker_node.state == State::Running as i32 {
return Ok(());
if worker.worker_node.state != State::Running as i32 {
worker.worker_node.state = State::Running as i32;
worker.insert(self.env.meta_store()).await?;
core.update_worker_node(worker.clone());
}
worker.worker_node.state = State::Running as i32;
worker.insert(self.env.meta_store()).await?;

core.update_worker_node(worker.clone());

// Notify frontends of new compute node.
// Always notify because a running worker's property may have been changed.
let worker_type = worker.worker_type();
if worker_type == WorkerType::ComputeNode {
self.env
Expand Down