Skip to content

Commit

Permalink
Reimplement send_with_reply() using DBusPendingCall
Browse files Browse the repository at this point in the history
  • Loading branch information
albel727 committed Jun 11, 2017
1 parent 63e453c commit 1dfee19
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
74 changes: 52 additions & 22 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ extern "C" fn object_path_message_cb(conn: *mut ffi::DBusConnection, msg: *mut f
ffi::DBusHandlerResult::Handled
}

extern "C" fn pending_call_cb<F: FnOnce(Message)>(pending: *mut ffi::DBusPendingCall, user_data: *mut c_void) {
let message = unsafe { ffi::dbus_pending_call_steal_reply(pending) };
assert!(!message.is_null());
let message = super::message::message_from_ptr(message, false);

let user_closure: *mut Option<Box<F>> = user_data as *mut Option<Box<F>>;
let user_closure = unsafe { (*user_closure).take().unwrap() };
(*user_closure)(message);
}

extern "C" fn pending_call_data_free_cb<F: FnOnce(Message)>(user_data: *mut c_void) {
let user_closure: *mut Option<Box<F>> = user_data as *mut Option<Box<F>>;
let user_closure = unsafe { Box::from_raw(user_closure) };
drop(user_closure)
}

impl Connection {

#[inline(always)]
Expand Down Expand Up @@ -204,10 +220,39 @@ impl Connection {
Ok(serial)
}

/// Sends a message over the D-Bus. The resulting handler can be added to a connectionitem handler.
pub fn send_with_reply<'a, F: FnOnce(&Message) + 'a>(&self, msg: Message, f: F) -> MessageReply<F> {
let serial = self.send(msg).unwrap();
MessageReply(Some(f), serial)
/// Sends a message over the D-Bus without waiting, but calls the given closure when the reply is received.
pub fn send_with_reply<'a, F: FnOnce(Message) + 'a>(&self, mut msg: Message, f: F) -> Result<(),()> {
// Ensure allocation of a fresh serial, so that callbacks work as expected.
super::message::message_set_serial(&mut msg, 0);

let mut pc: *mut ffi::DBusPendingCall = ::std::ptr::null_mut();
let r = unsafe {
ffi::dbus_connection_send_with_reply(
self.conn(),
super::message::get_message_ptr(&msg),
&mut pc,
ffi::DBUS_TIMEOUT_INFINITE
)
};
if pc.is_null() { return Err(()); }
let pc = super::pending::pending_call_from_ptr(pc, false);
if r == 0 { return Err(()); }

let callback: Box<Option<Box<F>>> = Box::new(Some(Box::new(f)));
let callback = Box::into_raw(callback) as *mut c_void;
let r = unsafe {
ffi::dbus_pending_call_set_notify(
super::pending::get_pending_call_ptr(&pc),
Some(pending_call_cb::<F>),
callback,
Some(pending_call_data_free_cb::<F>)
)
};
if r == 0 {
drop(unsafe { Box::from_raw(callback) });
return Err(());
}
Ok(())
}

/// Get the connection's unique name.
Expand Down Expand Up @@ -377,34 +422,19 @@ pub struct MsgHandlerResult {
pub reply: Vec<Message>,
}

pub struct MessageReply<F>(Option<F>, u32);

impl<'a, F: FnOnce(&Message) + 'a> MsgHandler for MessageReply<F> {
fn handle_ci(&mut self, ci: &ConnectionItem) -> Option<MsgHandlerResult> {
if let ConnectionItem::MethodReturn(ref msg) = *ci {
if msg.get_reply_serial() == Some(self.1) {
self.0.take().unwrap()(msg);
return Some(MsgHandlerResult { handled: true, done: true, reply: Vec::new() })
}
}
None
}
}


#[test]
fn message_reply() {
use std::{cell, rc};
let c = Connection::get_private(BusType::Session).unwrap();
let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
let quit = rc::Rc::new(cell::Cell::new(false));
let quit2 = quit.clone();
let reply = c.send_with_reply(m, move |result| {
c.send_with_reply(m, move |result| {
let r = result;
let _: ::arg::Array<&str, _> = r.get1().unwrap();
quit2.set(true);
});
for _ in c.iter(1000).with(reply) { if quit.get() { return; } }
}).unwrap();
for _ in c.iter(1000) { if quit.get() { return; } }
assert!(false);
}

2 changes: 0 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,6 @@ impl<'a, C: ::std::ops::Deref<Target=Connection>> ConnPath<'a, C> {
}
}

// For purpose of testing the library only.
#[cfg(test)]
pub fn message_set_serial(m: &mut Message, s: u32) {
unsafe { ffi::dbus_message_set_serial(m.msg, s) };
}
Expand Down

0 comments on commit 1dfee19

Please sign in to comment.