Skip to content
This repository has been archived by the owner on Jan 11, 2024. It is now read-only.

Commit

Permalink
FM-36: Save state root history (#38)
Browse files Browse the repository at this point in the history
* FM-36: Save state root history

* FM-36: Use the state root history in queries.
  • Loading branch information
aakoshh authored Feb 20, 2023
1 parent 727638b commit 9aac09c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 17 deletions.
96 changes: 81 additions & 15 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use cid::Cid;
use fendermint_abci::Application;
use fendermint_storage::{Codec, Encode, KVRead, KVReadable, KVStore, KVWritable, KVWrite};
use fendermint_storage::{
Codec, Encode, KVCollection, KVRead, KVReadable, KVStore, KVWritable, KVWrite,
};
use fendermint_vm_interpreter::bytes::{
BytesMessageApplyRet, BytesMessageCheckRet, BytesMessageQuery, BytesMessageQueryRet,
};
Expand All @@ -25,6 +27,7 @@ use fvm_shared::version::NetworkVersion;
use serde::{Deserialize, Serialize};
use tendermint::abci::request::CheckTxKind;
use tendermint::abci::{request, response, Code, Event, EventAttribute};
use tendermint::block::Height;

const VERSION: &str = env!("CARGO_PKG_VERSION");

Expand All @@ -36,6 +39,9 @@ macro_rules! must_encode {
};
}

// Different type from `ChainEpoch` just because we might use epoch in a more traditional sense for checkpointing.
pub type BlockHeight = u64;

