Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deferred credits abs fixes #3203

Merged
merged 14 commits into from
Nov 16, 2022
46 changes: 17 additions & 29 deletions massa-execution-worker/src/active_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use massa_ledger_exports::{
use massa_models::{
address::Address, amount::Amount, operation::OperationId, prehash::PreHashMap, slot::Slot,
};
use std::collections::{BTreeMap, VecDeque};
use std::collections::VecDeque;

#[derive(Default)]
/// History of the outputs of recently executed slots.
Expand Down Expand Up @@ -143,49 +143,37 @@ impl ActiveHistory {
})
}

/// Traverse the whole history and return every deferred credit of `addr` _after_ `slot` (included).
///
/// # Arguments
/// * `slot`: slot _after_ which we fetch the credits
/// * `addr`: address to fetch the credits from
#[allow(dead_code)]
pub fn fetch_deferred_credits_after(
&self,
slot: &Slot,
addr: &Address,
) -> BTreeMap<Slot, Amount> {
/// Gets all the deferred credits that will be credited at a given slot
pub fn get_all_deferred_credits_for(&self, slot: &Slot) -> PreHashMap<Address, Amount> {
self.0
.iter()
.flat_map(|output| {
.filter_map(|output| {
output
.state_changes
.pos_changes
.deferred_credits
.0
.range(slot..)
.filter_map(|(&slot, credits)| credits.get(addr).map(|&amount| (slot, amount)))
.get(slot)
.cloned()
})
.flatten()
.collect()
}

/// Traverse the whole history and return every deferred credit _at_ `slot`
///
/// # Arguments
/// * `slot`: slot _at_ which we fetch the credits
pub fn fetch_all_deferred_credits_at(&self, slot: &Slot) -> PreHashMap<Address, Amount> {
self.0
/// Gets the deferred credits for a given address that will be credited at a given slot
pub(crate) fn get_adress_deferred_credit_for(&self, addr: &Address, slot: &Slot) -> Option<Amount> {
for hist_item in self.0
.iter()
.filter_map(|output| {
output
.rev() {
if let Some(v) = hist_item
.state_changes
.pos_changes
.deferred_credits
.0
.get(slot)
.cloned()
})
.flatten()
.collect()
.get_address_deferred_credit_for_slot(addr, slot) {
return Some(v);
}
}
None
}

/// Gets the index of a slot in history
Expand Down
70 changes: 47 additions & 23 deletions massa-execution-worker/src/speculative_roll_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,33 @@ impl SpeculativeRollState {
)));
}

let cur_cycle = slot.get_cycle(periods_per_cycle);

// remove the rolls
let current_rolls = self
.added_changes
.roll_changes
.entry(*seller_addr)
.or_insert_with(|| owned_count);
*current_rolls = owned_count.saturating_sub(roll_count);

// compute deferred credit slot
let cur_cycle = slot.get_cycle(periods_per_cycle);
let target_slot = Slot::new_last_of_cycle(
cur_cycle
.checked_add(3)
.expect("unexpected cycle overflow in try_sell_rolls"),
periods_per_cycle,
thread_count,
)
.expect("unexepected slot overflot in try_sell_rolls");
.expect("unexpected slot overflow in try_sell_rolls");

// add deferred reimbursement corresponding to the sold rolls value
let credit = self
// Note 1: Deferred credits are stored as absolute value
let new_deferred_credits = self
.get_address_deferred_credit_for_slot(seller_addr, &target_slot)
.unwrap_or_default()
.saturating_add(roll_price.saturating_mul_u64(roll_count));

// Remove the rolls
self
.added_changes
.deferred_credits
.0
.entry(target_slot)
.or_insert_with(PreHashMap::default);
credit.insert(*seller_addr, roll_price.saturating_mul_u64(roll_count));
.roll_changes
damip marked this conversation as resolved.
Show resolved Hide resolved
.insert(*seller_addr, owned_count.saturating_sub(roll_count));

// Add deferred credits (reimbursement) corresponding to the sold rolls value
self.added_changes
.deferred_credits
.insert(*seller_addr, target_slot, new_deferred_credits);

Ok(())
}
Expand Down Expand Up @@ -218,10 +217,7 @@ impl SpeculativeRollState {
if owned_count != 0 {
if let Some(amount) = roll_price.checked_mul_u64(owned_count) {
target_credits.insert(addr, amount);
self.added_changes
.roll_changes
.entry(addr)
.or_insert_with(|| 0);
self.added_changes.roll_changes.insert(addr, 0);
}
}
}
Expand Down Expand Up @@ -279,6 +275,34 @@ impl SpeculativeRollState {
res.into_iter().filter(|(_s, v)| !v.is_zero()).collect()
}

