Skip to content

Commit

Permalink
refactor to a map based pattern to ensure parallel return structure
Browse files Browse the repository at this point in the history
  • Loading branch information
alexytsu committed Jun 8, 2023
1 parent c859afe commit 12a4ae8
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 67 deletions.
205 changes: 138 additions & 67 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,84 +540,158 @@ impl Actor {
let curr_epoch = rt.curr_epoch();

let sector_results = rt.transaction(|st: &mut State, rt| {
let mut sector_results: Vec<Option<ActivateDealsResult>> = Vec::new();
let mut deal_states: Vec<(DealID, DealState)> = vec![];
for p in params.sectors {
let proposal_array = st.get_proposal_array(rt.store())?;
let proposals = get_proposals(&proposal_array, &p.deal_ids, st.next_id)?;

let deal_spaces = {
validate_and_return_deal_space(
let sector_results: Vec<Option<ActivateDealsResult>> = params
.sectors
.iter()
.map(|p| {
if p.deal_ids.is_empty() {
return Some(ActivateDealsResult {
nonverified_deal_space: BigInt::default(),
verified_infos: Vec::default(),
});
}

let proposal_array = match st.get_proposal_array(rt.store()) {
Ok(proposal_array) => proposal_array,
Err(e) => {
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
return None;
}
};
let proposals = match get_proposals(&proposal_array, &p.deal_ids, st.next_id) {
Ok(proposals) => proposals,
Err(e) => {
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
return None;
}
};

let deal_spaces = match validate_and_return_deal_space(
&proposals,
&miner_addr,
p.sector_expiry,
curr_epoch,
None,
)
.context("failed to validate deal proposals for activation")?
};

// Update deal states
let mut verified_infos = Vec::new();

for (deal_id, proposal) in proposals {
// This construction could be replaced with a single "update deal state"
// state method, possibly batched over all deal ids at once.
let s = st.find_deal_state(rt.store(), deal_id)?;

if s.is_some() {
return Err(actor_error!(
illegal_argument,
"deal {} already activated",
deal_id
));
}
) {
Ok(ds) => ds,
Err(e) => {
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
return None;
}
};

// Update deal states
let mut verified_infos = Vec::new();

for (deal_id, proposal) in proposals {
// This construction could be replaced with a single "update deal state"
// state method, possibly batched over all deal ids at once.
let s = match st.find_deal_state(rt.store(), deal_id) {
Ok(s) => s,
Err(e) => {
log::warn!(
"failed to activate deals {:?} due to error in {}: {}",
p.deal_ids,
deal_id,
e
);
return None;
}
};

if s.is_some() {
log::warn!(
"skipping activation of deals {:?} as deal {} is already activated",
p.deal_ids,
deal_id
);
return None;
}

let propc = rt_deal_cid(rt, &proposal)?;
let propc = match rt_deal_cid(rt, &proposal) {
Ok(propc) => propc,
Err(e) => {
log::warn!(
"failed to activate deals {:?} due to error in {}: {}",
p.deal_ids,
deal_id,
e
);
return None;
}
};

// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = match st.has_pending_deal(rt.store(), propc) {
Ok(has) => has,
Err(e) => {
log::warn!(
"failed to activate deals {:?} due to error in {}: {}",
p.deal_ids,
deal_id,
e
);
return None;
}
};

if !has {
log::warn!(
"tried to activate deal that was not in the pending set ({})",
propc
);
return None;
}

// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = st.has_pending_deal(rt.store(), propc)?;
// Extract and remove any verified allocation ID for the pending deal.
let allocation = match st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))
{
Ok(allocation) => allocation,
Err(e) => {
log::warn!(
"failed to activate deals {:?} due to error in {}: {}",
p.deal_ids,
deal_id,
e
);
return None;
}
};

let allocation =
allocation.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID)).1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}

if !has {
return Err(actor_error!(
illegal_state,
"tried to activate deal that was not in the pending set ({})",
propc
deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
}

// Extract and remove any verified allocation ID for the pending deal.
let allocation = st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))?
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
.1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}
Some(ActivateDealsResult {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos,
})
})
.collect();

deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
}

sector_results.push(Some(ActivateDealsResult {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos,
}));
}
st.put_deal_states(rt.store(), &deal_states)?;
Ok(sector_results)
})?;
Expand All @@ -637,11 +711,8 @@ impl Actor {

rt.transaction(|st: &mut State, rt| {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
// TODO: if "continue", make sure to add a None to the results

for id in params.deal_ids {
// TODO: if no deals, return success

let deal = st.find_proposal(rt.store(), id)?;
// The deal may have expired and been deleted before the sector is terminated.
// Nothing to do, but continue execution for the other deals.
Expand Down
8 changes: 8 additions & 0 deletions actors/miner/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,14 @@ impl ActorHarness {
}
} else {
// empty deal ids
sector_activation_params.push(ActivateDealsParams {
deal_ids: vec![],
sector_expiry: pc.info.expiration,
});
sector_activation_results.push(Some(ActivateDealsResult {
nonverified_deal_space: BigInt::zero(),
verified_infos: vec![],
}));
valid_pcs.push(pc);
}
}
Expand Down

0 comments on commit 12a4ae8

Please sign in to comment.