Skip to content

Commit

Permalink
Merge pull request #7276 from drmingdrmer/1-adm
Browse files Browse the repository at this point in the history
refactor(meta-service): add cluster status "last_seq"; add info level logging for critical events
  • Loading branch information
drmingdrmer authored Aug 24, 2022
2 parents 88171b9 + 1ab4f0b commit 42de73d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
10 changes: 7 additions & 3 deletions src/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,11 @@ impl StateMachine {
/// command safely in case of network failure etc.
#[tracing::instrument(level = "debug", skip(self, entry), fields(log_id=%entry.log_id))]
pub async fn apply(&self, entry: &Entry<LogEntry>) -> Result<AppliedState, MetaStorageError> {
debug!("apply: summary: {}", entry.summary());
debug!("apply: payload: {:?}", entry.payload);
info!(
"apply: summary: {}; payload: {:?}",
entry.summary(),
entry.payload
);

let log_id = &entry.log_id;

Expand All @@ -285,6 +288,7 @@ impl StateMachine {
}

let res = self.apply_cmd(&data.cmd, &txn_tree, kv_pairs.as_ref());
info!("apply_result: summary: {}; res: {:?}", entry.summary(), res);
let applied_state = res?;

if let Some(ref txid) = data.txid {
Expand Down Expand Up @@ -744,7 +748,7 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
kv_pairs: Option<&(DeleteByPrefixKeyMap, DeleteByPrefixKeyMap)>,
) -> Result<AppliedState, MetaStorageError> {
debug!("apply_cmd: {:?}", cmd);
info!("apply_cmd: {:?}", cmd);

match cmd {
Cmd::IncrSeq { ref key } => self.apply_incr_seq_cmd(key, txn_tree),
Expand Down
15 changes: 15 additions & 0 deletions src/meta/service/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ use common_base::base::tokio::task::JoinHandle;
use common_grpc::ConnectionFactory;
use common_grpc::DNSResolver;
use common_meta_raft_store::config::RaftConfig;
use common_meta_raft_store::sled_key_spaces::GenericKV;
use common_meta_raft_store::state_machine::StateMachine;
use common_meta_sled_store::openraft;
use common_meta_sled_store::openraft::DefensiveCheck;
use common_meta_sled_store::openraft::StoreExt;
use common_meta_sled_store::SledKeySpace;
use common_meta_types::protobuf::raft_service_client::RaftServiceClient;
use common_meta_types::protobuf::raft_service_server::RaftServiceServer;
use common_meta_types::protobuf::WatchRequest;
Expand Down Expand Up @@ -106,6 +108,11 @@ pub struct MetaNodeStatus {
pub voters: Vec<Node>,

pub non_voters: Vec<Node>,

/// The last `seq` used by GenericKV sub tree.
///
/// `seq` is a monotonically incremental integer for every value that is inserted or updated.
pub last_seq: u64,
}

// MetaRaft is a impl of the generic Raft handling meta data R/W.
Expand Down Expand Up @@ -755,6 +762,13 @@ impl MetaNode {
} else {
None
};

let last_seq = {
let sm = self.sto.state_machine.read().await;
let last_seq = sm.sequences().get(&GenericKV::NAME.to_string())?;
last_seq.unwrap_or_default().0
};

Ok(MetaNodeStatus {
id: self.sto.id,
endpoint: endpoint.to_string(),
Expand All @@ -770,6 +784,7 @@ impl MetaNode {
leader,
voters,
non_voters,
last_seq,
})
}

Expand Down
26 changes: 25 additions & 1 deletion src/meta/service/src/store/store_bare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {

#[tracing::instrument(level = "debug", skip(self, hs), fields(id=self.id))]
async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError> {
info!("save_hard_state: {:?}", hs);

match self
.raft_state
.write_hard_state(hs)
Expand Down Expand Up @@ -327,6 +329,8 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {

#[tracing::instrument(level = "debug", skip(self), fields(id=self.id))]
async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> {
info!("delete_conflict_logs_since: {}", log_id);

match self
.log
.range_remove(log_id.index..)
Expand All @@ -343,6 +347,8 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {

#[tracing::instrument(level = "debug", skip(self), fields(id=self.id))]
async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError> {
info!("purge_logs_upto: {}", log_id);

if let Err(err) = self
.log
.set_last_purged(log_id)
Expand All @@ -367,7 +373,10 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {

#[tracing::instrument(level = "debug", skip(self, entries), fields(id=self.id))]
async fn append_to_log(&self, entries: &[&Entry<LogEntry>]) -> Result<(), StorageError> {
// TODO(xp): replicated_to_log should not block. Do the actual work in another task.
for ent in entries {
info!("append_to_log: {}", ent.log_id);
}

let entries = entries.iter().map(|x| (*x).clone()).collect::<Vec<_>>();
match self
.log
Expand All @@ -388,6 +397,10 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {
&self,
entries: &[&Entry<LogEntry>],
) -> Result<Vec<AppliedState>, StorageError> {
for ent in entries {
info!("apply_to_state_machine: {}", ent.log_id);
}

let mut res = Vec::with_capacity(entries.len());

let sm = self.state_machine.write().await;
Expand Down Expand Up @@ -576,6 +589,11 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {
Some(x) => Some(x.1.log_id),
};

info!(
"get_log_state: ({:?},{:?}]",
last_purged_log_id, last_log_id
);

Ok(LogState {
last_purged_log_id,
last_log_id,
Expand Down Expand Up @@ -606,6 +624,12 @@ impl RaftStorage<LogEntry, AppliedState> for RaftStoreBare {
}
Ok(r) => r,
};

info!(
"last_applied_state: applied: {:?}, membership: {:?}",
last_applied, last_membership
);

Ok((last_applied, last_membership))
}
}
Expand Down

1 comment on commit 42de73d

@vercel
Copy link

@vercel vercel bot commented on 42de73d Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.