Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.7.5 mark fragment #258

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
41 changes: 13 additions & 28 deletions c-pallets/file-bank/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,31 @@ impl<T: Config> Pallet<T> {
pub fn generate_file(
file_hash: &Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
mut miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>,
complete_list: BoundedVec<CompleteInfo<T>, T::FragmentCount>,
user_brief: UserBrief<T>,
stat: FileState,
file_size: u128,
) -> DispatchResult {
let mut segment_info_list: BoundedVec<SegmentInfo<T>, T::SegmentCount> = Default::default();
ensure!(complete_list.len() == FRAGMENT_COUNT as usize, Error::<T>::Unexpected);
for segment in deal_info.iter() {
let mut segment_info = SegmentInfo::<T> {
hash: segment.hash,
fragment_list: Default::default(),
};

for miner_task in &mut miner_task_list {
miner_task.fragment_list.sort();
}

for frag_hash in segment.fragment_list.iter() {
for miner_task in &mut miner_task_list {
if let Ok(index) = miner_task.fragment_list.binary_search(&frag_hash) {
let miner = miner_task.miner.clone().ok_or(Error::<T>::Unexpected)?;
let frag_info = FragmentInfo::<T> {
hash: *frag_hash,
avail: true,
miner: miner.clone(),
};
segment_info.fragment_list.try_push(frag_info).map_err(|_e| Error::<T>::BoundedVecError)?;
miner_task.fragment_list.remove(index);
break;
}
}
for (index, fragment_hash) in segment.fragment_list.iter().enumerate() {
let frag_info = FragmentInfo::<T> {
hash: *fragment_hash,
avail: true,
miner: complete_list[index as usize].miner.clone(),
};

segment_info.fragment_list.try_push(frag_info).map_err(|_e| Error::<T>::BoundedVecError)?;
}

segment_info_list.try_push(segment_info).map_err(|_e| Error::<T>::BoundedVecError)?;
}

let cur_block = <frame_system::Pallet<T>>::block_number();

let file_info = FileInfo::<T> {
Expand Down Expand Up @@ -111,7 +101,6 @@ impl<T: Config> Pallet<T> {
pub(super) fn generate_deal(
file_hash: Hash,
file_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>,
user_brief: UserBrief<T>,
file_size: u128,
) -> DispatchResult {
Expand All @@ -127,7 +116,6 @@ impl<T: Config> Pallet<T> {
file_size,
segment_list: file_info.clone(),
user: user_brief,
miner_task_list: miner_task_list,
complete_list: Default::default(),
};

Expand Down Expand Up @@ -179,11 +167,8 @@ impl<T: Config> Pallet<T> {
let needed_space = Self::cal_file_size(deal_info.segment_list.len() as u128);
T::StorageHandle::unlock_user_space(&deal_info.user.user, needed_space)?;
// unlock mienr space
for miner_task in deal_info.miner_task_list {
if let Some(miner) = miner_task.miner {
let count = miner_task.fragment_list.len() as u128;
T::MinerControl::unlock_space(&miner, FRAGMENT_SIZE * count)?;
}
for complete_info in deal_info.complete_list {
T::MinerControl::unlock_space(&complete_info.miner, FRAGMENT_SIZE * FRAGMENT_COUNT as u128)?;
}

<DealMap<T>>::remove(deal_hash);
Expand Down
37 changes: 37 additions & 0 deletions c-pallets/file-bank/src/impls/dealimpl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::*;

impl<T: Config> DealInfo<T> {
pub fn complete_part(&mut self, miner: AccountOf<T>, index: u8) -> DispatchResult {
for complete_info in &self.complete_list {
ensure!(index != complete_info.index, Error::<T>::Existed);
ensure!(miner != complete_info.miner, Error::<T>::Existed);
}

let complete_info = CompleteInfo::<T> {
index,
miner,
};

self.complete_list.try_push(complete_info).map_err(|_| Error::<T>::BoundedVecError)?;

Ok(())
}

pub fn completed_all(&mut self) -> DispatchResult {
self.stage = 2;
for complete_info in self.complete_list.iter() {
<PendingReplacements<T>>::try_mutate(&complete_info.miner, |pending_space| -> DispatchResult {
let replace_space = FRAGMENT_SIZE
.checked_mul(self.segment_list.len() as u128)
.ok_or(Error::<T>::Overflow)?;

*pending_space = pending_space
.checked_add(replace_space).ok_or(Error::<T>::Overflow)?;

Ok(())
})?;
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions c-pallets/file-bank/src/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod receptionist;
pub use receptionist::*;

pub mod dealimpl;
pub use dealimpl::*;
82 changes: 82 additions & 0 deletions c-pallets/file-bank/src/impls/receptionist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::*;

pub struct Receptionist<T: Config>(PhantomData<T>);

impl<T: Config> Receptionist<T> {
pub fn fly_upload_file(file_hash: Hash, user_brief: UserBrief<T>, used_space: u128) -> DispatchResult {
T::StorageHandle::update_user_space(&user_brief.user, 1, used_space)?;

if <Bucket<T>>::contains_key(&user_brief.user, &user_brief.bucket_name) {
Pallet::<T>::add_file_to_bucket(&user_brief.user, &user_brief.bucket_name, &file_hash)?;
} else {
Pallet::<T>::create_bucket_helper(&user_brief.user, &user_brief.bucket_name, Some(file_hash))?;
}

Pallet::<T>::add_user_hold_fileslice(&user_brief.user, file_hash, used_space)?;

<File<T>>::try_mutate(&file_hash, |file_opt| -> DispatchResult {
let file = file_opt.as_mut().ok_or(Error::<T>::FileNonExistent)?;
file.owner.try_push(user_brief.clone()).map_err(|_e| Error::<T>::BoundedVecError)?;
Ok(())
})?;

Ok(())
}

pub fn generate_deal(
file_hash: Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
user_brief: UserBrief<T>,
needed_space: u128,
file_size: u128,
) -> DispatchResult {
T::StorageHandle::lock_user_space(&user_brief.user, needed_space)?;
// TODO! Replace the file_hash param
Pallet::<T>::generate_deal(file_hash.clone(), deal_info, user_brief.clone(), file_size)?;

Ok(())
}

pub fn qualification_report_processing(sender: AccountOf<T>, deal_hash: Hash, deal_info: &mut DealInfo<T>, index: u8) -> DispatchResult {
deal_info.complete_part(sender.clone(), index)?;

// If it is the last submitter of the order.
if deal_info.complete_list.len() == FRAGMENT_COUNT as usize {
deal_info.completed_all()?;
Pallet::<T>::generate_file(
&deal_hash,
deal_info.segment_list.clone(),
deal_info.complete_list.clone(),
deal_info.user.clone(),
FileState::Calculate,
deal_info.file_size,
)?;

let segment_count = deal_info.segment_list.len();
let needed_space = Pallet::<T>::cal_file_size(segment_count as u128);
T::StorageHandle::unlock_and_used_user_space(&deal_info.user.user, needed_space)?;
T::StorageHandle::sub_total_idle_space(needed_space)?;
T::StorageHandle::add_total_service_space(needed_space)?;
let result = T::FScheduler::cancel_named(deal_hash.0.to_vec()).map_err(|_| Error::<T>::Unexpected);
if let Err(_) = result {
log::info!("transfer report cancel schedule failed: {:?}", deal_hash.clone());
}
// Calculate the maximum time required for storage nodes to tag files
let max_needed_cal_space = (segment_count as u32).checked_mul(FRAGMENT_SIZE as u32).ok_or(Error::<T>::Overflow)?;
let mut life: u32 = (max_needed_cal_space / TRANSFER_RATE as u32).checked_add(11).ok_or(Error::<T>::Overflow)?;
life = (max_needed_cal_space / CALCULATE_RATE as u32)
.checked_add(30).ok_or(Error::<T>::Overflow)?
.checked_add(life).ok_or(Error::<T>::Overflow)?;
Pallet::<T>::start_second_task(deal_hash.0.to_vec(), deal_hash, life)?;
if <Bucket<T>>::contains_key(&deal_info.user.user, &deal_info.user.bucket_name) {
Pallet::<T>::add_file_to_bucket(&deal_info.user.user, &deal_info.user.bucket_name, &deal_hash)?;
} else {
Pallet::<T>::create_bucket_helper(&deal_info.user.user, &deal_info.user.bucket_name, Some(deal_hash))?;
}
Pallet::<T>::add_user_hold_fileslice(&deal_info.user.user, deal_hash.clone(), needed_space)?;
Pallet::<T>::deposit_event(Event::<T>::StorageCompleted{ file_hash: deal_hash });
}

Ok(())
}
}
132 changes: 14 additions & 118 deletions c-pallets/file-bank/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ mod functions;
mod constants;
use constants::*;

mod impls;
use impls::receptionist::Receptionist;

use codec::{Decode, Encode};
use frame_support::{
// bounded_vec,
Expand Down Expand Up @@ -423,10 +426,6 @@ pub mod pallet {
origin: OriginFor<T>,
file_hash: Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
assigned_data: BoundedVec<
BoundedVec<Hash, T::MissionCount>,
ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>,
>,
user_brief: UserBrief<T>,
file_size: u128,
) -> DispatchResult {
Expand All @@ -436,7 +435,7 @@ pub mod pallet {
// Check file specifications.
ensure!(Self::check_file_spec(&deal_info), Error::<T>::SpecError);
// Check whether the user-defined name meets the rules.

let minimum = T::NameMinLength::get();
ensure!(user_brief.file_name.len() as u32 >= minimum, Error::<T>::SpecError);
ensure!(user_brief.bucket_name.len() as u32 >= minimum, Error::<T>::SpecError);
Expand All @@ -448,38 +447,9 @@ pub mod pallet {
ensure!(T::StorageHandle::get_user_avail_space(&user_brief.user)? > needed_space, Error::<T>::InsufficientAvailableSpace);

if <File<T>>::contains_key(&file_hash) {
T::StorageHandle::update_user_space(&user_brief.user, 1, needed_space)?;

if <Bucket<T>>::contains_key(&user_brief.user, &user_brief.bucket_name) {
Self::add_file_to_bucket(&user_brief.user, &user_brief.bucket_name, &file_hash)?;
} else {
Self::create_bucket_helper(&user_brief.user, &user_brief.bucket_name, Some(file_hash))?;
}

Self::add_user_hold_fileslice(&user_brief.user, file_hash, needed_space)?;

<File<T>>::try_mutate(&file_hash, |file_opt| -> DispatchResult {
let file = file_opt.as_mut().ok_or(Error::<T>::FileNonExistent)?;
file.owner.try_push(user_brief.clone()).map_err(|_e| Error::<T>::BoundedVecError)?;
Ok(())
})?;
Receptionist::<T>::fly_upload_file(file_hash, user_brief.clone(), needed_space)?;
} else {
ensure!(assigned_data.len() as u32 == ASSIGN_MINER_IDEAL_QUANTITY, Error::<T>::SpecError);
let mut miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>> = Default::default();
let mut index = 1;
for fragment_list in assigned_data {
ensure!(fragment_list.len() == deal_info.len(), Error::<T>::SpecError);
let miner_task = MinerTaskList::<T> {
index,
miner: None,
fragment_list,
};
miner_task_list.try_push(miner_task).map_err(|_| Error::<T>::BoundedVecError)?;
index += 1;
}
T::StorageHandle::lock_user_space(&user_brief.user, needed_space)?;
// TODO! Replace the file_hash param
Self::generate_deal(file_hash.clone(), deal_info, miner_task_list, user_brief.clone(), file_size)?;
Receptionist::<T>::generate_deal(file_hash, deal_info, user_brief.clone(), needed_space, file_size)?;
}

Self::deposit_event(Event::<T>::UploadDeclaration { operator: sender, owner: user_brief.user, deal_hash: file_hash });
Expand Down Expand Up @@ -619,81 +589,7 @@ pub mod pallet {
<DealMap<T>>::try_mutate(&deal_hash, |deal_info_opt| -> DispatchResult {
// can use unwrap because there was a judgment above
let deal_info = deal_info_opt.as_mut().unwrap();

if !deal_info.complete_list.contains(&index) {
for task_info in &mut deal_info.miner_task_list {
if task_info.index == index {
match task_info.miner {
Some(_) => Err(Error::<T>::Existed)?,
None => {
task_info.miner = Some(sender.clone());
T::MinerControl::lock_space(&sender, task_info.fragment_list.len() as u128 * FRAGMENT_SIZE)?;
deal_info.complete_list.try_push(index).map_err(|_| Error::<T>::BoundedVecError)?;
},
};
} else {
if let Some(miner) = &task_info.miner {
if miner == &sender {
Err(Error::<T>::Existed)?;
}
};
}
}


// If it is the last submitter of the order.
if deal_info.complete_list.len() == deal_info.miner_task_list.len() {
deal_info.stage = 2;
Self::generate_file(
&deal_hash,
deal_info.segment_list.clone(),
deal_info.miner_task_list.clone(),
deal_info.user.clone(),
FileState::Calculate,
deal_info.file_size,
)?;
let mut max_task_count = 0;
for miner_task in deal_info.miner_task_list.iter() {
let count = miner_task.fragment_list.len() as u128;
if count > max_task_count {
max_task_count = count;
}
// Miners need to report the replaced documents themselves.
// If a challenge is triggered before the report is completed temporarily,
// these documents to be replaced also need to be verified
if let Some(miner) = &miner_task.miner {
<PendingReplacements<T>>::try_mutate(miner, |pending_space| -> DispatchResult {
let replace_space = FRAGMENT_SIZE.checked_mul(count).ok_or(Error::<T>::Overflow)?;
let pending_space_temp = pending_space.checked_add(replace_space).ok_or(Error::<T>::Overflow)?;
*pending_space = pending_space_temp;
Ok(())
})?;
}
}
let needed_space = Self::cal_file_size(deal_info.segment_list.len() as u128);
T::StorageHandle::unlock_and_used_user_space(&deal_info.user.user, needed_space)?;
T::StorageHandle::sub_total_idle_space(needed_space)?;
T::StorageHandle::add_total_service_space(needed_space)?;
let result = T::FScheduler::cancel_named(deal_hash.0.to_vec()).map_err(|_| Error::<T>::Unexpected);
if let Err(_) = result {
log::info!("transfer report cancel schedule failed: {:?}", deal_hash.clone());
}
// Calculate the maximum time required for storage nodes to tag files
let max_needed_cal_space = (max_task_count as u32).checked_mul(FRAGMENT_SIZE as u32).ok_or(Error::<T>::Overflow)?;
let mut life: u32 = (max_needed_cal_space / TRANSFER_RATE as u32).checked_add(11).ok_or(Error::<T>::Overflow)?;
life = (max_needed_cal_space / CALCULATE_RATE as u32)
.checked_add(30).ok_or(Error::<T>::Overflow)?
.checked_add(life).ok_or(Error::<T>::Overflow)?;
Self::start_second_task(deal_hash.0.to_vec(), deal_hash, life)?;
if <Bucket<T>>::contains_key(&deal_info.user.user, &deal_info.user.bucket_name) {
Self::add_file_to_bucket(&deal_info.user.user, &deal_info.user.bucket_name, &deal_hash)?;
} else {
Self::create_bucket_helper(&deal_info.user.user, &deal_info.user.bucket_name, Some(deal_hash))?;
}
Self::add_user_hold_fileslice(&deal_info.user.user, deal_hash.clone(), needed_space)?;
Self::deposit_event(Event::<T>::StorageCompleted{ file_hash: deal_hash });
}
}
Receptionist::<T>::qualification_report_processing(sender.clone(), deal_hash, deal_info, index)?;
Ok(())
})?;

Expand All @@ -719,18 +615,18 @@ pub mod pallet {
let _ = ensure_root(origin)?;

let deal_info = <DealMap<T>>::try_get(&deal_hash).map_err(|_| Error::<T>::NonExistent)?;
for miner_task in deal_info.miner_task_list {
let count = miner_task.fragment_list.len() as u32;
let mut hash_list: Vec<Box<[u8; 256]>> = Default::default();
for fragment_hash in miner_task.fragment_list {
for (index, complete_info) in deal_info.complete_list.iter().enumerate() {
let count = FRAGMENT_COUNT;
let mut hash_list: Vec<Box<[u8; 256]>> = Default::default();
for segment in &deal_info.segment_list {
let fragment_hash = segment.fragment_list[index as usize];
let hash_temp = fragment_hash.binary().map_err(|_| Error::<T>::BugInvalid)?;
hash_list.push(hash_temp);
}
// Accumulate the number of fragments stored by each miner
let unlock_space = FRAGMENT_SIZE.checked_mul(count as u128).ok_or(Error::<T>::Overflow)?;
let miner = miner_task.miner.ok_or(Error::<T>::Unexpected)?;
T::MinerControl::unlock_space_to_service(&miner, unlock_space)?;
T::MinerControl::insert_service_bloom(&miner, hash_list)?;
T::MinerControl::unlock_space_to_service(&complete_info.miner, unlock_space)?;
T::MinerControl::insert_service_bloom(&complete_info.miner, hash_list)?;
}

<File<T>>::try_mutate(&deal_hash, |file_opt| -> DispatchResult {
Expand Down
Loading
Loading