Skip to content
This repository has been archived by the owner on Dec 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #238 from EspressoSystems/ag/size-limit
Browse files Browse the repository at this point in the history
Add block size limits
  • Loading branch information
QuentinI authored Aug 16, 2024
2 parents 9fdf328 + eb5fa6a commit f358101
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature
async-lock = "2.8"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }
async-trait = "0.1"
bincode = "1.3"
clap = { version = "4.5", features = ["derive", "env"] }
committable = "0.2"
derivative = "2.2"
Expand All @@ -37,4 +38,6 @@ hex = "0.4.3"
hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.70" }

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std", "tokio"))'] }
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(async_executor_impl, values("async-std", "tokio"))',
] }
58 changes: 51 additions & 7 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use async_std::task::spawn_blocking;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::spawn_blocking;

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -87,6 +87,8 @@ pub struct BuildBlockInfo<TYPES: NodeType> {
pub metadata: <<TYPES as NodeType>::BlockPayload as BlockPayload<TYPES>>::Metadata,
pub vid_trigger: OneShotSender<TriggerStatus>,
pub vid_receiver: UnboundedReceiver<(VidCommitment, VidPrecomputeData)>,
// Could we have included more transactions, but chose not to?
pub truncated: bool,
}

/// Response Message to be put on the response channel
Expand Down Expand Up @@ -169,7 +171,7 @@ pub struct BuilderState<TYPES: NodeType> {
pub tx_receiver: BroadcastReceiver<Arc<ReceivedTransaction<TYPES>>>,

/// filtered queue of available transactions, taken from tx_receiver
pub tx_queue: Vec<Arc<ReceivedTransaction<TYPES>>>,
pub tx_queue: VecDeque<Arc<ReceivedTransaction<TYPES>>>,

/// global state handle, defined in the service.rs
pub global_state: Arc<RwLock<GlobalState<TYPES>>>,
Expand Down Expand Up @@ -490,17 +492,58 @@ impl<TYPES: NodeType> BuilderState<TYPES> {

async_sleep(sleep_interval).await
}

// Don't build an empty block
if self.tx_queue.is_empty() {
return None;
}

let max_block_size = self.global_state.read_arc().await.max_block_size;
let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| {
let prev_size = *total_size;
*total_size += tx.len;
// We will include one transaction over our target block length
// if it's the first transaction in queue, otherwise we'd have a possible failure
// state where a single transaction larger than target block state is stuck in
// queue and we just build empty blocks forever
if *total_size >= max_block_size && prev_size != 0 {
None
} else {
Some(tx.tx.clone())
}
});

