Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch ClaimAllocations during ProveCommitAggregate #1304

Merged
merged 10 commits into from
Jun 5, 2023
19 changes: 14 additions & 5 deletions actors/miner/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub mod market {
}
}

#[derive(Serialize_tuple, Deserialize_tuple)]
#[derive(Serialize_tuple, Deserialize_tuple, Clone)]
pub struct ActivateDealsResult {
#[serde(with = "bigint_ser")]
pub nonverified_deal_space: BigInt,
Expand Down Expand Up @@ -207,13 +207,22 @@ pub mod verifreg {

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ClaimAllocationsParams {
pub sectors: Vec<SectorAllocationClaim>,
pub allocations: Vec<SectorAllocationClaim>,
pub all_or_nothing: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ClaimAllocationsReturn {
pub batch_info: BatchReturn,

#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct SectorAllocationClaimResult {
#[serde(with = "bigint_ser")]
pub claimed_space: BigInt,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct ClaimAllocationsReturn {
/// claim_results is parallel to ClaimAllocationsParams.allocations with failed allocations
/// being represented by claimed_space == BigInt::zero()
pub claim_results: Vec<SectorAllocationClaimResult>,
alexytsu marked this conversation as resolved.
Show resolved Hide resolved
}
}
185 changes: 157 additions & 28 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::ops::Neg;
use anyhow::{anyhow, Error};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use cid::Cid;
use ext::market::ActivateDealsResult;
use fvm_ipld_bitfield::{BitField, Validate};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::{from_slice, BytesDe, CborStore};
Expand Down Expand Up @@ -3593,6 +3594,13 @@ struct ExtendExpirationsInner {
claims: Option<BTreeMap<SectorNumber, (u64, u64)>>,
}

#[derive(Debug, Clone)]
pub struct DealsActivationInfo {
pub deal_ids: Vec<DealID>,
pub sector_expiry: ChainEpoch,
pub sector_number: SectorNumber,
}

#[derive(Clone, Debug, PartialEq)]
pub struct ValidatedExpirationExtension {
pub deadline: u64,
Expand Down Expand Up @@ -4795,43 +4803,36 @@ fn confirm_sector_proofs_valid_internal(
// Ideally, we'd combine some of these operations, but at least we have
// a constant number of them.
let activation = rt.curr_epoch();
// Pre-commits for new sectors.
let mut valid_pre_commits = Vec::default();

for pre_commit in pre_commits {
match activate_deals_and_claim_allocations(
rt,
pre_commit.clone().info.deal_ids,
pre_commit.info.expiration,
pre_commit.info.sector_number,
)? {
None => {
info!(
"failed to activate deals on sector {}, dropping from prove commit set",
pre_commit.info.sector_number,
);
continue;
}
Some(deal_spaces) => valid_pre_commits.push((pre_commit, deal_spaces)),
};
}
let deals_activation_infos: Vec<DealsActivationInfo> = pre_commits
.iter()
.map(|pc| DealsActivationInfo {
deal_ids: pc.info.deal_ids.clone(),
sector_expiry: pc.info.expiration,
sector_number: pc.info.sector_number,
})
.collect();

// When all prove commits have failed abort early
if valid_pre_commits.is_empty() {
return Err(actor_error!(illegal_argument, "all prove commits failed to validate"));
}
let activated_sectors =
batch_activate_deals_and_claim_allocations(rt, &deals_activation_infos)?;

let (total_pledge, newly_vested) = rt.transaction(|state: &mut State, rt| {
let policy = rt.policy();
let store = rt.store();
let info = get_miner_info(store, state)?;

let mut new_sector_numbers = Vec::<SectorNumber>::with_capacity(valid_pre_commits.len());
let mut new_sector_numbers = Vec::<SectorNumber>::with_capacity(activated_sectors.len());
let mut deposit_to_unlock = TokenAmount::zero();
let mut new_sectors = Vec::<SectorOnChainInfo>::new();
let mut total_pledge = TokenAmount::zero();

for (pre_commit, deal_spaces) in valid_pre_commits {
for (pre_commit, deal_spaces) in pre_commits.iter().zip(activated_sectors) {
// skip sectors that weren't activated
if deal_spaces.is_none() {
continue;
}
let deal_spaces = deal_spaces.unwrap();

// compute initial pledge
let duration = pre_commit.info.expiration - activation;

Expand Down Expand Up @@ -4886,7 +4887,7 @@ fn confirm_sector_proofs_valid_internal(
sector_number: pre_commit.info.sector_number,
seal_proof: pre_commit.info.seal_proof,
sealed_cid: pre_commit.info.sealed_cid,
deal_ids: pre_commit.info.deal_ids,
deal_ids: pre_commit.info.deal_ids.clone(),
expiration: pre_commit.info.expiration,
activation,
deal_weight,
Expand Down Expand Up @@ -4962,6 +4963,131 @@ fn confirm_sector_proofs_valid_internal(
Ok(())
}

/// Activates the deals then claims allocations for any verified deals
/// The returned vector is parallel (with same length and corresponding indices) to the requested
/// activations. If activation a deal set fails, a None entry appears in the vector. If the final
/// ClaimAllocations fails, the function returns an error.
fn batch_activate_deals_and_claim_allocations(
anorth marked this conversation as resolved.
Show resolved Hide resolved
alexytsu marked this conversation as resolved.
Show resolved Hide resolved
rt: &impl Runtime,
activation_infos: &[DealsActivationInfo],
) -> Result<Vec<Option<ext::market::DealSpaces>>, ActorError> {
// Fills with ActivateDealResults per DealActivationInfo
// ActivateDeals implicitly succeeds when no deal_ids are specified
// If deal activation fails, a None is recorded in place
let mut activation_results = Vec::new();

// TODO: https://github.com/filecoin-project/builtin-actors/pull/1303 enable activation batching
for activation_info in activation_infos {
// Check (and activate) storage deals associated to sector
let deal_ids = activation_info.deal_ids.clone();
if deal_ids.is_empty() {
activation_results.push(Some(ActivateDealsResult {
nonverified_deal_space: BigInt::default(),
verified_infos: Vec::default(),
}));
continue;
}

let activate_raw = extract_send_result(rt.send_simple(
&STORAGE_MARKET_ACTOR_ADDR,
ext::market::ACTIVATE_DEALS_METHOD,
IpldBlock::serialize_cbor(&ext::market::ActivateDealsParams {
deal_ids,
sector_expiry: activation_info.sector_expiry,
})?,
TokenAmount::zero(),
));
let activate_res: Option<ext::market::ActivateDealsResult> = match activate_raw {
Ok(res) => Some(deserialize_block(res)?),
Err(e) => {
info!(
"error activating deals on sector {}: {}",
activation_info.sector_number,
e.msg()
);
None
}
};
activation_results.push(activate_res);
}

// When all prove commits have failed abort early
if activation_results.iter().all(Option::is_none) {
return Err(actor_error!(illegal_argument, "all deals failed to activate"));
}

// Flattened claims across all sectors
let mut sectors_claims = Vec::new();
for (activation_info, activate_res) in activation_infos.iter().zip(activation_results.clone()) {
if activate_res.is_none() {
continue;
}
let activate_res = activate_res.unwrap();

let mut sector_claims = activate_res
.verified_infos
.iter()
.map(|info| ext::verifreg::SectorAllocationClaim {
client: info.client,
allocation_id: info.allocation_id,
data: info.data,
size: info.size,
sector: activation_info.sector_number,
sector_expiry: activation_info.sector_expiry,
})
.collect();
sectors_claims.append(&mut sector_claims);
}

let claim_res = match sectors_claims.is_empty() {
true => Vec::default(),
false => {
let claim_raw = extract_send_result(rt.send_simple(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsParams {
allocations: sectors_claims,
all_or_nothing: true,
})?,
TokenAmount::zero(),
))?;

let claim_res: ext::verifreg::ClaimAllocationsReturn = deserialize_block(claim_raw)
.with_context_code(ExitCode::USR_ILLEGAL_ARGUMENT, || {
"failed to claim allocations during batch activation"
})?;

claim_res.claim_results
}
};

// claim res is an iterator of successful claim results parallel to `sector_claims`
let mut claim_res = claim_res.into_iter();

// reassociate the verified claims within each sector to the original activation requests
let activation_and_claim_results: Vec<Option<ext::market::DealSpaces>> = activation_results
.iter()
.map(|activation_res| {
// not all deals were activated but we return explicit None corresponding to unactivated requests
activation_res.as_ref().map(|res| {
// each activation contributed as many claims as verified_info entries it had
let number_of_claims = res.verified_infos.len();
// we consume these claim results from the iterator, then reduce the claims into one
// value for the original activation request
let verified_deal_space = claim_res
.by_ref()
.take(number_of_claims)
.fold(BigInt::zero(), |acc, claim| acc + claim.claimed_space);
ext::market::DealSpaces {
verified_deal_space,
deal_space: res.nonverified_deal_space.clone(),
}
})
})
.collect();
Ok(activation_and_claim_results)
}

// activate deals with builtin market and claim allocations with verified registry actor
// returns an error in case of a fatal programmer error
// returns Ok(None) in case deal activation or verified allocation claim fails
Expand Down Expand Up @@ -5013,7 +5139,7 @@ fn activate_deals_and_claim_allocations(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsParams {
sectors: sector_claims,
allocations: sector_claims,
all_or_nothing: true,
})?,
TokenAmount::zero(),
Expand All @@ -5027,7 +5153,10 @@ fn activate_deals_and_claim_allocations(
};
Ok(Some(ext::market::DealSpaces {
deal_space: activate_res.nonverified_deal_space,
verified_deal_space: claim_res.claimed_space,
verified_deal_space: claim_res
.claim_results
.iter()
.fold(BigInt::zero(), |acc, claim_result| acc + claim_result.claimed_space.clone()),
}))
}

Expand Down
56 changes: 34 additions & 22 deletions actors/miner/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use fil_actor_market::{
use fil_actor_miner::ext::market::ON_MINER_SECTORS_TERMINATE_METHOD;
use fil_actor_miner::ext::power::{UPDATE_CLAIMED_POWER_METHOD, UPDATE_PLEDGE_TOTAL_METHOD};
use fil_actor_miner::ext::verifreg::{
ClaimAllocationsParams, ClaimAllocationsReturn, SectorAllocationClaim, CLAIM_ALLOCATIONS_METHOD,
ClaimAllocationsParams, ClaimAllocationsReturn, SectorAllocationClaim,
SectorAllocationClaimResult, CLAIM_ALLOCATIONS_METHOD,
};
use fil_actor_miner::{
aggregate_pre_commit_network_fee, aggregate_prove_commit_network_fee, consensus_fault_penalty,
Expand Down Expand Up @@ -43,7 +44,7 @@ use fil_actor_miner::ext::verifreg::{
};

use fil_actors_runtime::runtime::{DomainSeparationTag, Policy, Runtime, RuntimePolicy};
use fil_actors_runtime::{test_utils::*, BatchReturn, BatchReturnGen};
use fil_actors_runtime::{test_utils::*, BatchReturnGen};
use fil_actors_runtime::{
ActorDowncast, ActorError, Array, DealWeight, MessageAccumulator, BURNT_FUNDS_ACTOR_ADDR,
INIT_ACTOR_ADDR, REWARD_ACTOR_ADDR, STORAGE_MARKET_ACTOR_ADDR, STORAGE_POWER_ACTOR_ADDR,
Expand Down Expand Up @@ -1053,6 +1054,9 @@ impl ActorHarness {
pcs: &[SectorPreCommitOnChainInfo],
) {
let mut valid_pcs = Vec::new();
// claim FIL+ allocations
let mut sectors_claims: Vec<SectorAllocationClaim> = Vec::new();

for pc in pcs {
if !pc.info.deal_ids.is_empty() {
let deal_spaces = cfg.deal_spaces(&pc.info.sector_number);
Expand Down Expand Up @@ -1091,8 +1095,7 @@ impl ActorHarness {
valid_pcs.push(pc);
}
} else {
// calim FIL+ allocations
let sector_claims = ret
let mut sector_claims: Vec<SectorAllocationClaim> = ret
.verified_infos
.iter()
.map(|info| SectorAllocationClaim {
Expand All @@ -1104,31 +1107,40 @@ impl ActorHarness {
sector_expiry: pc.info.expiration,
})
.collect();

let claim_allocation_params =
ClaimAllocationsParams { sectors: sector_claims, all_or_nothing: true };

// TODO handle failures of claim allocations
// use exit code map for claim allocations in config
sectors_claims.append(&mut sector_claims);
valid_pcs.push(pc);
let claim_allocs_ret = ClaimAllocationsReturn {
batch_info: BatchReturn::ok(ret.verified_infos.len() as u32),
claimed_space: deal_spaces.verified_deal_space,
};
rt.expect_send_simple(
VERIFIED_REGISTRY_ACTOR_ADDR,
CLAIM_ALLOCATIONS_METHOD as u64,
IpldBlock::serialize_cbor(&claim_allocation_params).unwrap(),
TokenAmount::zero(),
IpldBlock::serialize_cbor(&claim_allocs_ret).unwrap(),
ExitCode::OK,
);
}
} else {
// empty deal ids
valid_pcs.push(pc);
}
}

if !sectors_claims.is_empty() {
let claim_allocation_params = ClaimAllocationsParams {
allocations: sectors_claims.clone(),
all_or_nothing: true,
};

// TODO handle failures of claim allocations
// use exit code map for claim allocations in config

let claim_allocs_ret = ClaimAllocationsReturn {
claim_results: sectors_claims
.iter()
.map(|claim| SectorAllocationClaimResult { claimed_space: claim.size.0.into() })
.collect(),
};
rt.expect_send_simple(
VERIFIED_REGISTRY_ACTOR_ADDR,
CLAIM_ALLOCATIONS_METHOD as u64,
IpldBlock::serialize_cbor(&claim_allocation_params).unwrap(),
TokenAmount::zero(),
IpldBlock::serialize_cbor(&claim_allocs_ret).unwrap(),
ExitCode::OK,
);
}

if !valid_pcs.is_empty() {
let mut expected_pledge = TokenAmount::zero();
let mut expected_qa_power = BigInt::from(0);
Expand Down
Loading