Skip to content
This repository has been archived by the owner on Nov 18, 2024. It is now read-only.

Commit

Permalink
server: move specific config to configs mod (#1123)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Nov 10, 2022
1 parent 622fb0b commit 3e5bbbf
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 229 deletions.
198 changes: 196 additions & 2 deletions src/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};

use rocksdb::DBCompressionType;
use serde::{Deserialize, Serialize};

use crate::{ExecutorConfig, NodeConfig, RaftConfig, RootConfig};
use crate::constants::REPLICA_PER_GROUP;

#[derive(Default, Clone, Debug, Deserialize, Serialize)]
pub struct Config {
Expand Down Expand Up @@ -50,6 +50,51 @@ pub struct Config {
pub db: DbConfig,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct NodeConfig {
/// The limit bytes of each shard chunk during migration.
///
/// Default: 64KB.
pub shard_chunk_size: usize,

/// The limit number of keys for gc shard after migration.
///
/// Default: 256.
pub shard_gc_keys: usize,

#[serde(default)]
pub replica: ReplicaConfig,

#[serde(default)]
pub engine: EngineConfig,
}

#[derive(Clone, Debug, Default)]
pub struct ReplicaTestingKnobs {
pub disable_scheduler_orphan_replica_detecting_intervals: bool,
pub disable_scheduler_durable_task: bool,
pub disable_scheduler_remove_orphan_replica_task: bool,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ReplicaConfig {
/// The limit size of each snapshot files.
///
/// Default: 64MB.
pub snap_file_size: u64,

#[serde(skip)]
pub testing_knobs: ReplicaTestingKnobs,
}

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct EngineConfig {
/// Log slow io requests if it exceeds the specified threshold.
///
/// Default: disabled
pub engine_slow_io_threshold_ms: Option<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DbConfig {
// io related configs
Expand Down Expand Up @@ -94,6 +139,98 @@ pub struct DbConfig {
pub rate_limiter_auto_tuned: bool,
}

#[derive(Clone, Debug, Default)]
pub struct RaftTestingKnobs {
pub force_new_peer_receiving_snapshot: bool,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RaftConfig {
/// The intervals of tick, in millis.
///
/// Default: 500ms.
pub tick_interval_ms: u64,

/// The size of inflights requests.
///
/// Default: 102400
pub max_inflight_requests: usize,

/// Before a follower begin election, it must wait a randomly election ticks and does not
/// receives any messages from leader.
///
/// Default: 3.
pub election_tick: usize,

/// Limit the entries batched in an append message(in size). 0 means one entry per message.
///
/// Default: 64KB
pub max_size_per_msg: u64,

/// Limit the total bytes per io batch requests.
///
/// Default: 64KB
pub max_io_batch_size: u64,

/// Limit the number of inflights messages which send to one peer.
///
/// Default: 10K
pub max_inflight_msgs: usize,

/// Log slow io requests if it exceeds the specified threshold.
///
/// Default: disabled
pub engine_slow_io_threshold_ms: Option<u64>,

/// Enable recycle log files to reduce allocating overhead?
///
/// Default: false
pub enable_log_recycle: bool,

#[serde(skip)]
pub testing_knobs: RaftTestingKnobs,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RootConfig {
pub replicas_per_group: usize,
pub enable_group_balance: bool,
pub enable_replica_balance: bool,
pub enable_shard_balance: bool,
pub enable_leader_balance: bool,
pub liveness_threshold_sec: u64,
pub heartbeat_timeout_sec: u64,
pub schedule_interval_sec: u64,
pub max_create_group_retry_before_rollback: u64,
}

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct ExecutorConfig {
pub event_interval: Option<u32>,
pub global_event_interval: Option<u32>,
pub max_blocking_threads: Option<usize>,
}

impl Default for NodeConfig {
fn default() -> Self {
NodeConfig {
shard_chunk_size: 64 * 1024 * 1024,
shard_gc_keys: 256,
replica: ReplicaConfig::default(),
engine: EngineConfig::default(),
}
}
}

impl Default for ReplicaConfig {
fn default() -> Self {
ReplicaConfig {
snap_file_size: 64 * 1024 * 1024 * 1024,
testing_knobs: ReplicaTestingKnobs::default(),
}
}
}

impl DbConfig {
pub fn to_options(&self) -> rocksdb::Options {
use rocksdb::{BlockBasedIndexType, BlockBasedOptions, Cache, Options};
Expand Down Expand Up @@ -202,6 +339,63 @@ impl Default for DbConfig {
}
}

impl RaftConfig {
pub(crate) fn to_raft_config(&self, replica_id: u64, applied: u64) -> raft::Config {
raft::Config {
id: replica_id,
election_tick: self.election_tick,
heartbeat_tick: 1,
applied,
pre_vote: true,
batch_append: true,
check_quorum: true,
max_size_per_msg: self.max_size_per_msg,
max_inflight_msgs: self.max_inflight_msgs,
max_committed_size_per_ready: self.max_io_batch_size,
read_only_option: raft::ReadOnlyOption::Safe,
..Default::default()
}
}
}

impl Default for RaftConfig {
fn default() -> Self {
RaftConfig {
tick_interval_ms: 500,
max_inflight_requests: 102400,
election_tick: 3,
max_size_per_msg: 64 << 10,
max_io_batch_size: 64 << 10,
max_inflight_msgs: 10 * 1000,
engine_slow_io_threshold_ms: None,
enable_log_recycle: false,
testing_knobs: RaftTestingKnobs::default(),
}
}
}

impl RootConfig {
pub fn heartbeat_interval(&self) -> Duration {
Duration::from_secs(self.liveness_threshold_sec - self.heartbeat_timeout_sec)
}
}

impl Default for RootConfig {
fn default() -> Self {
Self {
replicas_per_group: REPLICA_PER_GROUP,
enable_group_balance: true,
enable_replica_balance: true,
enable_shard_balance: true,
enable_leader_balance: true,
liveness_threshold_sec: 30,
heartbeat_timeout_sec: 4,
schedule_interval_sec: 3,
max_create_group_retry_before_rollback: 10,
}
}
}

fn adaptive_block_cache_size() -> usize {
if cfg!(test) {
return 32 << 20;
Expand Down
11 changes: 1 addition & 10 deletions src/server/src/engine/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,15 @@ use std::{

use engula_api::{server::v1::*, shard};
use prost::Message;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};

use super::RawDb;
use crate::{
constants::{INITIAL_EPOCH, LOCAL_COLLECTION_ID},
serverpb::v1::*,
Error, Result,
EngineConfig, Error, Result,
};

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct EngineConfig {
/// Log slow io requests if it exceeds the specified threshold.
///
/// Default: disabled
pub engine_slow_io_threshold_ms: Option<u64>,
}

#[derive(Default)]
pub struct WriteStates {
pub apply_state: Option<ApplyState>,
Expand Down
2 changes: 1 addition & 1 deletion src/server/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
use tracing::info;

pub(crate) use self::{
group::{EngineConfig, GroupEngine, RawIterator, SnapshotMode, WriteBatch, WriteStates},
group::{GroupEngine, RawIterator, SnapshotMode, WriteBatch, WriteStates},
state::StateEngine,
};
use crate::{DbConfig, Result};
Expand Down
5 changes: 1 addition & 4 deletions src/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ pub use crate::{
bootstrap::run,
config::*,
error::{Error, Result},
node::NodeConfig,
raftgroup::RaftConfig,
root::{diagnosis, RootConfig},
runtime::ExecutorConfig,
root::diagnosis,
service::Server,
};

Expand Down
36 changes: 2 additions & 34 deletions src/server/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,49 +26,28 @@ use std::{

use engula_api::server::v1::*;
use futures::{channel::mpsc, lock::Mutex};
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};

use self::{
job::StateChannel,
migrate::{MigrateController, ShardChunkStream},
replica::ReplicaConfig,
};
pub use self::{
replica::Replica,
route_table::{RaftRouteTable, ReplicaRouteTable},
};
use crate::{
constants::ROOT_GROUP_ID,
engine::{EngineConfig, Engines, GroupEngine, RawDb, StateEngine},
engine::{Engines, GroupEngine, RawDb, StateEngine},
node::replica::{fsm::GroupStateMachine, ExecCtx, LeaseState, LeaseStateObserver, ReplicaInfo},
raftgroup::{snap::RecycleSnapMode, ChannelManager, RaftManager, RaftNodeFacade, SnapManager},
runtime::sync::WaitGroup,
schedule::MoveReplicasProvider,
serverpb::v1::*,
transport::TransportManager,
Config, Error, Result,
Config, EngineConfig, Error, NodeConfig, Result,
};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct NodeConfig {
/// The limit bytes of each shard chunk during migration.
///
/// Default: 64KB.
pub shard_chunk_size: usize,

/// The limit number of keys for gc shard after migration.
///
/// Default: 256.
pub shard_gc_keys: usize,

#[serde(default)]
pub replica: ReplicaConfig,

#[serde(default)]
pub engine: EngineConfig,
}

struct ReplicaContext {
#[allow(dead_code)]
info: Arc<ReplicaInfo>,
Expand Down Expand Up @@ -689,17 +668,6 @@ impl NodeState {
}
}

impl Default for NodeConfig {
fn default() -> Self {
NodeConfig {
shard_chunk_size: 64 * 1024 * 1024,
shard_gc_keys: 256,
replica: ReplicaConfig::default(),
engine: EngineConfig::default(),
}
}
}

async fn open_group_engine(
cfg: &EngineConfig,
raw_db: Arc<RawDb>,
Expand Down
6 changes: 3 additions & 3 deletions src/server/src/node/replica/fsm/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ use tracing::{debug, error, info};

use crate::{
engine::{GroupEngine, RawIterator},
node::replica::ReplicaConfig,
raftgroup::SnapshotBuilder,
serverpb::v1::ApplyState,
Error, Result,
Error, ReplicaConfig, Result,
};

pub struct GroupSnapshotBuilder {
Expand Down Expand Up @@ -170,8 +169,9 @@ mod tests {

use super::*;
use crate::{
engine::{EngineConfig, GroupEngine, WriteBatch, WriteStates},
engine::{GroupEngine, WriteBatch, WriteStates},
runtime::ExecutorOwner,
EngineConfig,
};

async fn create_engine(dir: &Path, group_id: u64, shard_id: u64) -> GroupEngine {
Expand Down
4 changes: 2 additions & 2 deletions src/server/src/node/replica/fsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use engula_api::server::v1::{
};
use tracing::{info, trace, warn};

use super::{ReplicaConfig, ReplicaInfo};
use super::ReplicaInfo;
use crate::{
engine::{GroupEngine, WriteBatch, WriteStates},
raftgroup::{ApplyEntry, SnapshotBuilder, StateMachine},
serverpb::v1::*,
Result,
ReplicaConfig, Result,
};

const SHARD_UPDATE_DELTA: u64 = 1 << 32;
Expand Down
Loading

0 comments on commit 3e5bbbf

Please sign in to comment.