Skip to content

Commit

Permalink
Replace Blob Ids with Forward property (solana-labs#2734)
Browse files Browse the repository at this point in the history
* Replace Blob Id with Blob forwarding

* Update simulation to properly propagate blobs
  • Loading branch information
sagar-solana authored Feb 12, 2019
1 parent 1173cf7 commit 8b39eb5
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 132 deletions.
10 changes: 5 additions & 5 deletions src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res
Ok((1, entries[0].id))
}

pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()>
pub fn genesis<'a, I>(ledger_path: &str, entries: I) -> Result<()>
where
I: IntoIterator<Item = &'a Entry>,
{
Expand All @@ -1274,7 +1274,7 @@ where
.map(|(idx, entry)| {
let mut b = entry.borrow().to_blob();
b.set_index(idx as u64);
b.set_id(&keypair.pubkey());
b.forward(true);
b.set_slot(DEFAULT_SLOT_HEIGHT);
b
})
Expand Down Expand Up @@ -1408,7 +1408,7 @@ mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = DEFAULT_SLOT_HEIGHT;
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, &[slot; 10]);
index_blobs(&shared_blobs, &mut 0, &[slot; 10]);

let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down Expand Up @@ -1779,7 +1779,7 @@ mod tests {

let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{
genesis(&ledger_path, &Keypair::new(), &entries).unwrap();
genesis(&ledger_path, &entries).unwrap();

let ledger = Blocktree::open(&ledger_path).expect("open failed");

Expand All @@ -1797,7 +1797,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{
// put entries except last 2 into ledger
genesis(&ledger_path, &Keypair::new(), &entries[..entries.len() - 2]).unwrap();
genesis(&ledger_path, &entries[..entries.len() - 2]).unwrap();

let ledger = Blocktree::open(&ledger_path).expect("open failed");

Expand Down
2 changes: 1 addition & 1 deletion src/broadcast_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Broadcast {
.collect();

// TODO: blob_index should be slot-relative...
index_blobs(&blobs, &self.id, &mut self.blob_index, &slots);
index_blobs(&blobs, &mut self.blob_index, &slots);

let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

Expand Down
4 changes: 2 additions & 2 deletions src/chacha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ mod tests {
use bs58;
// golden needs to be updated if blob stuff changes....
let golden = Hash::new(
&bs58::decode("nzxMWDQVsftBZbMGA1ika8X6bAKy7vya1jfXnVZSErt")
&bs58::decode("8NMJBwpXoBoA7YrA5CemRtGtfAqoY15bvnCqVjh4LYpS")
.into_vec()
.unwrap(),
);
assert_eq!(hasher.result(), golden,);
assert_eq!(hasher.result(), golden);
remove_file(out_path).unwrap();
}
}
1 change: 0 additions & 1 deletion src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ impl ClusterInfo {
let s = obj.read().unwrap();
(s.my_data().clone(), peers)
};
blob.write().unwrap().set_id(&me.id);
let rblob = blob.read().unwrap();
trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders
Expand Down
67 changes: 21 additions & 46 deletions src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@ use std::sync::{Arc, RwLock};

pub const MAX_REPAIR_LENGTH: usize = 128;

pub fn retransmit_all_leader_blocks(
pub fn retransmit_blobs(
dq: &[SharedBlob],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
retransmit: &BlobSender,
id: &Pubkey,
) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
for b in dq {
// Check if the blob is from the scheduled leader for its slot. If so,
// add to the retransmit_queue
// Don't add blobs generated by this node to the retransmit queue
let slot = b.read().unwrap().slot();
if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) {
if leader_id != *id {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
retransmit_queue.push(b.clone());
}
}
}

//todo maybe move this to where retransmit is actually happening
submit(
influxdb::Point::new("retransmit-queue")
.add_field(
Expand All @@ -50,24 +50,6 @@ pub fn retransmit_all_leader_blocks(
Ok(())
}

pub fn add_blob_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
retransmit_queue: &mut Vec<SharedBlob>,
) {
let p = b.read().unwrap();
if p.id() == leader_id {
let nv = SharedBlob::default();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push(nv);
}
}

/// Process a blob: Add blob to the ledger window.
pub fn process_blob(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
Expand Down Expand Up @@ -216,7 +198,7 @@ mod test {
}

#[test]
pub fn test_retransmit() {
pub fn test_send_to_retransmit_stage() {
let leader = Keypair::new().pubkey();
let nonleader = Keypair::new().pubkey();
let mut leader_scheduler = LeaderScheduler::default();
Expand All @@ -226,39 +208,38 @@ mod test {

let (blob_sender, blob_receiver) = channel();

// Expect blob from leader to be retransmitted
blob.write().unwrap().set_id(&leader);
retransmit_all_leader_blocks(
// Expect all blobs to be sent to retransmit_stage
blob.write().unwrap().forward(false);
retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
let output_blob = blob_receiver
let _ = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");

// Retransmitted blob should be missing the leader id
assert_ne!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Set the leader in the retransmitted blob, should now match the original
output_blob[0].write().unwrap().set_id(&leader);
assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap());

// Expect blob from nonleader to not be retransmitted
blob.write().unwrap().set_id(&nonleader);
retransmit_all_leader_blocks(
blob.write().unwrap().forward(true);
retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err());
let output_blob = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");

// retransmit_blobs shouldn't be modifying the blob. That is retransmit stage's job now
assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap());

// Expect blob from leader while currently leader to not be retransmitted
blob.write().unwrap().set_id(&leader);
retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender, &leader)
// Even when forward is set
blob.write().unwrap().forward(true);
retransmit_blobs(&vec![blob], &leader_scheduler, &blob_sender, &leader)
.expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err());
}
Expand Down Expand Up @@ -470,12 +451,7 @@ mod test {
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();

index_blobs(
&shared_blobs,
&Keypair::new().pubkey(),
&mut 0,
&vec![slot; num_entries],
);
index_blobs(&shared_blobs, &mut 0, &vec![slot; num_entries]);

let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
Expand Down Expand Up @@ -572,7 +548,6 @@ mod test {

index_blobs(
&shared_blobs,
&Keypair::new().pubkey(),
&mut 0,
&vec![DEFAULT_SLOT_HEIGHT; num_entries],
);
Expand Down
3 changes: 1 addition & 2 deletions src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec<Entry> {

#[cfg(test)]
pub fn make_consecutive_blobs(
id: &Pubkey,
num_blobs_to_make: u64,
start_height: u64,
start_hash: Hash,
Expand All @@ -460,7 +459,7 @@ pub fn make_consecutive_blobs(
for blob in &blobs {
let mut blob = blob.write().unwrap();
blob.set_index(index);
blob.set_id(id);
blob.forward(true);
blob.meta.set_addr(addr);
index += 1;
}
Expand Down
25 changes: 9 additions & 16 deletions src/erasure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ impl CodingGenerator {
for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
let index = data_blob.index();
let slot = data_blob.slot();
let id = data_blob.id();
let should_forward = data_blob.should_forward();

let coding_blob = SharedBlob::default();
{
let mut coding_blob = coding_blob.write().unwrap();
coding_blob.set_index(index);
coding_blob.set_slot(slot);
coding_blob.set_id(&id);
coding_blob.forward(should_forward);
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
}
Expand Down Expand Up @@ -509,7 +509,6 @@ pub mod test {
use crate::window::WindowSlot;
use rand::{thread_rng, Rng};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;

#[test]
Expand Down Expand Up @@ -756,23 +755,23 @@ pub mod test {
for i in 0..max_data_size {
coding_wl.data[i] = 0;
}
// copy index and id from the data blob
// copy index and forward flag from the data blob
if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap();

let index = data_rl.index();
let slot = data_rl.slot();
let id = data_rl.id();
let should_forward = data_rl.should_forward();

trace!(
"{} copying index {} id {:?} from data to coding",
id,
"{} copying index {} should_forward {:?} from data to coding",
should_forward,
index,
id
should_forward
);
coding_wl.set_index(index);
coding_wl.set_slot(slot);
coding_wl.set_id(&id);
coding_wl.forward(should_forward);
}
coding_wl.set_size(max_data_size);
coding_wl.set_coding();
Expand Down Expand Up @@ -890,12 +889,7 @@ pub mod test {
}

// Make some dummy slots
index_blobs(
&blobs,
&Keypair::new().pubkey(),
&mut (offset as u64),
&vec![slot; blobs.len()],
);
index_blobs(&blobs, &mut (offset as u64), &vec![slot; blobs.len()]);

for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
Expand All @@ -910,7 +904,6 @@ pub mod test {

index_blobs(
&blobs,
&Keypair::new().pubkey(),
&mut (offset as u64),
&vec![DEFAULT_SLOT_HEIGHT; blobs.len()],
);
Expand Down
15 changes: 5 additions & 10 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,16 +794,11 @@ mod tests {

let tvu_address = &validator_info.tvu;

let msgs = make_consecutive_blobs(
&leader_id,
blobs_to_send,
ledger_initial_len,
last_id,
&tvu_address,
)
.into_iter()
.rev()
.collect();
let msgs =
make_consecutive_blobs(blobs_to_send, ledger_initial_len, last_id, &tvu_address)
.into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};
Expand Down
Loading

0 comments on commit 8b39eb5

Please sign in to comment.