Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
anorth committed Aug 10, 2023
1 parent 887b7a9 commit 9aa6ab7
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 66 deletions.
9 changes: 5 additions & 4 deletions actors/market/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod account {
pub mod miner {
use super::*;
use cid::Cid;
use fvm_ipld_encoding::RawBytes;
use fvm_shared::clock::ChainEpoch;
use fvm_shared::error::ExitCode;
use fvm_shared::piece::PaddedPieceSize;
Expand Down Expand Up @@ -57,7 +58,7 @@ pub mod miner {
// The relevant state must be already committed so the receiver can observe any impacts
// at the sending miner actor.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
// TODO REVIEW: #[serde(transparent)]?
#[serde(transparent)]
pub struct SectorContentChangedParams {
// Distinct sectors with changed content.
pub sectors: Vec<SectorChanges>,
Expand Down Expand Up @@ -85,20 +86,20 @@ pub mod miner {
pub size: PaddedPieceSize,
// A receiver-specific identifier.
// E.g. an encoded deal ID which the provider claims this piece satisfies.
pub payload: Vec<u8>,
pub payload: RawBytes,
}

// For each piece in each sector, the notifee returns an exit code and
// (possibly-empty) result data.
// The miner actor will pass through results to its caller.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
// TODO REVIEW: #[serde(transparent)]?
#[serde(transparent)]
pub struct SectorContentChangedReturn {
// A result for each sector that was notified, in the same order.
pub sectors: Vec<SectorReturn>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
// TODO REVIEW: #[serde(transparent)]?
#[serde(transparent)]
pub struct SectorReturn {
// A result for each piece for the sector that was notified, in the same order.
pub added: Vec<PieceReturn>,
Expand Down
43 changes: 30 additions & 13 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,9 @@ impl Actor {
let mut sectors_deals: Vec<(SectorNumber, SectorDealIDs)> = vec![];

'sector: for p in params.sectors {
let mut verified_infos = Vec::new();
let mut nonverified_deal_space: BigInt = BigInt::zero();
let mut validated_proposals = vec![];
// Iterate once to validate all the requested deals.
// If a deal fails, skip the whole sector.
for deal_id in &p.deal_ids {
// Check each deal is present only once, within and across sectors.
if activated_deals.contains(deal_id) {
Expand All @@ -591,12 +592,18 @@ impl Actor {
continue 'sector;
}
};
validated_proposals.push(proposal);
}

// No continue below here, to ensure state changes are consistent.
// Any error is an abort.
let mut verified_infos = vec![];
let mut nonverified_deal_space: BigInt = BigInt::zero();
// Given that all deals validated, prepare the state updates for them all.
// There's no continue below here to ensure updates are consistent.
// Any error must abort.
for (deal_id, proposal) in p.deal_ids.iter().zip(validated_proposals) {
activated_deals.insert(*deal_id);
// Extract and remove any verified allocation ID for the pending deal.
let alloc_id = remove_pending_deal_allocation_id(
// FIXME move outside inner loop
&mut pending_deal_allocation_ids,
*deal_id,
)?
Expand All @@ -613,6 +620,7 @@ impl Actor {
nonverified_deal_space += proposal.piece_size.0;
}

// Prepare initial deal state.
deal_states.push((
*deal_id,
DealState {
Expand All @@ -624,9 +632,6 @@ impl Actor {
},
));
}
for id in &p.deal_ids {
activated_deals.insert(*id);
}

sectors_deals.push((p.sector_number, SectorDealIDs { deals: p.deal_ids.clone() }));
activations.push(SectorDealActivation { nonverified_deal_space, verified_infos });
Expand Down Expand Up @@ -669,7 +674,17 @@ impl Actor {
let mut sector_deal_ids: Vec<DealID> = vec![];
let mut pieces_ret: Vec<PieceReturn> = vec![];
for piece in &sector.added {
let deal_id: DealID = deserialize(&piece.payload.clone().into(), "deal id")?;
let deal_id: DealID = match deserialize(&piece.payload.clone(), "deal id") {
Ok(v) => v,
Err(e) => {
log::warn!("failed to deserialize deal id: {}", e);
pieces_ret.push(PieceReturn {
code: ExitCode::USR_SERIALIZATION,
data: vec![],
});
continue;
}
};
if activated_deals.contains(&deal_id) {
log::warn!("failed to activate duplicated deal {}", deal_id);
pieces_ret.push(PieceReturn {
Expand Down Expand Up @@ -1273,12 +1288,14 @@ fn preactivate_deal<BS: Blockstore>(
return Ok(Err(actor_error!(illegal_argument, "deal {} already activated", deal_id)));
}

let deal_cid = deal_cid(rt, &proposal)?;

// Confirm the deal is in the pending proposals queue.
// Confirm the deal is in the pending proposals set.
// It will be removed from this queue later, during cron.
// Failing this check is an internal invariant violation.
// The pending deals set exists to prevent duplicate proposals.
// It should be impossible to have a proposal, no deal state, and not be in pending deals.
let deal_cid = deal_cid(rt, &proposal)?;
if !has_pending_deal(pending_proposals, &deal_cid)? {
return Ok(Err(actor_error!(illegal_argument, "deal {} is not in pending set", deal_cid)));
return Ok(Err(actor_error!(illegal_state, "deal {} is not in pending set", deal_cid)));
}

Ok(Ok(proposal))
Expand Down
2 changes: 1 addition & 1 deletion actors/market/tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,6 @@ pub fn piece_info_from_deal(id: DealID, deal: &DealProposal) -> PieceInfo {
PieceInfo {
data: deal.piece_cid,
size: deal.piece_size,
payload: serialize(&id, "deal id").unwrap().to_vec(),
payload: serialize(&id, "deal id").unwrap(),
}
}
3 changes: 2 additions & 1 deletion actors/market/tests/random_cron_epoch_during_publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use fil_actor_market::EX_DEAL_EXPIRED;
use fil_actors_runtime::network::EPOCHS_IN_DAY;
use fil_actors_runtime::runtime::Policy;
use fil_actors_runtime::BURNT_FUNDS_ACTOR_ADDR;
Expand Down Expand Up @@ -153,7 +154,7 @@ fn activation_after_deal_start_epoch_but_before_it_is_processed_fails() {

let res =
activate_deals(&rt, SECTOR_EXPIRY, PROVIDER_ADDR, curr_epoch, SECTOR_NUMBER, &[deal_id]);
assert_eq!(res.activation_results.codes(), vec![ExitCode::USR_ILLEGAL_ARGUMENT]);
assert_eq!(res.activation_results.codes(), vec![EX_DEAL_EXPIRED]);
check_state(&rt);
}

Expand Down
33 changes: 28 additions & 5 deletions actors/market/tests/sector_content_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ fn piece_must_match_deal() {
pieces.push(PieceInfo {
data: Cid::new_v1(0, Sha2_256.digest(&[1, 2, 3, 4])),
size: PaddedPieceSize(1234),
payload: serialize(&1234, "deal id").unwrap().to_vec(),
payload: serialize(&1234, "deal id").unwrap(),
});

let changes =
Expand All @@ -196,6 +196,32 @@ fn piece_must_match_deal() {
check_state(&rt);
}

#[test]
fn invalid_deal_id_rejected() {
let rt = setup();
let deals = create_deals(&rt, 1);

rt.set_caller(*ACCOUNT_ACTOR_CODE_ID, WORKER_ADDR);
let deal_ids =
publish_deals(&rt, &MINER_ADDRESSES, &deals, TokenAmount::zero(), NO_ALLOCATION_ID);
let mut pieces = pieces_from_deals(&deal_ids, &deals);
// Append a byte to the deal ID.
let mut buf = pieces[0].payload.to_vec();
buf.push(123);
pieces[0].payload = buf.into();

let changes =
vec![SectorChanges { sector: 1, minimum_commitment_epoch: END_EPOCH + 10, added: pieces }];
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(1, ret.sectors.len());
assert_eq!(
vec![PieceReturn { code: ExitCode::USR_SERIALIZATION, data: vec![] },],
ret.sectors[0].added
);

check_state(&rt);
}

#[test]
fn failures_isolated() {
let rt = setup();
Expand Down Expand Up @@ -332,7 +358,4 @@ fn pieces_from_deals(deal_ids: &[DealID], deals: &[DealProposal]) -> Vec<PieceIn
// TODO

// - See activate_deals_failures
// - test bad deal ID, serialise, notfound
// - test not pending
// - test bad epoch, provider
// - test already activated
// - test bad deal ID, serialise
23 changes: 12 additions & 11 deletions actors/market/tests/verify_deals_for_activation_test.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

mod harness;

use fil_actor_market::{Actor as MarketActor, Method, SectorDeals, VerifyDealsForActivationParams};
use fil_actors_runtime::runtime::builtins::Type;
use fil_actors_runtime::test_utils::{
expect_abort, expect_abort_contains_message, make_piece_cid, ACCOUNT_ACTOR_CODE_ID,
MINER_ACTOR_CODE_ID,
};
use fil_actors_runtime::EPOCHS_IN_DAY;
use fvm_ipld_encoding::ipld_block::IpldBlock;
use fvm_shared::address::Address;
use fvm_shared::bigint::BigInt;
Expand All @@ -18,9 +9,19 @@ use fvm_shared::econ::TokenAmount;
use fvm_shared::error::ExitCode;
use fvm_shared::piece::PieceInfo;
use fvm_shared::sector::RegisteredSealProof;
use harness::*;
use num_traits::Zero;

use fil_actor_market::{Actor as MarketActor, Method, SectorDeals, VerifyDealsForActivationParams};
use fil_actors_runtime::runtime::builtins::Type;
use fil_actors_runtime::test_utils::{
expect_abort, expect_abort_contains_message, make_piece_cid, ACCOUNT_ACTOR_CODE_ID,
MINER_ACTOR_CODE_ID,
};
use fil_actors_runtime::EPOCHS_IN_DAY;
use harness::*;

mod harness;

const START_EPOCH: ChainEpoch = 10;
const CURR_EPOCH: ChainEpoch = START_EPOCH;
const END_EPOCH: ChainEpoch = 200 * EPOCHS_IN_DAY;
Expand Down Expand Up @@ -267,7 +268,7 @@ fn fail_when_current_epoch_is_greater_than_proposal_start_epoch() {
}],
};
expect_abort(
ExitCode::USR_ILLEGAL_ARGUMENT,
fil_actor_market::EX_DEAL_EXPIRED,
rt.call::<MarketActor>(
Method::VerifyDealsForActivation as u64,
IpldBlock::serialize_cbor(&params).unwrap(),
Expand Down
44 changes: 13 additions & 31 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,6 @@ impl Actor {
&rew.this_epoch_baseline_power,
&rew.this_epoch_reward_smoothed,
&pwr.quality_adj_power_smoothed,
info,
)?;

// Compute and burn the aggregate network fee. We need to re-load the state as
Expand Down Expand Up @@ -1164,7 +1163,7 @@ impl Actor {
})
.collect();
let (batch_return, deals_spaces) =
batch_activate_deals_and_claim_allocations(rt, &activation_infos, info.sector_size)?;
batch_activate_deals_and_claim_allocations(rt, &activation_infos)?;

// associate the successfully activated sectors with the ReplicaUpdateInner and SectorOnChainInfo
let validated_updates: Vec<(&UpdateAndSectorInfo, ext::market::DealSpaces)> =
Expand Down Expand Up @@ -2106,14 +2105,12 @@ impl Actor {
"failed to load pre-committed sectors",
)
})?;
let info = get_miner_info(rt.store(), &st)?;
confirm_sector_proofs_valid_internal(
rt,
precommited_sectors,
&params.reward_baseline_power,
&params.reward_smoothed,
&params.quality_adj_power_smoothed,
info,
)
}

Expand Down Expand Up @@ -4751,7 +4748,6 @@ fn confirm_sector_proofs_valid_internal(
this_epoch_baseline_power: &BigInt,
this_epoch_reward_smoothed: &FilterEstimate,
quality_adj_power_smoothed: &FilterEstimate,
info: MinerInfo,
) -> Result<(), ActorError> {
// get network stats from other actors
let circulating_supply = rt.total_fil_circ_supply();
Expand All @@ -4771,11 +4767,12 @@ fn confirm_sector_proofs_valid_internal(
.collect();

let (batch_return, activated_sectors) =
batch_activate_deals_and_claim_allocations(rt, &deals_activation_infos, info.sector_size)?;
batch_activate_deals_and_claim_allocations(rt, &deals_activation_infos)?;

let (total_pledge, newly_vested) = rt.transaction(|state: &mut State, rt| {
let policy = rt.policy();
let store = rt.store();
let info = get_miner_info(store, state)?;

let mut new_sector_numbers = Vec::<SectorNumber>::with_capacity(activated_sectors.len());
let mut deposit_to_unlock = TokenAmount::zero();
Expand Down Expand Up @@ -4920,7 +4917,6 @@ fn confirm_sector_proofs_valid_internal(
fn batch_activate_deals_and_claim_allocations(
rt: &impl Runtime,
activation_infos: &[DealsActivationInfo],
sector_size: SectorSize,
) -> Result<(BatchReturn, Vec<ext::market::DealSpaces>), ActorError> {
let batch_activation_res = match activation_infos.iter().all(|p| p.deal_ids.is_empty()) {
true => ext::market::BatchActivateDealsResult {
Expand Down Expand Up @@ -4970,30 +4966,16 @@ fn batch_activate_deals_and_claim_allocations(
for (activation_info, activate_res) in
successful_activation_infos.iter().zip(&batch_activation_res.activations)
{
let mut total_verified_space = BigInt::zero();
let mut sector_claims = Vec::new();
for verified in &activate_res.verified_infos {
total_verified_space += verified.size.0;
sector_claims.push(ext::verifreg::AllocationClaim {
client: verified.client,
allocation_id: verified.allocation_id,
data: verified.data,
size: verified.size,
});
}

// Verify the deals for each sector fit in the sector size.
// This check is redundant if the sector's unsealed CID (CommD) is verified as
// compatible with the deal pieces.
let total_deal_space = total_verified_space + &activate_res.nonverified_deal_space;
if total_deal_space > BigInt::from(sector_size as u64) {
return Err(actor_error!(
illegal_argument,
"deal space {} exceed sector size {}",
total_deal_space,
sector_size
));
}
let sector_claims = activate_res
.verified_infos
.iter()
.map(|info| ext::verifreg::AllocationClaim {
client: info.client,
allocation_id: info.allocation_id,
data: info.data,
size: info.size,
})
.collect();

verified_claims.push(ext::verifreg::SectorAllocationClaims {
sector: activation_info.sector_number,
Expand Down

0 comments on commit 9aa6ab7

Please sign in to comment.