Skip to content

Commit

Permalink
Update query, reply and error messages format (#372)
Browse files Browse the repository at this point in the history
* feat: new declare interest packet format

* refactor: use flag macros

* fix: encode correct flag value

* feat: update query messages

* fix: remove unitialized memory warning

* fix: remove codec test memory leak

* fix: set correct tag in codec test response

* feat: add sample kind and push body in replies

* feat: update error message format

* chore: code format
  • Loading branch information
jean-roland authored Mar 19, 2024
1 parent d47c33a commit 8311045
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 274 deletions.
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle);
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
*/
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload);
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
90 changes: 43 additions & 47 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,58 +48,25 @@
#define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header

// Flags:
// - T: Timestamp If T==1 then the timestamp if present
// - X: Reserved
// - E: Encoding If E==1 then the encoding is present
// - Z: Extension If Z==1 then at least one extension is present
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |Z|E|T| REPLY |
// |Z|E|X| ERR |
// +-+-+-+---------+
// ~ ts: <u8;z16> ~ if T==1
// +---------------+
// ~ encoding ~ if E==1
// +---------------+
// ~ [repl_exts] ~ if Z==1
// +---------------+
// ~ pl: <u8;z32> ~ -- Payload
// +---------------+
typedef struct {
_z_timestamp_t _timestamp;
_z_value_t _value;
_z_source_info_t _ext_source_info;
z_consolidation_mode_t _ext_consolidation;
#if Z_FEATURE_ATTACHMENT == 1
_z_owned_encoded_attachment_t _ext_attachment;
#endif
} _z_msg_reply_t;
void _z_msg_reply_clear(_z_msg_reply_t *msg);
#define _Z_FLAG_Z_R_T 0x20
#define _Z_FLAG_Z_R_E 0x40

// Flags:
// - T: Timestamp If T==1 then the timestamp if present
// - I: Infrastructure If I==1 then the error is related to the infrastructure else to the user
// - Z: Extension If Z==1 then at least one extension is present
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |Z|I|T| ERR |
// +-+-+-+---------+
// % code:z16 %
// +---------------+
// ~ ts: <u8;z16> ~ if T==1
// % encoding %
// +---------------+
// ~ [err_exts] ~ if Z==1
// +---------------+
#define _Z_FLAG_Z_E_T 0x20
#define _Z_FLAG_Z_E_I 0x40
/// ~ pl: <u8;z32> ~ Payload
/// +---------------+
#define _Z_FLAG_Z_E_E 0x40
typedef struct {
uint16_t _code;
_Bool _is_infrastructure;
_z_timestamp_t _timestamp;
_z_encoding_t encoding;
_z_source_info_t _ext_source_info;
_z_value_t _ext_value;
_z_bytes_t _payload;
} _z_msg_err_t;
void _z_msg_err_clear(_z_msg_err_t *err);

Expand Down Expand Up @@ -166,31 +133,60 @@ void _z_msg_put_clear(_z_msg_put_t *);
/*------------------ Query Message ------------------*/
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |Z|C|P| QUERY |
// |Z|P|C| QUERY |
// +-+-+-+---------+
// ~ params ~ if P==1 -- <utf8;z32>
// +---------------+
// ~ consolidation ~ if C==1 -- u8
// +---------------+
// ~ params ~ if P==1 -- <utf8;z16>
// +---------------+
// ~ [qry_exts] ~ if Z==1
// +---------------+
#define _Z_FLAG_Z_Q_P 0x20 // 1 << 6 | Period if P==1 then a period is present
#define _Z_FLAG_Z_Q_C 0x20 // 1 << 5 | Consolidation if C==1 then consolidation is present
#define _Z_FLAG_Z_Q_P 0x40 // 1 << 6 | Params if P==1 then parameters are present
typedef struct {
_z_bytes_t _parameters;
_z_source_info_t _ext_info;
_z_value_t _ext_value;
z_consolidation_mode_t _ext_consolidation;
z_consolidation_mode_t _consolidation;
#if Z_FEATURE_ATTACHMENT == 1
_z_owned_encoded_attachment_t _ext_attachment;
#endif
} _z_msg_query_t;
typedef struct {
_Bool info;
_Bool body;
_Bool consolidation;
_Bool attachment;
} _z_msg_query_reqexts_t;
_z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg);
void _z_msg_query_clear(_z_msg_query_t *msg);

