Skip to content

Commit

Permalink
add an optional start timestamp to the rpc to get blocks (#89)
Browse files Browse the repository at this point in the history
In the follow blocks script, we end up getting all the blocks from the
beginning up to the latest when what we really wanna do is see the
blocks minted starting from the time when we run the script. Add an
optional start timestamp parameter to the `GetBlocks` request to support
this.
  • Loading branch information
aditiharini authored Nov 26, 2024
1 parent 40ca68f commit 6b5f0a2
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
27 changes: 12 additions & 15 deletions src/bin/collect_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ async fn main() {
});

let start = Instant::now();
let start_farcaster_time = current_time();
let mut stats_calculation_timer = time::interval(STATS_CALCULATION_INTERVAL);
let mut block_count = 0;
let mut num_messages_confirmed = 0;
let mut num_messages_submitted = 0;
let mut pending_messages = HashSet::new();
let mut time_to_confirmation = vec![];
let mut block_times = vec![];
let mut last_block_time = start_farcaster_time;
let mut last_block_time = current_time();
let mut iteration = 0;
loop {
select! {
Expand All @@ -129,19 +128,17 @@ async fn main() {
},
Some(block) = blocks_rx.recv() => {
let block_timestamp = block.header.as_ref().unwrap().timestamp;
if block_timestamp > start_farcaster_time {
block_count += 1;
block_times.push(block_timestamp - last_block_time);
last_block_time = block_timestamp;
for chunk in &block.shard_chunks {
for tx in &chunk.transactions {
for msg in &tx.user_messages {
let msg_data = MessageData::decode(msg.data_bytes.as_ref().unwrap().as_slice()).unwrap();
let msg_timestamp = msg_data.timestamp;
time_to_confirmation.push(block_timestamp - msg_timestamp as u64);
num_messages_confirmed += 1;
pending_messages.remove(&hex::encode(msg.hash.clone()));
}
block_count += 1;
block_times.push(block_timestamp - last_block_time);
last_block_time = block_timestamp;
for chunk in &block.shard_chunks {
for tx in &chunk.transactions {
for msg in &tx.user_messages {
let msg_data = MessageData::decode(msg.data_bytes.as_ref().unwrap().as_slice()).unwrap();
let msg_timestamp = msg_data.timestamp;
time_to_confirmation.push(block_timestamp - msg_timestamp as u64);
num_messages_confirmed += 1;
pending_messages.remove(&hex::encode(msg.hash.clone()));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ impl Proposer for BlockProposer {
let destination_addr = format!("http://{}", rpc_address.clone());
let mut rpc_client = SnapchainServiceClient::connect(destination_addr).await?;
let request = Request::new(BlocksRequest {
shard_id: self.shard_id.shard_id(),
start_block_number: prev_block_number + 1,
stop_block_number: None,
start_timestamp: None,
});
let missing_blocks = rpc_client.get_blocks(request).await?;
for block in missing_blocks.get_ref().blocks.clone() {
Expand Down
8 changes: 8 additions & 0 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl SnapchainService for MySnapchainService {
) -> Result<Response<BlocksResponse>, Status> {
let start_block_number = request.get_ref().start_block_number;
let stop_block_number = request.get_ref().stop_block_number;
let start_timestamp = request.get_ref().start_timestamp;

info!( {start_block_number, stop_block_number}, "Received call to [get_blocks] RPC");

Expand All @@ -78,6 +79,13 @@ impl SnapchainService for MySnapchainService {
{
Err(err) => Err(Status::from_error(Box::new(err))),
Ok(blocks) => {
let blocks = match start_timestamp {
None => blocks,
Some(start_timestamp) => blocks
.into_iter()
.filter(|block| block.header.as_ref().unwrap().timestamp >= start_timestamp)
.collect(),
};
let response = Response::new(BlocksResponse { blocks });
Ok(response)
}
Expand Down
6 changes: 3 additions & 3 deletions src/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import "blocks.proto";
import "hub_event.proto";

message BlocksRequest {
uint32 shard_id = 1;
uint64 start_block_number = 2;
optional uint64 stop_block_number = 3;
uint64 start_block_number = 1;
optional uint64 stop_block_number = 2;
optional uint64 start_timestamp = 3;
}

message BlocksResponse {
Expand Down
3 changes: 2 additions & 1 deletion src/utils/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::consensus::proposer::current_time;
use crate::proto::msg as message;
use crate::proto::rpc::snapchain_service_client::SnapchainServiceClient;
use crate::proto::{rpc, snapchain::Block};
Expand Down Expand Up @@ -41,9 +42,9 @@ pub async fn follow_blocks(

loop {
let msg = rpc::BlocksRequest {
shard_id: 0,
start_block_number: i,
stop_block_number: Some(i + FETCH_SIZE),
start_timestamp: Some(current_time()),
};

let request = tonic::Request::new(msg);
Expand Down

0 comments on commit 6b5f0a2

Please sign in to comment.