Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_express to put/delete #508

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,15 @@ typedef struct {
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when routed.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
z_timestamp_t *timestamp;
_Bool is_express;
z_owned_bytes_t *attachment;
} z_put_options_t;

Expand All @@ -273,11 +275,13 @@ typedef struct {
* Members:
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when router.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
z_timestamp_t *timestamp;
} z_delete_options_t;

Expand All @@ -287,11 +291,13 @@ typedef struct {
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
_Bool is_express;
z_timestamp_t *timestamp;
z_owned_bytes_t *attachment;
} z_publisher_put_options_t;
Expand All @@ -301,9 +307,11 @@ typedef struct {
* sent via :c:func:`z_publisher_delete`.
*
* Members:
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
_Bool is_express;
z_timestamp_t *timestamp;
} z_publisher_delete_options_t;

Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* kind: The kind of the value.
* cong_ctrl: The congestion control of this write. Possible values defined
* in :c:type:`_z_congestion_control_t`.
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created).
* attachment: An optional attachment to this write.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand Down
20 changes: 15 additions & 5 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,12 +919,14 @@ void z_put_options_default(z_put_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->encoding = NULL;
options->is_express = false;
options->timestamp = NULL;
options->attachment = NULL;
}

void z_delete_options_default(z_delete_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->is_express = false;
options->timestamp = NULL;
options->priority = Z_PRIORITY_DEFAULT;
}
Expand All @@ -939,13 +941,14 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
opt.congestion_control = options->congestion_control;
opt.encoding = options->encoding;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}

ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority,
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));

// Trigger local subscriptions
_z_trigger_local_subscriptions(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload),
Expand All @@ -966,10 +969,11 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
}
ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
opt.congestion_control, opt.priority, opt.timestamp, _z_bytes_null());
opt.congestion_control, opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());

return ret;
}
Expand Down Expand Up @@ -1020,10 +1024,14 @@ int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { return _z_undeclare_and
void z_publisher_put_options_default(z_publisher_put_options_t *options) {
options->encoding = NULL;
options->attachment = NULL;
options->is_express = false;
options->timestamp = NULL;
}

void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->timestamp = NULL; }
void z_publisher_delete_options_default(z_publisher_delete_options_t *options) {
options->is_express = false;
options->timestamp = NULL;
}

int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload,
const z_publisher_put_options_t *options) {
Expand All @@ -1033,6 +1041,7 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
z_publisher_put_options_default(&opt);
if (options != NULL) {
opt.encoding = options->encoding;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}
Expand All @@ -1041,7 +1050,7 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
// Write value
ret = _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, pub->_congestion_control,
pub->_priority, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
pub->_priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub->_zn.in->val, pub->_key, _z_bytes_from_owned_bytes(payload), _Z_N_QOS_DEFAULT,
Expand All @@ -1058,10 +1067,11 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
z_publisher_delete_options_t opt;
z_publisher_delete_options_default(&opt);
if (options != NULL) {
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
}
return _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, opt.timestamp, _z_bytes_null());
pub->_congestion_control, pub->_priority, opt.is_express, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
Expand Down
6 changes: 3 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;
_z_network_message_t msg;
switch (kind) {
Expand All @@ -139,7 +139,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._body._push =
{
._key = keyexpr,
._qos = _z_n_qos_make(0, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._timestamp = _z_timestamp_null(),
._body._is_put = true,
._body._body._put =
Expand All @@ -159,7 +159,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._body._push =
{
._key = keyexpr,
._qos = _z_n_qos_make(0, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._timestamp = _z_timestamp_null(),
._body._is_put = false,
._body._body._del = {._commons = {._timestamp =
Expand Down
Loading