Skip to content

Commit

Permalink
Clean up and add links to issues
Browse files Browse the repository at this point in the history
  • Loading branch information
elmattic committed Sep 11, 2024
1 parent ac83758 commit c228195
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 189 deletions.
28 changes: 1 addition & 27 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::shim::{clock::ChainEpoch, state_tree::StateTree};
use crate::utils::db::BlockstoreExt as _;
use crate::utils::encoding::from_slice_with_fallback;
use anyhow::{anyhow, bail, Context, Error, Result};
use byteorder::{BigEndian, ByteOrder};
use cbor4ii::core::dec::Decode as _;
use cbor4ii::core::Value;
use cid::Cid;
Expand All @@ -53,7 +52,6 @@ use num::{BigInt, Zero as _};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::time::Duration;
use std::{ops::Add, sync::Arc};

const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
Expand Down Expand Up @@ -88,17 +86,6 @@ const EMPTY_ROOT: &str = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc00162
/// The address used in messages to actors that have since been deleted.
const REVERTED_ETH_ADDRESS: &str = "0xff0000000000000000000000ffffffffffffffff";

/// Corresponds to 3 epochs.
const EVENT_READ_TIMEOUT: Duration = Duration::from_secs(90);

pub fn eth_bloom_set(bytes: &mut EthBytes, data: &[u8]) {
let hash = keccak_hash::keccak(data);
for i in 0..3 {
let n = BigEndian::read_u16(&hash[i * 2..]) % BLOOM_SIZE as u16;
bytes.0[(BLOOM_SIZE / 8) - (n / 8) as usize - 1] |= 1 << (n % 8);
}
}

// TODO(forest): https://github.com/ChainSafe/forest/issues/4436
// use ethereum_types::U256 or use lotus_json::big_int
#[derive(
Expand Down Expand Up @@ -1142,20 +1129,7 @@ async fn new_eth_tx_receipt<DB: Blockstore>(
receipt.contract_address = Some(ret.eth_address.into());
}

if message_lookup.receipt.events_root().is_some() {
let logs = ctx
.eth_event_handler
.get_eth_logs_for_block_and_transaction(ctx, &tx.block_hash, &tx.hash)
.await?;
receipt.logs = logs;
}

for log in receipt.logs.iter() {
for topic in log.topics.iter() {
eth_bloom_set(&mut receipt.logs_bloom, topic.0.as_bytes());
}
eth_bloom_set(&mut receipt.logs_bloom, log.address.0.as_bytes());
}
// TODO(elmattic): https://github.com/ChainSafe/forest/issues/4759

Ok(receipt)
}
Expand Down
14 changes: 2 additions & 12 deletions src/rpc/methods/eth/filter/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,21 @@ impl Filter for EventFilter {
}
}

pub struct EventIndex {}

impl EventIndex {
pub fn is_height_past(&self, _height: ChainEpoch) -> anyhow::Result<bool> {
todo!()
}
}

/// The `EventFilterManager` structure maintains a set of filters, allowing new filters to be
/// installed or existing ones to be removed. It ensures that each filter is uniquely identifiable
/// by its ID and that a maximum number of results can be configured for each filter.
pub struct EventFilterManager {
filters: RwLock<HashMap<FilterID, Arc<EventFilter>>>,
max_filter_results: usize,

// TODO(elmattic): implement similar functionality
pub event_index: Option<Arc<EventIndex>>,
// TODO(elmattic): https://github.com/ChainSafe/forest/issues/4740
//pub event_index: Option<Arc<EventIndex>>,
}

impl EventFilterManager {
pub fn new(max_filter_results: usize) -> Arc<Self> {
Arc::new(Self {
filters: RwLock::new(HashMap::new()),
max_filter_results,
event_index: None,
})
}

Expand Down
150 changes: 0 additions & 150 deletions src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@ mod mempool;
mod store;
mod tipset;

use super::eth_filter_logs_from_events;
use super::BlockNumberOrHash;
use super::EthLog;
use super::Predefined;
use super::EVENT_READ_TIMEOUT;
use crate::rpc::eth::filter::event::*;
use crate::rpc::eth::filter::mempool::*;
use crate::rpc::eth::filter::tipset::*;
use crate::rpc::eth::types::*;
use crate::rpc::eth::CollectedEvent;
use crate::rpc::Ctx;
use crate::shim::address::Address;
use crate::shim::clock::ChainEpoch;
use crate::utils::misc::env::env_or_default;
Expand Down Expand Up @@ -111,151 +106,6 @@ impl EthEventHandler {
}
}

