Skip to content

Commit

Permalink
Merge pull request #1712 from eqlabs/mirko/separate-pending
Browse files Browse the repository at this point in the history
feat(sync): poll pending continuously
  • Loading branch information
Mirko-von-Leipzig authored Jan 31, 2024
2 parents 70e8ed1 + dbc852b commit 34e48cf
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 212 deletions.
58 changes: 25 additions & 33 deletions crates/pathfinder/src/state/sync/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,12 @@ where
};

let t_block = std::time::Instant::now();
// Next block and state update which we can get for free when exiting poll pending mode
let mut next_block = None;
let mut next_state_update = None;

let mut pending_handle = None;

let (block, commitments) = loop {
match download_block(
next,
// Reuse the next full block if we got it for free when polling pending
std::mem::take(&mut next_block),
chain,
chain_id,
head_meta.map(|h| h.1),
Expand All @@ -140,19 +137,26 @@ where
// Not implemented yet for P2P
tracing::info!("Skipping the pending blocks polling");
tokio::time::sleep(PENDING_POLL_INTERVAL).await;
} else {
tracing::trace!("Polling pending blocks");
let head =
head_meta.expect("Head hash should exist when entering pending mode");
(next_block, next_state_update) = pending::poll_pending(
} else if pending_handle.is_none() {
tracing::info!("At head of chain, enabling polling of pending data");
pending_handle = Some(tokio::spawn(pending::poll_pending(
tx_event.clone(),
&sequencer,
(head.1, head.2),
sequencer.clone(),
PENDING_POLL_INTERVAL,
storage.clone(),
)
.await
.context("Polling pending block")?;
)));
}

// Poll the head until it changes. This query is very quick and cheap to perform.
// Once its changed we exit the loop to try download the next block.
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
let (_, hash) = sequencer.head().await.context("Polling head of chain")?;
if hash != head.unwrap_or_default().1 {
break;
}
interval.tick().await;
}
}
DownloadBlock::Reorg => {
Expand Down Expand Up @@ -211,15 +215,10 @@ where
let block_hash = block.block_hash;
let t_update = std::time::Instant::now();

let state_update = match next_state_update {
// Reuse the next full state update if we got it for free when polling pending
Some(state_update) if state_update.block_hash == block_hash => state_update,
// We were unlucky or poll pending is disabled
Some(_) | None => sequencer
.state_update(block_hash.into())
.await
.with_context(|| format!("Fetch state diff for block {next:?} from sequencer"))?,
};
let state_update = sequencer
.state_update(block_hash.into())
.await
.with_context(|| format!("Fetch state diff for block {next:?} from sequencer"))?;

anyhow::ensure!(
state_update.block_hash != BlockHash::ZERO,
Expand Down Expand Up @@ -424,8 +423,6 @@ pub enum BlockValidationMode {

async fn download_block(
block_number: BlockNumber,
// Poll pending could exit when it encountered a finalized block, so we'd like to reuse it
next_block: Option<Block>,
chain: Chain,
chain_id: ChainId,
prev_block_hash: Option<BlockHash>,
Expand All @@ -436,12 +433,8 @@ async fn download_block(
error::KnownStarknetErrorCode::BlockNotFound, reply::MaybePendingBlock,
};

let result = match next_block {
// Reuse a finalized block downloaded before pending mode exited
Some(block) if block.block_number == block_number => Ok(MaybePendingBlock::Block(block)),
// Bad luck or poll pending is disabled
Some(_) | None => sequencer.block(block_number.into()).await,
};
// TODO: merge block and state update call.
let result = sequencer.block(block_number.into()).await;

let result = match result {
Ok(MaybePendingBlock::Block(block)) => {
Expand Down Expand Up @@ -568,7 +561,6 @@ async fn reorg(

match download_block(
previous_block_number,
None,
chain,
chain_id,
Some(previous.0),
Expand Down
223 changes: 44 additions & 179 deletions crates/pathfinder/src/state/sync/pending.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use anyhow::Context;
use pathfinder_common::BlockHash;
use pathfinder_common::BlockId;
use pathfinder_common::StateUpdate;
use pathfinder_storage::Storage;
use starknet_gateway_client::GatewayApi;
use starknet_gateway_types::reply::Block;
use starknet_gateway_types::reply::MaybePendingBlock;
use tokio::time::Instant;

Expand All @@ -20,84 +19,60 @@ use crate::state::sync::SyncEvent;
/// A full block or full state update can be returned from this function if it is encountered during polling.
pub async fn poll_pending<S: GatewayApi + Clone + Send + 'static>(
tx_event: tokio::sync::mpsc::Sender<SyncEvent>,
sequencer: &S,
head: (
pathfinder_common::BlockHash,
pathfinder_common::StateCommitment,
),
sequencer: S,
poll_interval: std::time::Duration,
storage: Storage,
) -> anyhow::Result<(Option<Block>, Option<StateUpdate>)> {
// The transaction count of the last emitted pending block. This is used
// as a proxy for freshness of the pending data. Feeder gateways are not 100%
// in sync wrt pending data, and as a result it is possible for us to receive
// pending data which is older than the one we received previously.
) -> anyhow::Result<()> {
let mut prev_tx_count = 0;
let mut prev_hash = BlockHash::default();

loop {
let t_fetch = Instant::now();

// Fetches the pending block _and_ state update in a single request.
// Starknet 0.12.2 introduced a feeder gateway API for fetching both the block and the state update, so
// that we get _consistent_ data.
let (block, state_update) = sequencer
.state_update_with_block(BlockId::Pending)
.await
.context("Downloading pending block and state update")?;

match block {
MaybePendingBlock::Block(block) if block.block_hash == head.0 => {
// Sequencer `pending` may return the latest full block for quite some time, so ignore it.
tracing::trace!(hash=%block.block_hash, "Found current head from pending mode");
}
MaybePendingBlock::Block(block) => {
tracing::trace!(hash=%block.block_hash, "Found full block, exiting pending mode.");
return Ok((Some(block), Some(state_update)));
}
MaybePendingBlock::Pending(pending) if pending.parent_hash != head.0 => {
tracing::trace!(
pending=%pending.parent_hash, head=%head.0,
"Pending block's parent hash does not match head, exiting pending mode"
);
return Ok((None, None));
}
MaybePendingBlock::Pending(pending)
if state_update.parent_state_commitment != head.1 =>
{
tracing::trace!(
pending=%pending.parent_hash, head=%head.0,
"Pending state update's parent state commitment does not match head, exiting pending mode"
);
return Ok((None, None));
}
MaybePendingBlock::Pending(block) if block.transactions.len() > prev_tx_count => {
// Download, process and emit all missing classes. This can occasionally
// fail when querying a desync'd feeder gateway which isn't aware of the
// new pending classes. In this case, ignore the new pending data as it
// is incomplete.
if let Err(e) = super::l2::download_new_classes(
&state_update,
sequencer,
&tx_event,
&block.starknet_version,
storage.clone(),
)
// The sequencer sometimes returns full blocks, ignore these.
let MaybePendingBlock::Pending(block) = block else {
tracing::trace!("Full block received");
tokio::time::sleep_until(t_fetch + poll_interval).await;
continue;
};

// Use the transaction count as a proxy for freshness of the pending data.
//
// The sequencer has multiple feeder gateways which are not 100% in sync making
// it possible for us to receive stale data, older than the previous data.
if block.parent_hash == prev_hash && block.transactions.len() <= prev_tx_count {
tracing::trace!("No change in pending block data");
tokio::time::sleep_until(t_fetch + poll_interval).await;
continue;
}

// Download, process and emit all missing classes. This can occasionally
// fail when querying a desync'd feeder gateway which isn't aware of the
// new pending classes. In this case, ignore the new pending data as it
// is incomplete.
if let Err(e) = super::l2::download_new_classes(
&state_update,
&sequencer,
&tx_event,
&block.starknet_version,
storage.clone(),
)
.await
{
tracing::debug!(reason=?e, "Failed to download pending classes");
} else {
prev_tx_count = block.transactions.len();
prev_hash = block.parent_hash;
tracing::trace!("Emitting a pending update");
tx_event
.send(SyncEvent::Pending(Box::new((block, state_update))))
.await
{
tracing::debug!(reason=?e, "Failed to download pending classes");
} else {
prev_tx_count = block.transactions.len();
tracing::trace!("Emitting a pending update");
tx_event
.send(SyncEvent::Pending(Box::new((block, state_update))))
.await
.context("Event channel closed")?;
}
}
MaybePendingBlock::Pending(_) => {
// Pending data was not newer than the previous iteration.
tracing::trace!("No change in pending block data");
}
.context("Event channel closed")?;
}

tokio::time::sleep_until(t_fetch + poll_interval).await;
Expand Down Expand Up @@ -173,114 +148,6 @@ mod tests {
/// need to timeout naturally which may be forever.
const TEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

#[tokio::test]
async fn exits_on_full_block() {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let mut sequencer = MockGatewayApi::new();

// Give a pending state update and full block.
sequencer
.expect_state_update_with_block()
.returning(move |_| {
Ok((
MaybePendingBlock::Block(NEXT_BLOCK.clone()),
PENDING_UPDATE.clone(),
))
});

let sequencer = Arc::new(sequencer);

let jh = tokio::spawn(async move {
poll_pending(
tx,
&sequencer,
(PARENT_HASH, PARENT_ROOT),
std::time::Duration::ZERO,
Storage::in_memory().unwrap(),
)
.await
});

let result = tokio::time::timeout(TEST_TIMEOUT, rx.recv())
.await
.expect("Channel should be dropped");
assert_matches!(result, None);

let (full_block, _) = jh.await.unwrap().unwrap();
assert_eq!(full_block.unwrap(), *NEXT_BLOCK);
}

#[tokio::test]
async fn exits_on_block_discontinuity() {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let mut sequencer = MockGatewayApi::new();

let mut pending_block = PENDING_BLOCK.clone();
pending_block.parent_hash = block_hash!("0xFFFFFF");
sequencer
.expect_state_update_with_block()
.returning(move |_| {
Ok((
MaybePendingBlock::Pending(pending_block.clone()),
PENDING_UPDATE.clone(),
))
});
let sequencer = Arc::new(sequencer);

let jh = tokio::spawn(async move {
poll_pending(
tx,
&sequencer,
(PARENT_HASH, PARENT_ROOT),
std::time::Duration::ZERO,
Storage::in_memory().unwrap(),
)
.await
});

let result = tokio::time::timeout(TEST_TIMEOUT, rx.recv())
.await
.expect("Channel should be dropped");
assert_matches!(result, None);
jh.await.unwrap().unwrap();
}

#[tokio::test]
async fn exits_on_state_diff_discontinuity() {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let mut sequencer = MockGatewayApi::new();

let disconnected_diff = PENDING_UPDATE
.clone()
.with_parent_state_commitment(state_commitment_bytes!(b"different old root"));
sequencer
.expect_state_update_with_block()
.returning(move |_| {
Ok((
MaybePendingBlock::Pending(PENDING_BLOCK.clone()),
disconnected_diff.clone(),
))
});
let sequencer = Arc::new(sequencer);

let jh = tokio::spawn(async move {
poll_pending(
tx,
&sequencer,
(PARENT_HASH, PARENT_ROOT),
std::time::Duration::ZERO,
Storage::in_memory().unwrap(),
)
.await
});

let result = tokio::time::timeout(TEST_TIMEOUT, rx.recv())
.await
.expect("Channel should be dropped");
assert_matches!(result, None);
jh.await.unwrap().unwrap();
}

#[tokio::test]
async fn success() {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
Expand All @@ -299,8 +166,7 @@ mod tests {
let _jh = tokio::spawn(async move {
poll_pending(
tx,
&sequencer,
(PARENT_HASH, PARENT_ROOT),
sequencer,
std::time::Duration::ZERO,
Storage::in_memory().unwrap(),
)
Expand Down Expand Up @@ -378,8 +244,7 @@ mod tests {
let _jh = tokio::spawn(async move {
poll_pending(
tx,
&sequencer,
(PARENT_HASH, PARENT_ROOT),
sequencer,
std::time::Duration::ZERO,
Storage::in_memory().unwrap(),
)
Expand Down

0 comments on commit 34e48cf

Please sign in to comment.