Skip to content

Commit

Permalink
Merge pull request #435 from ZettaScaleLabs/add_reply_del
Browse files Browse the repository at this point in the history
Add reply_del support
  • Loading branch information
milyin authored Jun 10, 2024
2 parents c11eed3 + ad3ed38 commit 3d95166
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
52 changes: 52 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,36 @@ typedef struct z_query_reply_options_t {
*/
struct z_owned_bytes_t *attachment;
} z_query_reply_options_t;
/**
* Represents the set of options that can be applied to a query delete reply,
* sent via `z_query_reply_del()`.
*/
typedef struct z_query_reply_del_options_t {
/**
* The congestion control to apply when routing the reply.
*/
enum z_congestion_control_t congestion_control;
/**
* The priority of the reply.
*/
enum z_priority_t priority;
/**
* If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
*/
bool is_express;
/**
* The timestamp of the reply.
*/
struct z_timestamp_t *timestamp;
/**
* The source info for the reply.
*/
struct z_owned_source_info_t *source_info;
/**
* The attachment to this reply.
*/
struct z_owned_bytes_t *attachment;
} z_query_reply_del_options_t;
/**
* Represents the set of options that can be applied to a query reply error,
* sent via `z_query_reply_err()`.
Expand Down Expand Up @@ -2389,6 +2419,28 @@ z_error_t z_query_reply(const struct z_loaned_query_t *this_,
const struct z_loaned_keyexpr_t *key_expr,
struct z_owned_bytes_t *payload,
struct z_query_reply_options_t *options);
/**
* Sends a delete reply to a query.
*
* This function must be called inside of a Queryable callback passing the
* query received as parameters of the callback function. This function can
* be called multiple times to send multiple replies to a query. The reply
* will be considered complete when the Queryable callback returns.
*
* @param this: The query to reply to.
* @param key_expr: The key of this delete reply.
* @param options: The options of this delete reply. All owned fields will be consumed.
*
* @return 0 in case of success, negative error code otherwise.
*/
ZENOHC_API
z_error_t z_query_reply_del(const struct z_loaned_query_t *this_,
const struct z_loaned_keyexpr_t *key_expr,
struct z_query_reply_del_options_t *options);
/**
* Constructs the default value for `z_query_reply_del_options_t`.
*/
ZENOHC_API void z_query_reply_del_options_default(struct z_query_reply_del_options_t *this_);
/**
* Sends a error reply to a query.
*
Expand Down
83 changes: 83 additions & 0 deletions src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,39 @@ pub extern "C" fn z_query_reply_err_options_default(this: &mut z_query_reply_err
};
}

/// Represents the set of options that can be applied to a query delete reply,
/// sent via `z_query_reply_del()`.
#[allow(non_camel_case_types)]
#[repr(C)]
pub struct z_query_reply_del_options_t {
/// The congestion control to apply when routing the reply.
pub congestion_control: z_congestion_control_t,
/// The priority of the reply.
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
pub is_express: bool,
/// The timestamp of the reply.
pub timestamp: *mut z_timestamp_t,
/// The source info for the reply.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to this reply.
pub attachment: *mut z_owned_bytes_t,
}

/// Constructs the default value for `z_query_reply_del_options_t`.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub extern "C" fn z_query_reply_del_options_default(this: &mut z_query_reply_del_options_t) {
*this = z_query_reply_del_options_t {
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
timestamp: null_mut(),
source_info: null_mut(),
attachment: null_mut(),
};
}

/// Constructs a Queryable for the given key expression.
///
/// @param this_: An uninitialized memory location where queryable will be constructed.
Expand Down Expand Up @@ -326,6 +359,56 @@ pub unsafe extern "C" fn z_query_reply_err(
errors::Z_OK
}

/// Sends a delete reply to a query.
///
/// This function must be called inside of a Queryable callback passing the
/// query received as parameters of the callback function. This function can
/// be called multiple times to send multiple replies to a query. The reply
/// will be considered complete when the Queryable callback returns.
///
/// @param this: The query to reply to.
/// @param key_expr: The key of this delete reply.
/// @param options: The options of this delete reply. All owned fields will be consumed.
///
/// @return 0 in case of success, negative error code otherwise.
#[allow(clippy::missing_safety_doc)]
#[no_mangle]
pub unsafe extern "C" fn z_query_reply_del(
this: &z_loaned_query_t,
key_expr: &z_loaned_keyexpr_t,
options: Option<&mut z_query_reply_del_options_t>,
) -> errors::z_error_t {
let query = this.transmute_ref();
let key_expr = key_expr.transmute_ref();

let mut reply = query.reply_del(key_expr);
if let Some(options) = options {
if let Some(source_info) = unsafe { options.source_info.as_mut() } {
let source_info = source_info.transmute_mut().extract();
reply = reply.source_info(source_info);
};
if let Some(attachment) = unsafe { options.attachment.as_mut() } {
let attachment = attachment.transmute_mut().extract();
reply = reply.attachment(attachment);
}
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
reply = reply.timestamp(Some(timestamp));
}
reply = reply.priority(options.priority.into());
reply = reply.congestion_control(options.congestion_control.into());
reply = reply.express(options.is_express);
}

if let Err(e) = reply.wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
errors::Z_OK
}

/// Gets query key expression.
#[allow(clippy::missing_safety_doc)]
#[no_mangle]
Expand Down

0 comments on commit 3d95166

Please sign in to comment.