/// Gets the deferred credits for a given address that will be credited at a given slot
fn get_address_deferred_credit_for_slot(&self, addr: &Address, slot: &Slot) -> Option<Amount> {

// search in the added changes
if let Some(v) = self.added_changes
.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot) {
return Some(v);
}

// search in the history
if let Some(v) = self.active_history
.read()
.get_adress_deferred_credit_for(addr, slot) {
return Some(v);
}

// search in the final state
if let Some(v) = self.final_state
.read()
.pos_state.deferred_credits
.get_address_deferred_credit_for_slot(addr, slot) {
return Some(v);
}

None
}

/// Get the production statistics for a given address at a given cycle.
pub fn get_address_cycle_infos(
&self,
Expand Down Expand Up @@ -466,7 +490,7 @@ impl SpeculativeRollState {
credits.extend(
self.active_history
.read()
.fetch_all_deferred_credits_at(slot),
.get_all_deferred_credits_for(slot)
);

// added deferred credits
Expand Down
2 changes: 1 addition & 1 deletion massa-execution-worker/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn get_sample_state() -> Result<(Arc<RwLock<FinalState>>, NamedTempFile, Tem
let (_, selector_controller) = start_selector_worker(SelectorConfig::default())
.expect("could not start selector controller");
let mut final_state =
FinalState::new(cfg, Box::new(ledger), selector_controller.clone()).unwrap();
FinalState::new(cfg, Box::new(ledger), selector_controller).unwrap();
final_state.compute_initial_draws().unwrap();
final_state.pos_state.create_initial_cycle();
Ok((Arc::new(RwLock::new(final_state)), tempfile, tempdir))
Expand Down
1 change: 1 addition & 0 deletions massa-execution-worker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

mod mock;
mod scenarios_mandatories;
mod tests_active_history;
78 changes: 69 additions & 9 deletions massa-execution-worker/src/tests/scenarios_mandatories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use serial_test::serial;
use std::{
cmp::Reverse, collections::BTreeMap, collections::HashMap, str::FromStr, time::Duration,
};
use num::rational::Ratio;

#[test]
#[serial]
Expand Down Expand Up @@ -458,14 +459,23 @@ pub fn roll_buy() {
#[test]
#[serial]
pub fn roll_sell() {

// Try to sell 10 rolls (operation 1) then 1 rolls (operation 2)
// Check for resulting roll count + resulting deferred credits

// setup the period duration
let exec_cfg = ExecutionConfig {
let mut exec_cfg = ExecutionConfig {
t0: 100.into(),
periods_per_cycle: 2,
thread_count: 2,
cursor_delay: 0.into(),
..Default::default()
};
// turn off roll selling on missed block opportunities
// otherwise balance will be credited with those sold roll (and we need to check the balance for
// if the deferred credits are reimbursed
exec_cfg.max_miss_ratio = Ratio::new(1, 1);

// get a sample final state
let (sample_state, _keep_file, _keep_dir) = get_sample_state().unwrap();

Expand All @@ -482,20 +492,43 @@ pub fn roll_sell() {
// generate the keypair and its corresponding address
let keypair = KeyPair::from_str("S1JJeHiZv1C1zZN5GLFcbz6EXYiccmUPLkYuDFA3kayjxP39kFQ").unwrap();
let address = Address::from_public_key(&keypair.get_public_key());
// create the operation
let operation = Operation::new_wrapped(

// get initial balance
let balance_initial = sample_state
.read()
.ledger
.get_balance(&address)
.unwrap();

// get initial roll count
let roll_count_initial = sample_state.read().pos_state.get_rolls_for(&address);
let roll_sell_1 = 10;
let roll_sell_2 = 1;

// create operation 1
let operation1 = Operation::new_wrapped(
Operation {
fee: Amount::zero(),
expire_period: 10,
op: OperationType::RollSell { roll_count: 10 },
op: OperationType::RollSell { roll_count: roll_sell_1 },
},
OperationSerializer::new(),
&keypair,
)
.unwrap();
let operation2 = Operation::new_wrapped(
Operation {
fee: Amount::zero(),
expire_period: 10,
op: OperationType::RollSell { roll_count: roll_sell_2 },
},
OperationSerializer::new(),
&keypair,
)
.unwrap();
// create the block containing the roll buy operation
storage.store_operations(vec![operation.clone()]);
let block = create_block(KeyPair::generate(), vec![operation], Slot::new(1, 0)).unwrap();
storage.store_operations(vec![operation1.clone(), operation2.clone()]);
let block = create_block(KeyPair::generate(), vec![operation1, operation2], Slot::new(1, 0)).unwrap();
// store the block in storage
storage.store_block(block.clone());
// set the block as final so the sell and credits are processed
Expand All @@ -508,18 +541,45 @@ pub fn roll_sell() {
Default::default(),
block_storage.clone(),
);
std::thread::sleep(Duration::from_millis(350));
std::thread::sleep(Duration::from_millis(1000));
// check roll count deferred credits and candidate balance of the seller address
let sample_read = sample_state.read();
let mut credits = PreHashMap::default();
credits.insert(address, Amount::from_str("1000").unwrap());
assert_eq!(sample_read.pos_state.get_rolls_for(&address), 90);
let roll_remaining = roll_count_initial - roll_sell_1 - roll_sell_2;
let roll_sold = roll_sell_1 + roll_sell_2;
credits.insert(address, exec_cfg.roll_price
.checked_mul_u64(roll_sold)
.unwrap()
);

assert_eq!(sample_read.pos_state.get_rolls_for(&address), roll_remaining);
assert_eq!(
sample_read
.pos_state
.get_deferred_credits_at(&Slot::new(7, 1)),
credits
);

// Check that deferred credit are reimbursed
let credits = PreHashMap::default();
assert_eq!(
sample_read
.pos_state
.get_deferred_credits_at(&Slot::new(8, 1)),
credits
);

// Now check balance
let balances = controller
.get_final_and_candidate_balance(&[address]);
let candidate_balance = balances.get(0).unwrap().1.unwrap();

assert_eq!(candidate_balance, exec_cfg.roll_price
.checked_mul_u64(roll_sell_1 + roll_sell_2)
.unwrap()
.checked_add(balance_initial)
.unwrap());

// stop the execution controller
manager.stop();
}
Expand Down
67 changes: 67 additions & 0 deletions massa-execution-worker/src/tests/tests_active_history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::collections::{BTreeMap, VecDeque};
use massa_execution_exports::ExecutionOutput;
use crate::active_history::ActiveHistory;
use massa_models::slot::Slot;

use serial_test::serial;
use massa_final_state::StateChanges;
use massa_hash::Hash;
use massa_models::address::Address;
use massa_models::amount::Amount;
use massa_models::prehash::{CapacityAllocator, PreHashMap};
use massa_pos_exports::{DeferredCredits, PoSChanges};

#[test]
#[serial]
fn test_active_history_deferred_credits() {

let slot1 = Slot::new(2, 2);
let slot2 = Slot::new(4, 11);

let addr1 = Address(Hash::compute_from("A1".as_bytes()));
let addr2 = Address(Hash::compute_from("A2".as_bytes()));

let amount_a1_s1 = Amount::from_raw(500);
let amount_a2_s1 = Amount::from_raw(2702);
let amount_a1_s2 = Amount::from_raw(37);
let amount_a2_s2 = Amount::from_raw(3);

let mut ph1 = PreHashMap::with_capacity(2);
ph1.insert(addr1, amount_a1_s1);
ph1.insert(addr2, amount_a2_s1);
let mut ph2 = PreHashMap::with_capacity(2);
ph2.insert(addr1, amount_a1_s2);
ph2.insert(addr2, amount_a2_s2);

let credits = DeferredCredits { 0: BTreeMap::from(
[
(slot1, ph1),
(slot2, ph2)
]
)};

let exec_output_1 = ExecutionOutput {
slot: Slot::new(1, 0),
block_id: None,
state_changes: StateChanges {
ledger_changes: Default::default(),
async_pool_changes: Default::default(),
pos_changes: PoSChanges {
seed_bits: Default::default(),
roll_changes: Default::default(),
production_stats: Default::default(),
deferred_credits: credits
},
executed_ops_changes: Default::default()
},
events: Default::default()
};

let active_history = ActiveHistory { 0: VecDeque::from([exec_output_1]) };

assert_eq!(active_history.get_adress_deferred_credit_for(&addr1, &slot2), Some(amount_a1_s2));

let deferred_credit_for_slot1 = active_history.get_all_deferred_credits_for(&slot1);
assert_eq!(deferred_credit_for_slot1.get(&addr1), Some(&amount_a1_s1));
assert_eq!(deferred_credit_for_slot1.get(&addr2), Some(&amount_a2_s1));
}
Loading