async fn eth_get_events_for_filter<DB: Blockstore>(
&self,
ctx: &Ctx<DB>,
spec: EthFilterSpec,
) -> anyhow::Result<Vec<CollectedEvent>> {
let event_filter_manager = self
.event_filter_manager
.as_ref()
.context("not supported")?
.clone();

let _event_index = event_filter_manager
.event_index
.as_ref()
.context("cannot use eth_get_logs if historical event index is disabled")?
.clone();

let pf = self.parse_eth_filter_spec(ctx, &spec)?;
// Should pf.tipset_cid be an Option?
let mut max_height = pf.max_height;
if max_height == -1 {
// heaviest tipset doesn't have events because its messages haven't been executed yet
max_height = ctx.chain_store().heaviest_tipset().epoch() - 1;
}

if max_height < 0 {
bail!("max_height requested is less than 0");
}

// we can't return events for the heaviest tipset as the transactions in that tipset will be executed
// in the next non-null tipset (because of Filecoin's "deferred execution" model)
if max_height > ctx.chain_store().heaviest_tipset().epoch() - 1 {
bail!("max_height requested is greater than the heaviest tipset");
}

self.wait_for_height_processed(ctx, max_height).await?;
// TODO: Ideally we should also check that events for the epoch at `pf.minheight` have been indexed
// However, it is currently tricky to check/guarantee this for two reasons:
// a) Event Index is not aware of null-blocks. This means that the Event Index wont be able to say whether the block at
// `pf.minheight` is a null block or whether it has no events
// b) There can be holes in the index where events at certain epoch simply haven't been indexed because of edge cases around
// node restarts while indexing. This needs a long term "auto-repair"/"automated-backfilling" implementation in the index
// So, for now, the best we can do is ensure that the event index has evenets for events at height >= `pf.maxHeight`

// Create a temporary filter
let filter = event_filter_manager
.install(pf)
.context("failed to install event filter")?;
// TODO
//let events = filter.take_collected_events();

if event_filter_manager.remove(filter.id()).is_none() {
bail!("failed to uninstall filter");
}

Ok(vec![])
}

async fn wait_for_height_processed<DB: Blockstore>(
&self,
ctx: &Ctx<DB>,
height: ChainEpoch,
) -> anyhow::Result<()> {
let event_filter_manager = self
.event_filter_manager
.as_ref()
.context("not supported")?
.clone();

let event_index = event_filter_manager
.event_index
.as_ref()
.context("cannot use eth_get_logs if historical event index is disabled")?
.clone();

if height > ctx.chain_store().heaviest_tipset().epoch() {
bail!("height is in the future");
}

let result: Result<anyhow::Result<()>, tokio::time::error::Elapsed> =
tokio::time::timeout(EVENT_READ_TIMEOUT, async {
// do nothing if the height we're interested in has already been indexed
if event_index
.is_height_past(height)
.context("failed to check if event index has events for given height")?
{
return Ok(());
}
// TODO(elmattic): subscribe for updates to the event index

// it could be that the event index was updated while the subscription was being
// processed -> check if index has what we need now
if event_index
.is_height_past(height)
.context("failed to check if event index has events for given height")?
{
return Ok(());
}

// wait for the update

Ok(())
})
.await;
match result {
Err(_) => bail!("timeouted"),
Ok(res) => Ok(res?),
}
}

fn parse_eth_filter_spec<DB: Blockstore>(
&self,
ctx: &Ctx<DB>,
filter_spec: &EthFilterSpec,
) -> anyhow::Result<ParsedFilter> {
EthFilterSpec::parse_eth_filter_spec(
filter_spec,
ctx.chain_store().heaviest_tipset().epoch(),
self.max_filter_height_range,
)
}

pub async fn get_eth_logs_for_block_and_transaction<DB: Blockstore>(
&self,
ctx: &Ctx<DB>,
block_hash: &EthHash,
tx_hash: &EthHash,
) -> anyhow::Result<Vec<EthLog>> {
let events = self
.eth_get_events_for_filter(
ctx,
EthFilterSpec {
block_hash: Some(block_hash.clone()),
..EthFilterSpec::default()
},
)
.await?;
let logs = eth_filter_logs_from_events(ctx, &events)?;
let filtered: Vec<EthLog> = logs
.into_iter()
.filter(|e| &e.transaction_hash == tx_hash)
.collect();
Ok(filtered)
}

fn install_filter(
&self,
filter_manager: &Option<Arc<dyn FilterManager>>,
Expand Down

0 comments on commit c228195

Please sign in to comment.