if let Ok((payload, metadata)) =
<TYPES::BlockPayload as BlockPayload<TYPES>>::from_transactions(
self.tx_queue.iter().map(|tx| tx.tx.clone()),
transactions_to_include,
&self.validated_state,
&self.instance_state,
)
.await
{
let builder_hash = payload.builder_commitment(&metadata);
// count the number of txns
let txn_count = self.tx_queue.len();
let actual_txn_count = payload.num_transactions(&metadata);

// Payload is empty despite us checking that tx_queue isn't empty earlier.
//
// This means that the block was truncated due to *sequencer* block length
// limits, which are different from our `max_block_size`. There's no good way
// for us to check for this in advance, so we detect transactions too big for
// the sequencer indirectly, by observing that we passed some transactions
// to `<TYPES::BlockPayload as BlockPayload<TYPES>>::from_transactions`, but
// it returned an empty block.
// Thus we deduce that the first transaction in our queue is too big to *ever*
// be included, because it alone goes over sequencer's block size limit.
// We need to drop it and mark as "included" so that if we receive
// it again we don't even bother with it.
if actual_txn_count == 0 {
if let Some(txn) = self.tx_queue.pop_front() {
self.txns_in_queue.remove(&txn.commit);
self.included_txns.insert(txn.commit);
};
return None;
}

// insert the recently built block into the builder commitments
self.builder_commitments
Expand Down Expand Up @@ -536,7 +579,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
tracing::info!(
"Builder view num {:?}, building block with {:?} txns, with builder hash {:?}",
self.built_from_proposed_block.view_number,
txn_count,
actual_txn_count,
builder_hash
);

Expand All @@ -551,6 +594,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
metadata,
vid_trigger: trigger_send,
vid_receiver: unbounded_receiver,
truncated: actual_txn_count < self.tx_queue.len(),
})
} else {
tracing::warn!("build block, returning None");
Expand Down Expand Up @@ -744,7 +788,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
qc_receiver: BroadcastReceiver<MessageType<TYPES>>,
req_receiver: BroadcastReceiver<MessageType<TYPES>>,
tx_receiver: BroadcastReceiver<Arc<ReceivedTransaction<TYPES>>>,
tx_queue: Vec<Arc<ReceivedTransaction<TYPES>>>,
tx_queue: VecDeque<Arc<ReceivedTransaction<TYPES>>>,
global_state: Arc<RwLock<GlobalState<TYPES>>>,
num_nodes: NonZeroUsize,
maximize_txn_capture_timeout: Duration,
Expand Down Expand Up @@ -841,7 +885,7 @@ impl<TYPES: NodeType> BuilderState<TYPES> {
continue;
}
self.txns_in_queue.insert(tx.commit);
self.tx_queue.push(tx);
self.tx_queue.push_back(tx);
}
Err(async_broadcast::TryRecvError::Empty)
| Err(async_broadcast::TryRecvError::Closed) => {
Expand Down
107 changes: 94 additions & 13 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ use std::{fmt::Display, time::Instant};
use tagged_base64::TaggedBase64;
use tide_disco::{method::ReadState, Url};

// Start assuming we're fine calculatig VID for 100 kilobyte blocks
const INITIAL_MAX_BLOCK_SIZE: u64 = 100_000;
// Never go lower than 10 kilobytes
const MAX_BLOCK_SIZE_FLOOR: u64 = 10_000;
// When adjusting max block size, we it will be decremented or incremented
// by current value / [`MAX_BLOCK_SIZE_CHANGE_DIVISOR`]
const MAX_BLOCK_SIZE_CHANGE_DIVISOR: u64 = 10;
// We will not increment max block value if we aren't able to serve a response
// with a margin below [`ProxyGlobalState::max_api_waiting_time`]
// more than [`ProxyGlobalState::max_api_waiting_time`] / `VID_RESPONSE_TARGET_MARGIN_DIVISOR`
const VID_RESPONSE_TARGET_MARGIN_DIVISOR: u32 = 10;

// It holds all the necessary information for a block
#[derive(Debug)]
pub struct BlockInfo<Types: NodeType> {
Expand All @@ -68,6 +80,8 @@ pub struct BlockInfo<Types: NodeType> {
pub vid_trigger: Arc<RwLock<Option<OneShotSender<TriggerStatus>>>>,
pub vid_receiver: Arc<RwLock<WaitAndKeep<(VidCommitment, VidPrecomputeData)>>>,
pub offered_fee: u64,
// Could we have included more transactions with this block, but chose not to?
pub truncated: bool,
}

// It holds the information for the proposed block
Expand Down Expand Up @@ -105,9 +119,11 @@ pub struct BuilderStatesInfo<Types: NodeType> {
pub struct ReceivedTransaction<Types: NodeType> {
// the transaction
pub tx: Types::Transaction,
// its hash
// transaction's hash
pub commit: Commitment<Types::Transaction>,
// its source
// transaction's esitmated length
pub len: u64,
// transaction's source
pub source: TransactionSource,
// received time
pub time_in: Instant,
Expand Down Expand Up @@ -135,6 +151,9 @@ pub struct GlobalState<Types: NodeType> {

// highest view running builder task
pub highest_view_num_builder_id: BuilderStateId<Types>,

// estimated maximum block size we can build in time
pub max_block_size: u64,
}

impl<Types: NodeType> GlobalState<Types> {
Expand All @@ -160,6 +179,7 @@ impl<Types: NodeType> GlobalState<Types> {
last_garbage_collected_view_num,
builder_state_to_last_built_block: Default::default(),
highest_view_num_builder_id: bootstrap_id,
max_block_size: INITIAL_MAX_BLOCK_SIZE,
}
}

Expand Down Expand Up @@ -203,6 +223,7 @@ impl<Types: NodeType> GlobalState<Types> {
build_block_info.vid_receiver,
))),
offered_fee: build_block_info.offered_fee,
truncated: build_block_info.truncated,
},
);
}
Expand Down Expand Up @@ -234,7 +255,13 @@ impl<Types: NodeType> GlobalState<Types> {
&self,
txns: Vec<<Types as NodeType>::Transaction>,
) -> Vec<Result<Commitment<<Types as NodeType>::Transaction>, BuildError>> {
handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await
handle_received_txns(
&self.tx_sender,
txns,
TransactionSource::External,
self.max_block_size,
)
.await
}

pub fn get_channel_for_matching_builder_or_highest_view_buider(
Expand Down Expand Up @@ -599,6 +626,17 @@ where
Err(_toe) => {
if Instant::now() >= timeout_after {
tracing::warn!("Couldn't get vid commitment in time for block {id}",);
{
// we can't keep up with this block size, reduce max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = std::cmp::min(
global_state.max_block_size
- global_state
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR),
MAX_BLOCK_SIZE_FLOOR,
);
}
break Err(BuildError::Error {
message: "Couldn't get vid commitment in time".to_string(),
});
Expand All @@ -619,6 +657,21 @@ where
};

