Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial p2p sync implementation #1674

Merged
merged 10 commits into from
Jan 25, 2024
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 23 additions & 5 deletions crates/common/src/header.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
BlockHash, BlockNumber, BlockTimestamp, ClassCommitment, EventCommitment, GasPrice,
SequencerAddress, StarknetVersion, StateCommitment, StateUpdate, StorageCommitment,
TransactionCommitment,
};
use crate::prelude::*;
use crate::BlockCommitmentSignature;
use fake::Dummy;

#[derive(Debug, Clone, PartialEq, Eq, Default, Dummy)]
Expand All @@ -24,6 +21,23 @@ pub struct BlockHeader {
pub event_count: usize,
}

#[derive(Debug, Clone, PartialEq, Default)]
pub struct SignedBlockHeader {
pub header: BlockHeader,
pub signature: BlockCommitmentSignature,
}

impl SignedBlockHeader {
/// Returns true if the signature is correct for the block header.
///
/// Note that this does not imply that a given state diff is correct.
/// TODO: improve this documentation somehow.
pub fn verify_signature(&self) -> bool {
// TODO: implement this.
true
}
}

pub struct BlockHeaderBuilder(BlockHeader);

impl BlockHeader {
Expand All @@ -47,6 +61,10 @@ impl BlockHeader {
.with_block_hash(self.hash)
.with_state_commitment(self.state_commitment)
}

pub fn verify_hash(&self) -> bool {
todo!();
}
}

impl BlockHeaderBuilder {
Expand Down
6 changes: 5 additions & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod trie;
pub use signature::BlockCommitmentSignature;
pub use state_update::StateUpdate;

pub use header::{BlockHeader, BlockHeaderBuilder};
pub use header::{BlockHeader, BlockHeaderBuilder, SignedBlockHeader};

impl ContractAddress {
/// The contract at 0x1 is special. It was never deployed and therefore
Expand Down Expand Up @@ -254,6 +254,10 @@ impl BlockNumber {
Some(*self - 1)
}
}

pub fn is_zero(&self) -> bool {
self == &Self::GENESIS
}
}

