From 639c93460a7f3cfca16f68c84cb318a3af2834c0 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 17 Oct 2018 13:02:32 -0700 Subject: [PATCH] Write stage optimizations (#1534) - Testnet dashboard shows that channel pressure for write stage is incrementing on every iteration of write. - This change optimizes ledger writing by removing cloning of map and reducing calls to flush --- src/ledger.rs | 14 ++++++++------ src/write_stage.rs | 9 +++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/ledger.rs b/src/ledger.rs index d8a31607bac2c2..7036ce5ef2a132 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -368,7 +368,7 @@ impl LedgerWriter { Ok(LedgerWriter { index, data }) } - fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { + pub fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { let len = serialized_size(&entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; @@ -395,13 +395,17 @@ impl LedgerWriter { Ok(()) } - pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { - self.write_entry_noflush(&entry)?; + pub fn flush(&mut self) -> io::Result<()> { self.index.flush()?; self.data.flush()?; Ok(()) } + pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + self.write_entry_noflush(&entry)?; + self.flush() + } + pub fn write_entries(&mut self, entries: I) -> io::Result<()> where I: IntoIterator, @@ -409,9 +413,7 @@ impl LedgerWriter { for entry in entries { self.write_entry_noflush(&entry)?; } - self.index.flush()?; - self.data.flush()?; - Ok(()) + self.flush() } } diff --git a/src/write_stage.rs b/src/write_stage.rs index e2e381d9d8fc5b..b03eabb4a75096 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -166,15 +166,15 @@ impl WriteStage { let start = Instant::now(); for entries in ventries { - for e in &entries { - num_txs += e.transactions.len(); - } let cluster_info_votes_start = Instant::now(); let votes = &entries.votes(); cluster_info.write().unwrap().insert_votes(&votes); cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed()); - ledger_writer.write_entries(entries.clone())?; + for e in &entries { + num_txs += e.transactions.len(); + ledger_writer.write_entry_noflush(&e)?; + } // Once the entries have been written to the ledger, then we can // safely incement entry height *entry_height += entries.len() as u64; @@ -196,6 +196,7 @@ impl WriteStage { entries_send_total += duration_as_ms(&entries_send_start.elapsed()); } + ledger_writer.flush()?; inc_new_counter_info!( "write_stage-time_ms", duration_as_ms(&now.elapsed()) as usize