tracing::info!("Got vid commitment for block {id}",);

// This block was truncated, but we got VID in time with margin left.
// Maybe we can handle bigger blocks?
if block_info.truncated
&& timeout_after.duration_since(Instant::now())
> self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR
{
// Increase max block size
let mut global_state = self.global_state.write_arc().await;
global_state.max_block_size = global_state.max_block_size
+ global_state
.max_block_size
.div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR);
}

if response_received.is_ok() {
let (vid_commitment, vid_precompute_data) =
response_received.map_err(|err| BuildError::Error {
Expand Down Expand Up @@ -785,11 +838,11 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
// sending a Decide event from the hotshot to the builder states
decide_sender: BroadcastSender<MessageType<Types>>,

// shared accumulated transactions handle
tx_sender: BroadcastSender<Arc<ReceivedTransaction<Types>>>,

// Url to (re)connect to for the events stream
hotshot_events_api_url: Url,

// Global state
global_state: Arc<RwLock<GlobalState<Types>>>,
) -> Result<(), anyhow::Error> {
// connection to the events stream
let connected = connect_to_events_service::<Types, V>(hotshot_events_api_url.clone()).await;
Expand All @@ -801,6 +854,8 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
let (mut subscribed_events, mut membership) =
connected.context("Failed to connect to events service")?;

let tx_sender = global_state.read_arc().await.tx_sender.clone();

loop {
let event = subscribed_events.next().await;
//tracing::debug!("Builder Event received from HotShot: {:?}", event);
Expand All @@ -812,8 +867,14 @@ pub async fn run_non_permissioned_standalone_builder_service<Types: NodeType, V:
}
// tx event
EventType::Transactions { transactions } => {
handle_received_txns(&tx_sender, transactions, TransactionSource::HotShot)
.await;
let max_block_size = global_state.read_arc().await.max_block_size;
handle_received_txns(
&tx_sender,
transactions,
TransactionSource::HotShot,
max_block_size,
)
.await;
}
// decide event
EventType::Decide {
Expand Down Expand Up @@ -878,9 +939,6 @@ pub async fn run_permissioned_standalone_builder_service<
I: NodeImplementation<Types>,
V: Versions,
>(
// sending received transactions
tx_sender: BroadcastSender<Arc<ReceivedTransaction<Types>>>,

// sending a DA proposal from the hotshot to the builder states
da_sender: BroadcastSender<MessageType<Types>>,

Expand All @@ -892,8 +950,12 @@ pub async fn run_permissioned_standalone_builder_service<

// hotshot context handle
hotshot_handle: Arc<SystemContextHandle<Types, I, V>>,

// Global state
global_state: Arc<RwLock<GlobalState<Types>>>,
) {
let mut event_stream = hotshot_handle.event_stream();
let tx_sender = global_state.read_arc().await.tx_sender.clone();
loop {
tracing::debug!("Waiting for events from HotShot");
match event_stream.next().await {
Expand All @@ -908,8 +970,14 @@ pub async fn run_permissioned_standalone_builder_service<
}
// tx event
EventType::Transactions { transactions } => {
handle_received_txns(&tx_sender, transactions, TransactionSource::HotShot)
.await;
let max_block_size = global_state.read_arc().await.max_block_size;
handle_received_txns(
&tx_sender,
transactions,
TransactionSource::HotShot,
max_block_size,
)
.await;
}
// decide event
EventType::Decide { leaf_chain, .. } => {
Expand Down Expand Up @@ -1057,17 +1125,30 @@ pub(crate) async fn handle_received_txns<Types: NodeType>(
tx_sender: &BroadcastSender<Arc<ReceivedTransaction<Types>>>,
txns: Vec<Types::Transaction>,
source: TransactionSource,
max_txn_len: u64,
) -> Vec<Result<Commitment<<Types as NodeType>::Transaction>, BuildError>> {
let mut results = Vec::with_capacity(txns.len());
let time_in = Instant::now();
for tx in txns.into_iter() {
let commit = tx.commit();
// This is a rough estimate, but we don't have any other way to get real
// encoded transaction length. Luckily, this being roughly proportional
// to encoded length is enough, because we only use this value to estimate
// our limitations on computing the VID in time.
let len = bincode::serialized_size(&tx).unwrap_or_default();
if len > max_txn_len {
results.push(Err(BuildError::Error {
message: format!("Transaction too big (estimated length {len}, currently accepting <= {max_txn_len})"),
}));
continue;
}
let res = tx_sender
.try_broadcast(Arc::new(ReceivedTransaction {
tx,
source: source.clone(),
commit,
time_in,
len,
}))
.inspect(|val| {
if let Some(evicted_txn) = val {
Expand Down
Loading

0 comments on commit f358101

Please sign in to comment.