impl std::ops::Add<u64> for BlockNumber {
Expand Down
1 change: 1 addition & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
async-stream = "0.3.5"
async-trait = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true, features = ["derive", "env", "wrap_help"] }
Expand Down
96 changes: 94 additions & 2 deletions crates/p2p/src/client/peer_agnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use p2p_proto::transaction::TransactionsRequest;
use pathfinder_common::{
event::Event,
transaction::{DeployAccountTransactionV0V1, DeployAccountTransactionV3, TransactionVariant},
BlockHash, BlockNumber, ContractAddress, TransactionHash,
BlockHash, BlockNumber, ContractAddress, SignedBlockHeader, TransactionHash,
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tokio::sync::RwLock;

use crate::client::peer_aware;
Expand All @@ -33,6 +33,19 @@ mod parse;

use parse::ParserState;

/// Data received from a specific peer.
#[derive(Debug)]
pub struct PeerData<T> {
pub peer: PeerId,
pub data: T,
}

impl<T> PeerData<T> {
pub fn new(peer: PeerId, data: T) -> Self {
Self { peer, data }
}
}

#[derive(Clone, Debug)]
pub struct Client {
inner: peer_aware::Client,
Expand Down Expand Up @@ -105,6 +118,85 @@ impl Client {
peers
}

pub fn header_stream(
self,
start: BlockNumber,
stop: BlockNumber,
reverse: bool,
) -> impl futures::Stream<Item = PeerData<SignedBlockHeader>> {
let (mut start, stop, direction) = match reverse {
true => (stop, start, Direction::Backward),
false => (start, stop, Direction::Forward),
};

async_stream::stream! {
// Loop which refreshes peer set once we exhaust it.
loop {
let peers = self
.get_update_peers_with_sync_capability(protocol::Headers::NAME)
.await;

// Attempt each peer.
'next_peer: for peer in peers {
CHr15F0x marked this conversation as resolved.
Show resolved Hide resolved
let limit = start.get().max(stop.get()) - start.get().min(stop.get());

let request = BlockHeadersRequest {
iteration: Iteration {
start: start.get().into(),
direction,
limit,
step: 1.into(),
},
};

let responses = match self.inner.send_headers_sync_request(peer, request).await
{
Ok(x) => x,
Err(error) => {
// Failed to establish connection, try next peer.
tracing::debug!(%peer, reason=%error, "Headers request failed");
continue 'next_peer;
}
};

let mut responses = responses
.flat_map(|response| futures::stream::iter(response.parts))
.chunks(2)
.scan((), |(), chunk| async { parse::handle_signed_header_chunk(chunk) })
.boxed();

while let Some(signed_header) = responses.next().await {
let signed_header = match signed_header {
Ok(signed_header) => signed_header,
Err(error) => {
tracing::debug!(%peer, %error, "Header stream failed");
continue 'next_peer;
}
};

// Small sanity check. We cannot reliably check the hash here,
// its easier for the caller to ensure it matches expectations.
if signed_header.header.number != start {
tracing::debug!(%peer, "Wrong block number");
continue 'next_peer;
}

start = match direction {
Direction::Forward => start + 1,
// unwrap_or_default is safe as this is the genesis edge case,
// at which point the loop will complete at the end of this iteration.
Direction::Backward => start.parent().unwrap_or_default(),
};

yield PeerData::new(peer, signed_header);
}

// TODO: track how much and how fast this peer responded with i.e. don't let them drip feed us etc.
}
}
}
}

pub async fn block_headers(
&self,
start_block: BlockNumber,
Expand Down
74 changes: 74 additions & 0 deletions crates/p2p/src/client/peer_agnostic/parse.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
use anyhow::Context;
use p2p_proto::{block::BlockHeadersResponsePart, common::ConsensusSignature};
use pathfinder_common::{
BlockCommitmentSignature, BlockCommitmentSignatureElem, SignedBlockHeader,
};

use crate::client::types::TryFromDto;

pub(crate) trait ParserState {
type Dto;
type Inner;
Expand Down Expand Up @@ -713,3 +721,69 @@ pub(crate) mod events {
impl_take_parsed_and_should_stop!(events);
}
}

/// Parses (header, signature) pairs. Expects chunks(2) as input,
/// will panic if given a chunk with length not one or two.
///
/// Errors if the chunk is not a (header, signature) pair or a
/// successful [BlockHeadersResponsePart::Fin].
pub(crate) fn handle_signed_header_chunk(
chunk: Vec<BlockHeadersResponsePart>,
) -> Option<anyhow::Result<SignedBlockHeader>> {
use anyhow::anyhow;

if let [single] = chunk.as_slice() {
match single {
Header(_) => {
return Some(Err(anyhow!("Stream finalized with header")));
}
Signatures(_) => {
return Some(Err(anyhow!("Stream finalized with signature")));
}
Fin(p2p_proto::common::Fin { error: Some(error) }) => {
CHr15F0x marked this conversation as resolved.
Show resolved Hide resolved
return Some(Err(anyhow!("Stream finalized with error: {error:?}")));
}
Fin(_) => return None,
}
}

let [a, b] = chunk.as_slice() else {
panic!("Expected exactly two items in the chunk");
};

use BlockHeadersResponsePart::*;
let result = match (a, b) {
(Fin(_), _) | (_, Fin(_)) => Err(anyhow!("Received unexpected Fin")),
(Signatures(_), _) => Err(anyhow!("Received signature without header")),
CHr15F0x marked this conversation as resolved.
Show resolved Hide resolved
(_, Header(_)) => Err(anyhow!("Received header without signature")),
CHr15F0x marked this conversation as resolved.
Show resolved Hide resolved
(Header(header), Signatures(signatures)) => {
if header.hash != signatures.block.hash {
return Some(Err(anyhow!("Signature and header block hash mismatch")));
}
if header.number != signatures.block.number {
return Some(Err(anyhow!("Signature and header block number mismatch")));
}

let header = match pathfinder_common::BlockHeader::try_from_dto(header)
.context("Parsing header")
{
Ok(header) => header,
Err(e) => return Some(Err(e)),
};

let signature = match signatures.signatures.as_slice() {
&[ConsensusSignature { r, s }] => BlockCommitmentSignature {
r: BlockCommitmentSignatureElem(r),
s: BlockCommitmentSignatureElem(s),
},
other => {
return Some(Err(anyhow!("Bad signature length: {}", other.len(),)));
}
};

Ok(SignedBlockHeader { header, signature })
}
};

Some(result)
}
33 changes: 33 additions & 0 deletions crates/p2p/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ impl From<BlockHeader> for MaybeSignedBlockHeader {
}
}

impl<T> TryFromDto<T> for pathfinder_common::BlockHeader
where
T: AsRef<p2p_proto::block::BlockHeader>,
{
fn try_from_dto(dto: T) -> anyhow::Result<Self> {
let dto = dto.as_ref();
Ok(Self {
hash: BlockHash(dto.hash.0),
parent_hash: BlockHash(dto.parent_hash.0),
number: BlockNumber::new(dto.number)
.ok_or(anyhow::anyhow!("Invalid block number > i64::MAX"))?,
timestamp: BlockTimestamp::new(
dto.time.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
)
.ok_or(anyhow::anyhow!("Invalid block timestamp"))?,
sequencer_address: SequencerAddress(dto.sequencer_address.0),
eth_l1_gas_price: GasPrice::from_be_slice(dto.gas_price.as_slice())?,
starknet_version: StarknetVersion::from(dto.starknet_version.clone()),
// State commitments may only be calculated intermittently in the future to save on compute.
state_commitment: StateCommitment(dto.state_commitment.unwrap_or_default().0),
event_commitment: pathfinder_common::EventCommitment(dto.events.root.0),
event_count: dto.events.n_leaves as usize,
transaction_commitment: pathfinder_common::TransactionCommitment(
dto.transactions.root.0,
),
transaction_count: dto.transactions.n_leaves as usize,
strk_l1_gas_price: Default::default(),
class_commitment: Default::default(),
storage_commitment: Default::default(),
})
}
}

/// Simple state update meant for the temporary p2p client hidden behind
/// the gateway client api, ie.:
/// - does not contain any commitments
Expand Down
1 change: 1 addition & 0 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod test_utils;
mod tests;
mod transport;

pub use client::peer_agnostic::PeerData;
pub use libp2p;
pub use peers::Peers;
pub use sync::protocol::PROTOCOLS;
Expand Down
1 change: 1 addition & 0 deletions crates/pathfinder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod monitoring;
pub mod state;
mod sync;

#[cfg(feature = "p2p")]
pub mod p2p_network;
3 changes: 3 additions & 0 deletions crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#[cfg(feature = "p2p")]
#[allow(dead_code)]
mod p2p;
Loading
Loading