Skip to content

Commit

Permalink
Add is_express to put/delete (#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Jul 5, 2024
1 parent 23919e2 commit 9e73622
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
8 changes: 8 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,15 @@ typedef struct {
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when routed.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
z_timestamp_t *timestamp;
_Bool is_express;
z_owned_bytes_t *attachment;
} z_put_options_t;

Expand All @@ -273,11 +275,13 @@ typedef struct {
* Members:
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when router.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
z_timestamp_t *timestamp;
} z_delete_options_t;

Expand All @@ -287,11 +291,13 @@ typedef struct {
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
_Bool is_express;
z_timestamp_t *timestamp;
z_owned_bytes_t *attachment;
} z_publisher_put_options_t;
Expand All @@ -301,9 +307,11 @@ typedef struct {
* sent via :c:func:`z_publisher_delete`.
*
* Members:
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
_Bool is_express;
z_timestamp_t *timestamp;
} z_publisher_delete_options_t;

Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* kind: The kind of the value.
* cong_ctrl: The congestion control of this write. Possible values defined
* in :c:type:`_z_congestion_control_t`.
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created).
* attachment: An optional attachment to this write.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand Down
20 changes: 15 additions & 5 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,12 +919,14 @@ void z_put_options_default(z_put_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->encoding = NULL;
options->is_express = false;
options->timestamp = NULL;
options->attachment = NULL;
}

void z_delete_options_default(z_delete_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->is_express = false;
options->timestamp = NULL;
options->priority = Z_PRIORITY_DEFAULT;
}
Expand All @@ -939,13 +941,14 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
opt.congestion_control = options->congestion_control;
opt.encoding = options->encoding;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}

ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority,
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));

// Trigger local subscriptions
_z_trigger_local_subscriptions(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload),
Expand All @@ -966,10 +969,11 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
}
ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
opt.congestion_control, opt.priority, opt.timestamp, _z_bytes_null());
opt.congestion_control, opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());

return ret;
}
Expand Down Expand Up @@ -1020,10 +1024,14 @@ int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { return _z_undeclare_and
void z_publisher_put_options_default(z_publisher_put_options_t *options) {
options->encoding = NULL;
options->attachment = NULL;
options->is_express = false;
options->timestamp = NULL;
}

void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->timestamp = NULL; }
void z_publisher_delete_options_default(z_publisher_delete_options_t *options) {
options->is_express = false;
options->timestamp = NULL;
}

int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload,
const z_publisher_put_options_t *options) {
Expand All @@ -1033,6 +1041,7 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
z_publisher_put_options_default(&opt);
if (options != NULL) {
opt.encoding = options->encoding;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}
Expand All @@ -1041,7 +1050,7 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
// Write value
ret = _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, pub->_congestion_control,
pub->_priority, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
pub->_priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub->_zn.in->val, pub->_key, _z_bytes_from_owned_bytes(payload), _Z_N_QOS_DEFAULT,
Expand All @@ -1058,10 +1067,11 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
z_publisher_delete_options_t opt;
z_publisher_delete_options_default(&opt);
if (options != NULL) {
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
}
return _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, opt.timestamp, _z_bytes_null());
pub->_congestion_control, pub->_priority, opt.is_express, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
Expand Down
6 changes: 3 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;
_z_network_message_t msg;
switch (kind) {
Expand All @@ -139,7 +139,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._body._push =
{
._key = keyexpr,
._qos = _z_n_qos_make(0, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._timestamp = _z_timestamp_null(),
._body._is_put = true,
._body._body._put =
Expand All @@ -159,7 +159,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._body._push =
{
._key = keyexpr,
._qos = _z_n_qos_make(0, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._timestamp = _z_timestamp_null(),
._body._is_put = false,
._body._body._del = {._commons = {._timestamp =
Expand Down

0 comments on commit 9e73622

Please sign in to comment.