diff --git a/src/server/src/config.rs b/src/server/src/config.rs index c321dd12..37b9b590 100644 --- a/src/server/src/config.rs +++ b/src/server/src/config.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; use rocksdb::DBCompressionType; use serde::{Deserialize, Serialize}; -use crate::{ExecutorConfig, NodeConfig, RaftConfig, RootConfig}; +use crate::constants::REPLICA_PER_GROUP; #[derive(Default, Clone, Debug, Deserialize, Serialize)] pub struct Config { @@ -50,6 +50,51 @@ pub struct Config { pub db: DbConfig, } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct NodeConfig { + /// The limit bytes of each shard chunk during migration. + /// + /// Default: 64KB. + pub shard_chunk_size: usize, + + /// The limit number of keys for gc shard after migration. + /// + /// Default: 256. + pub shard_gc_keys: usize, + + #[serde(default)] + pub replica: ReplicaConfig, + + #[serde(default)] + pub engine: EngineConfig, +} + +#[derive(Clone, Debug, Default)] +pub struct ReplicaTestingKnobs { + pub disable_scheduler_orphan_replica_detecting_intervals: bool, + pub disable_scheduler_durable_task: bool, + pub disable_scheduler_remove_orphan_replica_task: bool, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ReplicaConfig { + /// The limit size of each snapshot files. + /// + /// Default: 64MB. + pub snap_file_size: u64, + + #[serde(skip)] + pub testing_knobs: ReplicaTestingKnobs, +} + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +pub struct EngineConfig { + /// Log slow io requests if it exceeds the specified threshold. + /// + /// Default: disabled + pub engine_slow_io_threshold_ms: Option, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DbConfig { // io related configs @@ -94,6 +139,98 @@ pub struct DbConfig { pub rate_limiter_auto_tuned: bool, } +#[derive(Clone, Debug, Default)] +pub struct RaftTestingKnobs { + pub force_new_peer_receiving_snapshot: bool, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RaftConfig { + /// The intervals of tick, in millis. + /// + /// Default: 500ms. + pub tick_interval_ms: u64, + + /// The size of inflights requests. + /// + /// Default: 102400 + pub max_inflight_requests: usize, + + /// Before a follower begin election, it must wait a randomly election ticks and does not + /// receives any messages from leader. + /// + /// Default: 3. + pub election_tick: usize, + + /// Limit the entries batched in an append message(in size). 0 means one entry per message. + /// + /// Default: 64KB + pub max_size_per_msg: u64, + + /// Limit the total bytes per io batch requests. + /// + /// Default: 64KB + pub max_io_batch_size: u64, + + /// Limit the number of inflights messages which send to one peer. + /// + /// Default: 10K + pub max_inflight_msgs: usize, + + /// Log slow io requests if it exceeds the specified threshold. + /// + /// Default: disabled + pub engine_slow_io_threshold_ms: Option, + + /// Enable recycle log files to reduce allocating overhead? + /// + /// Default: false + pub enable_log_recycle: bool, + + #[serde(skip)] + pub testing_knobs: RaftTestingKnobs, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RootConfig { + pub replicas_per_group: usize, + pub enable_group_balance: bool, + pub enable_replica_balance: bool, + pub enable_shard_balance: bool, + pub enable_leader_balance: bool, + pub liveness_threshold_sec: u64, + pub heartbeat_timeout_sec: u64, + pub schedule_interval_sec: u64, + pub max_create_group_retry_before_rollback: u64, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct ExecutorConfig { + pub event_interval: Option, + pub global_event_interval: Option, + pub max_blocking_threads: Option, +} + +impl Default for NodeConfig { + fn default() -> Self { + NodeConfig { + shard_chunk_size: 64 * 1024 * 1024, + shard_gc_keys: 256, + replica: ReplicaConfig::default(), + engine: EngineConfig::default(), + } + } +} + +impl Default for ReplicaConfig { + fn default() -> Self { + ReplicaConfig { + snap_file_size: 64 * 1024 * 1024 * 1024, + testing_knobs: ReplicaTestingKnobs::default(), + } + } +} + impl DbConfig { pub fn to_options(&self) -> rocksdb::Options { use rocksdb::{BlockBasedIndexType, BlockBasedOptions, Cache, Options}; @@ -202,6 +339,63 @@ impl Default for DbConfig { } } +impl RaftConfig { + pub(crate) fn to_raft_config(&self, replica_id: u64, applied: u64) -> raft::Config { + raft::Config { + id: replica_id, + election_tick: self.election_tick, + heartbeat_tick: 1, + applied, + pre_vote: true, + batch_append: true, + check_quorum: true, + max_size_per_msg: self.max_size_per_msg, + max_inflight_msgs: self.max_inflight_msgs, + max_committed_size_per_ready: self.max_io_batch_size, + read_only_option: raft::ReadOnlyOption::Safe, + ..Default::default() + } + } +} + +impl Default for RaftConfig { + fn default() -> Self { + RaftConfig { + tick_interval_ms: 500, + max_inflight_requests: 102400, + election_tick: 3, + max_size_per_msg: 64 << 10, + max_io_batch_size: 64 << 10, + max_inflight_msgs: 10 * 1000, + engine_slow_io_threshold_ms: None, + enable_log_recycle: false, + testing_knobs: RaftTestingKnobs::default(), + } + } +} + +impl RootConfig { + pub fn heartbeat_interval(&self) -> Duration { + Duration::from_secs(self.liveness_threshold_sec - self.heartbeat_timeout_sec) + } +} + +impl Default for RootConfig { + fn default() -> Self { + Self { + replicas_per_group: REPLICA_PER_GROUP, + enable_group_balance: true, + enable_replica_balance: true, + enable_shard_balance: true, + enable_leader_balance: true, + liveness_threshold_sec: 30, + heartbeat_timeout_sec: 4, + schedule_interval_sec: 3, + max_create_group_retry_before_rollback: 10, + } + } +} + fn adaptive_block_cache_size() -> usize { if cfg!(test) { return 32 << 20; diff --git a/src/server/src/engine/group.rs b/src/server/src/engine/group.rs index b2043b52..f6b8a696 100644 --- a/src/server/src/engine/group.rs +++ b/src/server/src/engine/group.rs @@ -23,24 +23,15 @@ use std::{ use engula_api::{server::v1::*, shard}; use prost::Message; -use serde::{Deserialize, Serialize}; use tracing::{info, warn}; use super::RawDb; use crate::{ constants::{INITIAL_EPOCH, LOCAL_COLLECTION_ID}, serverpb::v1::*, - Error, Result, + EngineConfig, Error, Result, }; -#[derive(Clone, Default, Debug, Serialize, Deserialize)] -pub struct EngineConfig { - /// Log slow io requests if it exceeds the specified threshold. - /// - /// Default: disabled - pub engine_slow_io_threshold_ms: Option, -} - #[derive(Default)] pub struct WriteStates { pub apply_state: Option, diff --git a/src/server/src/engine/mod.rs b/src/server/src/engine/mod.rs index a79bfc2d..05dd4777 100644 --- a/src/server/src/engine/mod.rs +++ b/src/server/src/engine/mod.rs @@ -23,7 +23,7 @@ use std::{ use tracing::info; pub(crate) use self::{ - group::{EngineConfig, GroupEngine, RawIterator, SnapshotMode, WriteBatch, WriteStates}, + group::{GroupEngine, RawIterator, SnapshotMode, WriteBatch, WriteStates}, state::StateEngine, }; use crate::{DbConfig, Result}; diff --git a/src/server/src/lib.rs b/src/server/src/lib.rs index e3fa26eb..d70934cf 100644 --- a/src/server/src/lib.rs +++ b/src/server/src/lib.rs @@ -40,10 +40,7 @@ pub use crate::{ bootstrap::run, config::*, error::{Error, Result}, - node::NodeConfig, - raftgroup::RaftConfig, - root::{diagnosis, RootConfig}, - runtime::ExecutorConfig, + root::diagnosis, service::Server, }; diff --git a/src/server/src/node/mod.rs b/src/server/src/node/mod.rs index f7787dc7..1f476ace 100644 --- a/src/server/src/node/mod.rs +++ b/src/server/src/node/mod.rs @@ -26,13 +26,11 @@ use std::{ use engula_api::server::v1::*; use futures::{channel::mpsc, lock::Mutex}; -use serde::{Deserialize, Serialize}; use tracing::{debug, info, warn}; use self::{ job::StateChannel, migrate::{MigrateController, ShardChunkStream}, - replica::ReplicaConfig, }; pub use self::{ replica::Replica, @@ -40,35 +38,16 @@ pub use self::{ }; use crate::{ constants::ROOT_GROUP_ID, - engine::{EngineConfig, Engines, GroupEngine, RawDb, StateEngine}, + engine::{Engines, GroupEngine, RawDb, StateEngine}, node::replica::{fsm::GroupStateMachine, ExecCtx, LeaseState, LeaseStateObserver, ReplicaInfo}, raftgroup::{snap::RecycleSnapMode, ChannelManager, RaftManager, RaftNodeFacade, SnapManager}, runtime::sync::WaitGroup, schedule::MoveReplicasProvider, serverpb::v1::*, transport::TransportManager, - Config, Error, Result, + Config, EngineConfig, Error, NodeConfig, Result, }; -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct NodeConfig { - /// The limit bytes of each shard chunk during migration. - /// - /// Default: 64KB. - pub shard_chunk_size: usize, - - /// The limit number of keys for gc shard after migration. - /// - /// Default: 256. - pub shard_gc_keys: usize, - - #[serde(default)] - pub replica: ReplicaConfig, - - #[serde(default)] - pub engine: EngineConfig, -} - struct ReplicaContext { #[allow(dead_code)] info: Arc, @@ -689,17 +668,6 @@ impl NodeState { } } -impl Default for NodeConfig { - fn default() -> Self { - NodeConfig { - shard_chunk_size: 64 * 1024 * 1024, - shard_gc_keys: 256, - replica: ReplicaConfig::default(), - engine: EngineConfig::default(), - } - } -} - async fn open_group_engine( cfg: &EngineConfig, raw_db: Arc, diff --git a/src/server/src/node/replica/fsm/checkpoint.rs b/src/server/src/node/replica/fsm/checkpoint.rs index 7099d7cd..08469db6 100644 --- a/src/server/src/node/replica/fsm/checkpoint.rs +++ b/src/server/src/node/replica/fsm/checkpoint.rs @@ -18,10 +18,9 @@ use tracing::{debug, error, info}; use crate::{ engine::{GroupEngine, RawIterator}, - node::replica::ReplicaConfig, raftgroup::SnapshotBuilder, serverpb::v1::ApplyState, - Error, Result, + Error, ReplicaConfig, Result, }; pub struct GroupSnapshotBuilder { @@ -170,8 +169,9 @@ mod tests { use super::*; use crate::{ - engine::{EngineConfig, GroupEngine, WriteBatch, WriteStates}, + engine::{GroupEngine, WriteBatch, WriteStates}, runtime::ExecutorOwner, + EngineConfig, }; async fn create_engine(dir: &Path, group_id: u64, shard_id: u64) -> GroupEngine { diff --git a/src/server/src/node/replica/fsm/mod.rs b/src/server/src/node/replica/fsm/mod.rs index f8b7bf26..6f18d514 100644 --- a/src/server/src/node/replica/fsm/mod.rs +++ b/src/server/src/node/replica/fsm/mod.rs @@ -22,12 +22,12 @@ use engula_api::server::v1::{ }; use tracing::{info, trace, warn}; -use super::{ReplicaConfig, ReplicaInfo}; +use super::ReplicaInfo; use crate::{ engine::{GroupEngine, WriteBatch, WriteStates}, raftgroup::{ApplyEntry, SnapshotBuilder, StateMachine}, serverpb::v1::*, - Result, + ReplicaConfig, Result, }; const SHARD_UPDATE_DELTA: u64 = 1 << 32; diff --git a/src/server/src/node/replica/mod.rs b/src/server/src/node/replica/mod.rs index 1c6ac901..f7278d6f 100644 --- a/src/server/src/node/replica/mod.rs +++ b/src/server/src/node/replica/mod.rs @@ -27,7 +27,7 @@ use engula_api::{ server::v1::{group_request_union::Request, group_response_union::Response, *}, v1::{DeleteResponse, GetResponse, PutResponse}, }; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use tracing::info; pub use self::state::{LeaseState, LeaseStateObserver}; @@ -51,24 +51,6 @@ pub struct ReplicaPerfContext { pub propose: u64, } -#[derive(Clone, Debug, Default)] -pub struct ReplicaTestingKnobs { - pub disable_scheduler_orphan_replica_detecting_intervals: bool, - pub disable_scheduler_durable_task: bool, - pub disable_scheduler_remove_orphan_replica_task: bool, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct ReplicaConfig { - /// The limit size of each snapshot files. - /// - /// Default: 64MB. - pub snap_file_size: u64, - - #[serde(skip)] - pub testing_knobs: ReplicaTestingKnobs, -} - pub struct ReplicaInfo { pub replica_id: u64, pub group_id: u64, @@ -518,15 +500,6 @@ impl ExecCtx { } } -impl Default for ReplicaConfig { - fn default() -> Self { - ReplicaConfig { - snap_file_size: 64 * 1024 * 1024 * 1024, - testing_knobs: ReplicaTestingKnobs::default(), - } - } -} - pub(self) fn is_change_meta_request(request: &Request) -> bool { match request { Request::ChangeReplicas(_) diff --git a/src/server/src/raftgroup/mod.rs b/src/server/src/raftgroup/mod.rs index 2c6c5aae..0450f451 100644 --- a/src/server/src/raftgroup/mod.rs +++ b/src/server/src/raftgroup/mod.rs @@ -28,7 +28,6 @@ use engula_api::server::v1::*; use raft::prelude::{ ConfChangeSingle, ConfChangeTransition, ConfChangeType, ConfChangeV2, ConfState, }; -use serde::{Deserialize, Serialize}; pub use self::{ facade::RaftNodeFacade, @@ -43,61 +42,9 @@ use self::{io::LogWriter, worker::RaftWorker}; use crate::{ raftgroup::io::start_purging_expired_files, runtime::{sync::WaitGroup, TaskPriority}, - Result, + RaftConfig, Result, }; -#[derive(Clone, Debug, Default)] -pub struct RaftTestingKnobs { - pub force_new_peer_receiving_snapshot: bool, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct RaftConfig { - /// The intervals of tick, in millis. - /// - /// Default: 500ms. - pub tick_interval_ms: u64, - - /// The size of inflights requests. - /// - /// Default: 102400 - pub max_inflight_requests: usize, - - /// Before a follower begin election, it must wait a randomly election ticks and does not - /// receives any messages from leader. - /// - /// Default: 3. - pub election_tick: usize, - - /// Limit the entries batched in an append message(in size). 0 means one entry per message. - /// - /// Default: 64KB - pub max_size_per_msg: u64, - - /// Limit the total bytes per io batch requests. - /// - /// Default: 64KB - pub max_io_batch_size: u64, - - /// Limit the number of inflights messages which send to one peer. - /// - /// Default: 10K - pub max_inflight_msgs: usize, - - /// Log slow io requests if it exceeds the specified threshold. - /// - /// Default: disabled - pub engine_slow_io_threshold_ms: Option, - - /// Enable recycle log files to reduce allocating overhead? - /// - /// Default: false - pub enable_log_recycle: bool, - - #[serde(skip)] - pub testing_knobs: RaftTestingKnobs, -} - /// `ReadPolicy` is used to control `RaftNodeFacade::read` behavior. #[derive(Debug, Clone, Copy)] pub enum ReadPolicy { @@ -174,22 +121,6 @@ impl RaftManager { } } -impl Default for RaftConfig { - fn default() -> Self { - RaftConfig { - tick_interval_ms: 500, - max_inflight_requests: 102400, - election_tick: 3, - max_size_per_msg: 64 << 10, - max_io_batch_size: 64 << 10, - max_inflight_msgs: 10 * 1000, - engine_slow_io_threshold_ms: None, - enable_log_recycle: false, - testing_knobs: RaftTestingKnobs::default(), - } - } -} - fn encode_to_conf_change(change_replicas: ChangeReplicas) -> ConfChangeV2 { use prost::Message; diff --git a/src/server/src/raftgroup/node.rs b/src/server/src/raftgroup/node.rs index 6acfe5b3..0aee0f40 100644 --- a/src/server/src/raftgroup/node.rs +++ b/src/server/src/raftgroup/node.rs @@ -98,20 +98,7 @@ where .await?; try_reset_storage_state(replica_id, &mgr.snap_mgr, &mgr.engine, &mut storage).await?; - let config = Config { - id: replica_id, - election_tick: cfg.election_tick, - heartbeat_tick: 1, - applied, - pre_vote: true, - batch_append: true, - check_quorum: true, - max_size_per_msg: cfg.max_size_per_msg, - max_inflight_msgs: cfg.max_inflight_msgs, - max_committed_size_per_ready: cfg.max_io_batch_size, - read_only_option: ReadOnlyOption::Safe, - ..Default::default() - }; + let config = cfg.to_raft_config(replica_id, applied); Ok(RaftNode { group_id, lease_read_requests: Vec::default(), diff --git a/src/server/src/root/allocator/mod.rs b/src/server/src/root/allocator/mod.rs index 7e512216..18d00574 100644 --- a/src/server/src/root/allocator/mod.rs +++ b/src/server/src/root/allocator/mod.rs @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use engula_api::server::v1::{GroupDesc, NodeDesc}; -use serde::{Deserialize, Serialize}; use self::{ policy_leader_cnt::LeaderCountPolicy, policy_replica_cnt::ReplicaCountPolicy, policy_shard_cnt::ShardCountPolicy, source::NodeFilter, }; use super::{metrics, OngoingStats, RootShared}; -use crate::{constants::REPLICA_PER_GROUP, Result}; +use crate::{constants::REPLICA_PER_GROUP, Result, RootConfig}; #[cfg(test)] mod sim_test; @@ -94,41 +93,6 @@ enum BalanceStatus { Underfull, } -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct RootConfig { - pub replicas_per_group: usize, - pub enable_group_balance: bool, - pub enable_replica_balance: bool, - pub enable_shard_balance: bool, - pub enable_leader_balance: bool, - pub liveness_threshold_sec: u64, - pub heartbeat_timeout_sec: u64, - pub schedule_interval_sec: u64, - pub max_create_group_retry_before_rollback: u64, -} - -impl Default for RootConfig { - fn default() -> Self { - Self { - replicas_per_group: REPLICA_PER_GROUP, - enable_group_balance: true, - enable_replica_balance: true, - enable_shard_balance: true, - enable_leader_balance: true, - liveness_threshold_sec: 30, - heartbeat_timeout_sec: 4, - schedule_interval_sec: 3, - max_create_group_retry_before_rollback: 10, - } - } -} - -impl RootConfig { - pub fn heartbeat_interval(&self) -> Duration { - Duration::from_secs(self.liveness_threshold_sec - self.heartbeat_timeout_sec) - } -} - #[derive(Clone)] pub struct Allocator { alloc_source: Arc, diff --git a/src/server/src/root/mod.rs b/src/server/src/root/mod.rs index 538b8ed9..8944f163 100644 --- a/src/server/src/root/mod.rs +++ b/src/server/src/root/mod.rs @@ -37,22 +37,21 @@ use tokio_util::time::delay_queue; use tracing::{error, info, trace, warn}; pub(crate) use self::schema::*; -pub use self::{ - allocator::RootConfig, - collector::RootCollector, - watch::{WatchHub, Watcher, WatcherInitializer}, -}; use self::{ allocator::SysAllocSource, bg_job::Jobs, diagnosis::Metadata, schedule::ReconcileScheduler, schema::ReplicaNodes, store::RootStore, }; +pub use self::{ + collector::RootCollector, + watch::{WatchHub, Watcher, WatcherInitializer}, +}; use crate::{ constants::{ROOT_GROUP_ID, SHARD_MAX, SHARD_MIN}, node::{Node, Replica, ReplicaRouteTable}, runtime::{self, TaskPriority}, serverpb::v1::{background_job::Job, reconcile_task, *}, transport::TransportManager, - Config, Error, Result, + Config, Error, Result, RootConfig, }; #[derive(Clone)] diff --git a/src/server/src/runtime/executor.rs b/src/server/src/runtime/executor.rs index 4f8f6199..b714e5ba 100644 --- a/src/server/src/runtime/executor.rs +++ b/src/server/src/runtime/executor.rs @@ -20,9 +20,9 @@ use std::{ }; use pin_project::pin_project; -use serde::{Deserialize, Serialize}; use super::metrics::*; +use crate::ExecutorConfig; #[derive(Debug)] pub enum TaskPriority { @@ -39,13 +39,6 @@ enum TaskState { Polled(Duration), } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct ExecutorConfig { - pub event_interval: Option, - pub global_event_interval: Option, - pub max_blocking_threads: Option, -} - /// A handle that awaits the result of a task. /// /// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer diff --git a/src/server/src/schedule/scheduler.rs b/src/server/src/schedule/scheduler.rs index ecd80f7c..bf36f9c1 100644 --- a/src/server/src/schedule/scheduler.rs +++ b/src/server/src/schedule/scheduler.rs @@ -29,10 +29,7 @@ use super::{ tasks::{GroupLockTable, GENERATED_TASK_ID}, ScheduleStateObserver, }; -use crate::{ - node::{replica::ReplicaConfig, Replica}, - transport::TransportManager, -}; +use crate::{node::Replica, transport::TransportManager, ReplicaConfig}; #[derive(Clone)] pub struct EventWaker { diff --git a/src/server/src/schedule/setup.rs b/src/server/src/schedule/setup.rs index 0639ae60..777aece1 100644 --- a/src/server/src/schedule/setup.rs +++ b/src/server/src/schedule/setup.rs @@ -19,7 +19,7 @@ use tracing::debug; use super::ScheduleStateObserver; use crate::{ - node::{replica::ReplicaConfig, Replica}, + node::Replica, runtime::{sync::WaitGroup, TaskPriority}, schedule::{ event_source::EventSource, @@ -28,6 +28,7 @@ use crate::{ task::Task, }, transport::TransportManager, + ReplicaConfig, }; pub(crate) fn setup_scheduler( diff --git a/src/server/tests/helper/context.rs b/src/server/tests/helper/context.rs index e64afb4b..10b426f1 100644 --- a/src/server/tests/helper/context.rs +++ b/src/server/tests/helper/context.rs @@ -14,10 +14,8 @@ use std::{collections::HashMap, thread, time::Duration}; use engula_server::{ - node::replica::{ReplicaConfig, ReplicaTestingKnobs}, - raftgroup::RaftTestingKnobs, - runtime::{ExecutorConfig, ExecutorOwner, ShutdownNotifier}, - Config, DbConfig, NodeConfig, RaftConfig, RootConfig, + runtime::{ExecutorOwner, ShutdownNotifier}, + Config, DbConfig, NodeConfig, RaftConfig, RootConfig, *, }; use tempdir::TempDir; use tracing::info;