Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure EVM database integrity and consistency with DVM database #2783

Merged
merged 18 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions lib/ain-evm/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
filters::FilterService,
log::{LogService, Notification},
receipt::ReceiptService,
storage::{traits::BlockStorage, Storage},
storage::{
traits::{BlockStorage, FlushableStorage},
Storage,
},
transaction::{cache::TransactionCache, SignedTx},
trie::GENESIS_STATE_ROOT,
Result,
Expand Down Expand Up @@ -252,27 +255,24 @@ impl EVMServices {
/// across all usages. Note: To be replaced with a proper lock flow later.
///
pub unsafe fn commit_block(&self, template: &BlockTemplate) -> Result<()> {
{
let Some(BlockData { block, receipts }) = template.block_data.clone() else {
return Err(format_err!("no constructed EVM block exist in template id").into());
};

debug!(
"[finalize_block] Finalizing block number {:#x}, state_root {:#x}",
block.header.number, block.header.state_root
);
let Some(BlockData { block, receipts }) = template.block_data.clone() else {
return Err(format_err!("no constructed EVM block exist in template id").into());
};

self.block.connect_block(&block)?;
self.logs
.generate_logs_from_receipts(&receipts, block.header.number)?;
self.receipt.put_receipts(receipts)?;
self.channel
.sender
.send(Notification::Block(block.header.hash()))
.map_err(|e| format_err!(e.to_string()))?;
}
self.core.clear_account_nonce();
debug!(
"[finalize_block] Finalizing block number {:#x}, state_root {:#x}",
block.header.number, block.header.state_root
);

self.block.connect_block(&block)?;
self.logs
.generate_logs_from_receipts(&receipts, block.header.number)?;
self.receipt.put_receipts(receipts)?;
self.channel
.sender
.send(Notification::Block(block.header.hash()))
.map_err(|e| format_err!(e.to_string()))?;
self.core.clear_account_nonce();
Ok(())
}

Expand Down Expand Up @@ -464,6 +464,17 @@ impl EVMServices {
template.backend.increase_tx_count();
Ok(())
}

///
/// # Safety
///
/// Result cannot be used safety unless `cs_main` lock is taken on C++ side
/// across all usages. Note: To be replaced with a proper lock flow later.
///
pub unsafe fn flush_state_to_db(&self) -> Result<()> {
self.core.flush()?;
self.storage.flush()
}
}

