Skip to content

Commit

Permalink
feat: orphan chunk state witness pool (#10613)
Browse files Browse the repository at this point in the history
### Description
This PR adds a pool for orphaned `ChunkStateWitnesses`.
To process a `ChunkStateWitness` we need the previous block, but
sometimes it isn't available immediately. The node might receive a
`ChunkStateWitness` before the block that's required to process it. In
such cases the witness becomes an "orphaned chunk state witness" and
it's put in `OrphanChunkStateWitnessPool`, where it waits for the
desired block to appear. Once a new block is accepted, we fetch all
orphaned witnesses that were waiting for this block from the pool and
process them.

### Design of `OrphanStateWitnessPool`

`OrphanStateWitnessPool` keeps a cache which maps `shard_id` and
`height` to an orphan `ChunkStateWitness` with these parameters:
```rust
witness_cache: LruCache<(ShardId, BlockHeight), ChunkStateWitness>,
```

All `ChunkStateWitnesses` go through basic validation before being put
in the orphan cache.
* The signature is checked to make sure that this witness really comes
from the right chunk producer that should produce a witness at this
height and shard_id.
* Client keeps only witnesses which are within 5 blocks of the current
chain head to prevent spam attacks. Without this limitation a single
malicious chunk producer could fill the whole cache with their fake
witnesses.
* There's also a limitation on witness size to limit the amount of
memory consumed by the pool. During StatelessNet loadtests performed by
`@staffik` and `@Longarithm` the observed `ChunkStateWitness` sIze was
16-32MB, so a 40MB limit should be alright. This PR only limits the size
of orphaned witnesses, limiting the size of non-orphan witnesses is much
more tricky, see the discussion in
#10615.

It's impossible to fully validate an orphaned witness, but this partial
validation should be enough to protect against attacks on the orphan
pool.

Under normal circumstances there should be only a few orphaned witnesses
per shard. If the node has fallen behind by more than a few blocks, it
has to catch up and its chunk endorsements don't matter.
The default cache capacity is set to 25 witnesses. With 5 shards it
provides capacity for 5 orphaned witnesses on each shard, which should
be enough.
Assuming that a single witness can take up 40 MB, the pool will consume
at most 1GB at full capacity.

The changes are divided into individual commits, they can be reviewed
commit-by-commit.

### Fixes
Fixes: #10552
Fixes: near/stakewars-iv#15
  • Loading branch information
jancionear authored Feb 23, 2024
1 parent 0c0dd1b commit 2158533
Show file tree
Hide file tree
Showing 25 changed files with 1,739 additions and 124 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use near_epoch_manager::types::BlockHeaderInfo;
use near_epoch_manager::{EpochManagerAdapter, RngSeed};
use near_pool::types::TransactionGroupIterator;
use near_primitives::account::{AccessKey, Account};
use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner};
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
Expand Down Expand Up @@ -954,6 +955,14 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(true)
}

fn verify_chunk_state_witness_signature_in_epoch(
&self,
_state_witness: &ChunkStateWitness,
_epoch_id: &EpochId,
) -> Result<bool, Error> {
Ok(true)
}

fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down Expand Up @@ -1002,6 +1011,14 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(shard_layout != next_shard_layout)
}

fn possible_epochs_of_height_around_tip(
&self,
_tip: &Tip,
_height: BlockHeight,
) -> Result<Vec<EpochId>, EpochError> {
unimplemented!();
}

