Skip to content

Commit

Permalink
Write stage optimizations (#1534)
Browse files Browse the repository at this point in the history
- Testnet dashboard shows that channel pressure for write stage
  is incrementing on every iteration of write.
- This change optimizes ledger writing by removing cloning of map
  and reducing calls to flush
  • Loading branch information
pgarg66 authored Oct 17, 2018
1 parent 7611730 commit 639c934
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
14 changes: 8 additions & 6 deletions src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl LedgerWriter {
Ok(LedgerWriter { index, data })
}

fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
pub fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;

serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
Expand All @@ -395,23 +395,25 @@ impl LedgerWriter {
Ok(())
}

pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
self.write_entry_noflush(&entry)?;
pub fn flush(&mut self) -> io::Result<()> {
self.index.flush()?;
self.data.flush()?;
Ok(())
}

pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
self.write_entry_noflush(&entry)?;
self.flush()
}

pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
where
I: IntoIterator<Item = Entry>,
{
for entry in entries {
self.write_entry_noflush(&entry)?;
}
self.index.flush()?;
self.data.flush()?;
Ok(())
self.flush()
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/write_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ impl WriteStage {

let start = Instant::now();
for entries in ventries {
for e in &entries {
num_txs += e.transactions.len();
}
let cluster_info_votes_start = Instant::now();
let votes = &entries.votes();
cluster_info.write().unwrap().insert_votes(&votes);
cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed());

ledger_writer.write_entries(entries.clone())?;
for e in &entries {
num_txs += e.transactions.len();
ledger_writer.write_entry_noflush(&e)?;
}
// Once the entries have been written to the ledger, then we can
// safely incement entry height
*entry_height += entries.len() as u64;
Expand All @@ -196,6 +196,7 @@ impl WriteStage {

entries_send_total += duration_as_ms(&entries_send_start.elapsed());
}
ledger_writer.flush()?;
inc_new_counter_info!(
"write_stage-time_ms",
duration_as_ms(&now.elapsed()) as usize
Expand Down

0 comments on commit 639c934

Please sign in to comment.