Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make eth1 caching work with fast synced node #709

Merged
merged 26 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f82e275
Add functions to get deposit_count and deposit_root from deposit cache
pawanjay176 Dec 10, 2019
e331816
Fetch deposit root and deposit count from cache
pawanjay176 Dec 10, 2019
9fbc01c
Fix bugs
pawanjay176 Dec 11, 2019
365d6a2
Add test
pawanjay176 Dec 11, 2019
e3d0325
Compare deposit_count between the caching and http eth1 blocks
pawanjay176 Dec 11, 2019
a8e99da
Revert "Compare deposit_count between the caching and http eth1 blocks"
pawanjay176 Dec 11, 2019
bba7a5d
Fetch deposit cache using binary search instead of linear search
pawanjay176 Dec 13, 2019
3f4f88c
BlockCache waits till DepositCache is in sync
pawanjay176 Dec 13, 2019
1ec606c
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 16, 2019
9f14df3
Truncate required_blocks in block_cache upto latest_processed_block i…
pawanjay176 Dec 16, 2019
69f49de
Clean up
pawanjay176 Dec 16, 2019
8d0b5db
Handled getting deposit count before deploying deposit contract
pawanjay176 Dec 16, 2019
ffda83f
More cleanup
pawanjay176 Dec 16, 2019
921dd4a
Remove calls to http get deposit/count
pawanjay176 Dec 16, 2019
cc246aa
Fix block cache tests
pawanjay176 Dec 17, 2019
754eaa2
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 17, 2019
7706f8f
Minor changes
pawanjay176 Dec 17, 2019
e709f7e
Fix bootnode ports
pawanjay176 Dec 17, 2019
6378db8
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 18, 2019
63c8616
Address some of Paul's comments
pawanjay176 Dec 18, 2019
0777c04
Optimize `get_deposit_root` by caching `DepositDataTree`
pawanjay176 Dec 18, 2019
39a4871
Fix comments and minor changes
pawanjay176 Dec 18, 2019
040a57d
Change eth1 default config parameters
pawanjay176 Dec 18, 2019
35ac316
Use `Vec` instead of `HashMap` to store `deposit_roots`
pawanjay176 Dec 18, 2019
5beb4c8
Minor renaming
pawanjay176 Dec 19, 2019
7132c3c
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 97 additions & 12 deletions beacon_node/eth1/src/deposit_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::DepositLog;
use eth2_hashing::hash;
use tree_hash::TreeHash;
use types::{Deposit, Hash256};
use types::{Deposit, Hash256, DEPOSIT_TREE_DEPTH};

#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub enum Error {
/// A deposit log was added when a prior deposit was not already in the cache.
///
Expand All @@ -23,6 +23,8 @@ pub enum Error {
///
/// E.g., you cannot request deposit 10 when the deposit count is 9.
DepositCountInvalid { deposit_count: u64, range_end: u64 },
/// Error with the merkle tree for deposits.
DepositTreeError(merkle_proof::MerkleTreeError),
/// An unexpected condition was encountered.
InternalError(String),
}
Expand Down Expand Up @@ -66,18 +68,56 @@ impl DepositDataTree {
proof.push(Hash256::from_slice(&self.length_bytes()));
(root, proof)
}

/// Add a deposit to the merkle tree.
pub fn push_leaf(&mut self, leaf: Hash256) -> Result<(), Error> {
self.tree
.push_leaf(leaf, self.depth)
.map_err(Error::DepositTreeError)?;
self.mix_in_length += 1;
Ok(())
}
}

/// Mirrors the merkle tree of deposits in the eth1 deposit contract.
///
/// Provides `Deposit` objects with merkle proofs included.
#[derive(Default)]
pub struct DepositCache {
logs: Vec<DepositLog>,
roots: Vec<Hash256>,
leaves: Vec<Hash256>,
deposit_contract_deploy_block: u64,
/// An incremental merkle tree which represents the current state of the
/// deposit contract tree.
deposit_tree: DepositDataTree,
/// Vector of deposit roots. `deposit_roots[i]` denotes `deposit_root` at
/// `deposit_index` `i`.
deposit_roots: Vec<Hash256>,
}

impl Default for DepositCache {
fn default() -> Self {
let deposit_tree = DepositDataTree::create(&[], 0, DEPOSIT_TREE_DEPTH);
let deposit_roots = vec![deposit_tree.root()];
DepositCache {
logs: Vec::new(),
leaves: Vec::new(),
deposit_contract_deploy_block: 1,
deposit_tree,
deposit_roots,
}
}
}

