diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 12c79f9405..8b2d3cd75f 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -40,7 +40,8 @@ anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disab derive = ["libafl_derive"] # provide derive(SerdeAny) macro. llmp_small_maps = [] # reduces initial map size for llmp llmp_debug = ["backtrace"] # Enables debug output for LLMP -llmp_compression = [] #llmp compression using GZip +llmp_compression = [] # llmp compression using GZip +llmp_bind_public = [] # If set, llmp will bind to 0.0.0.0, allowing cross-device communication. Binds to localhost by default. [[example]] name = "llmp_test" @@ -61,6 +62,7 @@ libafl_derive = { version = "0.1.0", optional = true, path = "../libafl_derive" serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap compression = { version = "0.1.5" } num_enum = "0.5.1" +hostname = "^0.3" # Is there really no gethostname in the stdlib? [target.'cfg(target_os = "android")'.dependencies] backtrace = { version = "0.3", optional = true, default-features = false, features = ["std", "libbacktrace"] } # for llmp_debug diff --git a/libafl/examples/llmp_test/main.rs b/libafl/examples/llmp_test/main.rs index 9b6cde1ecb..0324dace4f 100644 --- a/libafl/examples/llmp_test/main.rs +++ b/libafl/examples/llmp_test/main.rs @@ -83,7 +83,7 @@ fn large_msg_loop(port: u16) -> ! { fn broker_message_hook( client_id: u32, tag: llmp::Tag, - _flags: llmp::Flag, + _flags: llmp::Flags, message: &[u8], ) -> Result { match tag { @@ -120,22 +120,31 @@ fn main() { let mode = std::env::args() .nth(1) - .expect("no mode specified, chose 'broker', 'ctr', 'adder', or 'large'"); + .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', or 'large'"); let port: u16 = std::env::args() .nth(2) .unwrap_or("1337".into()) .parse::() .unwrap(); + // in the b2b use-case, this is our "own" port, we connect to the "normal" broker node on startup. + let b2b_port: u16 = std::env::args() + .nth(3) + .unwrap_or("4242".into()) + .parse::() + .unwrap(); println!("Launching in mode {} on port {}", mode, port); match mode.as_str() { "broker" => { let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap(); - broker - .launch_listener(llmp::Listener::Tcp( - std::net::TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap(), - )) - .unwrap(); + broker.launch_tcp_listener_on(port).unwrap(); + broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5))) + } + "b2b" => { + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap(); + broker.launch_tcp_listener_on(b2b_port).unwrap(); + // connect back to the main broker. + broker.connect_b2b(("127.0.0.1", port)).unwrap(); broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5))) } "ctr" => { diff --git a/libafl/src/bolts/compress.rs b/libafl/src/bolts/compress.rs index 1da1f84cb8..9b34cc682d 100644 --- a/libafl/src/bolts/compress.rs +++ b/libafl/src/bolts/compress.rs @@ -42,6 +42,7 @@ impl GzipCompressor { /// Decompression. /// Flag is used to indicate if it's compressed or not + #[allow(clippy::unused_self)] pub fn decompress(&self, buf: &[u8]) -> Result, Error> { Ok(buf .iter() diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 565f6c769c..439e212399 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -2,9 +2,9 @@ A library for low level message passing To send new messages, the clients place a new message at the end of their -client_out_map. If the ringbuf is filled up, they start place a +client_out_map. If the current map is filled up, they place a LLMP_AGE_END_OF_PAGE_V1 msg and alloc a new shmap. -Once the broker mapped a page, it flags it save for unmapping. +Once the broker mapped this same page, it flags it as safe for unmapping. ```text [client0] [client1] ... [clientN] @@ -41,7 +41,7 @@ current map. [client0] [client1] ... [clientN] ``` -In the future, if we need zero copy, the current_broadcast_map could instead +In the future, if we would need zero copy, the current_broadcast_map could instead list the client_out_map ID an offset for each message. In that case, the clients also need to create new shmaps once their bufs are filled up. @@ -50,11 +50,14 @@ To use, you will have to create a broker using llmp_broker_new(). Then register some clientloops using llmp_broker_register_threaded_clientloop (or launch them as seperate processes) and call llmp_broker_run(); +For broker2broker communication, all messages are forwarded via network sockets. + */ use alloc::{string::String, vec::Vec}; use core::{ cmp::max, + convert::TryFrom, fmt::Debug, mem::size_of, ptr, slice, @@ -64,9 +67,11 @@ use core::{ use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use std::{ + convert::TryInto, env, io::{Read, Write}, - net::{SocketAddr, TcpListener, TcpStream}, + net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, + sync::mpsc::channel, thread, }; @@ -104,8 +109,23 @@ const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471; /// The sender on this map is exiting (if broker exits, clients should exit gracefully); const LLMP_TAG_EXITING: Tag = 0x13C5171; -pub const LLMP_FLAG_INITIALIZED: Flag = 0x0; -pub const LLMP_FLAG_COMPRESSED: Flag = 0x1; +/// Unused... +pub const LLMP_FLAG_INITIALIZED: Flags = 0x0; +/// This message was compressed in transit +pub const LLMP_FLAG_COMPRESSED: Flags = 0x1; +/// From another broker. +pub const LLMP_FLAG_FROM_B2B: Flags = 0x2; + +/// Timt the broker 2 broker connection waits for incoming data, +/// before checking for own data to forward again. +const _LLMP_B2B_BLOCK_TIME: Duration = Duration::from_millis(3_000); + +/// If broker2broker is enabled, bind to public IP +#[cfg(feature = "llmp_bind_public")] +const _LLMP_BIND_ADDR: &str = "0.0.0.0"; +/// If broker2broker is disabled, bind to localhost +#[cfg(not(feature = "llmp_bind_public"))] +const _LLMP_BIND_ADDR: &str = "127.0.0.1"; /// An env var of this value indicates that the set value was a NULL PTR const _NULL_ENV_STR: &str = "_NULL"; @@ -127,29 +147,89 @@ static mut GLOBAL_SIGHANDLER_STATE: LlmpBrokerSignalHandler = LlmpBrokerSignalHa /// TAGs used thorughout llmp pub type Tag = u32; -pub type Flag = u64; +/// The client ID == the sender id. +pub type ClientId = u32; +/// The broker ID, for broker 2 broker communication. +pub type BrokerId = u32; +/// The flags, indicating, for example, enabled compression. +pub type Flags = u32; +/// The message ID, an ever-increasing number, unique only to a sharedmap/page. +pub type MessageId = u64; /// This is for the server the broker will spawn. /// If an llmp connection is local - use sharedmaps /// or remote (broker2broker) - forwarded via tcp +#[derive(Serialize, Deserialize, Debug, Clone)] pub enum TcpRequest { - LocalClientHello { shmem: ShMemDescription }, - RemoteBrokerHello, - RemoteNewMessage { tag: Tag, payload: Vec }, + /// We would like to be a local client. + LocalClientHello { shmem_description: ShMemDescription }, + /// We would like to establish a b2b connection. + RemoteBrokerHello { hostname: String }, +} + +impl TryFrom<&Vec> for TcpRequest { + type Error = crate::Error; + + fn try_from(bytes: &Vec) -> Result { + Ok(postcard::from_bytes(bytes)?) + } +} + +/// Messages for broker 2 broker connection. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TcpRemoteNewMessage { + // The client ID of the original broker + client_id: ClientId, + // The message tag + tag: Tag, + // The flags + flags: Flags, + // The actual content of the message + payload: Vec, +} + +impl TryFrom<&Vec> for TcpRemoteNewMessage { + type Error = crate::Error; + + fn try_from(bytes: &Vec) -> Result { + Ok(postcard::from_bytes(bytes)?) + } } /// Responses for requests to the server. +#[derive(Serialize, Deserialize, Debug, Clone)] pub enum TcpResponse { + /// After receiving a new connection, the broker immediately sends a Hello. + BrokerConnectHello { + /// The broker page a new local client can listen on + broker_map_description: ShMemDescription, + /// This broker's hostname + hostname: String, + }, LocalClientAccepted { - client_id: u32, - shmem: ShMemDescription, + /// The ClientId this client should send messages as + /// Mainly used for client-side deduplication of incoming messages + client_id: ClientId, }, RemoteBrokerAccepted { - broker_id: u32, - hostname: String, + /// The broker id of this element + broker_id: BrokerId, + }, + /// Something went wrong when processing the request. + Error { + /// Error description + description: String, }, } +impl TryFrom<&Vec> for TcpResponse { + type Error = crate::Error; + + fn try_from(bytes: &Vec) -> Result { + Ok(postcard::from_bytes(bytes)?) + } +} + /// Abstraction for listeners #[cfg(feature = "std")] pub enum Listener { @@ -226,6 +306,59 @@ fn msg_offset_from_env(env_name: &str) -> Result, Error> { }) } +/// Send one message as `u32` len and `[u8;len]` bytes +#[cfg(feature = "std")] +fn send_tcp_msg(stream: &mut TcpStream, msg: &T) -> Result<(), Error> +where + T: Serialize, +{ + let msg = postcard::to_allocvec(msg)?; + if msg.len() > u32::MAX as usize { + return Err(Error::IllegalState(format!( + "Trying to send message a tcp message > u32! (size: {})", + msg.len() + ))); + } + + #[cfg(feature = "llmp_debug")] + println!("LLMP TCP: Sending {} bytes", msg.len()); + + let size_bytes = (msg.len() as u32).to_be_bytes(); + stream.write_all(&size_bytes)?; + stream.write_all(&msg)?; + + #[cfg(feature = "llmp_debug")] + println!("LLMP TCP: Sending {} bytes finished.", msg.len()); + + Ok(()) +} + +/// Receive one message of `u32` len and `[u8; len]` bytes +#[cfg(feature = "std")] +fn recv_tcp_msg(stream: &mut TcpStream) -> Result, Error> { + // Always receive one be u32 of size, then the command. + + #[cfg(feature = "llmp_debug")] + println!( + "LLMP TCP: Waiting for packet... (Timeout: {:?})", + stream.read_timeout().unwrap_or(None) + ); + + let mut size_bytes = [0u8; 4]; + stream.read_exact(&mut size_bytes)?; + let size = u32::from_be_bytes(size_bytes); + let mut bytes = vec![]; + bytes.resize(size as usize, 0u8); + + #[cfg(feature = "llmp_debug")] + println!("LLMP TCP: Receiving payload of size {}", size); + + stream + .read_exact(&mut bytes) + .expect("Failed to read message body"); + Ok(bytes) +} + /// In case we don't have enough space, make sure the next page will be large /// enough. For now, we want to have at least enough space to store 2 of the /// largest messages we encountered (plus message one new_page message). @@ -324,18 +457,22 @@ pub enum LlmpMsgHookResult { #[repr(C, packed)] pub struct LlmpMsg { /// A tag - pub tag: Tag, + pub tag: Tag, //u32 /// Sender of this messge - pub sender: u32, + pub sender: ClientId, //u32 + /// ID of another Broker, for b2b messages + pub broker: BrokerId, //u32 /// flags, currently only used for indicating compression - pub flags: Flag, + pub flags: Flags, //u32 /// The message ID, unique per page - pub message_id: u64, + pub message_id: MessageId, //u64 /// Buffer length as specified by the user pub buf_len: u64, /// (Actual) buffer length after padding + // Padding makes sure the next msg is aligned. pub buf_len_padded: u64, - /// The buf + /// The actual payload buf + // We try to keep the start of buf 64-bit aligned! pub buf: [u8; 0], } @@ -399,7 +536,7 @@ where #[cfg(feature = "std")] /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. pub fn on_port(shmem_provider: SP, port: u16) -> Result { - match TcpListener::bind(format!("127.0.0.1:{}", port)) { + match TcpListener::bind(format!("{}:{}", _LLMP_BIND_ADDR, port)) { Ok(listener) => { // We got the port. We are the broker! :) dbg!("We're the broker"); @@ -449,7 +586,7 @@ where } } - pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flag) -> Result<(), Error> { + pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> { match self { LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf), LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf), @@ -700,7 +837,7 @@ where #[cfg(all(feature = "llmp_debug", feature = "std"))] dbg!( page, - (*page), + *page, (*page).size_used, complete_msg_size, EOP_MSG_SIZE, @@ -918,7 +1055,7 @@ where } } - pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { + pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> { // Make sure we don't reuse already allocated tags if tag == LLMP_TAG_NEW_SHM_CLIENT || tag == LLMP_TAG_END_OF_PAGE @@ -1159,8 +1296,9 @@ where } } + /// Receive the buffer, also reading the LLMP internal message flags #[inline] - pub fn recv_buf_with_flags(&mut self) -> Result, Error> { + pub fn recv_buf_with_flags(&mut self) -> Result, Error> { unsafe { Ok(match self.recv()? { Some(msg) => Some(( @@ -1176,7 +1314,7 @@ where /// Returns the next sender, tag, buf, looping until it becomes available #[inline] - pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> { + pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> { unsafe { let msg = self.recv_blocking()?; Ok(( @@ -1233,7 +1371,7 @@ where SHM: ShMem, { /// Creates a new page, initializing the passed shared mem struct - pub fn new(sender: u32, mut new_map: SHM) -> Self { + pub fn new(sender: ClientId, mut new_map: SHM) -> Self { #[cfg(all(feature = "llmp_debug", feature = "std"))] println!( "LLMP_DEBUG: Initializing map on {} with size {}", @@ -1403,7 +1541,7 @@ impl Handler for LlmpBrokerSignalHandler { /// It may intercept messages passing through. impl LlmpBroker where - SP: ShMemProvider, + SP: ShMemProvider + 'static, { /// Create and initialize a new llmp_broker pub fn new(mut shmem_provider: SP) -> Result { @@ -1447,6 +1585,69 @@ where }); } + /// Connects to a broker running on another machine. + /// This will spawn a new background thread, registered as client, that proxies all messages to a remote machine. + /// Returns the description of the new page that still needs to be announced/added to the broker afterwards. + #[cfg(feature = "std")] + pub fn connect_b2b(&mut self, addr: A) -> Result<(), Error> + where + A: ToSocketAddrs, + { + let mut stream = TcpStream::connect(addr)?; + println!("B2B: Connected to {:?}", stream); + + match (&recv_tcp_msg(&mut stream)?).try_into()? { + TcpResponse::BrokerConnectHello { + broker_map_description: _, + hostname, + } => println!("B2B: Connected to {}", hostname), + _ => { + return Err(Error::IllegalState( + "Unexpected response from B2B server received.".to_string(), + )) + } + }; + + let hostname = hostname::get() + .unwrap_or_else(|_| "".into()) + .to_string_lossy() + .into(); + + send_tcp_msg(&mut stream, &TcpRequest::RemoteBrokerHello { hostname })?; + + let broker_id = match (&recv_tcp_msg(&mut stream)?).try_into()? { + TcpResponse::RemoteBrokerAccepted { broker_id } => { + println!("B2B: Got Connection Ack, broker_id {}", broker_id); + broker_id + } + _ => { + return Err(Error::IllegalState( + "Unexpected response from B2B server received.".to_string(), + )); + } + }; + + // TODO: use broker ids! + println!("B2B: We are broker {}", broker_id); + + // TODO: handle broker_ids properly/at all. + let map_description = Self::b2b_thread_on( + stream, + &self.shmem_provider, + self.llmp_clients.len() as ClientId, + &self.llmp_out.out_maps.first().unwrap().shmem.description(), + )?; + + let new_map = + LlmpSharedMap::existing(self.shmem_provider.from_description(map_description)?); + + { + self.register_client(new_map); + } + + Ok(()) + } + /// For internal use: Forward the current message to the out map. unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?; @@ -1471,7 +1672,7 @@ where #[inline] pub fn once(&mut self, on_new_msg: &mut F) -> Result<(), Error> where - F: FnMut(u32, Tag, Flag, &[u8]) -> Result, + F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { compiler_fence(Ordering::SeqCst); for i in 0..self.llmp_clients.len() { @@ -1502,7 +1703,7 @@ where /// 5 millis of sleep can't hurt to keep busywait not at 100% pub fn loop_forever(&mut self, on_new_msg: &mut F, sleep_time: Option) where - F: FnMut(u32, Tag, Flag, &[u8]) -> Result, + F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { #[cfg(unix)] if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } { @@ -1539,20 +1740,221 @@ where self.llmp_out.send_buf(tag, buf) } - pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { + pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> { self.llmp_out.send_buf_with_flags(tag, flags, buf) } - #[cfg(feature = "std")] /// Launches a thread using a tcp listener socket, on which new clients may connect to this broker /// Does so on the given port. + #[cfg(feature = "std")] pub fn launch_tcp_listener_on(&mut self, port: u16) -> Result, Error> { - let listener = TcpListener::bind(format!("127.0.0.1:{}", port))?; + let listener = TcpListener::bind(format!("{}:{}", _LLMP_BIND_ADDR, port))?; // accept connections and process them, spawning a new thread for each one println!("Server listening on port {}", port); self.launch_listener(Listener::Tcp(listener)) } + /// Announces a new client on the given shared map. + /// Called from a background thread, typically. + /// Upon receiving this message, the broker should map the announced page and start trckang it for new messages. + #[allow(dead_code)] + fn announce_new_client( + sender: &mut LlmpSender, + shmem_description: &ShMemDescription, + ) -> Result<(), Error> { + unsafe { + let msg = sender + .alloc_next(size_of::()) + .expect("Could not allocate a new message in shared map."); + (*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + (*pageinfo).shm_str = *shmem_description.id.as_slice(); + (*pageinfo).map_size = shmem_description.size; + sender.send(msg) + } + } + + /// For broker to broker connections: + /// Launches a proxy thread. + /// It will read outgoing messages from the given broker map (and handle EOP by mapping a new page). + /// This function returns the ShMemDescription the client uses to place incoming messages. + /// The thread exits, when the remote broker disconnects. + #[cfg(feature = "std")] + fn b2b_thread_on( + mut stream: TcpStream, + shmem_provider: &SP, + b2b_client_id: ClientId, + broker_map_description: &ShMemDescription, + ) -> Result { + let broker_map_description = *broker_map_description; + let mut shmem_provider_clone = shmem_provider.clone(); + + // A channel to get the new "client's" sharedmap id from + let (send, recv) = channel(); + + // (For now) the thread remote broker 2 broker just acts like a "normal" llmp client, except it proxies all messages to the attached socket, in both directions. + thread::spawn(move || { + // as always, call post_fork to potentially reconnect the provider (for threaded/forked use) + shmem_provider_clone.post_fork(); + + #[cfg(fature = "llmp_debug")] + println!("B2b: Spawned proxy thread"); + + // The background thread blocks on the incoming connection for 15 seconds (if no data is available), then checks if it should forward own messages, then blocks some more. + stream + .set_read_timeout(Some(_LLMP_B2B_BLOCK_TIME)) + .expect("Failed to set tcp stream timeout"); + + let mut new_sender = + match LlmpSender::new(shmem_provider_clone.clone(), b2b_client_id, false) { + Ok(new_sender) => new_sender, + Err(e) => { + panic!("B2B: Could not map shared map: {}", e); + } + }; + + send.send(new_sender.out_maps.first().unwrap().shmem.description()) + .expect("B2B: Error sending map description to channel!"); + + // the receiver receives from the local broker, and forwards it to the tcp stream. + let mut local_receiver = LlmpReceiver::on_existing_from_description( + shmem_provider_clone, + &LlmpDescription { + last_message_offset: None, + shmem: broker_map_description, + }, + ) + .expect("Failed to map local page in broker 2 broker thread!"); + + #[cfg(all(feature = "llmp_debug", feature = "std"))] + dbg!("B2B: Starting proxy loop :)"); + + loop { + // first, forward all data we have. + while let Some((client_id, tag, flags, payload)) = local_receiver + .recv_buf_with_flags() + .expect("Error reading from local page!") + { + if client_id == b2b_client_id { + dbg!("Ignored message we probably sent earlier (same id)", tag); + continue; + } + + #[cfg(all(feature = "llmp_debug", feature = "std"))] + dbg!( + "Fowarding message via broker2broker connection", + payload.len() + ); + // We got a new message! Forward... + send_tcp_msg( + &mut stream, + &TcpRemoteNewMessage { + client_id, + tag, + flags, + payload: payload.to_vec(), + }, + ) + .expect("Error sending message via broker 2 broker"); + } + + // Then, see if we can receive something. + // We set a timeout on the receive earlier. + // This makes sure we will still forward our own stuff. + // Forwarding happens between each recv, too, as simplification. + // We ignore errors completely as they may be timeout, or stream closings. + // Instead, we catch stream close when/if we next try to send. + if let Ok(val) = recv_tcp_msg(&mut stream) { + let msg: TcpRemoteNewMessage = (&val).try_into().expect( + "Illegal message received from broker 2 broker connection - shutting down.", + ); + + #[cfg(all(feature = "llmp_debug", feature = "std"))] + dbg!( + "Fowarding incoming message from broker2broker connection", + msg.payload.len() + ); + + // TODO: Could probably optimize this somehow to forward all queued messages between locks... oh well. + // Todo: somehow mangle in the other broker id? ClientId? + new_sender + .send_buf_with_flags(msg.tag, msg.flags | LLMP_FLAG_FROM_B2B, &msg.payload) + .expect("B2B: Error forwarding message. Exiting."); + } else { + #[cfg(all(feature = "llmp_debug", feature = "std"))] + dbg!("Received no input, timeout or closed. Looping back up :)"); + } + } + }); + + let ret = recv.recv().map_err(|_| { + Error::Unknown("Error launching background thread for b2b communcation".to_string()) + }); + + #[cfg(all(feature = "llmp_debug", feature = "std"))] + dbg!("B2B: returning from loop. Success: {}", ret.is_ok()); + + ret + } + + /// handles a single tcp request in the current context. + #[cfg(feature = "std")] + fn handle_tcp_request( + mut stream: TcpStream, + request: &TcpRequest, + current_client_id: &mut u32, + sender: &mut LlmpSender, + shmem_provider: &SP, + broker_map_description: &ShMemDescription, + ) { + match request { + TcpRequest::LocalClientHello { shmem_description } => { + match Self::announce_new_client(sender, shmem_description) { + Ok(()) => (), + Err(e) => println!("Error forwarding client on map: {:?}", e), + }; + + if let Err(e) = send_tcp_msg( + &mut stream, + &TcpResponse::LocalClientAccepted { + client_id: *current_client_id, + }, + ) { + println!("An error occurred sending via tcp {}", e); + }; + *current_client_id += 1; + } + TcpRequest::RemoteBrokerHello { hostname } => { + println!("B2B new client: {}", hostname); + + // TODO: Clean up broker ids. + if send_tcp_msg( + &mut stream, + &TcpResponse::RemoteBrokerAccepted { + broker_id: *current_client_id, + }, + ) + .is_err() + { + println!("Error accepting broker, ignoring."); + return; + } + + if let Ok(shmem_description) = Self::b2b_thread_on( + stream, + shmem_provider, + *current_client_id, + &broker_map_description, + ) { + if Self::announce_new_client(sender, &shmem_description).is_err() { + println!("B2B: Error announcing client {:?}", shmem_description); + }; + *current_client_id += 1; + } + } + }; + } + #[cfg(feature = "std")] /// Launches a thread using a listener socket, on which new clients may connect to this broker pub fn launch_listener(&mut self, listener: Listener) -> Result, Error> { @@ -1562,37 +1964,43 @@ where // to read from the initial map id. let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem; - let broadcast_map_description = postcard::to_allocvec(&client_out_map_mem.description())?; - - let mut incoming_map_description_serialized = vec![0u8; broadcast_map_description.len()]; + let broker_map_description = client_out_map_mem.description(); + let hostname = hostname::get() + .unwrap_or_else(|_| "".into()) + .to_string_lossy() + .into(); + let broker_hello = TcpResponse::BrokerConnectHello { + broker_map_description, + hostname, + }; - let llmp_tcp_id = self.llmp_clients.len() as u32; + let llmp_tcp_id = self.llmp_clients.len() as ClientId; // Tcp out map sends messages from background thread tcp server to foreground client let tcp_out_map = LlmpSharedMap::new( llmp_tcp_id, self.shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?, ); - let shmem_id = tcp_out_map.shmem.id(); - let tcp_out_map_str = *shmem_id.as_slice(); - let tcp_out_map_size = tcp_out_map.shmem.len(); + let tcp_out_map_description = tcp_out_map.shmem.description(); self.register_client(tcp_out_map); let mut shmem_provider_clone = self.shmem_provider.clone(); Ok(thread::spawn(move || { + // Call `post_fork` (even though this is not forked) so we get a new connection to the cloned `ShMemServer` if we are using a `ServedShMemProvider` shmem_provider_clone.post_fork(); - // Clone so we get a new connection to the AshmemServer if we are using - // ServedShMemProvider - let mut new_client_sender = LlmpSender { - id: 0, + + let mut current_client_id = llmp_tcp_id + 1; + + let mut tcp_incoming_sender = LlmpSender { + id: llmp_tcp_id, last_msg_sent: ptr::null_mut(), out_maps: vec![LlmpSharedMap::existing( shmem_provider_clone - .from_id_and_size(ShMemId::from_slice(&tcp_out_map_str), tcp_out_map_size) + .from_description(tcp_out_map_description) .unwrap(), )], - // drop pages to the broker if it already read them + // drop pages to the broker, if it already read them. keep_pages_forever: false, shmem_provider: shmem_provider_clone.clone(), }; @@ -1601,38 +2009,40 @@ where match listener.accept() { ListenerStream::Tcp(mut stream, addr) => { dbg!("New connection", addr, stream.peer_addr().unwrap()); - match stream.write(&broadcast_map_description) { - Ok(_) => {} // fire & forget + + // Send initial information, without anyone asking. + // This makes it a tiny bit easier to map the broker map for new Clients. + match send_tcp_msg(&mut stream, &broker_hello) { + Ok(()) => {} Err(e) => { - dbg!("Could not send to shmap to client", e); + dbg!("Error sending initial hello: {:?}", e); continue; } - }; - match stream.read_exact(&mut incoming_map_description_serialized) { - Ok(()) => (), + } + + let buf = match recv_tcp_msg(&mut stream) { + Ok(buf) => buf, Err(e) => { - dbg!("Ignoring failed read from client", e); + dbg!("Error receving from tcp", e); continue; } }; - if let Ok(incoming_map_description) = postcard::from_bytes::( - &incoming_map_description_serialized, - ) { - unsafe { - let msg = new_client_sender - .alloc_next(size_of::()) - .expect("Could not allocate a new message in shared map."); - (*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; - let pageinfo = - (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - (*pageinfo).shm_str = *incoming_map_description.id.as_slice(); - (*pageinfo).map_size = incoming_map_description.size; - match new_client_sender.send(msg) { - Ok(()) => (), - Err(e) => println!("Error forwarding client on map: {:?}", e), - }; + let req = match (&buf).try_into() { + Ok(req) => req, + Err(e) => { + dbg!("Could not deserialize tcp message", e); + continue; } - } + }; + + Self::handle_tcp_request( + stream, + &req, + &mut current_client_id, + &mut tcp_incoming_sender, + &shmem_provider_clone, + &broker_map_description, + ); } ListenerStream::Empty() => { continue; @@ -1646,7 +2056,7 @@ where #[inline] unsafe fn handle_new_msgs(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error> where - F: FnMut(u32, Tag, Flag, &[u8]) -> Result, + F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { let mut next_id = self.llmp_clients.len() as u32; @@ -1880,7 +2290,7 @@ where self.sender.send_buf(tag, buf) } - pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { + pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> { self.sender.send_buf_with_flags(tag, flags, buf) } @@ -1943,7 +2353,7 @@ where self.receiver.recv_buf_blocking() } - pub fn recv_buf_with_flags(&mut self) -> Result, Error> { + pub fn recv_buf_with_flags(&mut self) -> Result, Error> { self.receiver.recv_buf_with_flags() } @@ -1957,26 +2367,48 @@ where #[cfg(feature = "std")] /// Create a LlmpClient, getting the ID from a given port pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result { - let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; + let mut stream = TcpStream::connect(format!("{}:{}", _LLMP_BIND_ADDR, port))?; println!("Connected to port {}", port); - // First, get the serialized description size by serializing a dummy. - let dummy_description = ShMemDescription { - size: 0, - id: ShMemId::default(), + let broker_map_description = if let TcpResponse::BrokerConnectHello { + broker_map_description, + hostname: _, + } = (&recv_tcp_msg(&mut stream)?).try_into()? + { + broker_map_description + } else { + return Err(Error::IllegalState( + "Received unexpected Broker Hello".to_string(), + )); }; - let mut new_broker_map_str = postcard::to_allocvec(&dummy_description)?; - stream.read_exact(&mut new_broker_map_str)?; + let map = LlmpSharedMap::existing(shmem_provider.from_description(broker_map_description)?); + let mut ret = Self::new(shmem_provider, map)?; - let broker_map_description: ShMemDescription = postcard::from_bytes(&new_broker_map_str)?; + let client_hello_req = TcpRequest::LocalClientHello { + shmem_description: ret.sender.out_maps.first().unwrap().shmem.description(), + }; - let map = LlmpSharedMap::existing(shmem_provider.from_description(broker_map_description)?); - let ret = Self::new(shmem_provider, map)?; + send_tcp_msg(&mut stream, &client_hello_req)?; + + let client_id = if let TcpResponse::LocalClientAccepted { client_id } = + (&recv_tcp_msg(&mut stream)?).try_into()? + { + client_id + } else { + return Err(Error::IllegalState( + "Unexpected Response from Broker".to_string(), + )); + }; + + // Set our ID to the one the broker sent us.. + // This is mainly so we can filter out our own msgs later. + ret.sender.id = client_id; + // Also set the sender on our initial llmp map correctly. + unsafe { + (*ret.sender.out_maps.first_mut().unwrap().page_mut()).sender = client_id; + } - let own_map_description_bytes = - postcard::to_allocvec(&ret.sender.out_maps.first().unwrap().shmem.description())?; - stream.write_all(&own_map_description_bytes)?; Ok(ret) } } diff --git a/libafl/src/bolts/shmem.rs b/libafl/src/bolts/shmem.rs index 2b64dfc04f..9640e60212 100644 --- a/libafl/src/bolts/shmem.rs +++ b/libafl/src/bolts/shmem.rs @@ -171,8 +171,8 @@ pub trait ShMemProvider: Send + Clone + Default + Debug { )) } - /// This method should be called after a fork or thread creation event, allowing the ShMem to - /// reset thread specific info. + /// This method should be called after a fork or after cloning/a thread creation event, allowing the ShMem to + /// reset thread specific info, and potentially reconnect. fn post_fork(&mut self) { // do nothing } @@ -183,6 +183,9 @@ pub trait ShMemProvider: Send + Clone + Default + Debug { } } +/// A Refernce Counted shared map, +/// that can use internal mutability. +/// Useful if the `ShMemProvider` needs to keep local state. #[derive(Debug, Clone)] pub struct RcShMem { internal: ManuallyDrop, @@ -216,6 +219,9 @@ impl Drop for RcShMem { } } +/// A Refernce Counted `ShMemProvider`, +/// that can use internal mutability. +/// Useful if the `ShMemProvider` needs to keep local state. #[derive(Debug, Clone)] pub struct RcShMemProvider { internal: Rc>, @@ -274,6 +280,11 @@ where } } +/// A Unix sharedmem implementation. +/// +/// On Android, this is partially reused to wrap `Ashmem`, +/// Although for an `AshmemShMemProvider using a unix domain socket +/// Is needed on top. #[cfg(all(unix, feature = "std"))] pub mod unix_shmem { diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 90fd00f864..865f02ebda 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -15,7 +15,7 @@ use crate::bolts::{ use crate::{ bolts::{ - llmp::{self, Flag, LlmpClientDescription, LlmpSender, Tag}, + llmp::{self, Flags, LlmpClientDescription, LlmpSender, Tag}, shmem::ShMemProvider, }, corpus::CorpusScheduler, @@ -173,7 +173,7 @@ where #[cfg(feature = "llmp_compression")] let compressor = &self.compressor; broker.loop_forever( - &mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| { + &mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { if tag == LLMP_TAG_EVENT_TO_BOTH { #[cfg(not(feature = "llmp_compression"))] let event_bytes = msg; @@ -376,7 +376,7 @@ where #[cfg(feature = "llmp_compression")] fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; - let flags: Flag = LLMP_FLAG_INITIALIZED; + let flags: Flags = LLMP_FLAG_INITIALIZED; match self.compressor.compress(&serialized)? { Some(comp_buf) => {