Skip to content

Commit

Permalink
Claim allocations processes batches by sector (#1337)
Browse files Browse the repository at this point in the history
* Claim allocations processes batches by sector

* review

* restore illegal argument on batch failure

* fmt
  • Loading branch information
anorth authored Jul 31, 2023
1 parent 64a103e commit 1b282d4
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 230 deletions.
4 changes: 2 additions & 2 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl Actor {
let (activations, batch_ret) = rt.transaction(|st: &mut State, rt| {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
let mut activations: Vec<DealActivation> = vec![];
let mut activations: Vec<SectorDealActivation> = vec![];
let mut activated_deals: HashSet<DealID> = HashSet::new();

for p in params.sectors {
Expand Down Expand Up @@ -651,7 +651,7 @@ impl Actor {

match update_result {
Ok(_) => {
activations.push(DealActivation {
activations.push(SectorDealActivation {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos,
});
Expand Down
6 changes: 4 additions & 2 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct SectorDealData {
#[serde(transparent)]
pub struct BatchActivateDealsParams {
/// Deals to activate, grouped by sector.
/// A failed deal activation will cause other deals in the same sector group to also fail,
/// but allow other sectors to proceed.
pub sectors: Vec<SectorDeals>,
}

Expand All @@ -113,7 +115,7 @@ pub struct VerifiedDealInfo {

// Information about a sector-grouping of deals that have been activated.
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct DealActivation {
pub struct SectorDealActivation {
/// The total size of the non-verified deals activated.
#[serde(with = "bigint_ser")]
pub nonverified_deal_space: BigInt,
Expand All @@ -126,7 +128,7 @@ pub struct BatchActivateDealsResult {
/// Status of each sector grouping of deals.
pub activation_results: BatchReturn,
/// Activation information for the sector groups that were activated.
pub activations: Vec<DealActivation>,
pub activations: Vec<SectorDealActivation>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
Expand Down
19 changes: 12 additions & 7 deletions actors/miner/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,31 +202,36 @@ pub mod verifreg {
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct SectorAllocationClaim {
pub struct SectorAllocationClaims {
pub sector: SectorNumber,
pub expiry: ChainEpoch,
pub claims: Vec<AllocationClaim>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct AllocationClaim {
pub client: ActorID,
pub allocation_id: AllocationID,
pub data: Cid,
pub size: PaddedPieceSize,
pub sector: SectorNumber,
pub sector_expiry: ChainEpoch,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ClaimAllocationsParams {
pub allocations: Vec<SectorAllocationClaim>,
pub sectors: Vec<SectorAllocationClaims>,
pub all_or_nothing: bool,
}

#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct SectorAllocationClaimResult {
pub struct SectorClaimSummary {
#[serde(with = "bigint_ser")]
pub claimed_space: BigInt,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct ClaimAllocationsReturn {
pub claim_results: BatchReturn,
pub claims: Vec<SectorAllocationClaimResult>,
pub sector_results: BatchReturn,
pub sector_claims: Vec<SectorClaimSummary>,
}
}
47 changes: 22 additions & 25 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5010,32 +5010,39 @@ fn batch_activate_deals_and_claim_allocations(
for (activation_info, activate_res) in
successful_activation_infos.iter().zip(&batch_activation_res.activations)
{
let mut sector_claims = activate_res
let sector_claims = activate_res
.verified_infos
.iter()
.map(|info| ext::verifreg::SectorAllocationClaim {
.map(|info| ext::verifreg::AllocationClaim {
client: info.client,
allocation_id: info.allocation_id,
data: info.data,
size: info.size,
sector: activation_info.sector_number,
sector_expiry: activation_info.sector_expiry,
})
.collect();
verified_claims.append(&mut sector_claims);
verified_claims.push(ext::verifreg::SectorAllocationClaims {
sector: activation_info.sector_number,
expiry: activation_info.sector_expiry,
claims: sector_claims,
});
}

let claim_res = match verified_claims.is_empty() {
let claim_res = match verified_claims.iter().all(|sector| sector.claims.is_empty()) {
// Short-circuit the call if there are no claims,
// but otherwise send a group for each sector (even if empty) to ease association of results.
true => ext::verifreg::ClaimAllocationsReturn {
claim_results: BatchReturn::empty(),
claims: Vec::default(),
sector_results: BatchReturn::ok(verified_claims.len() as u32),
sector_claims: vec![
ext::verifreg::SectorClaimSummary { claimed_space: BigInt::zero() };
verified_claims.len()
],
},
false => {
let claim_raw = extract_send_result(rt.send_simple(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsParams {
allocations: verified_claims,
sectors: verified_claims,
all_or_nothing: true,
})?,
TokenAmount::zero(),
Expand All @@ -5048,29 +5055,19 @@ fn batch_activate_deals_and_claim_allocations(
};

assert!(
claim_res.claim_results.all_ok() || claim_res.claim_results.success_count == 0,
claim_res.sector_results.all_ok() || claim_res.sector_results.success_count == 0,
"batch return of claim allocations partially succeeded but request was all_or_nothing {:?}",
claim_res
);

let mut claims = claim_res.claims.into_iter();
// reassociate the verified claims with corresponding DealActivation information
let activation_and_claim_results: Vec<ext::market::DealSpaces> = batch_activation_res
let activation_and_claim_results = batch_activation_res
.activations
.iter()
.map(|deal_activation| {
// each activation contributed as many claims as verified_info entries it had
let number_of_claims = deal_activation.verified_infos.len();
// we consume these claims from the iterator, then combine the claims into one
// value per DealActivation
let verified_deal_space = claims
.by_ref()
.take(number_of_claims)
.fold(BigInt::zero(), |acc, claim| acc + claim.claimed_space);
ext::market::DealSpaces {
verified_deal_space,
deal_space: deal_activation.nonverified_deal_space.clone(),
}
.zip(claim_res.sector_claims)
.map(|(sector_deals, sector_claim)| ext::market::DealSpaces {
verified_deal_space: sector_claim.claimed_space,
deal_space: sector_deals.nonverified_deal_space.clone(),
})
.collect();

Expand Down
62 changes: 33 additions & 29 deletions actors/miner/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@

use fil_actor_account::Method as AccountMethod;
use fil_actor_market::{
BatchActivateDealsParams, BatchActivateDealsResult, DealActivation, DealSpaces,
Method as MarketMethod, OnMinerSectorsTerminateParams, SectorDealData, SectorDeals,
BatchActivateDealsParams, BatchActivateDealsResult, DealSpaces, Method as MarketMethod,
OnMinerSectorsTerminateParams, SectorDealActivation, SectorDealData, SectorDeals,
VerifiedDealInfo, VerifyDealsForActivationParams, VerifyDealsForActivationReturn,
};
use fil_actor_miner::ext::market::ON_MINER_SECTORS_TERMINATE_METHOD;
use fil_actor_miner::ext::power::{UPDATE_CLAIMED_POWER_METHOD, UPDATE_PLEDGE_TOTAL_METHOD};
use fil_actor_miner::ext::verifreg::{
ClaimAllocationsParams, ClaimAllocationsReturn, SectorAllocationClaim,
SectorAllocationClaimResult, CLAIM_ALLOCATIONS_METHOD,
};
use fil_actor_miner::ext::verifreg::CLAIM_ALLOCATIONS_METHOD;
use fil_actor_miner::{
aggregate_pre_commit_network_fee, aggregate_prove_commit_network_fee, consensus_fault_penalty,
initial_pledge_for_power, locked_reward_from_reward, max_prove_commit_duration,
ext, initial_pledge_for_power, locked_reward_from_reward, max_prove_commit_duration,
new_deadline_info_from_offset_and_epoch, pledge_penalty_for_continued_fault, power_for_sectors,
qa_power_for_sector, qa_power_for_weight, reward_for_consensus_slash_report, ActiveBeneficiary,
Actor, ApplyRewardParams, BeneficiaryTerm, BitFieldQueue, ChangeBeneficiaryParams,
Expand Down Expand Up @@ -1056,11 +1053,11 @@ impl ActorHarness {
let mut valid_pcs = Vec::new();

// claim FIL+ allocations
let mut sectors_claims: Vec<SectorAllocationClaim> = Vec::new();
let mut sectors_claims: Vec<ext::verifreg::SectorAllocationClaims> = Vec::new();

// build expectations per sector
let mut sector_activation_params: Vec<SectorDeals> = Vec::new();
let mut sector_activations: Vec<DealActivation> = Vec::new();
let mut sector_activations: Vec<SectorDealActivation> = Vec::new();
let mut sector_activation_results = BatchReturnGen::new(pcs.len());

for pc in pcs {
Expand All @@ -1073,7 +1070,7 @@ impl ActorHarness {
};
sector_activation_params.push(activate_params);

let ret = DealActivation {
let ret = SectorDealActivation {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos: cfg
.verified_deal_infos
Expand All @@ -1094,25 +1091,23 @@ impl ActorHarness {
}
}

if ret.verified_infos.is_empty() {
if activate_deals_exit == ExitCode::OK {
valid_pcs.push(pc);
}
} else {
let mut sector_claims: Vec<SectorAllocationClaim> = ret
if activate_deals_exit == ExitCode::OK {
valid_pcs.push(pc);
let sector_claims = ret
.verified_infos
.iter()
.map(|info| SectorAllocationClaim {
.map(|info| ext::verifreg::AllocationClaim {
client: info.client,
allocation_id: info.allocation_id,
data: info.data,
size: info.size,
sector: pc.info.sector_number,
sector_expiry: pc.info.expiration,
})
.collect();
sectors_claims.append(&mut sector_claims);
valid_pcs.push(pc);
sectors_claims.push(ext::verifreg::SectorAllocationClaims {
sector: pc.info.sector_number,
expiry: pc.info.expiration,
claims: sector_claims,
});
}
} else {
// empty deal ids
Expand All @@ -1121,11 +1116,16 @@ impl ActorHarness {
sector_expiry: pc.info.expiration,
sector_type: RegisteredSealProof::StackedDRG8MiBV1,
});
sector_activations.push(DealActivation {
sector_activations.push(SectorDealActivation {
nonverified_deal_space: BigInt::zero(),
verified_infos: vec![],
});
sector_activation_results.add_success();
sectors_claims.push(ext::verifreg::SectorAllocationClaims {
sector: pc.info.sector_number,
expiry: pc.info.expiration,
claims: vec![],
});
valid_pcs.push(pc);
}
}
Expand All @@ -1148,20 +1148,24 @@ impl ActorHarness {
);
}

if !sectors_claims.is_empty() {
let claim_allocation_params = ClaimAllocationsParams {
allocations: sectors_claims.clone(),
if !sectors_claims.iter().all(|c| c.claims.is_empty()) {
let claim_allocation_params = ext::verifreg::ClaimAllocationsParams {
sectors: sectors_claims.clone(),
all_or_nothing: true,
};

// TODO handle failures of claim allocations
// use exit code map for claim allocations in config
let claim_allocs_ret = ClaimAllocationsReturn {
claims: sectors_claims
let claim_allocs_ret = ext::verifreg::ClaimAllocationsReturn {
sector_results: BatchReturn::ok(sectors_claims.len() as u32),
sector_claims: sectors_claims
.iter()
.map(|claim| SectorAllocationClaimResult { claimed_space: claim.size.0.into() })
.map(|sector| ext::verifreg::SectorClaimSummary {
claimed_space: BigInt::from(
sector.claims.iter().map(|c| c.size.0).sum::<u64>(),
),
})
.collect(),
claim_results: BatchReturn::ok(sectors_claims.len() as u32),
};
rt.expect_send_simple(
VERIFIED_REGISTRY_ACTOR_ADDR,
Expand Down
Loading

0 comments on commit 1b282d4

Please sign in to comment.