impl DepositCache {
/// Create new `DepositCache` given block number at which deposit
/// contract was deployed.
pub fn new(deposit_contract_deploy_block: u64) -> Self {
DepositCache {
deposit_contract_deploy_block,
..Self::default()
}
}

/// Returns the number of deposits available in the cache.
pub fn len(&self) -> usize {
self.logs.len()
Expand Down Expand Up @@ -114,10 +154,11 @@ impl DepositCache {
/// - If a log with `log.index` is already known, but the given `log` is distinct to it.
pub fn insert_log(&mut self, log: DepositLog) -> Result<(), Error> {
if log.index == self.logs.len() as u64 {
self.roots
.push(Hash256::from_slice(&log.deposit_data.tree_hash_root()));
let deposit = Hash256::from_slice(&log.deposit_data.tree_hash_root());
self.leaves.push(deposit);
self.logs.push(log);

self.deposit_tree.push_leaf(deposit)?;
self.deposit_roots.push(self.deposit_tree.root());
Ok(())
} else if log.index < self.logs.len() as u64 {
if self.logs[log.index as usize] == log {
Expand Down Expand Up @@ -163,18 +204,18 @@ impl DepositCache {
requested: end,
known_deposits: self.logs.len(),
})
} else if deposit_count > self.roots.len() as u64 {
} else if deposit_count > self.leaves.len() as u64 {
// There are not `deposit_count` known deposit roots, so we can't build the merkle tree
// to prove into.
Err(Error::InsufficientDeposits {
requested: deposit_count,
known_deposits: self.logs.len(),
})
} else {
let roots = self
.roots
let leaves = self
.leaves
.get(0..deposit_count as usize)
.ok_or_else(|| Error::InternalError("Unable to get known root".into()))?;
.ok_or_else(|| Error::InternalError("Unable to get known leaves".into()))?;

// Note: there is likely a more optimal solution than recreating the `DepositDataTree`
// each time this function is called.
Expand All @@ -183,7 +224,7 @@ impl DepositCache {
// last finalized eth1 deposit count. Then, that tree could be cloned and extended for
// each of these calls.

let tree = DepositDataTree::create(roots, deposit_count as usize, tree_depth);
let tree = DepositDataTree::create(leaves, deposit_count as usize, tree_depth);

let deposits = self
.logs
Expand All @@ -203,6 +244,50 @@ impl DepositCache {
Ok((tree.root(), deposits))
}
}

/// Gets the deposit count at block height = block_number.
///
/// Fetches the `DepositLog` that was emitted at or just before `block_number`
/// and returns the deposit count as `index + 1`.
///
/// Returns `None` if block number queried is 0 or less than deposit_contract_deployed block.
pub fn get_deposit_count_from_cache(&self, block_number: u64) -> Option<u64> {
// Contract cannot be deployed in 0'th block
if block_number == 0 {
return None;
}
if block_number < self.deposit_contract_deploy_block {
return None;
}
// Return 0 if block_num queried is before first deposit
if let Some(first_deposit) = self.logs.first() {
if first_deposit.block_number > block_number {
return Some(0);
}
}
let index = self
.logs
.binary_search_by(|deposit| deposit.block_number.cmp(&block_number));
match index {
Ok(index) => return self.logs.get(index).map(|x| x.index + 1),
Err(next) => {
return Some(
self.logs
.get(next.saturating_sub(1))
.map_or(0, |x| x.index + 1),
)
}
}
}

/// Gets the deposit root at block height = block_number.
///
/// Fetches the `deposit_count` on or just before the queried `block_number`
/// and queries the `deposit_roots` map to get the corresponding `deposit_root`.
pub fn get_deposit_root_from_cache(&self, block_number: u64) -> Option<Hash256> {
let index = self.get_deposit_count_from_cache(block_number)?;
Some(self.deposit_roots.get(index as usize)?.clone())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how you indexed by deposit count, this is better than my suggestion.

}
}

/// Returns `int` as little-endian bytes with a length of 32.
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/eth1/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ pub struct DepositUpdater {
pub last_processed_block: Option<u64>,
}

impl DepositUpdater {
pub fn new(deposit_contract_deploy_block: u64) -> Self {
let cache = DepositCache::new(deposit_contract_deploy_block);
DepositUpdater {
cache,
last_processed_block: None,
}
}
}

#[derive(Default)]
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
Expand Down
63 changes: 31 additions & 32 deletions beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use crate::metrics;
use crate::{
block_cache::{BlockCache, Error as BlockCacheError, Eth1Block},
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_deposit_count, get_deposit_logs_in_range, get_deposit_root,
},
http::{get_block, get_block_number, get_deposit_logs_in_range},
inner::{DepositUpdater, Inner},
DepositLog,
};
Expand All @@ -27,14 +25,10 @@ const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getBlockByNumber call.
const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract root.
const GET_DEPOSIT_ROOT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract deposit count.
const GET_DEPOSIT_COUNT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getLogs to read the deposit contract logs.
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;

#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub enum Error {
/// The remote node is less synced that we expect, it is not useful until has done more
/// syncing.
Expand Down Expand Up @@ -118,8 +112,8 @@ impl Default for Config {
Self {
endpoint: "http://localhost:8545".into(),
deposit_contract_address: "0x0000000000000000000000000000000000000000".into(),
deposit_contract_deploy_block: 0,
lowest_cached_block_number: 0,
deposit_contract_deploy_block: 1,
lowest_cached_block_number: 1,
follow_distance: 128,
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
Expand Down Expand Up @@ -147,6 +141,9 @@ impl Service {
pub fn new(config: Config, log: Logger) -> Self {
Self {
inner: Arc::new(Inner {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
config: RwLock::new(config),
..Inner::default()
}),
Expand Down Expand Up @@ -254,6 +251,7 @@ impl Service {
"Updated eth1 deposit cache";
"cached_deposits" => inner_1.deposit_cache.read().cache.len(),
"logs_imported" => logs_imported,
"last_processed_eth1_block" => inner_1.deposit_cache.read().last_processed_block,
),
Err(e) => error!(
log_a,
Expand Down Expand Up @@ -491,6 +489,7 @@ impl Service {
let cache_3 = self.inner.clone();
let cache_4 = self.inner.clone();
let cache_5 = self.inner.clone();
let cache_6 = self.inner.clone();

let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
Expand Down Expand Up @@ -527,7 +526,6 @@ impl Service {
let max_size = block_cache_truncation
.map(|n| n as u64)
.unwrap_or_else(u64::max_value);

if range_size > max_size {
// If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks.
Expand All @@ -543,14 +541,22 @@ impl Service {
})
// Download the range of blocks and sequentially import them into the cache.
.and_then(move |required_block_numbers| {
// Last processed block in deposit cache
let latest_in_cache = cache_6
.deposit_cache
.read()
.last_processed_block
.unwrap_or(0);

let required_block_numbers = required_block_numbers
.into_iter()
.take(max_blocks_per_update);

.filter(|x| *x <= latest_in_cache)
.take(max_blocks_per_update)
.collect::<Vec<_>>();
// Produce a stream from the list of required block numbers and return a future that
// consumes the it.
stream::unfold(
required_block_numbers,
required_block_numbers.into_iter(),
move |mut block_numbers| match block_numbers.next() {
Some(block_number) => Some(
download_eth1_block(cache_2.clone(), block_number)
Expand Down Expand Up @@ -639,31 +645,24 @@ fn download_eth1_block<'a>(
cache: Arc<Inner>,
block_number: u64,
) -> impl Future<Item = Eth1Block, Error = Error> + 'a {
let deposit_root = cache
.deposit_cache
.read()
.cache
.get_deposit_root_from_cache(block_number);
let deposit_count = cache
.deposit_cache
.read()
.cache
.get_deposit_count_from_cache(block_number);
// Performs a `get_blockByNumber` call to an eth1 node.
get_block(
&cache.config.read().endpoint,
block_number,
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
)
.map_err(Error::BlockDownloadFailed)
.join3(
// Perform 2x `eth_call` via an eth1 node to read the deposit contract root and count.
get_deposit_root(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_ROOT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositRootFailed),
get_deposit_count(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_COUNT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositCountFailed),
)
.map(|(http_block, deposit_root, deposit_count)| Eth1Block {
.map(move |http_block| Eth1Block {
hash: http_block.hash,
number: http_block.number,
timestamp: http_block.timestamp,
Expand Down
Loading