From ad3ed388b678b84c359d93f31b5a3e3f32032fa6 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Mon, 10 Jun 2024 17:20:59 +0200 Subject: [PATCH] Add reply_del support --- include/zenoh_commons.h | 52 ++++++++++++++++++++++++++ src/queryable.rs | 83 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index a734488b8..e8b899a47 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -622,6 +622,36 @@ typedef struct z_query_reply_options_t { */ struct z_owned_bytes_t *attachment; } z_query_reply_options_t; +/** + * Represents the set of options that can be applied to a query delete reply, + * sent via `z_query_reply_del()`. + */ +typedef struct z_query_reply_del_options_t { + /** + * The congestion control to apply when routing the reply. + */ + enum z_congestion_control_t congestion_control; + /** + * The priority of the reply. + */ + enum z_priority_t priority; + /** + * If true, Zenoh will not wait to batch this operation with others to reduce the bandwith. + */ + bool is_express; + /** + * The timestamp of the reply. + */ + struct z_timestamp_t *timestamp; + /** + * The source info for the reply. + */ + struct z_owned_source_info_t *source_info; + /** + * The attachment to this reply. + */ + struct z_owned_bytes_t *attachment; +} z_query_reply_del_options_t; /** * Represents the set of options that can be applied to a query reply error, * sent via `z_query_reply_err()`. @@ -2389,6 +2419,28 @@ z_error_t z_query_reply(const struct z_loaned_query_t *this_, const struct z_loaned_keyexpr_t *key_expr, struct z_owned_bytes_t *payload, struct z_query_reply_options_t *options); +/** + * Sends a delete reply to a query. + * + * This function must be called inside of a Queryable callback passing the + * query received as parameters of the callback function. This function can + * be called multiple times to send multiple replies to a query. The reply + * will be considered complete when the Queryable callback returns. + * + * @param this: The query to reply to. + * @param key_expr: The key of this delete reply. + * @param options: The options of this delete reply. All owned fields will be consumed. + * + * @return 0 in case of success, negative error code otherwise. + */ +ZENOHC_API +z_error_t z_query_reply_del(const struct z_loaned_query_t *this_, + const struct z_loaned_keyexpr_t *key_expr, + struct z_query_reply_del_options_t *options); +/** + * Constructs the default value for `z_query_reply_del_options_t`. + */ +ZENOHC_API void z_query_reply_del_options_default(struct z_query_reply_del_options_t *this_); /** * Sends a error reply to a query. * diff --git a/src/queryable.rs b/src/queryable.rs index 48319d022..20f0cf10a 100644 --- a/src/queryable.rs +++ b/src/queryable.rs @@ -162,6 +162,39 @@ pub extern "C" fn z_query_reply_err_options_default(this: &mut z_query_reply_err }; } +/// Represents the set of options that can be applied to a query delete reply, +/// sent via `z_query_reply_del()`. +#[allow(non_camel_case_types)] +#[repr(C)] +pub struct z_query_reply_del_options_t { + /// The congestion control to apply when routing the reply. + pub congestion_control: z_congestion_control_t, + /// The priority of the reply. + pub priority: z_priority_t, + /// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith. + pub is_express: bool, + /// The timestamp of the reply. + pub timestamp: *mut z_timestamp_t, + /// The source info for the reply. + pub source_info: *mut z_owned_source_info_t, + /// The attachment to this reply. + pub attachment: *mut z_owned_bytes_t, +} + +/// Constructs the default value for `z_query_reply_del_options_t`. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn z_query_reply_del_options_default(this: &mut z_query_reply_del_options_t) { + *this = z_query_reply_del_options_t { + congestion_control: CongestionControl::default().into(), + priority: Priority::default().into(), + is_express: false, + timestamp: null_mut(), + source_info: null_mut(), + attachment: null_mut(), + }; +} + /// Constructs a Queryable for the given key expression. /// /// @param this_: An uninitialized memory location where queryable will be constructed. @@ -326,6 +359,56 @@ pub unsafe extern "C" fn z_query_reply_err( errors::Z_OK } +/// Sends a delete reply to a query. +/// +/// This function must be called inside of a Queryable callback passing the +/// query received as parameters of the callback function. This function can +/// be called multiple times to send multiple replies to a query. The reply +/// will be considered complete when the Queryable callback returns. +/// +/// @param this: The query to reply to. +/// @param key_expr: The key of this delete reply. +/// @param options: The options of this delete reply. All owned fields will be consumed. +/// +/// @return 0 in case of success, negative error code otherwise. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub unsafe extern "C" fn z_query_reply_del( + this: &z_loaned_query_t, + key_expr: &z_loaned_keyexpr_t, + options: Option<&mut z_query_reply_del_options_t>, +) -> errors::z_error_t { + let query = this.transmute_ref(); + let key_expr = key_expr.transmute_ref(); + + let mut reply = query.reply_del(key_expr); + if let Some(options) = options { + if let Some(source_info) = unsafe { options.source_info.as_mut() } { + let source_info = source_info.transmute_mut().extract(); + reply = reply.source_info(source_info); + }; + if let Some(attachment) = unsafe { options.attachment.as_mut() } { + let attachment = attachment.transmute_mut().extract(); + reply = reply.attachment(attachment); + } + if !options.timestamp.is_null() { + let timestamp = *unsafe { options.timestamp.as_mut() } + .unwrap() + .transmute_ref(); + reply = reply.timestamp(Some(timestamp)); + } + reply = reply.priority(options.priority.into()); + reply = reply.congestion_control(options.congestion_control.into()); + reply = reply.express(options.is_express); + } + + if let Err(e) = reply.wait() { + log::error!("{}", e); + return errors::Z_EGENERIC; + } + errors::Z_OK +} + /// Gets query key expression. #[allow(clippy::missing_safety_doc)] #[no_mangle]