typedef struct {
_Bool _is_put;
union {
_z_msg_del_t _del;
_z_msg_put_t _put;
} _body;
} _z_reply_body_t;
// Flags:
// - C: Consolidation If C==1 then consolidation is present
// - X: Reserved
// - Z: Extension If Z==1 then at least one extension is present
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |Z|X|C| REPLY |
// +-+-+-+---------+
// ~ consolidation ~ if C==1
// +---------------+
// ~ [repl_exts] ~ if Z==1
// +---------------+
// ~ ReplyBody ~ -- Payload
// +---------------+
typedef struct {
z_consolidation_mode_t _consolidation;
_z_reply_body_t _body;
} _z_msg_reply_t;
void _z_msg_reply_clear(_z_msg_reply_t *msg);
#define _Z_FLAG_Z_R_C 0x20

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_MESSAGE_H */
10 changes: 2 additions & 8 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,7 @@ typedef struct {
_z_n_msg_request_exts_t _z_n_msg_request_needed_exts(const _z_n_msg_request_t *msg);
void _z_n_msg_request_clear(_z_n_msg_request_t *msg);

typedef struct {
_Bool _is_put;
union {
_z_msg_del_t _del;
_z_msg_put_t _put;
} _body;
} _z_push_body_t;
typedef _z_reply_body_t _z_push_body_t;
_z_push_body_t _z_push_body_null(void);
_z_push_body_t _z_push_body_steal(_z_push_body_t *msg);
void _z_push_body_clear(_z_push_body_t *msg);
Expand Down Expand Up @@ -236,7 +230,7 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt
z_attachment_t attachment
#endif
);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key);
_z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration);
Expand Down
3 changes: 1 addition & 2 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t

int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind,
const _z_timestamp_t timestamp);
const _z_msg_put_t *msg);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
2 changes: 1 addition & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
.len = payload_len,
},
.encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}};
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value);
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT);
return _Z_ERR_GENERIC;
}
#endif
Expand Down
44 changes: 26 additions & 18 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
return _Z_RES_OK;
}

int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) {
int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind) {
int8_t ret = _Z_RES_OK;

_z_keyexpr_t q_ke;
Expand All @@ -359,23 +360,30 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val
// Build the reply context decorator. This is NOT the final reply.
_z_id_t zid = ((_z_session_t *)query->_zn)->_local_zid;
_z_keyexpr_t ke = _z_keyexpr_alias(keyexpr);
_z_zenoh_message_t z_msg = {
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_request_id,
._key = ke,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _Z_N_QOS_DEFAULT,
._ext_timestamp = _z_timestamp_null(),
._tag = _Z_RESPONSE_BODY_REPLY,
._body._reply = {._value = payload,
._timestamp = _z_timestamp_null(),
._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO,
._ext_source_info = _z_source_info_null()},
},
};

_z_zenoh_message_t z_msg;
switch (kind) {
default:
return _Z_ERR_GENERIC;
break;
case Z_SAMPLE_KIND_PUT:
z_msg._tag = _Z_N_RESPONSE;
z_msg._body._response._request_id = query->_request_id;
z_msg._body._response._key = ke;
z_msg._body._response._ext_responder._zid = zid;
z_msg._body._response._ext_responder._eid = 0;
z_msg._body._response._ext_qos = _Z_N_QOS_DEFAULT;
z_msg._body._response._ext_timestamp = _z_timestamp_null();
z_msg._body._response._tag = _Z_RESPONSE_BODY_REPLY;
z_msg._body._response._body._reply._consolidation = Z_CONSOLIDATION_MODE_DEFAULT;
z_msg._body._response._body._reply._body._is_put = true;
z_msg._body._response._body._reply._body._body._put._payload = payload.payload;
z_msg._body._response._body._reply._body._body._put._encoding = payload.encoding;
z_msg._body._response._body._reply._body._body._put._attachment.body.decoded = z_attachment_null();
z_msg._body._response._body._reply._body._body._put._attachment.is_encoded = false;
z_msg._body._response._body._reply._body._body._put._commons._timestamp = _z_timestamp_null();
z_msg._body._response._body._reply._body._body._put._commons._source_info = _z_source_info_null();
break;
}
if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
Expand Down
Loading

0 comments on commit 8311045

Please sign in to comment.