From 6d5d8ebe4d00d039545e975f0c2203709a6a7840 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Thu, 21 Sep 2023 15:00:07 +0200 Subject: [PATCH 1/2] fix/dotgraph: write dot graph even if conversion to svg fails Signed-off-by: Bernhard Schuster --- orchestra/proc-macro/src/graph.rs | 66 ++++++++++++-------- orchestra/proc-macro/tests/assets/sample.dot | 14 +++++ 2 files changed, 55 insertions(+), 25 deletions(-) create mode 100644 orchestra/proc-macro/tests/assets/sample.dot diff --git a/orchestra/proc-macro/src/graph.rs b/orchestra/proc-macro/src/graph.rs index 9cad334..d58cbce 100644 --- a/orchestra/proc-macro/src/graph.rs +++ b/orchestra/proc-macro/src/graph.rs @@ -184,7 +184,6 @@ impl<'a> ConnectionGraph<'a> { dest: impl AsRef, ) -> anyhow::Result<()> { use self::graph_helpers::color_scheme; - use fs_err as fs; let dot_content = format!( r#"digraph {{ @@ -201,32 +200,10 @@ node [colorscheme={}] let dest = dest.as_ref(); let dest = dest.to_path_buf(); - let svg_content = { - let mut parser = dotlay::gv::DotParser::new(dot_content.as_str()); - let graph = parser.process().map_err(|err_msg| { - anyhow::anyhow!(dbg!(err_msg)).context("Failed to parse dotfile") - })?; - let mut svg = dotlay::backends::svg::SVGWriter::new(); - let mut builder = dotlay::gv::GraphBuilder::default(); - builder.visit_graph(&graph); - let mut vg = builder.get(); - vg.do_it(false, false, false, &mut svg); - svg.finalize() - }; + write_to_disk(&dest, "dot", &dot_content)?; - fn write_to_disk( - dest: impl AsRef, - ext: &str, - content: impl AsRef<[u8]>, - ) -> std::io::Result<()> { - let dest = dest.as_ref().with_extension(ext); - print!("Writing {} to {} ..", ext, dest.display()); - fs::write(dest, content.as_ref())?; - println!(" OK"); - Ok(()) - } + let svg_content = convert_dot_to_svg(&dot_content)?; - write_to_disk(&dest, "dot", &dot_content)?; write_to_disk(&dest, "svg", &svg_content)?; Ok(()) @@ -388,6 +365,35 @@ node [colorscheme={}] } } +#[cfg(feature = "dotgraph")] +fn convert_dot_to_svg(dot_content: impl AsRef) -> anyhow::Result { + let mut parser = dotlay::gv::DotParser::new(dot_content.as_ref()); + let graph = parser.process().map_err(|err_msg| { + parser.print_error(); + anyhow::anyhow!(err_msg).context("Failed to parse dotfile content") + })?; + let mut svg = dotlay::backends::svg::SVGWriter::new(); + let mut builder = dotlay::gv::GraphBuilder::default(); + builder.visit_graph(&graph); + let mut vg = builder.get(); + vg.do_it(true, false, false, &mut svg); + Ok(svg.finalize()) +} + +#[cfg(feature = "dotgraph")] +fn write_to_disk( + dest: impl AsRef, + ext: &str, + content: impl AsRef<[u8]>, +) -> std::io::Result<()> { + use fs_err as fs; + let dest = dest.as_ref().with_extension(ext); + print!("Writing {} to {} ..", ext, dest.display()); + fs::write(dest, content.as_ref())?; + println!(" OK"); + Ok(()) +} + const GREEK_ALPHABET_SIZE: usize = 24; fn greek_alphabet() -> [char; GREEK_ALPHABET_SIZE] { @@ -490,4 +496,14 @@ mod tests { assert_eq!(sccs.len(), 2); // `f` and everything else assert_eq!(sccs[0].len(), 5); // every node but `f` } + + #[cfg(feature = "dotgraph")] + #[ignore] + #[test] + fn dot_to_svg_works() { + use super::convert_dot_to_svg; + + let s = include_str!("../tests/assets/sample.dot"); + convert_dot_to_svg(s).unwrap(); + } } diff --git a/orchestra/proc-macro/tests/assets/sample.dot b/orchestra/proc-macro/tests/assets/sample.dot new file mode 100644 index 0000000..b7e4d7d --- /dev/null +++ b/orchestra/proc-macro/tests/assets/sample.dot @@ -0,0 +1,14 @@ +digraph { + fontname="Cantarell" + bgcolor="white" + label = "orchestra message flow between subsystems" +node [colorscheme=rdylgn10] + 0 [ label="AvailabilityStore"] + 1 [ label="Provisioner"] + 2 [ label="RuntimeApi"] + 3 [ label="ChainApi"] + 4 [ label="MlExecution"] + 6 [ label="NetworkBridgeTx"] + 7 [ color="/rdylgn10/1",fontcolor="/rdylgn10/1",xlabel=<α>,label="GossipSupport"] + 1 -> 0 [ label="AvailabilityStoreMessage"] +} From a5a9330069311970204de708c0f5f9d72137e752 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 25 Sep 2023 14:54:23 +0100 Subject: [PATCH 2/2] Avoid writing length on each IO for async channel (#62) --- metered-channel/src/bounded.rs | 26 +++++++++++++++++++++----- metered-channel/src/lib.rs | 14 ++++++++++++-- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/metered-channel/src/bounded.rs b/metered-channel/src/bounded.rs index 8280e93..7efcc57 100644 --- a/metered-channel/src/bounded.rs +++ b/metered-channel/src/bounded.rs @@ -242,6 +242,11 @@ impl MeteredReceiver { /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { + // For async_channel we can update channel length in the meter access + // to avoid more expensive updates on each RW operation + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.len()); + &self.meter } @@ -262,7 +267,6 @@ impl MeteredReceiver { Err(err) => Err(err), }; - self.meter.note_channel_len(self.len()); result } @@ -275,7 +279,6 @@ impl MeteredReceiver { Err(err) => Err(err.into()), }; - self.meter.note_channel_len(self.len()); result } @@ -288,7 +291,6 @@ impl MeteredReceiver { Err(err) => Err(err), }; - self.meter.note_channel_len(self.len()); result } @@ -297,6 +299,12 @@ impl MeteredReceiver { pub fn len(&self) -> usize { self.inner.len() } + + #[cfg(feature = "futures_channel")] + /// Returns the current number of messages in the channel based on meter approximation + pub fn len(&self) -> usize { + self.meter.calculate_channel_len() + } } impl futures::stream::FusedStream for MeteredReceiver { @@ -346,6 +354,10 @@ impl MeteredSender { /// Get an updated accessor object for all metrics collected. pub fn meter(&self) -> &Meter { + // For async_channel we can update channel length in the meter access + // to avoid more expensive updates on each RW operation + #[cfg(feature = "async_channel")] + self.meter.note_channel_len(self.len()); &self.meter } @@ -382,7 +394,6 @@ impl MeteredSender { SendError::Closed(err.0.into()) }); - self.meter.note_channel_len(self.len()); result } @@ -420,7 +431,6 @@ impl MeteredSender { TrySendError::from(e) }); - self.meter.note_channel_len(self.len()); result } @@ -429,4 +439,10 @@ impl MeteredSender { pub fn len(&self) -> usize { self.inner.len() } + + #[cfg(feature = "futures_channel")] + /// Returns the current number of messages in the channel based on meter approximation + pub fn len(&self) -> usize { + self.meter.calculate_channel_len() + } } diff --git a/metered-channel/src/lib.rs b/metered-channel/src/lib.rs index adecf06..1fa32be 100644 --- a/metered-channel/src/lib.rs +++ b/metered-channel/src/lib.rs @@ -42,6 +42,9 @@ use coarsetime::Instant as CoarseInstant; #[cfg(test)] mod tests; +/// Defines the maximum number of time of flight values to be stored. +const TOF_QUEUE_SIZE: usize = 100; + /// A peek into the inner state of a meter. #[derive(Debug, Clone)] pub struct Meter { @@ -54,7 +57,7 @@ pub struct Meter { channel_len: Arc, // Number of times senders blocked while sending messages to a subsystem. blocked: Arc, - // Atomic ringbuffer of the last 50 time of flight values + // Atomic ringbuffer of the last `TOF_QUEUE_SIZE` time of flight values tof: Arc>, } @@ -66,7 +69,7 @@ impl std::default::Default for Meter { #[cfg(feature = "async_channel")] channel_len: Arc::new(AtomicUsize::new(0)), blocked: Arc::new(AtomicUsize::new(0)), - tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)), + tof: Arc::new(crossbeam_queue::ArrayQueue::new(TOF_QUEUE_SIZE)), } } } @@ -140,6 +143,13 @@ impl Meter { fn note_time_of_flight(&self, tof: CoarseDuration) { let _ = self.tof.force_push(tof); } + + #[cfg(feature = "futures_channel")] + fn calculate_channel_len(&self) -> usize { + let sent = self.sent.load(Ordering::Relaxed); + let received = self.received.load(Ordering::Relaxed); + sent.saturating_sub(received) as usize + } } /// Determine if this instance shall be measured