Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: during resharding, reassign outgoing receipts to lowest index child #9362

Merged
merged 8 commits into from
Aug 1, 2023
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,22 @@ expensive_tests = []
test_features = []
delay_detector = ["delay-detector/delay_detector"]
no_cache = ["near-store/no_cache"]
protocol_feature_reject_blocks_with_outdated_protocol_version = ["near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version"]
protocol_feature_block_header_v4 = ["near-primitives/protocol_feature_block_header_v4"]
protocol_feature_reject_blocks_with_outdated_protocol_version = [
"near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version",
]
protocol_feature_block_header_v4 = [
"near-primitives/protocol_feature_block_header_v4",
]
protocol_feature_simple_nightshade_v2 = [
"near-primitives/protocol_feature_simple_nightshade_v2",
]


nightly = [
"nightly_protocol",
"protocol_feature_block_header_v4",
"protocol_feature_reject_blocks_with_outdated_protocol_version",
"protocol_feature_simple_nightshade_v2",
"near-chain-configs/nightly",
"near-client-primitives/nightly",
"near-epoch-manager/nightly",
Expand Down
70 changes: 38 additions & 32 deletions chain/chain/src/chain.rs
wacban marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4879,13 +4879,14 @@ impl<'a> ChainUpdate<'a> {
self.chain_store_update.commit()
}

/// For all the outgoing receipts generated in block `hash` at the shards we are tracking
/// in this epoch,
/// save a mapping from receipt ids to the destination shard ids that the receipt will be sent
/// to in the next block.
/// Note that this function should be called after `save_block` is called on this block because
/// it requires that the block info is available in EpochManager, otherwise it will return an
/// error.
/// For all the outgoing receipts generated in block `hash` at the shards we
/// are tracking in this epoch, save a mapping from receipt ids to the
/// destination shard ids that the receipt will be sent to in the next
/// block.
///
/// Note that this function should be called after `save_block` is called on
/// this block because it requires that the block info is available in
/// EpochManager, otherwise it will return an error.
pub fn save_receipt_id_to_shard_id_for_block(
wacban marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
me: &Option<AccountId>,
Expand All @@ -4894,41 +4895,46 @@ impl<'a> ChainUpdate<'a> {
num_shards: NumShards,
) -> Result<(), Error> {
for shard_id in 0..num_shards {
if self.shard_tracker.care_about_shard(
let care_about_shard = self.shard_tracker.care_about_shard(
me.as_ref(),
&prev_hash,
shard_id as ShardId,
true,
) {
let receipt_id_to_shard_id: HashMap<_, _> = {
// it can be empty if there is no new chunk for this shard
if let Ok(outgoing_receipts) =
self.chain_store_update.get_outgoing_receipts(hash, shard_id)
{
let shard_layout =
self.epoch_manager.get_shard_layout_from_prev_block(hash)?;
outgoing_receipts
.iter()
.map(|receipt| {
(
receipt.receipt_id,
account_id_to_shard_id(&receipt.receiver_id, &shard_layout),
)
})
.collect()
} else {
HashMap::new()
}
};
for (receipt_id, shard_id) in receipt_id_to_shard_id {
self.chain_store_update.save_receipt_id_to_shard_id(receipt_id, shard_id);
}
);
if !care_about_shard {
continue;
}
let receipt_id_to_shard_id = self.get_receipt_id_to_shard_id(hash, shard_id)?;
for (receipt_id, shard_id) in receipt_id_to_shard_id {
self.chain_store_update.save_receipt_id_to_shard_id(receipt_id, shard_id);
}
}

Ok(())
}

/// Returns a mapping from the receipt id to the destination shard id.
fn get_receipt_id_to_shard_id(
&mut self,
hash: &CryptoHash,
shard_id: u64,
) -> Result<HashMap<CryptoHash, u64>, Error> {
let outgoing_receipts = self.chain_store_update.get_outgoing_receipts(hash, shard_id);
let outgoing_receipts = if let Ok(outgoing_receipts) = outgoing_receipts {
outgoing_receipts
} else {
return Ok(HashMap::new());
};
let shard_layout = self.epoch_manager.get_shard_layout_from_prev_block(hash)?;
let outgoing_receipts = outgoing_receipts
.iter()
.map(|receipt| {
(receipt.receipt_id, account_id_to_shard_id(&receipt.receiver_id, &shard_layout))
})
.collect();
Ok(outgoing_receipts)
}

fn apply_chunk_postprocessing(
&mut self,
block: &Block,
Expand Down
117 changes: 93 additions & 24 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{MerklePath, PartialMerkleTree};
use near_primitives::receipt::Receipt;
use near_primitives::shard_layout::{account_id_to_shard_id, get_block_shard_uid, ShardUId};
#[cfg(not(feature = "protocol_feature_simple_nightshade_v2"))]
use near_primitives::shard_layout::account_id_to_shard_id;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout, ShardUId};
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, ReceiptProof, ShardChunk, ShardChunkHeader,
StateSyncInfo,
Expand Down Expand Up @@ -480,35 +482,101 @@ impl ChainStore {
loop {
let block_header = self.get_block_header(&receipts_block_hash)?;

if block_header.height() == last_included_height {
let receipts_shard_layout =
epoch_manager.get_shard_layout(block_header.epoch_id())?;

// get the shard from which the outgoing receipt were generated
let receipts_shard_id = if shard_layout != receipts_shard_layout {
shard_layout.get_parent_shard_id(shard_id)?
} else {
shard_id
};
let mut receipts = self
.get_outgoing_receipts(&receipts_block_hash, receipts_shard_id)
.map(|v| v.to_vec())
.unwrap_or_default();

// filter to receipts that belong to `shard_id` in the current shard layout
if shard_layout != receipts_shard_layout {
receipts.retain(|receipt| {
account_id_to_shard_id(&receipt.receiver_id, &shard_layout) == shard_id
});
}
if block_header.height() != last_included_height {
receipts_block_hash = *block_header.prev_hash();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robin-near, just for my understanding, when is it possible for the prev_block_height to not be last_included_height? Is it when we have an orphan block chain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's when a block is missing a chunk. Then the last_included_height will be the latest block height where a chunk for the shard is present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, what @wacban said. It's one of the very cryptic parts of the codebase - chunks can be missing from blocks, and the logic to handle that is very complex.

continue;
}
let receipts_shard_layout = epoch_manager.get_shard_layout(block_header.epoch_id())?;

return Ok(receipts);
// get the shard from which the outgoing receipt were generated
let receipts_shard_id = if shard_layout != receipts_shard_layout {
shard_layout.get_parent_shard_id(shard_id)?
} else {
receipts_block_hash = *block_header.prev_hash();
shard_id
};

let mut receipts = self
.get_outgoing_receipts(&receipts_block_hash, receipts_shard_id)
.map(|v| v.to_vec())
.unwrap_or_default();

if shard_layout != receipts_shard_layout {
#[cfg(feature = "protocol_feature_simple_nightshade_v2")]
wacban marked this conversation as resolved.
Show resolved Hide resolved
Self::reassign_outgoing_receipts_for_resharding_v2(
&shard_layout,
shard_id,
receipts_shard_id,
&mut receipts,
);

#[cfg(not(feature = "protocol_feature_simple_nightshade_v2"))]
Self::reassign_outgoing_receipts_for_resharding_v1(
&shard_layout,
shard_id,
&mut receipts,
);
wacban marked this conversation as resolved.
Show resolved Hide resolved
}

return Ok(receipts);
}
}

/// Reassign the outgoing receipts from the parent shard to the children
/// shards.
///
/// This method does it based on the "lowest child index" approach where it
/// assigns all the receipts from parent to the child shard with the lowest
/// index. It's meant to be used for the resharding from simple nightshade
/// with 4 shards to simple nightshade v2 with 5 shards and subsequent
/// reshardings.
#[cfg(feature = "protocol_feature_simple_nightshade_v2")]
fn reassign_outgoing_receipts_for_resharding_v2(
wacban marked this conversation as resolved.
Show resolved Hide resolved
wacban marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this handle when split shards are in the middle of shard IDs, e.g. 1 -> 1,2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In resharding as you described we would have:
0->0' ; 1->1', 2' ; 2->3' ; 3->4'
shard 0', 1', 3', 4' each will get all of the ougoing receipts from their respective parent shard
shard 2' will get no outgoing receipts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I was wondering how this remapping happen for non split shards (e.g. shard 2 and 3 in the example above) since the function seem to care only about split shard ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-split shards we treat it as splitting a parent shard into a single child shard. So it is covered here. In the example above shard 3' will get all of the outgoing receipts from its parent shard 2 and shard 4' will get all of the outgoing receipts from its parent 3.

Currently because of how storage keys work we need to copy all of the data from shards that are not split anyway. In the future we'll likely optimize that. Assuming we decide to have unique, incremental shard ids (e.g. 0, 1, 2, 3 -> (splitting 1) -> 0, 2, 3, 4, 5) then the outgoing receipts reassignment will be to keep the receipts where they are for non-split shards and reassign to lowest index child for split shards.

shard_layout: &ShardLayout,
shard_id: ShardId,
parent_shard_id: ShardId,
receipts: &mut Vec<Receipt>,
) {
let split_shard_ids = shard_layout.get_split_shard_ids(parent_shard_id);
wacban marked this conversation as resolved.
Show resolved Hide resolved
let split_shard_ids = if let Some(split_shard_ids) = split_shard_ids {
split_shard_ids
} else {
tracing::error!(target:"store", "split shard ids is none");
return;
};

let lowest_index_child_shard_id = split_shard_ids.iter().min();
wacban marked this conversation as resolved.
Show resolved Hide resolved
let lowest_index_child_shard_id =
if let Some(lowest_index_child_shard_id) = lowest_index_child_shard_id {
*lowest_index_child_shard_id
} else {
tracing::error!(target:"store", "split shard ids is empty");
return;
};

if shard_id == lowest_index_child_shard_id {
// this shard_id is the lowest index child, it gets all the receipts
} else {
// this shard_id is not the lowest index child, it gets no receipts
receipts.clear();
}
}

/// Reassign the outgoing receipts from the parent shard to the children
/// shards.
///
/// This method does it based on the "receipt receiver" approach where the
/// receipt is assigned to the shard of the receiver.
wacban marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(not(feature = "protocol_feature_simple_nightshade_v2"))]
fn reassign_outgoing_receipts_for_resharding_v1(
shard_layout: &ShardLayout,
shard_id: ShardId,
receipts: &mut Vec<Receipt>,
) {
receipts.retain(|receipt| {
account_id_to_shard_id(&receipt.receiver_id, &shard_layout) == shard_id
});
}

/// For a given transaction, it expires if the block that the chunk points to is more than `validity_period`
/// ahead of the block that has `base_block_hash`.
pub fn check_transaction_validity_period(
Expand Down Expand Up @@ -1845,6 +1913,7 @@ impl<'a> ChainStoreUpdate<'a> {
shard_id: ShardId,
outgoing_receipts: Vec<Receipt>,
) {
tracing::debug!(target:"store", block=?hash, shard_id, ?outgoing_receipts, "save outgoing receipts");
self.chain_store_cache_update
.outgoing_receipts
.insert((*hash, shard_id), Arc::new(outgoing_receipts));
Expand Down
1 change: 1 addition & 0 deletions integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ chrono.workspace = true
clap.workspace = true
futures.workspace = true
hex.workspace = true
itertools.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
primitive-types.workspace = true
Expand Down
Loading