diff --git a/src/entry_stream.rs b/src/entry_stream.rs index ac77a32c7a9884..31086ab2ca77f1 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -6,87 +6,256 @@ use crate::entry::Entry; use crate::leader_scheduler::LeaderScheduler; use crate::result::Result; use chrono::{SecondsFormat, Utc}; +use solana_sdk::hash::Hash; +use std::cell::RefCell; use std::io::prelude::*; use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; use std::sync::{Arc, RwLock}; -pub trait EntryStreamHandler { - fn stream_entries(&mut self, entries: &[Entry]) -> Result<()>; +pub trait EntryWriter: std::fmt::Debug { + fn write(&self, payload: String) -> Result<()>; } -pub struct EntryStream { - pub socket: String, - leader_scheduler: Arc>, +#[derive(Debug, Default)] +pub struct EntryVec { + values: RefCell>, } -impl EntryStream { - pub fn new(socket: String, leader_scheduler: Arc>) -> Self { - EntryStream { - socket, - leader_scheduler, +impl EntryWriter for EntryVec { + fn write(&self, payload: String) -> Result<()> { + self.values.borrow_mut().push(payload); + Ok(()) + } +} + +impl EntryVec { + pub fn new() -> Self { + EntryVec { + values: RefCell::new(Vec::new()), } } + + pub fn entries(&self) -> Vec { + self.values.borrow().clone() + } } -impl EntryStreamHandler for EntryStream { - fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { +#[derive(Debug)] +pub struct EntrySocket { + socket: String, +} + +impl EntryWriter for EntrySocket { + fn write(&self, payload: String) -> Result<()> { let mut socket = UnixStream::connect(Path::new(&self.socket))?; - for entry in entries { - let json = serde_json::to_string(&entry)?; - let (slot, slot_leader) = { - let leader_scheduler = self.leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); - (slot, leader_scheduler.get_leader_for_slot(slot)) - }; - let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - slot_leader, - json - ); - socket.write_all(payload.as_bytes())?; - } + socket.write_all(payload.as_bytes())?; socket.shutdown(Shutdown::Write)?; Ok(()) } } -pub struct MockEntryStream { - pub socket: Vec, - leader_scheduler: Arc>, +pub trait EntryStreamHandler { + fn emit_entry_event(&self, slot: u64, leader_id: &str, entries: &Entry) -> Result<()>; + fn emit_block_event( + &self, + slot: u64, + leader_id: &str, + tick_height: u64, + last_id: Hash, + ) -> Result<()>; +} + +#[derive(Debug)] +pub struct EntryStream { + pub output: T, + pub leader_scheduler: Arc>, + pub queued_block: Option, +} + +impl EntryStreamHandler for EntryStream +where + T: EntryWriter, +{ + fn emit_entry_event(&self, slot: u64, leader_id: &str, entry: &Entry) -> Result<()> { + let json_entry = serde_json::to_string(&entry)?; + let payload = format!( + r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + leader_id, + json_entry, + ); + self.output.write(payload)?; + Ok(()) + } + + fn emit_block_event( + &self, + slot: u64, + leader_id: &str, + tick_height: u64, + last_id: Hash, + ) -> Result<()> { + let payload = format!( + r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + tick_height, + leader_id, + last_id, + ); + self.output.write(payload)?; + Ok(()) + } +} + +pub type SocketEntryStream = EntryStream; + +impl SocketEntryStream { + pub fn new(socket: String, leader_scheduler: Arc>) -> Self { + EntryStream { + output: EntrySocket { socket }, + leader_scheduler, + queued_block: None, + } + } } +pub type MockEntryStream = EntryStream; + impl MockEntryStream { - #[allow(clippy::needless_pass_by_value)] - pub fn new(_socket: String, leader_scheduler: Arc>) -> Self { - MockEntryStream { - socket: Vec::new(), + pub fn new(_: String, leader_scheduler: Arc>) -> Self { + EntryStream { + output: EntryVec::new(), leader_scheduler, + queued_block: None, } } + + pub fn entries(&self) -> Vec { + self.output.entries() + } +} + +#[derive(Debug)] +pub struct EntryStreamBlock { + pub tick_height: u64, + pub id: Hash, } -impl EntryStreamHandler for MockEntryStream { - fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { - for entry in entries { - let json = serde_json::to_string(&entry)?; - let (slot, slot_leader) = { - let leader_scheduler = self.leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); - (slot, leader_scheduler.get_leader_for_slot(slot)) - }; - let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - slot_leader, - json - ); - self.socket.push(payload); +#[cfg(test)] +mod test { + use super::*; + use crate::bank::Bank; + use crate::entry::Entry; + use crate::genesis_block::GenesisBlock; + use crate::leader_scheduler::LeaderSchedulerConfig; + use chrono::{DateTime, FixedOffset}; + use serde_json::Value; + use solana_sdk::hash::Hash; + use std::collections::HashSet; + + #[test] + fn test_entry_stream() -> () { + // Set up bank and leader_scheduler + let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10); + let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); + let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config); + // Set up entry stream + let entry_stream = + MockEntryStream::new("test_stream".to_string(), bank.leader_scheduler.clone()); + let ticks_per_slot = bank.leader_scheduler.read().unwrap().ticks_per_slot; + + let mut last_id = Hash::default(); + let mut entries = Vec::new(); + let mut expected_entries = Vec::new(); + + let tick_height_initial = 0; + let tick_height_final = tick_height_initial + ticks_per_slot + 2; + let mut previous_slot = bank + .leader_scheduler + .read() + .unwrap() + .tick_height_to_slot(tick_height_initial); + let leader_id = bank + .leader_scheduler + .read() + .unwrap() + .get_leader_for_slot(previous_slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); + + for tick_height in tick_height_initial..=tick_height_final { + bank.leader_scheduler + .write() + .unwrap() + .update_tick_height(tick_height, &bank); + let curr_slot = bank + .leader_scheduler + .read() + .unwrap() + .tick_height_to_slot(tick_height); + if curr_slot != previous_slot { + entry_stream + .emit_block_event(previous_slot, &leader_id, tick_height - 1, last_id) + .unwrap(); + } + let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); // just ticks + last_id = entry.id; + previous_slot = curr_slot; + entry_stream + .emit_entry_event(curr_slot, &leader_id, &entry) + .unwrap(); + expected_entries.push(entry.clone()); + entries.push(entry); } - Ok(()) + + assert_eq!( + entry_stream.entries().len() as u64, + // one entry per tick (0..=N+2) is +3, plus one block + ticks_per_slot + 3 + 1 + ); + + let mut j = 0; + let mut matched_entries = 0; + let mut matched_slots = HashSet::new(); + let mut matched_blocks = HashSet::new(); + + for item in entry_stream.entries() { + let json: Value = serde_json::from_str(&item).unwrap(); + let dt_str = json["dt"].as_str().unwrap(); + + // Ensure `ts` field parses as valid DateTime + let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + + let item_type = json["t"].as_str().unwrap(); + match item_type { + "block" => { + let id = json["id"].to_string(); + matched_blocks.insert(id); + } + + "entry" => { + let slot = json["s"].as_u64().unwrap(); + matched_slots.insert(slot); + let entry_obj = json["entry"].clone(); + let entry: Entry = serde_json::from_value(entry_obj).unwrap(); + + assert_eq!(entry, expected_entries[j]); + matched_entries += 1; + j += 1; + } + + _ => { + assert!(false, "unknown item type {}", item); + } + } + } + + assert_eq!(matched_entries, expected_entries.len()); + assert_eq!(matched_slots.len(), 2); + assert_eq!(matched_blocks.len(), 1); } } diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs index 3a31a1a0a3c599..569052ecddb7a0 100644 --- a/src/entry_stream_stage.rs +++ b/src/entry_stream_stage.rs @@ -3,11 +3,11 @@ //! real-time access to entries. use crate::entry::{EntryReceiver, EntrySender}; -#[cfg(not(test))] -use crate::entry_stream::EntryStream; -use crate::entry_stream::EntryStreamHandler; #[cfg(test)] use crate::entry_stream::MockEntryStream as EntryStream; +#[cfg(not(test))] +use crate::entry_stream::SocketEntryStream as EntryStream; +use crate::entry_stream::{EntryStreamBlock, EntryStreamHandler}; use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; @@ -59,9 +59,39 @@ impl EntryStreamStage { ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = ledger_entry_receiver.recv_timeout(timeout)?; - entry_stream.stream_entries(&entries).unwrap_or_else(|e| { - error!("Entry Stream error: {:?}, {:?}", e, entry_stream.socket); - }); + let leader_scheduler = entry_stream.leader_scheduler.read().unwrap(); + + for entry in &entries { + let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); + let leader_id = leader_scheduler + .get_leader_for_slot(slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); + + if entry.is_tick() && entry_stream.queued_block.is_some() { + let queued_block = entry_stream.queued_block.as_ref(); + let block_tick_height = queued_block.unwrap().tick_height; + let block_id = queued_block.unwrap().id; + entry_stream + .emit_block_event(slot, &leader_id, block_tick_height, block_id) + .unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + }); + entry_stream.queued_block = None; + } + entry_stream + .emit_entry_event(slot, &leader_id, &entry) + .unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + }); + if 0 == leader_scheduler.num_ticks_left_in_slot(entry.tick_height) { + entry_stream.queued_block = Some(EntryStreamBlock { + tick_height: entry.tick_height, + id: entry.id, + }); + } + } + entry_stream_sender.send(entries)?; Ok(()) } @@ -85,11 +115,14 @@ mod test { use chrono::{DateTime, FixedOffset}; use serde_json::Value; use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction::SystemTransaction; #[test] fn test_entry_stream_stage_process_entries() { // Set up bank and leader_scheduler - let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10); + let ticks_per_slot = 5; + let leader_scheduler_config = LeaderSchedulerConfig::new(ticks_per_slot, 2, 10); let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config); // Set up entry stream @@ -103,12 +136,18 @@ mod test { let mut last_id = Hash::default(); let mut entries = Vec::new(); let mut expected_entries = Vec::new(); - for x in 0..5 { + for x in 0..6 { let entry = Entry::new(&mut last_id, x, 1, vec![]); //just ticks last_id = entry.id; expected_entries.push(entry.clone()); entries.push(entry); } + let keypair = Keypair::new(); + let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0); + let entry = Entry::new(&mut last_id, ticks_per_slot - 1, 1, vec![tx]); + expected_entries.insert(ticks_per_slot as usize, entry.clone()); + entries.insert(ticks_per_slot as usize, entry); + ledger_entry_sender.send(entries).unwrap(); EntryStreamStage::process_entries( &ledger_entry_receiver, @@ -116,19 +155,38 @@ mod test { &mut entry_stream, ) .unwrap(); - assert_eq!(entry_stream.socket.len(), 5); - - for (i, item) in entry_stream.socket.iter().enumerate() { - let json: Value = serde_json::from_str(&item).unwrap(); - let dt_str = json["dt"].as_str().unwrap(); - - // Ensure `ts` field parses as valid DateTime - let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + assert_eq!(entry_stream.entries().len(), 8); + let (entry_events, block_events): (Vec, Vec) = entry_stream + .entries() + .iter() + .map(|item| { + let json: Value = serde_json::from_str(&item).unwrap(); + let dt_str = json["dt"].as_str().unwrap(); + // Ensure `ts` field parses as valid DateTime + let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + json + }) + .partition(|json| { + let item_type = json["t"].as_str().unwrap(); + item_type == "entry" + }); + for (i, json) in entry_events.iter().enumerate() { let entry_obj = json["entry"].clone(); - let entry: Entry = serde_json::from_value(entry_obj).unwrap(); - assert_eq!(entry, expected_entries[i]); + let tx = entry_obj["transactions"].as_array().unwrap(); + if tx.len() == 0 { + // TODO: There is a bug in Transaction deserialize methods such that + // `serde_json::from_str` does not work for populated Entries. + // Remove this `if` when fixed. + let entry: Entry = serde_json::from_value(entry_obj).unwrap(); + assert_eq!(entry, expected_entries[i]); + } + } + for json in block_events { + let height = json["h"].as_u64().unwrap(); + assert_eq!(ticks_per_slot - 1, height); } + // Ensure entries pass through stage unadulterated let recv_entries = entry_stream_receiver.recv().unwrap(); assert_eq!(expected_entries, recv_entries);