// Block template methods
Expand Down
2 changes: 2 additions & 0 deletions lib/ain-evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub enum EVMError {
StorageError(String),
#[error("EVM: serde_json error")]
JsonError(#[from] serde_json::Error),
#[error("EVM: serde_json error")]
JsonRpcError(#[from] jsonrpsee_core::Error),
#[error("EVM: rocksdb error")]
RocksDBError(#[from] rocksdb::Error),
#[error("EVM: ethabi error")]
Expand Down
21 changes: 11 additions & 10 deletions lib/ain-evm/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
thread::{self, JoinHandle},
};

use anyhow::Result;
use anyhow::{format_err, Result};
use jsonrpsee_server::ServerHandle;
use parking_lot::Mutex;
use tokio::{
Expand Down Expand Up @@ -74,33 +74,34 @@ impl Services {
{
let json_rpc_handles = self.json_rpc_handles.lock();
for server in &*json_rpc_handles {
server.stop().unwrap();
server.stop()?;
}
}

{
let websocket_handles = self.websocket_handles.lock();
for server in &*websocket_handles {
server.stop().unwrap();
server.stop()?;
}
}

// TODO: Propogate error
Ok(())
}

pub fn stop(&self) {
pub fn stop(&self) -> Result<()> {
let _ = self.tokio_runtime_channel_tx.blocking_send(());

self.tokio_worker
.lock()
.take()
.expect("runtime terminated?")
.ok_or(format_err!(
"failed to stop tokio runtime, early termination"
))?
.join()
.unwrap();
.map_err(|_| format_err!("failed to stop tokio runtime"))?;

// Persist EVM State to disk
self.evm.core.flush().expect("Could not flush evm state");
self.evm.storage.flush().expect("Could not flush storage");
self.evm.core.flush()?;
self.evm.storage.flush()?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions lib/ain-evm/src/storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl BlockStorage for BlockStore {
if let Some(block) = block {
let latest_block_cf = self.column::<columns::LatestBlockNumber>();
let block_number = block.header.number;
latest_block_cf.put(&"latest_block", &block_number)?;
latest_block_cf.put(&"", &block_number)?;
}
Ok(())
}
Expand Down Expand Up @@ -228,7 +228,7 @@ impl Rollback for BlockStore {

if let Some(block) = self.get_block_by_hash(&block.header.parent_hash)? {
let latest_block_cf = self.column::<columns::LatestBlockNumber>();
latest_block_cf.put(&"latest_block", &block.header.number)?;
latest_block_cf.put(&"", &block.header.number)?;
}

let logs_cf = self.column::<columns::AddressLogsMap>();
Expand Down
5 changes: 3 additions & 2 deletions lib/ain-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,12 @@ fn is_services_init_called() -> bool {
IS_SERVICES_INIT_CALL.load(Ordering::SeqCst)
}

pub fn stop_services() {
pub fn stop_services() -> Result<()> {
if is_services_init_called() {
info!("Shutdown rs services");
SERVICES.stop();
SERVICES.stop()?;
}
Ok(())
}

pub fn wipe_evm_folder() -> Result<()> {
Expand Down
71 changes: 36 additions & 35 deletions lib/ain-rs-exports/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,57 @@
use crate::{ffi::CrossBoundaryResult, prelude::*};
use crate::{ffi, prelude::*};
use ain_macros::ffi_fallible;
use anyhow::Result;

pub fn ain_rs_preinit(result: &mut CrossBoundaryResult) {
#[ffi_fallible]
pub fn ain_rs_preinit() -> Result<()> {
ain_grpc::preinit();
cross_boundary_success(result);
Ok(())
}

pub fn ain_rs_init_logging(result: &mut CrossBoundaryResult) {
#[ffi_fallible]
pub fn ain_rs_init_logging() -> Result<()> {
ain_grpc::init_logging();
cross_boundary_success(result);
Ok(())
}

pub fn ain_rs_init_core_services(result: &mut CrossBoundaryResult) {
#[ffi_fallible]
pub fn ain_rs_init_core_services() -> Result<()> {
ain_grpc::init_services();
result.ok = true;
Ok(())
}

pub fn ain_rs_stop_core_services(result: &mut CrossBoundaryResult) {
ain_grpc::stop_services();
cross_boundary_success(result);
#[ffi_fallible]
pub fn ain_rs_stop_core_services() -> Result<()> {
ain_grpc::stop_services()?;
Ok(())
}

pub fn ain_rs_init_network_json_rpc_service(result: &mut CrossBoundaryResult, addr: String) {
match ain_grpc::init_network_json_rpc_service(addr) {
Ok(()) => cross_boundary_success(result),
Err(e) => cross_boundary_error_return(result, e.to_string()),
}
#[ffi_fallible]
pub fn ain_rs_init_network_json_rpc_service(addr: String) -> Result<()> {
ain_grpc::init_network_json_rpc_service(addr)?;
Ok(())
}

pub fn ain_rs_init_network_grpc_service(result: &mut CrossBoundaryResult, addr: String) {
match ain_grpc::init_network_grpc_service(addr) {
Ok(()) => cross_boundary_success(result),
Err(e) => cross_boundary_error_return(result, e.to_string()),
}
#[ffi_fallible]
pub fn ain_rs_init_network_grpc_service(addr: String) -> Result<()> {
ain_grpc::init_network_grpc_service(addr)?;
Ok(())
}

pub fn ain_rs_init_network_subscriptions_service(result: &mut CrossBoundaryResult, addr: String) {
match ain_grpc::init_network_subscriptions_service(addr) {
Ok(()) => cross_boundary_success(result),
Err(e) => cross_boundary_error_return(result, e.to_string()),
}
#[ffi_fallible]
pub fn ain_rs_init_network_subscriptions_service(addr: String) -> Result<()> {
ain_grpc::init_network_subscriptions_service(addr)?;
Ok(())
}

pub fn ain_rs_stop_network_services(result: &mut CrossBoundaryResult) {
match ain_grpc::stop_network_services() {
Ok(()) => cross_boundary_success(result),
Err(e) => cross_boundary_error_return(result, e.to_string()),
}
#[ffi_fallible]
pub fn ain_rs_stop_network_services() -> Result<()> {
ain_grpc::stop_network_services()?;
Ok(())
}

pub fn ain_rs_wipe_evm_folder(result: &mut CrossBoundaryResult) {
match ain_grpc::wipe_evm_folder() {
Ok(()) => cross_boundary_success(result),
Err(e) => cross_boundary_error_return(result, e.to_string()),
}
#[ffi_fallible]
pub fn ain_rs_wipe_evm_folder() -> Result<()> {
ain_grpc::wipe_evm_folder()?;
Ok(())
}
29 changes: 29 additions & 0 deletions lib/ain-rs-exports/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,15 @@ fn evm_try_get_block_number_by_hash(hash: XHash) -> Result<u64> {
Ok(block_number)
}

/// Return the block header for a given blockhash.
///
/// # Arguments
///
/// * `hash` - The hash of the block we want to get the block header.
///
/// # Returns
///
/// Returns the block header associated with the given blockhash.
#[ffi_fallible]
fn evm_try_get_block_header_by_hash(hash: XHash) -> Result<ffi::EVMBlockHeader> {
let hash = H256::from(hash);
Expand Down Expand Up @@ -637,6 +646,21 @@ fn evm_try_get_block_header_by_hash(hash: XHash) -> Result<ffi::EVMBlockHeader>
Ok(out)
}

/// Return the latest block header from storage.
///
/// # Returns
///
/// Returns the latest block header.
#[ffi_fallible]
fn evm_try_get_latest_block_hash() -> Result<[u8; 32]> {
let block = SERVICES
.evm
.storage
.get_latest_block()?
.ok_or(format_err!("latest EVM block not found"))?;
Ok(block.header.hash().to_fixed_bytes())
}

#[ffi_fallible]
fn evm_try_get_tx_by_hash(tx_hash: XHash) -> Result<ffi::EVMTransaction> {
let tx_hash = H256::from(tx_hash);
Expand Down Expand Up @@ -834,3 +858,8 @@ fn evm_try_dispatch_pending_transactions_event(raw_tx: &str) -> Result<()> {
.send(Notification::Transaction(signed_tx.hash()))
.map_err(|e| format_err!(e.to_string()))?)
}

#[ffi_fallible]
fn evm_try_flush_db() -> Result<()> {
unsafe { SERVICES.evm.flush_state_to_db() }
}
4 changes: 4 additions & 0 deletions lib/ain-rs-exports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ pub mod ffi {
hash: [u8; 32],
) -> EVMBlockHeader;

fn evm_try_get_latest_block_hash(result: &mut CrossBoundaryResult) -> [u8; 32];

fn evm_try_get_tx_by_hash(
result: &mut CrossBoundaryResult,
tx_hash: [u8; 32],
Expand Down Expand Up @@ -339,6 +341,8 @@ pub mod ffi {
result: &mut CrossBoundaryResult,
raw_tx: &str,
);

fn evm_try_flush_db(result: &mut CrossBoundaryResult);
}

// ========= Debug ==========
Expand Down
2 changes: 2 additions & 0 deletions src/dfi/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ class DeFiErrors {
static Res InvalidBlockNumberString(const std::string &number) {
return Res::Err("Invalid block number: %s", number);
}

static Res InvalidBlockHashString(const std::string &hash) { return Res::Err("Invalid block hash: %s", hash); }
};

#endif // DEFI_DFI_ERRORS_H
3 changes: 3 additions & 0 deletions src/dfi/rpc_evm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ UniValue vmmap(const JSONRPCRequest &request) {
throwInvalidParam(dvmBlockHash.msg);
}
CBlockIndex *pindex = LookupBlockIndex(uint256S(*dvmBlockHash.val));
if (!pindex) {
throwInvalidParam(DeFiErrors::InvalidBlockHashString(*dvmBlockHash.val).msg);
}
uint64_t blockNumber = pindex->GetBlockHeader().deprecatedHeight;
return ResVal<std::string>(std::to_string(blockNumber), Res::Ok());
};
Expand Down
31 changes: 31 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,37 @@ bool AppInitMain(InitInterfaces& interfaces)
break;
}

// State consistency check is skipped for regtest (EVM state can be initialized with state input)
if (Params().NetworkIDString() != CBaseChainParams::REGTEST) {
// Check that EVM db and DVM db states are consistent
auto res = XResultValueLogged(evm_try_get_latest_block_hash(result));
if (res) {
// After EVM activation
auto evmBlockHash = uint256::FromByteArray(*res).GetHex();
auto dvmBlockHash = pcustomcsview->GetVMDomainBlockEdge(VMDomainEdge::EVMToDVM, evmBlockHash);
if (!dvmBlockHash.val.has_value()) {
strLoadError = _("Unable to get DVM block hash from latest EVM block hash, inconsistent chainstate detected. "
"This may be due to corrupted block databases between DVM and EVM, and you will need to "
"rebuild the database using -reindex.").translated;
break;
}
CBlockIndex *pindex = LookupBlockIndex(uint256S(*dvmBlockHash.val));
sieniven marked this conversation as resolved.
Show resolved Hide resolved
if (!pindex) {
strLoadError = _("Unable to get DVM block index from block hash, possible corrupted block database detected. "
"You will need to rebuild the database using -reindex.").translated;
break;
}
auto dvmBlockHeight = pindex->nHeight;

if (dvmBlockHeight != ::ChainActive().Tip()->nHeight) {
strLoadError = _("Inconsistent chainstate detected between DVM block database and EVM block database. "
"This may be due to corrupted block databases between DVM and EVM, and you will need to "
"rebuild the database using -reindex.").translated;
break;
}
}
}

fLoaded = true;
LogPrintf(" block index %15dms\n", GetTimeMillis() - load_block_index_start_time);
} while(false);
Expand Down
7 changes: 7 additions & 0 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3583,6 +3583,13 @@ bool CChainState::FlushStateToDisk(const CChainParams &chainparams,
if (!CoinsTip().Flush() || !pcustomcsDB->Flush()) {
return AbortNode(state, "Failed to write to coin or masternode db to disk");
}
// Flush the EVM chainstate
if (IsEVMEnabled(pcustomcsview->GetAttributes())) {
auto res = XResultStatusLogged(evm_try_flush_db(result));
if (!res) {
return AbortNode(state, "Failed to write to EVM db to disk");
}
}
if (!compactBegin.empty() && !compactEnd.empty()) {
auto time = GetTimeMillis();
pcustomcsDB->Compact(compactBegin, compactEnd);
Expand Down
Loading