#[derive(Serialize)]
#[repr(u8)]
pub enum AppStoreKey {
Expand All @@ -55,8 +61,12 @@ enum AppError {

#[derive(Serialize, Deserialize)]
pub struct AppState {
block_height: u64,
state_root: Cid, // TODO: Use TCid
/// Last committed block height.
block_height: BlockHeight,
/// Last committed state hash.
state_root: Cid,
/// Oldest state hash height.
oldest_state_height: BlockHeight,
network_version: NetworkVersion,
base_fee: TokenAmount,
circ_supply: TokenAmount,
Expand All @@ -72,33 +82,51 @@ where
db: Arc<DB>,
/// Namespace to store app state.
namespace: S::Namespace,
/// Collection of past state hashes.
///
/// We store the state hash for the height of the block where it was committed,
/// which is different from how Tendermint Core will refer to it in queries,
/// shifte by one, because Tendermint Core will use the height where the hash
/// *appeared*, which is in the block *after* the one which was committed.
state_hist: KVCollection<S, BlockHeight, Cid>,
/// Interpreter for block lifecycle events.
interpreter: Arc<I>,
/// State accumulating changes during block execution.
exec_state: Arc<Mutex<Option<FvmState<DB>>>>,
/// Projected partial state accumulating during transaction checks.
check_state: Arc<tokio::sync::Mutex<Option<FvmCheckState<DB>>>>,
/// How much history to keep.
///
/// Zero means unlimited.
state_hist_size: u64,
}

impl<DB, S, I> App<DB, S, I>
where
S: KVStore + Codec<AppState> + Encode<AppStoreKey>,
S: KVStore + Codec<AppState> + Encode<AppStoreKey> + Encode<BlockHeight> + Codec<Cid>,
DB: Blockstore + KVWritable<S> + KVReadable<S> + Clone + 'static,
{
pub fn new(db: DB, namespace: S::Namespace, interpreter: I) -> Self {
pub fn new(
db: DB,
app_namespace: S::Namespace,
hist_namespace: S::Namespace,
interpreter: I,
) -> Self {
Self {
db: Arc::new(db),
namespace,
namespace: app_namespace,
state_hist: KVCollection::new(hist_namespace),
interpreter: Arc::new(interpreter),
exec_state: Arc::new(Mutex::new(None)),
check_state: Arc::new(tokio::sync::Mutex::new(None)),
state_hist_size: 24 * 60 * 60,
}
}
}

impl<DB, S, I> App<DB, S, I>
where
S: KVStore + Codec<AppState> + Encode<AppStoreKey>,
S: KVStore + Codec<AppState> + Encode<AppStoreKey> + Encode<BlockHeight> + Codec<Cid>,
DB: Blockstore + KVWritable<S> + KVReadable<S> + 'static + Clone,
{
/// Get an owned clone of the database.
Expand All @@ -114,9 +142,27 @@ where
}

/// Set the last committed state.
fn set_committed_state(&self, state: AppState) {
fn set_committed_state(&self, mut state: AppState) {
self.db
.with_write(|tx| tx.put(&self.namespace, &AppStoreKey::State, &state))
.with_write(|tx| {
// Insert latest state history point.
self.state_hist
.put(tx, &state.block_height, &state.state_root)?;

// Prune state history.
if self.state_hist_size > 0 && state.block_height >= self.state_hist_size {
let prune_height = state.block_height.saturating_sub(self.state_hist_size);
while state.oldest_state_height <= prune_height {
self.state_hist.delete(tx, &state.oldest_state_height)?;
state.oldest_state_height += 1;
}
}

// Update the application state.
tx.put(&self.namespace, &AppStoreKey::State, &state)?;

Ok(())
})
.expect("commit failed");
}

Expand Down Expand Up @@ -144,6 +190,28 @@ where
self.put_exec_state(state);
Ok(ret)
}

/// Look up a past state hash at a particular height Tendermint Core is looking for,
/// which will be +1 shifted from what we saved. If the height is zero, it means it
/// wants the latest height.
///
/// Returns the CID and the height of the block which committed it.
fn state_root_at_height(&self, height: Height) -> (Cid, BlockHeight) {
if height.value() > 0 {
let h = height.value() - 1;
let tx = self.db.read();
let sh = self
.state_hist
.get(&tx, &h)
.expect("error looking up history");

if let Some(cid) = sh {
return (cid, h);
}
}
let state = self.committed_state();
(state.state_root, state.block_height)
}
}

// NOTE: The `Application` interface doesn't allow failures at the moment. The protobuf
Expand All @@ -154,7 +222,7 @@ where
#[async_trait]
impl<DB, S, I> Application for App<DB, S, I>
where
S: KVStore + Codec<AppState> + Encode<AppStoreKey>,
S: KVStore + Codec<AppState> + Encode<AppStoreKey> + Encode<BlockHeight> + Codec<Cid>,
S::Namespace: Sync + Send,
DB: Blockstore + KVWritable<S> + KVReadable<S> + Clone + Send + Sync + 'static,
I: Interpreter<
Expand Down Expand Up @@ -202,10 +270,8 @@ where
/// Query the application for data at the current or past height.
async fn query(&self, request: request::Query) -> response::Query {
let db = self.clone_db();
// TODO: Store the state for each height, or the last N heights, then use `request.height`.
let state = self.committed_state();
let block_height = state.block_height;
let state = FvmQueryState::new(db, state.state_root).expect("error creating query state");
let (state_root, block_height) = self.state_root_at_height(request.height);
let state = FvmQueryState::new(db, state_root).expect("error creating query state");
let qry = (request.path, request.data.to_vec());

let (_, result) = self
Expand Down Expand Up @@ -463,7 +529,7 @@ fn to_events(kind: &str, stamped_events: Vec<StampedEvent>) -> Vec<Event> {
}

/// Map to query results.
fn to_query(ret: FvmQueryRet, block_height: u64) -> response::Query {
fn to_query(ret: FvmQueryRet, block_height: BlockHeight) -> response::Query {
let exit_code = match ret {
FvmQueryRet::Ipld(None) | FvmQueryRet::ActorState(None) => ExitCode::USR_NOT_FOUND,
FvmQueryRet::Ipld(_) | FvmQueryRet::ActorState(_) => ExitCode::OK,
Expand Down
5 changes: 3 additions & 2 deletions fendermint/app/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ async fn main() {
let interpreter = BytesMessageInterpreter::new(interpreter);

let db = open_db();
let ns = db.new_cf_handle("app").unwrap();
let app = app::App::<_, AppStore, _>::new(db, ns, interpreter);
let app_ns = db.new_cf_handle("app").unwrap();
let state_hist_ns = db.new_cf_handle("state_hist").unwrap();
let app = app::App::<_, AppStore, _>::new(db, app_ns, state_hist_ns, interpreter);
let _service = ApplicationService(app);
}

Expand Down

0 comments on commit 9aac09c

Please sign in to comment.