Skip to content

Commit

Permalink
configurable tx routing horizon (#10251)
Browse files Browse the repository at this point in the history
Community reports that sometimes it takes minutes for a transaction to
be recorded on chain.
My guess is that it takes that much time to record a transaction on
chain, because it gets forwarded to nodes which either:
* fail to produce their chunks
* don't have time to process incoming tx RPCs before producing their
chunks
* the node producing a tx is out of sync and the set of validators is
already too late

All of these can be fixed by multicasting transactions further into the
future.

This PR is the simplest attempt to send transactions further into the
future.

Tested: Had a testnet node and submitted a tx that was supposed to be
forwarded to chunk producers of the next 100 blocks. In practice it gets
forwarded to 5 validators. Sending a message to a validator takes about
100µs for a trivial tx.

(cherry picked from commit 984d7f9)
  • Loading branch information
nikurt committed Dec 6, 2023
1 parent 873a8d2 commit aa98b1c
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 51 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## [unreleased]

### Protocol Changes

* Restrict the creation of non-implicit top-level account that are longer than 32 bytes. Only the registrar account can create them. [#9589](https://github.com/near/nearcore/pull/9589)
* Adjust the number of block producers and chunk producers on testnet to facilitate testing of chunk-only producers [#9563](https://github.com/near/nearcore/pull/9563)


### Non-protocol Changes

* Add prometheus metrics for the internal state of the doomslug. [#9458](https://github.com/near/nearcore/pull/9458)
* Fix `EXPERIMENTAL_protocol_config` to apply overrides from `EpochConfig`. [#9692](https://github.com/near/nearcore/pull/9692)
* Add config option `tx_routing_height_horizon` to configure how many chunk producers are notified about the tx. [#10251](https://github.com/near/nearcore/pull/10251)

## 1.36.0

### Protocol Changes
Expand Down
3 changes: 0 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ const NUM_PARENTS_TO_CHECK_FINALITY: usize = 20;
#[cfg(not(feature = "sandbox"))]
const ACCEPTABLE_TIME_DIFFERENCE: i64 = 12 * 10;

/// Over this block height delta in advance if we are not chunk producer - route tx to upcoming validators.
pub const TX_ROUTING_HEIGHT_HORIZON: BlockHeightDelta = 4;

/// Private constant for 1 NEAR (copy from near/config.rs) used for reporting.
const NEAR_BASE: Balance = 1_000_000_000_000_000_000_000_000;

Expand Down
10 changes: 5 additions & 5 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use near_async::messaging::{CanSend, Sender};
use near_chain::chain::VerifyBlockHashAndSignatureResult;
use near_chain::chain::{
ApplyStatePartsRequest, BlockCatchUpRequest, BlockMissingChunks, BlocksCatchUpState,
OrphanMissingChunks, TX_ROUTING_HEIGHT_HORIZON,
OrphanMissingChunks,
};
use near_chain::flat_storage_creator::FlatStorageCreator;
use near_chain::resharding::StateSplitRequest;
Expand Down Expand Up @@ -1890,8 +1890,8 @@ impl Client {
let maybe_next_epoch_id = self.get_next_epoch_id_if_at_boundary(&head)?;

let mut validators = HashSet::new();
for horizon in
(2..=TX_ROUTING_HEIGHT_HORIZON).chain(vec![TX_ROUTING_HEIGHT_HORIZON * 2].into_iter())
for horizon in (2..=self.config.tx_routing_height_horizon)
.chain(vec![self.config.tx_routing_height_horizon * 2].into_iter())
{
let target_height = head.height + horizon - 1;
let validator =
Expand Down Expand Up @@ -1961,7 +1961,7 @@ impl Client {
+ self.config.epoch_length;

let epoch_boundary_possible =
head.height + TX_ROUTING_HEIGHT_HORIZON >= next_epoch_estimated_height;
head.height + self.config.tx_routing_height_horizon >= next_epoch_estimated_height;
if epoch_boundary_possible {
Ok(Some(self.epoch_manager.get_next_epoch_id_from_prev_block(&head.last_block_hash)?))
} else {
Expand Down Expand Up @@ -2116,7 +2116,7 @@ impl Client {
return Ok(false);
};

for i in 1..=TX_ROUTING_HEIGHT_HORIZON {
for i in 1..=self.config.tx_routing_height_horizon {
let chunk_producer =
self.epoch_manager.get_chunk_producer(&epoch_id, head.height + i, shard_id)?;
if &chunk_producer == account_id {
Expand Down
3 changes: 1 addition & 2 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
};
use actix::{Actor, Addr, Handler, SyncArbiter, SyncContext};
use near_async::messaging::CanSend;
use near_chain::chain::TX_ROUTING_HEIGHT_HORIZON;
use near_chain::types::{RuntimeAdapter, Tip};
use near_chain::{
get_epoch_block_producers_view, Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode,
Expand Down Expand Up @@ -480,7 +479,7 @@ impl ViewClientActor {
.epoch_manager
.get_chunk_producer(
&head.epoch_id,
head.height + TX_ROUTING_HEIGHT_HORIZON - 1,
head.height + self.config.tx_routing_height_horizon - 1,
target_shard_id,
)
.map_err(|err| TxStatusError::ChainError(err.into()))?;
Expand Down
73 changes: 55 additions & 18 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ pub const DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL: u32 =

/// Configuration for garbage collection.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(default)]
pub struct GCConfig {
/// Maximum number of blocks to garbage collect at every garbage collection
/// call.
#[serde(default = "default_gc_blocks_limit")]
pub gc_blocks_limit: NumBlocks,

/// Maximum number of height to go through at each garbage collection step
/// when cleaning forks during garbage collection.
#[serde(default = "default_gc_fork_clean_step")]
pub gc_fork_clean_step: u64,

/// Number of epochs for which we keep store data.
#[serde(default = "default_gc_num_epochs_to_keep")]
pub gc_num_epochs_to_keep: u64,
}

Expand All @@ -56,18 +54,6 @@ impl Default for GCConfig {
}
}

fn default_gc_blocks_limit() -> NumBlocks {
GCConfig::default().gc_blocks_limit
}

fn default_gc_fork_clean_step() -> u64 {
GCConfig::default().gc_fork_clean_step
}

fn default_gc_num_epochs_to_keep() -> u64 {
GCConfig::default().gc_num_epochs_to_keep()
}

impl GCConfig {
pub fn gc_num_epochs_to_keep(&self) -> u64 {
max(MIN_GC_NUM_EPOCHS_TO_KEEP, self.gc_num_epochs_to_keep)
Expand Down Expand Up @@ -125,6 +111,9 @@ pub struct DumpConfig {
/// Feel free to set to `None`, defaults are sensible.
#[serde(skip_serializing_if = "Option::is_none")]
pub iteration_delay: Option<Duration>,
/// Location of a json file with credentials allowing write access to the bucket.
#[serde(skip_serializing_if = "Option::is_none")]
pub credentials_file: Option<PathBuf>,
}

/// Configures how to fetch state parts during state sync.
Expand Down Expand Up @@ -159,6 +148,50 @@ impl SyncConfig {
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug)]
#[serde(default)]
pub struct StateSplitConfig {
/// The soft limit on the size of a single batch. The batch size can be
/// decreased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_size: bytesize::ByteSize,

/// The delay between writing batches to the db. The batch delay can be
/// increased if resharding is consuming too many resources and interfering
/// with regular node operation.
pub batch_delay: Duration,

/// The delay between attempts to start resharding while waiting for the
/// state snapshot to become available.
pub retry_delay: Duration,

/// The delay between the resharding request is received and when the actor
/// actually starts working on it. This delay should only be used in tests.
pub initial_delay: Duration,

/// The maximum time that the actor will wait for the snapshot to be ready,
/// before starting resharding. Do not wait indefinitely since we want to
/// report error early enough for the node maintainer to have time to recover.
pub max_poll_time: Duration,
}

impl Default for StateSplitConfig {
fn default() -> Self {
// Conservative default for a slower resharding that puts as little
// extra load on the node as possible.
Self {
batch_size: bytesize::ByteSize::kb(500),
batch_delay: Duration::from_millis(100),
retry_delay: Duration::from_secs(10),
initial_delay: Duration::from_secs(0),
// The snapshot typically is available within a minute from the
// epoch start. Set the default higher in case we need to wait for
// state sync.
max_poll_time: Duration::from_secs(2 * 60 * 60), // 2 hours
}
}
}

/// ClientConfig where some fields can be updated at runtime.
#[derive(Clone, serde::Serialize)]
pub struct ClientConfig {
Expand Down Expand Up @@ -262,13 +295,16 @@ pub struct ClientConfig {
pub state_sync_enabled: bool,
/// Options for syncing state.
pub state_sync: StateSyncConfig,
/// Testing only. Makes a state snapshot after every epoch, but also every N blocks. The first snapshot is done after processng the first block.
pub state_snapshot_every_n_blocks: Option<u64>,
/// Limit of the size of per-shard transaction pool measured in bytes. If not set, the size
/// will be unbounded.
pub transaction_pool_size_limit: Option<u64>,
// Allows more detailed logging, for example a list of orphaned blocks.
pub enable_multiline_logging: bool,
// Configuration for resharding.
pub state_split_config: StateSplitConfig,
/// If the node is not a chunk producer within that many blocks, then route
/// to upcoming chunk producers.
pub tx_routing_height_horizon: BlockHeightDelta,
}

impl ClientConfig {
Expand Down Expand Up @@ -341,9 +377,10 @@ impl ClientConfig {
flat_storage_creation_period: Duration::from_secs(1),
state_sync_enabled,
state_sync: StateSyncConfig::default(),
state_snapshot_every_n_blocks: None,
transaction_pool_size_limit: None,
enable_multiline_logging: false,
state_split_config: StateSplitConfig::default(),
tx_routing_height_horizon: 4,
}
}
}
57 changes: 34 additions & 23 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::dyn_config::LOG_CONFIG_FILENAME;
use anyhow::{anyhow, bail, Context};
use near_chain_configs::{
get_initial_supply, ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode,
LogSummaryStyle, MutableConfigValue, StateSyncConfig,
LogSummaryStyle, MutableConfigValue, StateSplitConfig, StateSyncConfig,
};
use near_config_utils::{ValidationError, ValidationErrors};
use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey, Signer};
Expand Down Expand Up @@ -121,10 +121,6 @@ pub const GENESIS_CONFIG_FILENAME: &str = "genesis.json";
pub const NODE_KEY_FILE: &str = "node_key.json";
pub const VALIDATOR_KEY_FILE: &str = "validator_key.json";

pub const MAINNET: &str = "mainnet";
pub const TESTNET: &str = "testnet";
pub const BETANET: &str = "betanet";

pub const MAINNET_TELEMETRY_URL: &str = "https://explorer.mainnet.near.org/api/nodes";
pub const NETWORK_TELEMETRY_URL: &str = "https://explorer.{}.near.org/api/nodes";

Expand Down Expand Up @@ -198,6 +194,10 @@ fn default_transaction_pool_size_limit() -> Option<u64> {
Some(100_000_000) // 100 MB.
}

fn default_tx_routing_height_horizon() -> BlockHeightDelta {
4
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct Consensus {
/// Minimum number of peers to start syncing.
Expand Down Expand Up @@ -306,19 +306,15 @@ pub struct Config {
#[serde(skip_serializing_if = "Option::is_none")]
pub save_trie_changes: Option<bool>,
pub log_summary_style: LogSummaryStyle,
#[serde(default = "default_log_summary_period")]
pub log_summary_period: Duration,
// Allows more detailed logging, for example a list of orphaned blocks.
pub enable_multiline_logging: Option<bool>,
/// Garbage collection configuration.
#[serde(default, flatten)]
#[serde(flatten)]
pub gc: GCConfig,
#[serde(default = "default_view_client_threads")]
pub view_client_threads: usize,
pub epoch_sync_enabled: bool,
#[serde(default = "default_view_client_throttle_period")]
pub view_client_throttle_period: Duration,
#[serde(default = "default_trie_viewer_state_size_limit")]
pub trie_viewer_state_size_limit: Option<u64>,
/// If set, overrides value in genesis configuration.
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -327,14 +323,14 @@ pub struct Config {
pub store: near_store::StoreConfig,
/// Different parameters to configure underlying cold storage.
/// This feature is under development, do not use in production.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cold_store: Option<near_store::StoreConfig>,
/// Configuration for the split storage.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
pub split_storage: Option<SplitStorageConfig>,
/// The node will stop after the head exceeds this height.
/// The node usually stops within several seconds after reaching the target height.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
pub expected_shutdown: Option<BlockHeight>,
/// Whether to use state sync (unreliable and corrupts the DB if fails) or do a block sync instead.
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -348,10 +344,12 @@ pub struct Config {
/// guarantees that the node will use bounded resources to store incoming transactions.
/// Setting this value too low (<1MB) on the validator might lead to production of smaller
/// chunks and underutilizing the capacity of the network.
#[serde(default = "default_transaction_pool_size_limit")]
pub transaction_pool_size_limit: Option<u64>,
/// If a node needs to upload state parts to S3
pub s3_credentials_file: Option<String>,
// Configuration for resharding.
pub state_split_config: StateSplitConfig,
/// If the node is not a chunk producer within that many blocks, then route
/// to upcoming chunk producers.
pub tx_routing_height_horizon: BlockHeightDelta,
}

fn is_false(value: &bool) -> bool {
Expand Down Expand Up @@ -391,8 +389,9 @@ impl Default for Config {
state_sync: None,
state_sync_enabled: None,
transaction_pool_size_limit: default_transaction_pool_size_limit(),
s3_credentials_file: None,
enable_multiline_logging: None,
state_split_config: StateSplitConfig::default(),
tx_routing_height_horizon: default_tx_routing_height_horizon(),
}
}
}
Expand Down Expand Up @@ -503,7 +502,6 @@ impl Config {
None
}

#[allow(unused_variables)]
pub fn set_rpc_addr(&mut self, addr: tcp::ListenerAddr) {
#[cfg(feature = "json_rpc")]
{
Expand Down Expand Up @@ -688,9 +686,10 @@ impl NearConfig {
flat_storage_creation_period: config.store.flat_storage_creation_period,
state_sync_enabled: config.state_sync_enabled.unwrap_or(false),
state_sync: config.state_sync.unwrap_or_default(),
state_snapshot_every_n_blocks: None,
transaction_pool_size_limit: config.transaction_pool_size_limit,
enable_multiline_logging: config.enable_multiline_logging.unwrap_or(true),
state_split_config: config.state_split_config,
tx_routing_height_horizon: config.tx_routing_height_horizon,
},
network_config: NetworkConfig::new(
config.network,
Expand Down Expand Up @@ -898,7 +897,9 @@ fn generate_or_load_keys(
) -> anyhow::Result<()> {
generate_or_load_key(dir, &config.node_key_file, Some("node".parse().unwrap()), None)?;
match chain_id {
MAINNET | TESTNET | BETANET => {
near_primitives::chains::MAINNET
| near_primitives::chains::TESTNET
| near_primitives::chains::BETANET => {
generate_or_load_key(dir, &config.validator_key_file, account_id, None)?;
}
_ => {
Expand Down Expand Up @@ -983,7 +984,7 @@ pub fn init_configs(
// Before finalizing the Config and Genesis, make sure the node and validator keys exist.
generate_or_load_keys(dir, &config, &chain_id, account_id, test_seed)?;
match chain_id.as_ref() {
MAINNET => {
near_primitives::chains::MAINNET => {
if test_seed.is_some() {
bail!("Test seed is not supported for {chain_id}");
}
Expand All @@ -1002,7 +1003,7 @@ pub fn init_configs(
genesis.to_file(dir.join(config.genesis_file));
info!(target: "near", "Generated mainnet genesis file in {}", dir.display());
}
TESTNET | BETANET => {
near_primitives::chains::TESTNET | near_primitives::chains::BETANET => {
if test_seed.is_some() {
bail!("Test seed is not supported for {chain_id}");
}
Expand Down Expand Up @@ -1077,6 +1078,11 @@ pub fn init_configs(
_ => {
// Create new configuration, key files and genesis for one validator.
config.network.skip_sync_wait = true;

// Make sure node tracks all shards, see
// https://github.com/near/nearcore/issues/7388
config.tracked_shards = vec![0];

if fast {
config.consensus.min_block_production_delay =
Duration::from_millis(FAST_MIN_BLOCK_PRODUCTION_DELAY);
Expand Down Expand Up @@ -1431,7 +1437,12 @@ pub fn load_config(
validation_errors.push_errors(e)
};
if validator_signer.is_some()
&& matches!(genesis.config.chain_id.as_ref(), MAINNET | TESTNET | BETANET)
&& matches!(
genesis.config.chain_id.as_ref(),
near_primitives::chains::MAINNET
| near_primitives::chains::TESTNET
| near_primitives::chains::BETANET
)
&& config.tracked_shards.is_empty()
{
// Make sure validators tracks all shards, see
Expand Down
Loading

0 comments on commit aa98b1c

Please sign in to comment.