Skip to content

Commit

Permalink
feat: cross-shard congestion control
Browse files Browse the repository at this point in the history
This PR builds on other parts of NEP-539.
It uses the congestion info and the outgoing receipts
buffers introduced in previous commits.
  • Loading branch information
jakmeier committed Apr 21, 2024
1 parent a4a4a84 commit 63910c5
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 45 deletions.
43 changes: 38 additions & 5 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,16 @@ impl RuntimeAdapter for NightshadeRuntime {
// Total amount of gas burnt for converting transactions towards receipts.
let mut total_gas_burnt = 0;
let mut total_size = 0u64;
// TODO: Update gas limit for transactions
let transactions_gas_limit = gas_limit / 2;

let transactions_gas_limit =
if ProtocolFeature::Nep539CongestionControl.protocol_version() <= protocol_version {
let own_congestion =
prev_block.congestion_info.get(&shard_id).or_else(|| Default::default());
own_congestion.unwrap().process_tx_limit()
} else {
gas_limit / 2
};

let mut result = PreparedTransactions {
transactions: Vec::new(),
limited_by: None,
Expand Down Expand Up @@ -792,10 +800,16 @@ impl RuntimeAdapter for NightshadeRuntime {
result.limited_by = Some(PrepareTransactionsLimit::Size);
break;
}
if result.transactions.len() >= new_receipt_count_limit {
result.limited_by = Some(PrepareTransactionsLimit::ReceiptCount);
break;
if ProtocolFeature::Nep539CongestionControl.protocol_version() > protocol_version {
// Keep this for the upgrade phase, afterwards it can be
// removed. It does not need to be kept because it does not
// affect replayability.
if result.transactions.len() >= new_receipt_count_limit {
result.limited_by = Some(PrepareTransactionsLimit::ReceiptCount);
break;
}
}

if let Some(time_limit) = &time_limit {
if start_time.elapsed() >= *time_limit {
result.limited_by = Some(PrepareTransactionsLimit::Time);
Expand All @@ -806,6 +820,25 @@ impl RuntimeAdapter for NightshadeRuntime {
if let Some(iter) = transaction_groups.next() {
while let Some(tx) = iter.next() {
num_checked_transactions += 1;

if ProtocolFeature::Nep539CongestionControl.protocol_version()
<= protocol_version
{
let receiving_shard = EpochManagerAdapter::account_id_to_shard_id(
self.epoch_manager.as_ref(),
&tx.transaction.receiver_id,
&epoch_id,
)?;
if let Some(shard_congestion) =
prev_block.congestion_info.get(&receiving_shard)
{
if !shard_congestion.shard_accepts_transactions() {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "discarding transaction due to congestion");
continue;
}
}
}

// Verifying the transaction is on the same chain and hasn't expired yet.
if !chain_validate(&tx) {
tracing::trace!(target: "runtime", tx=?tx.get_hash(), "discarding transaction that failed chain validation");
Expand Down
8 changes: 6 additions & 2 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,12 @@ impl TestEnv {
}

pub fn view_account(&self, account_id: &AccountId) -> AccountView {
let shard_id =
self.epoch_manager.account_id_to_shard_id(account_id, &self.head.epoch_id).unwrap();
let shard_id = EpochInfoProvider::account_id_to_shard_id(
self.epoch_manager.as_ref(),
account_id,
&self.head.epoch_id,
)
.unwrap();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &self.head.epoch_id).unwrap();
self.runtime
.view_account(&shard_uid, self.state_roots[shard_id as usize], account_id)
Expand Down
8 changes: 8 additions & 0 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ impl EpochInfoProvider for EpochManagerHandle {
let epoch_manager = self.read();
epoch_manager.config.chain_id().into()
}

fn account_id_to_shard_id(
&self,
account_id: &AccountId,
epoch_id: &EpochId,
) -> Result<ShardId, EpochError> {
EpochManagerAdapter::account_id_to_shard_id(self, account_id, epoch_id)
}
}

/// Tracks epoch information across different forks, such as validators.
Expand Down
69 changes: 65 additions & 4 deletions core/primitives/src/congestion_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
use near_primitives_core::types::{Gas, ShardId};

// TODO: find better home for the const?
const MAX_CONGESTION_INCOMING_GAS: Gas = 20 * 10u64.pow(15);
const MAX_CONGESTION_OUTGOING_GAS: Gas = 2 * 10u64.pow(15);
const MAX_CONGESTION_MEMORY_CONSUMPTION: u64 = bytesize::ByteSize::mb(1000u64).0;
const MIN_GAS_FORWARDING: Gas = 1 * 10u64.pow(15);
const MAX_GAS_FORWARDING: Gas = 300 * 10u64.pow(15);
const RED_GAS: Gas = 1 * 10u64.pow(15);
const MIN_TX_GAS: Gas = 20 * 10u64.pow(12);
const MAX_TX_GAS: Gas = 500 * 10u64.pow(12);
// 0.25 * MAX_CONGESTION_INCOMING_GAS
const REJECT_TX_CONGESTION_THRESHOLD: Gas = MAX_CONGESTION_INCOMING_GAS / 4;

/// Stores the congestion level of a shard.
///
/// [`CongestionInfo`] should remain an internal struct that is not borsh
Expand All @@ -19,18 +31,67 @@ pub struct CongestionInfo {

impl CongestionInfo {
/// How much gas another shard can send to us in the next block.
pub fn outgoing_limit(&self, _sender_shard: ShardId) -> Gas {
todo!()
pub fn outgoing_limit(&self, sender_shard: ShardId) -> Gas {
let incoming_congestion = self.incoming_congestion();
let outgoing_congestion = self.outgoing_congestion();
let memory_congestion = self.memory_congestion();

let congestion = incoming_congestion.max(outgoing_congestion).max(memory_congestion);

if congestion == u16::MAX {
// Red traffic light: reduce to minimum speed
if sender_shard == self.allowed_shard {
RED_GAS
} else {
0
}
} else {
mix(MAX_GAS_FORWARDING, MIN_GAS_FORWARDING, congestion)
}
}

fn incoming_congestion(&self) -> u16 {
u16_fraction(self.delayed_receipts_gas, MAX_CONGESTION_INCOMING_GAS)
}
fn outgoing_congestion(&self) -> u16 {
u16_fraction(self.buffered_receipts_gas, MAX_CONGESTION_OUTGOING_GAS)
}
fn memory_congestion(&self) -> u16 {
u16_fraction(self.receipt_bytes as u128, MAX_CONGESTION_MEMORY_CONSUMPTION)
}

/// How much gas we accept for executing new transactions going to any
/// uncongested shards.
pub fn process_tx_limit(&self) -> Gas {
todo!()
mix(MAX_TX_GAS, MIN_TX_GAS, self.incoming_congestion())
}

/// Whether we can accept new transaction with the receiver set to this shard.
pub fn shard_accepts_transactions(&self) -> bool {
todo!()
self.delayed_receipts_gas > REJECT_TX_CONGESTION_THRESHOLD as u128
}
}

#[inline]
fn u16_fraction(value: u128, max: u64) -> u16 {
let bounded_value = std::cmp::min(value as u128, max as u128);
let in_u16_range = bounded_value * u16::MAX as u128 / max as u128;
in_u16_range as u16
}

// linearly interpolate between two values
//
// This method treats u16 as a fraction of u16::MAX.
// This makes multiplication of numbers on the upper end of `u128` better behaved
// than using f64 which lacks precision for such high numbers.
//
// (TODO: overkill? maybe just use f64 and hope that we never have platform incompatibilities)
fn mix(left: u64, right: u64, ratio: u16) -> u64 {
let left_part = left as u128 * (u16::MAX - ratio) as u128;
let right_part = right as u128 * ratio as u128;
let total = (left_part + right_part) / u16::MAX as u128;

// conversion is save because left and right were both u64 and the result is
// between the two
return total as u64;
}
10 changes: 9 additions & 1 deletion core/primitives/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use near_async::time::Clock;
use near_crypto::vrf::Value;
use near_crypto::{EmptySigner, InMemorySigner, KeyType, PublicKey, SecretKey, Signature, Signer};
use near_primitives_core::account::id::AccountIdRef;
use near_primitives_core::types::ProtocolVersion;
use near_primitives_core::types::{ProtocolVersion, ShardId};
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -625,6 +625,14 @@ impl EpochInfoProvider for MockEpochInfoProvider {
fn chain_id(&self) -> String {
"localnet".into()
}

fn account_id_to_shard_id(
&self,
account_id: &AccountId,
epoch_id: &EpochId,
) -> Result<ShardId, EpochError> {
todo!()
}
}

/// Encode array of `u64` to be passed as a smart contract argument.
Expand Down
7 changes: 7 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,13 @@ pub trait EpochInfoProvider {

/// Get the chain_id of the chain this epoch belongs to
fn chain_id(&self) -> String;

/// Which shard the account belongs to in the given epoch.
fn account_id_to_shard_id(
&self,
account_id: &AccountId,
epoch_id: &EpochId,
) -> Result<ShardId, EpochError>;
}

/// Mode of the trie cache.
Expand Down
1 change: 0 additions & 1 deletion integration-tests/src/user/runtime_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use near_chain_configs::MIN_GAS_PRICE;
use near_crypto::{PublicKey, Signer};
use near_jsonrpc_primitives::errors::ServerError;
use near_parameters::RuntimeConfig;
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::errors::{RuntimeError, TxExecutionError};
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::Receipt;
Expand Down
Loading

0 comments on commit 63910c5

Please sign in to comment.