Skip to content

Commit

Permalink
0.7.5 upload file (#256)
Browse files Browse the repository at this point in the history
* feat: upload file

* feat: upload file change

* fix: fix complete error
  • Loading branch information
ytqaljn authored Nov 8, 2023
1 parent ea61db4 commit a257bab
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 286 deletions.
250 changes: 22 additions & 228 deletions c-pallets/file-bank/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ impl<T: Config> Pallet<T> {
file_hash: &Hash,
deal_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
mut miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>,
share_info: Vec<SegmentInfo<T>>,
user_brief: UserBrief<T>,
stat: FileState,
file_size: u128,
Expand All @@ -29,48 +28,26 @@ impl<T: Config> Pallet<T> {
fragment_list: Default::default(),
};

let mut mark_miner: Vec<AccountOf<T>> = Default::default();

let mut flag = true;
for share_segment_info in &share_info {
if segment.hash == share_segment_info.hash {
segment_info.fragment_list = share_segment_info.fragment_list.clone();
flag = false;
break;
}
for miner_task in &mut miner_task_list {
miner_task.fragment_list.sort();
}

if flag {
for frag_hash in segment.fragment_list.iter() {
for miner_task in &mut miner_task_list {
miner_task.fragment_list.sort();
}

let best_count: u32 = (SEGMENT_SIZE * 15 / 10 / FRAGMENT_SIZE) as u32;
let cur_count: u32 = miner_task_list.len() as u32;
let flag = best_count == cur_count;

for frag_hash in segment.fragment_list.iter() {
for miner_task in &mut miner_task_list {
if flag {
if mark_miner.contains(&miner_task.miner) {
continue;
}
}
if let Ok(index) = miner_task.fragment_list.binary_search(&frag_hash) {
let frag_info = FragmentInfo::<T> {
hash: *frag_hash,
avail: true,
miner: miner_task.miner.clone(),
};
segment_info.fragment_list.try_push(frag_info).map_err(|_e| Error::<T>::BoundedVecError)?;
miner_task.fragment_list.remove(index);
mark_miner.push(miner_task.miner.clone());
break;
}
if let Ok(index) = miner_task.fragment_list.binary_search(&frag_hash) {
let miner = miner_task.miner.clone().ok_or(Error::<T>::Unexpected)?;
let frag_info = FragmentInfo::<T> {
hash: *frag_hash,
avail: true,
miner: miner.clone(),
};
segment_info.fragment_list.try_push(frag_info).map_err(|_e| Error::<T>::BoundedVecError)?;
miner_task.fragment_list.remove(index);
break;
}
}
}

segment_info_list.try_push(segment_info).map_err(|_e| Error::<T>::BoundedVecError)?;
}

Expand Down Expand Up @@ -134,11 +111,10 @@ impl<T: Config> Pallet<T> {
pub(super) fn generate_deal(
file_hash: Hash,
file_info: BoundedVec<SegmentList<T>, T::SegmentCount>,
miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>,
user_brief: UserBrief<T>,
file_size: u128,
) -> DispatchResult {
let miner_task_list = Self::random_assign_miner(&file_info)?;

let space = Self::cal_file_size(file_info.len() as u128);

let life = space / TRANSFER_RATE + 1;
Expand All @@ -150,10 +126,8 @@ impl<T: Config> Pallet<T> {
count: 0,
file_size,
segment_list: file_info.clone(),
needed_list: file_info,
user: user_brief,
assigned_miner: miner_task_list,
share_info: Default::default(),
miner_task_list: miner_task_list,
complete_list: Default::default(),
};

Expand All @@ -176,7 +150,7 @@ impl<T: Config> Pallet<T> {
Option::None,
schedule::HARD_DEADLINE,
frame_system::RawOrigin::Root.into(),
Call::deal_reassign_miner{deal_hash: deal_hash, count: count, life: life}.into(),
Call::deal_timing_task{deal_hash: deal_hash, count: count, life: life}.into(),
).map_err(|_| Error::<T>::Unexpected)?;

Ok(())
Expand Down Expand Up @@ -205,178 +179,18 @@ impl<T: Config> Pallet<T> {
let needed_space = Self::cal_file_size(deal_info.segment_list.len() as u128);
T::StorageHandle::unlock_user_space(&deal_info.user.user, needed_space)?;
// unlock mienr space
for miner_task in deal_info.assigned_miner {
let count = miner_task.fragment_list.len() as u128;
T::MinerControl::unlock_space(&miner_task.miner, FRAGMENT_SIZE * count)?;
for miner_task in deal_info.miner_task_list {
if let Some(miner) = miner_task.miner {
let count = miner_task.fragment_list.len() as u128;
T::MinerControl::unlock_space(&miner, FRAGMENT_SIZE * count)?;
}
}

<DealMap<T>>::remove(deal_hash);

Ok(())
}

pub(super) fn reassign_miner(
task_list: BoundedVec<MinerTaskList<T>, T::FragmentCount>,
selected_miner: BoundedVec<AccountOf<T>, T::FragmentCount>,
) -> Result<BoundedVec<MinerTaskList<T>, T::FragmentCount>, DispatchError> {
let mut all_miner = T::MinerControl::get_all_miner()?;
let mut total = all_miner.len() as u32;
let mut seed = <frame_system::Pallet<T>>::block_number().saturated_into();
// Used to calculate the upper limit of the number of random selections
let random_count_limit = task_list.len() as u32 * 10 + 10;
let mut new_task_list: BoundedVec<MinerTaskList<T>, T::FragmentCount> = Default::default();
for miner_task in &task_list {
// Used to count the number of random selections.
let mut cur_count = 0;
let miner = loop {
if random_count_limit == cur_count {
Err(Error::<T>::NotQualified)?;
}

if total == 0 {
Err(Error::<T>::NotQualified)?;
}

let index = Self::generate_random_number(seed)? as usize % all_miner.len();
seed = seed.checked_add(1).ok_or(Error::<T>::Overflow)?;

let miner = all_miner[index as usize].clone();
all_miner.remove(index as usize);
cur_count += 1;
total = total - 1;

if selected_miner.contains(&miner) {
continue;
}

if Self::miner_failed_exceeds_limit(&miner) {
continue;
}

let result = T::MinerControl::is_positive(&miner)?;
if !result {
continue;
}

let (idle, _) = T::MinerControl::get_power(&miner)?;
let needed_size = miner_task.fragment_list.len() as u128 * FRAGMENT_SIZE;
if idle < needed_size {
continue;
}
T::MinerControl::lock_space(&miner, miner_task.fragment_list.len() as u128 * FRAGMENT_SIZE)?;

break miner;
};

new_task_list.try_push(MinerTaskList::<T>{miner: miner, fragment_list: miner_task.fragment_list.clone()}).map_err(|_| Error::<T>::BugInvalid)?;
}

return Ok(new_task_list)
}

pub(super) fn random_assign_miner(
needed_list: &BoundedVec<SegmentList<T>, T::SegmentCount>
) -> Result<BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>>, DispatchError> {
let mut miner_task_list: BoundedVec<MinerTaskList<T>, ConstU32<ASSIGN_MINER_IDEAL_QUANTITY>> = Default::default();
let mut miner_idle_space_list: Vec<u128> = Default::default();
// The optimal number of miners required for storage.
// segment_size * 1.5 / fragment_size.
let miner_count: u32 = (SEGMENT_SIZE * 15 / 10 / FRAGMENT_SIZE) as u32;
let mut seed = <frame_system::Pallet<T>>::block_number().saturated_into();

let mut all_miner = T::MinerControl::get_all_miner()?;
let mut total = all_miner.len() as u32;

// ensure!(total > miner_count, Error::<T>::NodesInsufficient);
let max_count = miner_count * 5;
let mut cur_count = 0;
let mut total_idle_space = 0;

// start random choose miner
loop {
// Get a random subscript.
if total == 0 {
break;
}

let index = Self::generate_random_number(seed)? as u32 % total;
// seed + 1
seed = seed.checked_add(1).ok_or(Error::<T>::Overflow)?;

// When the number of cycles reaches the upper limit, the cycle ends.
if cur_count == max_count {
break;
}

// Number of cycles plus 1
cur_count += 1;

// Judge whether the idle space of the miners is sufficient.
let miner = all_miner[index as usize].clone();
all_miner.remove(index as usize);
total = total - 1;

if Self::miner_failed_exceeds_limit(&miner) {
continue;
}
let result = T::MinerControl::is_positive(&miner)?;
if !result {
log::info!("Isn't positive");
continue;
}

let cur_space: u128 = T::MinerControl::get_miner_idle_space(&miner)?;
// If sufficient, the miner is selected.
if cur_space > needed_list.len() as u128 * FRAGMENT_SIZE {
// Accumulate all idle space of currently selected miners
total_idle_space = total_idle_space.checked_add(&cur_space).ok_or(Error::<T>::Overflow)?;
let miner_task = MinerTaskList::<T>{
miner: miner,
fragment_list: Default::default(),
};
miner_task_list.try_push(miner_task).map_err(|_e| Error::<T>::BoundedVecError)?;
miner_idle_space_list.push(cur_space);
}
// If the selected number of miners has reached the optimal number, the cycle ends.
if miner_task_list.len() as u32 == miner_count {
break;
}
}

ensure!(miner_task_list.len() != 0, Error::<T>::BugInvalid);
ensure!(total_idle_space > SEGMENT_SIZE * 15 / 10, Error::<T>::NodesInsufficient);

// According to the selected miner.
// Assign responsible documents to miners.
for segment_list in needed_list {
let mut index = 0;
for hash in &segment_list.fragment_list {
// To prevent the number of miners from not meeting the fragment number.
// It may occur that one miner stores multiple fragments
loop {
// Obtain the account of the storage node through the subscript.
// To prevent exceeding the boundary, use '%'.
let temp_index = index % miner_task_list.len();
let cur_space = miner_idle_space_list[temp_index];
// To prevent a miner from storing multiple fragments,
// the idle space is insufficient
if cur_space > (miner_task_list[temp_index].fragment_list.len() as u128 + 1) * FRAGMENT_SIZE {
miner_task_list[temp_index].fragment_list.try_push(*hash).map_err(|_e| Error::<T>::BoundedVecError)?;
break;
}
index = index.checked_add(1).ok_or(Error::<T>::PanicOverflow)?;
}
index = index.checked_add(1).ok_or(Error::<T>::PanicOverflow)?;
}
}
// lock miner space
for miner_task in miner_task_list.iter() {
T::MinerControl::lock_space(&miner_task.miner, miner_task.fragment_list.len() as u128 * FRAGMENT_SIZE)?;
}

Ok(miner_task_list)
}

pub(super) fn cal_file_size(len: u128) -> u128 {
len * (SEGMENT_SIZE * 15 / 10)
}
Expand Down Expand Up @@ -650,26 +464,6 @@ impl<T: Config> Pallet<T> {
true
}

pub(super) fn add_task_failed_count(miner: &AccountOf<T>) -> DispatchResult {
TaskFailedCount::<T>::try_mutate(miner, |count| -> DispatchResult {
*count = count.checked_add(1).unwrap_or(255);
Ok(())
})
}

pub(super) fn zero_task_failed_count(miner: &AccountOf<T>) -> DispatchResult {
TaskFailedCount::<T>::try_mutate(miner, |count| -> DispatchResult {
*count = 0;
Ok(())
})
}

pub(super) fn miner_failed_exceeds_limit(miner: &AccountOf<T>) -> bool {
let count = TaskFailedCount::<T>::get(miner);

count >= 5
}

pub(super) fn get_segment_length_from_deal(deal_hash: &Hash) -> u32 {
if let Ok(deal) = <DealMap<T>>::try_get(deal_hash) {
return deal.segment_list.len() as u32;
Expand Down
Loading

0 comments on commit a257bab

Please sign in to comment.