Skip to content

Commit

Permalink
rust: intercept module tested to work to transfer events
Browse files Browse the repository at this point in the history
  • Loading branch information
rizsotto committed Oct 12, 2024
1 parent 857e7df commit f59a60a
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 88 deletions.
46 changes: 21 additions & 25 deletions rust/intercept/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,40 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

use std::net::{TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};

use crossbeam::channel::{Receiver, Sender};
use crossbeam_channel::bounded;
use crossbeam::channel::Sender;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use super::Envelope;

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct SessionLocator(pub String);

pub trait EventCollector {
fn address(&self) -> Result<SessionLocator, anyhow::Error>;
fn address(&self) -> SessionLocator;
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error>;
fn stop(&self) -> Result<(), anyhow::Error>;
}

pub struct EventCollectorOnTcp {
control_input: Sender<bool>,
control_output: Receiver<bool>,
shutdown: Arc<AtomicBool>,
listener: TcpListener,
address: SocketAddr,
}

impl EventCollectorOnTcp {
pub fn new() -> Result<Self, anyhow::Error> {
let (control_input, control_output) = bounded(0);
let shutdown = Arc::new(AtomicBool::new(false));
let listener = TcpListener::bind("127.0.0.1:0")?;
let address = listener.local_addr()?;

let result = EventCollectorOnTcp {
control_input,
control_output,
shutdown,
listener,
address,
};

Ok(result)
Expand All @@ -67,25 +69,20 @@ impl EventCollectorOnTcp {
}

impl EventCollector for EventCollectorOnTcp {
fn address(&self) -> Result<SessionLocator, anyhow::Error> {
let local_addr = self.listener.local_addr()?;
let locator = SessionLocator(local_addr.to_string());
Ok(locator)
fn address(&self) -> SessionLocator {
SessionLocator(self.address.to_string())
}

fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error> {
loop {
if let Ok(shutdown) = self.control_output.try_recv() {
if shutdown {
break;
}
for stream in self.listener.incoming() {
if self.shutdown.load(Ordering::Relaxed) {
break;
}

match self.listener.accept() {
Ok((stream, _)) => {
println!("Got a connection");
match stream {
Ok(connection) => {
// ... (process the connection in a separate thread or task)
self.send(stream, destination.clone())?;
self.send(connection, destination.clone())?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// No new connection available, continue checking for shutdown
Expand All @@ -97,13 +94,12 @@ impl EventCollector for EventCollectorOnTcp {
}
}
}

println!("Server shutting down");
Ok(())
}

fn stop(&self) -> Result<(), anyhow::Error> {
self.control_input.send(true)?;
self.shutdown.store(true, Ordering::Relaxed);
let _ = TcpStream::connect(self.address)?;
Ok(())
}
}
60 changes: 4 additions & 56 deletions rust/intercept/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ pub mod reporter;
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ReporterId(pub u64);

#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ProcessId(pub u32);

#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct Execution {
pub executable: PathBuf,
pub arguments: Vec<String>,
Expand All @@ -51,7 +51,7 @@ pub struct Execution {
// terminate), but can be extended later with performance related
// events like monitoring the CPU usage or the memory allocation if
// this information is available.
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub enum Event {
Started {
pid: ProcessId,
Expand All @@ -66,7 +66,7 @@ pub enum Event {
},
}

#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct Envelope {
pub rid: ReporterId,
pub timestamp: u64,
Expand Down Expand Up @@ -106,55 +106,3 @@ impl Envelope {
Ok(length)
}
}

#[cfg(test)]
mod test {
use super::*;
use lazy_static::lazy_static;
use std::io::Cursor;

#[test]
fn read_write_works() {
let mut writer = Cursor::new(vec![0; 1024]);
for envelope in ENVELOPES.iter() {
let result = Envelope::write_into(envelope, &mut writer);
assert!(result.is_ok());
}

let mut reader = Cursor::new(writer.get_ref());
for envelope in ENVELOPES.iter() {
let result = Envelope::read_from(&mut reader);
assert!(result.is_ok());
assert_eq!(result.unwrap(), *envelope.clone());
}
}

lazy_static! {
static ref ENVELOPES: Vec<Envelope> = vec![
Envelope {
rid: ReporterId(1),
timestamp: 0,
event: Event::Started {
pid: ProcessId(1),
ppid: ProcessId(0),
execution: Execution {
executable: PathBuf::from("/usr/bin/ls"),
arguments: vec!["-l".to_string()],
working_dir: PathBuf::from("/tmp"),
environment: HashMap::new(),
},
},
},
Envelope {
rid: ReporterId(1),
timestamp: 0,
event: Event::Terminated { status: 0 },
},
Envelope {
rid: ReporterId(1),
timestamp: 0,
event: Event::Signaled { signal: 15 },
},
];
}
}
12 changes: 5 additions & 7 deletions rust/intercept/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@ impl ReporterId {
// supervisor processes). The events are collected in a common place
// in order to reconstruct of final report of a build process.
pub trait Reporter {
fn report(&mut self, event: Event) -> Result<(), anyhow::Error>;
fn report(&self, event: Event) -> Result<(), anyhow::Error>;
}

struct TcpReporter {
socket: TcpStream,
pub struct TcpReporter {
destination: String,
reporter_id: ReporterId,
}

impl TcpReporter {
pub fn new(destination: String) -> Result<Self, anyhow::Error> {
let socket = TcpStream::connect(destination.clone())?;
let reporter_id = ReporterId::new();
let result = TcpReporter {
socket,
destination,
reporter_id,
};
Expand All @@ -59,9 +56,10 @@ impl TcpReporter {
}

impl Reporter for TcpReporter {
fn report(&mut self, event: Event) -> Result<(), anyhow::Error> {
fn report(&self, event: Event) -> Result<(), anyhow::Error> {
let envelope = Envelope::new(&self.reporter_id, event);
envelope.write_into(&mut self.socket)?;
let mut socket = TcpStream::connect(self.destination.clone())?;
envelope.write_into(&mut socket)?;

Ok(())
}
Expand Down
105 changes: 105 additions & 0 deletions rust/intercept/tests/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use intercept::collector::{EventCollector, EventCollectorOnTcp};
use intercept::reporter::{Reporter, TcpReporter};
use intercept::*;

mod test {
use super::*;
use crossbeam_channel::bounded;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;

// Test that the TCP reporter and the TCP collector work together.
// We create a TCP collector and a TCP reporter, then we send events
// to the reporter and check if the collector receives them.
//
// We use a bounded channel to send the events from the reporter to the
// collector. The collector reads the events from the channel and checks
// if they are the same as the original events.
#[test]
fn tcp_reporter_and_collectors_work() {
let collector = EventCollectorOnTcp::new().unwrap();
let reporter = TcpReporter::new(collector.address().0).unwrap();

// Create wrapper to share the collector across threads.
let thread_collector = Arc::new(collector);
let main_collector = thread_collector.clone();

// Start the collector in a separate thread.
let (input, output) = bounded(EVENTS.len());
let receiver_thread = thread::spawn(move || {
thread_collector.collect(input).unwrap();
});
// Send events to the reporter.
for event in EVENTS.iter() {
let result = reporter.report(event.clone());
assert!(result.is_ok());
}

// Call the stop method to stop the collector. This will close the
// channel and the collector will stop reading from it.
main_collector.stop().unwrap();

// Empty the channel and assert that we received all the events.
let mut count = 0;
for envelope in output.iter() {
assert!(EVENTS.contains(&envelope.event));
count += 1;
}
assert_eq!(count, EVENTS.len());
// shutdown the receiver thread
receiver_thread.join().unwrap();
}

// Test that the serialization and deserialization of the Envelope works.
// We write the Envelope to a buffer and read it back to check if the
// deserialized Envelope is the same as the original one.
#[test]
fn read_write_works() {
let mut writer = Cursor::new(vec![0; 1024]);
for envelope in ENVELOPES.iter() {
let result = Envelope::write_into(envelope, &mut writer);
assert!(result.is_ok());
}

let mut reader = Cursor::new(writer.get_ref());
for envelope in ENVELOPES.iter() {
let result = Envelope::read_from(&mut reader);
assert!(result.is_ok());
assert_eq!(result.unwrap(), envelope.clone());
}
}

lazy_static! {
static ref ENVELOPES: Vec<Envelope> = vec![
Envelope {
rid: ReporterId(1),
timestamp: 0,
event: Event::Started {
pid: ProcessId(1),
ppid: ProcessId(0),
execution: Execution {
executable: PathBuf::from("/usr/bin/ls"),
arguments: vec!["ls".to_string(), "-l".to_string()],
working_dir: PathBuf::from("/tmp"),
environment: HashMap::new(),
},
},
},
Envelope {
rid: ReporterId(1),
timestamp: 0,
event: Event::Terminated { status: 0 },
},
Envelope {
rid: ReporterId(1),
timestamp: 0,
event: Event::Signaled { signal: 15 },
},
];
static ref EVENTS: Vec<Event> = ENVELOPES.iter().map(|e| e.event.clone()).collect();
}
}

0 comments on commit f59a60a

Please sign in to comment.