#[cfg(feature = "new_epoch_sync")]
fn get_all_epoch_hashes(
&self,
Expand Down
1 change: 1 addition & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
yansi.workspace = true
bytesize.workspace = true

near-async.workspace = true
near-cache.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ impl Client {
network_adapter.clone().into_sender(),
runtime_adapter.clone(),
chunk_endorsement_tracker.clone(),
config.orphan_state_witness_pool_size,
);
let chunk_distribution_network = ChunkDistributionNetwork::from_config(&config);
Ok(Self {
Expand Down Expand Up @@ -1640,6 +1641,8 @@ impl Client {

self.shards_manager_adapter
.send(ShardsManagerRequestFromClient::CheckIncompleteChunks(*block.hash()));

self.process_ready_orphan_chunk_state_witnesses(&block);
}

/// Reconcile the transaction pool after processing a block.
Expand Down
36 changes: 7 additions & 29 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1824,35 +1824,13 @@ impl ClientActionHandler<SyncMessage> for ClientActions {
}
}

impl ClientActions {
pub fn handle_state_witness_message(
&mut self,
msg: ChunkStateWitnessMessage,
ctx: &mut dyn DelayedActionRunner<Self>,
) {
let peer_id = msg.peer_id.clone();
let attempts_remaining = msg.attempts_remaining;
match self.client.process_chunk_state_witness(msg.witness, msg.peer_id, None) {
Err(err) => {
tracing::error!(target: "client", ?err, "Error processing chunk state witness");
}
Ok(Some(witness)) => {
if attempts_remaining > 0 {
ctx.run_later(Duration::from_millis(100), move |actions, ctx| {
actions.handle_state_witness_message(
ChunkStateWitnessMessage {
witness,
peer_id,
attempts_remaining: attempts_remaining - 1,
},
ctx,
);
});
} else {
tracing::error!(target: "client", "Failed to process chunk state witness even after 5 tries due to missing parent block");
}
}
Ok(None) => {}
impl ClientActionHandler<ChunkStateWitnessMessage> for ClientActions {
type Result = ();

#[perf]
fn handle(&mut self, msg: ChunkStateWitnessMessage) -> Self::Result {
if let Err(err) = self.client.process_chunk_state_witness(msg.witness, msg.peer_id, None) {
tracing::error!(target: "client", ?err, "Error processing chunk state witness");
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_client_primitives::types::Error;
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::{EpochManagerAdapter, RngSeed};
use near_network::client::ChunkStateWitnessMessage;
use near_network::types::PeerManagerAdapter;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_primitives::network::PeerId;
Expand Down Expand Up @@ -151,18 +150,6 @@ where
}
}

// This one requires the context for further scheduling of messages, so
// we can't use the generic wrapper above.
impl Handler<WithSpanContext<ChunkStateWitnessMessage>> for ClientActor {
type Result = ();

fn handle(&mut self, msg: WithSpanContext<ChunkStateWitnessMessage>, ctx: &mut Context<Self>) {
self.wrap(msg, ctx, "ChunkStateWitnessMessage", |this, msg, ctx| {
this.actions.handle_state_witness_message(msg, ctx)
})
}
}

/// Returns random seed sampled from the current thread
pub fn random_seed_from_thread() -> RngSeed {
let mut rng_seed: RngSeed = [0; 32];
Expand Down
4 changes: 4 additions & 0 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ pub use crate::client::{Client, ProduceChunkResult};
pub use crate::client_actions::NetworkAdversarialMessage;
pub use crate::client_actor::{start_client, ClientActor};
pub use crate::config_updater::ConfigUpdater;
pub use crate::stateless_validation::chunk_validator::orphan_witness_handling::{
HandleOrphanWitnessOutcome, MAX_ORPHAN_WITNESS_SIZE,
};
pub use crate::sync::adapter::{SyncAdapter, SyncMessage};
pub use crate::view_client::{start_view_client, ViewClientActor};
pub use near_client_primitives::debug::DebugStatus;
pub use near_network::client::{
BlockApproval, BlockResponse, ProcessTxRequest, ProcessTxResponse, SetNetworkInfo,
};
pub use stateless_validation::processing_tracker::{ProcessingDoneTracker, ProcessingDoneWaiter};

pub mod adapter;
pub mod adversarial;
Expand Down
28 changes: 28 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,34 @@ pub(crate) static CHUNK_STATE_WITNESS_TOTAL_SIZE: Lazy<HistogramVec> = Lazy::new
.unwrap()
});

pub(crate) static ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_orphan_chunk_state_witness_total_count",
"Total number of orphaned chunk state witnesses that were saved for later processing",
&["shard_id"],
)
.unwrap()
});

pub(crate) static ORPHAN_CHUNK_STATE_WITNESS_POOL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_orphan_chunk_state_witness_pool_size",
"Number of orphaned witnesses kept in OrphanStateWitnessPool (by shard_id)",
&["shard_id"],
)
.unwrap()
});

pub(crate) static ORPHAN_CHUNK_STATE_WITNESS_POOL_MEMORY_USED: Lazy<IntGaugeVec> =
Lazy::new(|| {
try_create_int_gauge_vec(
"near_orphan_chunk_state_witness_pool_memory_used",
"Memory in bytes consumed by the OrphanStateWitnessPool (by shard_id)",
&["shard_id"],
)
.unwrap()
});

pub(crate) static BLOCK_PRODUCER_ENDORSED_STAKE_RATIO: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_block_producer_endorsed_stake_ratio",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod orphan_witness_handling;
pub mod orphan_witness_pool;

use super::processing_tracker::ProcessingDoneTracker;
use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker;
use crate::{metrics, Client};
Expand Down Expand Up @@ -31,6 +34,7 @@ use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::ShardId;
use near_primitives::validator_signer::ValidatorSigner;
use near_store::PartialStorage;
use orphan_witness_pool::OrphanStateWitnessPool;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -53,6 +57,7 @@ pub struct ChunkValidator {
network_sender: Sender<PeerManagerMessageRequest>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
orphan_witness_pool: OrphanStateWitnessPool,
}

impl ChunkValidator {
Expand All @@ -62,13 +67,15 @@ impl ChunkValidator {
network_sender: Sender<PeerManagerMessageRequest>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
orphan_witness_pool_size: usize,
) -> Self {
Self {
my_signer,
epoch_manager,
network_sender,
runtime_adapter,
chunk_endorsement_tracker,
orphan_witness_pool: OrphanStateWitnessPool::new(orphan_witness_pool_size),
}
}

Expand Down Expand Up @@ -639,14 +646,40 @@ impl Client {
witness: ChunkStateWitness,
peer_id: PeerId,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<Option<ChunkStateWitness>, Error> {
) -> Result<(), Error> {
let prev_block_hash = witness.inner.chunk_header.prev_block_hash();
if self.chain.get_block(prev_block_hash).is_err() {
return Ok(Some(witness));
let prev_block = match self.chain.get_block(prev_block_hash) {
Ok(block) => block,
Err(Error::DBNotFoundErr(_)) => {
// Previous block isn't available at the moment, add this witness to the orphan pool.
self.handle_orphan_state_witness(witness)?;
return Ok(());
}
Err(err) => return Err(err),
};
self.process_chunk_state_witness_with_prev_block(
witness,
peer_id,
&prev_block,
processing_done_tracker,
)
}

pub fn process_chunk_state_witness_with_prev_block(
&mut self,
witness: ChunkStateWitness,
peer_id: PeerId,
prev_block: &Block,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() {
return Err(Error::Other(format!(
"process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})",
witness.inner.chunk_header.prev_block_hash(),
prev_block.hash()
)));
}

// TODO(#10265): If the previous block does not exist, we should
// queue this (similar to orphans) to retry later.
let result = self.chunk_validator.start_validating_chunk(
witness,
&self.chain,
Expand All @@ -661,6 +694,6 @@ impl Client {
},
));
}
result.map(|_| None)
result
}
}
Loading

0 comments on commit 2158533

Please sign in to comment.