Skip to content

Commit

Permalink
fix: during resharding, reassign outgoing receipts to lowest index ch…
Browse files Browse the repository at this point in the history
…ild (#9362)

As described on [zulip](https://near.zulipchat.com/#narrow/stream/295558-pagoda.2Fcore/topic/outgoing.20receipts.20and.20resharding) the current way of reassigning outgoing receipts from parent shard to children shards won't work for any future reshardings. It only worked for 1->4 because there all children share the same parent. 

This PR implements a new approach where all outgoing receipts are going to be assigned to the child with the lowest shard id.
  • Loading branch information
wacban authored Aug 1, 2023
1 parent 291fb82 commit 83d6d08
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 133 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions chain/chain-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ pub enum Error {
/// Invalid block merkle root.
#[error("Invalid Block Merkle Root")]
InvalidBlockMerkleRoot,
/// Invalid split shard ids.
#[error("Invalid Split Shard Ids when resharding. shard_id: {0}, parent_shard_id: {1}")]
InvalidSplitShardsIds(u64, u64),
/// Someone is not a validator. Usually happens in signature verification
#[error("Not A Validator")]
NotAValidator,
Expand Down Expand Up @@ -271,6 +274,7 @@ impl Error {
| Error::InvalidStatePayload
| Error::InvalidTransactions
| Error::InvalidChallenge
| Error::InvalidSplitShardsIds(_, _)
| Error::MaliciousChallenge
| Error::IncorrectNumberOfChunkHeaders
| Error::InvalidEpochHash
Expand Down
9 changes: 8 additions & 1 deletion chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@ expensive_tests = []
test_features = []
delay_detector = ["delay-detector/delay_detector"]
no_cache = ["near-store/no_cache"]
protocol_feature_reject_blocks_with_outdated_protocol_version = ["near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version"]

protocol_feature_reject_blocks_with_outdated_protocol_version = [
"near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version",
]
protocol_feature_simple_nightshade_v2 = [
"near-primitives/protocol_feature_simple_nightshade_v2",
]

nightly = [
"nightly_protocol",
"protocol_feature_reject_blocks_with_outdated_protocol_version",
"protocol_feature_simple_nightshade_v2",
"near-chain-configs/nightly",
"near-client-primitives/nightly",
"near-epoch-manager/nightly",
Expand Down
70 changes: 38 additions & 32 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4887,13 +4887,14 @@ impl<'a> ChainUpdate<'a> {
self.chain_store_update.commit()
}

/// For all the outgoing receipts generated in block `hash` at the shards we are tracking
/// in this epoch,
/// save a mapping from receipt ids to the destination shard ids that the receipt will be sent
/// to in the next block.
/// Note that this function should be called after `save_block` is called on this block because
/// it requires that the block info is available in EpochManager, otherwise it will return an
/// error.
/// For all the outgoing receipts generated in block `hash` at the shards we
/// are tracking in this epoch, save a mapping from receipt ids to the
/// destination shard ids that the receipt will be sent to in the next
/// block.
///
/// Note that this function should be called after `save_block` is called on
/// this block because it requires that the block info is available in
/// EpochManager, otherwise it will return an error.
pub fn save_receipt_id_to_shard_id_for_block(
&mut self,
me: &Option<AccountId>,
Expand All @@ -4902,41 +4903,46 @@ impl<'a> ChainUpdate<'a> {
num_shards: NumShards,
) -> Result<(), Error> {
for shard_id in 0..num_shards {
if self.shard_tracker.care_about_shard(
let care_about_shard = self.shard_tracker.care_about_shard(
me.as_ref(),
&prev_hash,
shard_id as ShardId,
true,
) {
let receipt_id_to_shard_id: HashMap<_, _> = {
// it can be empty if there is no new chunk for this shard
if let Ok(outgoing_receipts) =
self.chain_store_update.get_outgoing_receipts(hash, shard_id)
{
let shard_layout =
self.epoch_manager.get_shard_layout_from_prev_block(hash)?;
outgoing_receipts
.iter()
.map(|receipt| {
(
receipt.receipt_id,
account_id_to_shard_id(&receipt.receiver_id, &shard_layout),
)
})
.collect()
} else {
HashMap::new()
}
};
for (receipt_id, shard_id) in receipt_id_to_shard_id {
self.chain_store_update.save_receipt_id_to_shard_id(receipt_id, shard_id);
}
);
if !care_about_shard {
continue;
}
let receipt_id_to_shard_id = self.get_receipt_id_to_shard_id(hash, shard_id)?;
for (receipt_id, shard_id) in receipt_id_to_shard_id {
self.chain_store_update.save_receipt_id_to_shard_id(receipt_id, shard_id);
}
}

Ok(())
}

/// Returns a mapping from the receipt id to the destination shard id.
fn get_receipt_id_to_shard_id(
&mut self,
hash: &CryptoHash,
shard_id: u64,
) -> Result<HashMap<CryptoHash, u64>, Error> {
let outgoing_receipts = self.chain_store_update.get_outgoing_receipts(hash, shard_id);
let outgoing_receipts = if let Ok(outgoing_receipts) = outgoing_receipts {
outgoing_receipts
} else {
return Ok(HashMap::new());
};
let shard_layout = self.epoch_manager.get_shard_layout_from_prev_block(hash)?;
let outgoing_receipts = outgoing_receipts
.iter()
.map(|receipt| {
(receipt.receipt_id, account_id_to_shard_id(&receipt.receiver_id, &shard_layout))
})
.collect();
Ok(outgoing_receipts)
}

fn apply_chunk_postprocessing(
&mut self,
block: &Block,
Expand Down
149 changes: 125 additions & 24 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use near_cache::CellLruCache;
use near_chain_primitives::error::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::block::Tip;
#[cfg(feature = "protocol_feature_simple_nightshade_v2")]
use near_primitives::checked_feature;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{MerklePath, PartialMerkleTree};
use near_primitives::receipt::Receipt;
use near_primitives::shard_layout::{account_id_to_shard_id, get_block_shard_uid, ShardUId};
use near_primitives::shard_layout::account_id_to_shard_id;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout, ShardUId};
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, ReceiptProof, ShardChunk, ShardChunkHeader,
StateSyncInfo,
Expand All @@ -37,6 +40,7 @@ use near_primitives::utils::{
get_block_shard_id, get_outcome_id_block_hash, get_outcome_id_block_hash_rev, index_to_bytes,
to_timestamp,
};
use near_primitives::version::ProtocolVersion;
use near_primitives::views::LightClientBlockView;
use near_store::{
DBCol, KeyForStateChanges, ShardTries, Store, StoreUpdate, WrappedTrieChanges, CHUNK_TAIL_KEY,
Expand Down Expand Up @@ -480,35 +484,132 @@ impl ChainStore {
loop {
let block_header = self.get_block_header(&receipts_block_hash)?;

if block_header.height() == last_included_height {
let receipts_shard_layout =
epoch_manager.get_shard_layout(block_header.epoch_id())?;

// get the shard from which the outgoing receipt were generated
let receipts_shard_id = if shard_layout != receipts_shard_layout {
shard_layout.get_parent_shard_id(shard_id)?
} else {
shard_id
};
let mut receipts = self
.get_outgoing_receipts(&receipts_block_hash, receipts_shard_id)
.map(|v| v.to_vec())
.unwrap_or_default();

// filter to receipts that belong to `shard_id` in the current shard layout
if shard_layout != receipts_shard_layout {
receipts.retain(|receipt| {
account_id_to_shard_id(&receipt.receiver_id, &shard_layout) == shard_id
});
}
if block_header.height() != last_included_height {
receipts_block_hash = *block_header.prev_hash();
continue;
}
let receipts_shard_layout = epoch_manager.get_shard_layout(block_header.epoch_id())?;

return Ok(receipts);
// get the shard from which the outgoing receipt were generated
let receipts_shard_id = if shard_layout != receipts_shard_layout {
shard_layout.get_parent_shard_id(shard_id)?
} else {
receipts_block_hash = *block_header.prev_hash();
shard_id
};

let mut receipts = self
.get_outgoing_receipts(&receipts_block_hash, receipts_shard_id)
.map(|v| v.to_vec())
.unwrap_or_default();

if shard_layout != receipts_shard_layout {
// the shard layout has changed so we need to reassign the outgoing receipts
let epoch_id = epoch_manager.get_epoch_id_from_prev_block(&prev_block_hash)?;
let protocol_version = epoch_manager.get_epoch_protocol_version(&epoch_id)?;
Self::reassign_outgoing_receipts_for_resharding(
&mut receipts,
protocol_version,
&shard_layout,
shard_id,
receipts_shard_id,
)?;
}

return Ok(receipts);
}
}

fn reassign_outgoing_receipts_for_resharding(
receipts: &mut Vec<Receipt>,
protocol_version: ProtocolVersion,
shard_layout: &ShardLayout,
shard_id: u64,
receipts_shard_id: u64,
) -> Result<(), Error> {
tracing::trace!(target: "resharding", ?protocol_version, shard_id, receipts_shard_id, "reassign_outgoing_receipts_for_resharding");
// If simple nightshade v2 is enabled and stable use that.
#[cfg(feature = "protocol_feature_simple_nightshade_v2")]
if checked_feature!("stable", SimpleNightshadeV2, protocol_version) {
Self::reassign_outgoing_receipts_for_resharding_v2(
receipts,
shard_layout,
shard_id,
receipts_shard_id,
)?;
return Ok(());
}

// Otherwise use the old reassignment. Keep in mind it only works for
// 1 shard -> n shards reshardings, otherwise receipts get lost.
Self::reassign_outgoing_receipts_for_resharding_v1(receipts, shard_layout, shard_id)?;
Ok(())
}

/// Reassign the outgoing receipts from the parent shard to the children
/// shards.
///
/// This method does it based on the "lowest child index" approach where it
/// assigns all the receipts from parent to the child shard with the lowest
/// index. It's meant to be used for the resharding from simple nightshade
/// with 4 shards to simple nightshade v2 with 5 shards and subsequent
/// reshardings.
///
/// e.g. in the following resharding
/// 0->0', 1->1', 2->2', 3->3',4'
/// 0' will get all outgoing receipts from its parent 0
/// 1' will get all outgoing receipts from its parent 1
/// 2' will get all outgoing receipts from its parent 2
/// 3' will get all outgoing receipts from its parent 3
/// 4' will get no outgoing receipts from its parent 3
/// All receipts are distributed to children, each exactly once.
#[cfg(feature = "protocol_feature_simple_nightshade_v2")]
fn reassign_outgoing_receipts_for_resharding_v2(
receipts: &mut Vec<Receipt>,
shard_layout: &ShardLayout,
shard_id: ShardId,
receipts_shard_id: ShardId,
) -> Result<(), Error> {
let split_shard_ids = shard_layout.get_split_shard_ids(receipts_shard_id);
let split_shard_ids =
split_shard_ids.ok_or(Error::InvalidSplitShardsIds(shard_id, receipts_shard_id))?;

// The target shard id is the split shard with the lowest shard id.
let target_shard_id = split_shard_ids.iter().min();
let target_shard_id =
*target_shard_id.ok_or(Error::InvalidSplitShardsIds(shard_id, receipts_shard_id))?;

if shard_id == target_shard_id {
// This shard_id is the lowest index child, it gets all the receipts.
Ok(())
} else {
// This shard_id is not the lowest index child, it gets no receipts.
receipts.clear();
Ok(())
}
}

/// Reassign the outgoing receipts from the parent shard to the children
/// shards.
///
/// This method does it based on the "receipt receiver" approach where the
/// receipt is assigned to the shard of the receiver.
///
/// This approach worked well for the 1->4 shards resharding but it doesn't
/// work for following reshardings. The reason is that it's only the child
/// shards that look at parents shard's outgoing receipts. If the receipt
/// receiver happens to not fall within one of the children shards then the
/// receipt is lost.
fn reassign_outgoing_receipts_for_resharding_v1(
receipts: &mut Vec<Receipt>,
shard_layout: &ShardLayout,
shard_id: ShardId,
) -> Result<(), Error> {
receipts.retain(|receipt| {
account_id_to_shard_id(&receipt.receiver_id, &shard_layout) == shard_id
});
Ok(())
}

/// For a given transaction, it expires if the block that the chunk points to is more than `validity_period`
/// ahead of the block that has `base_block_hash`.
pub fn check_transaction_validity_period(
Expand Down
1 change: 1 addition & 0 deletions core/primitives/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ impl TestBlockBuilder {
}

pub fn build(self) -> Block {
tracing::debug!(target: "test", height=self.height, ?self.epoch_id, "produce block");
Block::produce(
PROTOCOL_VERSION,
PROTOCOL_VERSION,
Expand Down
1 change: 1 addition & 0 deletions integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ chrono.workspace = true
clap.workspace = true
futures.workspace = true
hex.workspace = true
itertools.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
primitive-types.workspace = true
Expand Down
Loading

0 comments on commit 83d6d08

Please sign in to comment.