Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move encoding value on receive side. #490

Merged
merged 6 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ typedef struct {
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
uint32_t request_id, const _z_bytes_t attachment);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
void _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);

typedef struct _z_pending_reply_t {
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src);
_z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ typedef struct {
_z_value_t _z_value_null(void);
_z_value_t _z_value_steal(_z_value_t *value);
int8_t _z_value_copy(_z_value_t *dst, const _z_value_t *src);
void _z_value_move(_z_value_t *dst, _z_value_t *src);
void _z_value_clear(_z_value_t *src);
void _z_value_free(_z_value_t **hello);

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t

int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
const _z_msg_put_t *msg);
_z_msg_put_t *msg);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key);

_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
const _z_bytes_t attachment);
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q);
void _z_flush_session_queryable(_z_session_t *zn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
Expand Down
4 changes: 2 additions & 2 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void _z_query_free(_z_query_t **query) {
}

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
uint32_t request_id, const _z_bytes_t attachment) {
_z_query_t q = _z_query_null();
q._request_id = request_id;
Expand All @@ -71,7 +71,7 @@ _z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_
q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
q._key = _z_keyexpr_steal(key);
_z_bytes_copy(&q.attachment, &attachment);
_z_value_copy(&q._value, value); // FIXME: Move encoding, Issue #482
_z_value_move(&q._value, value);
return q;
}

Expand Down
6 changes: 3 additions & 3 deletions src/net/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) {
}

_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_z_reply_t reply = _z_reply_null();
reply._tag = tag;
Expand All @@ -96,13 +96,13 @@ _z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id,
reply.data.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data.sample.payload, &payload);
_z_bytes_copy(&reply.data.sample.attachment, &attachment);
_z_encoding_copy(&reply.data.sample.encoding, &encoding); // FIXME: Move encoding, Issue #482
_z_encoding_move(&reply.data.sample.encoding, encoding);
}
return reply;
}
#else
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(tag);
Expand Down
6 changes: 3 additions & 3 deletions src/net/sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src) {

#if Z_FEATURE_SUBSCRIPTION == 1
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment) {
_z_sample_t s = _z_sample_null();
s.keyexpr = _z_keyexpr_steal(key);
Expand All @@ -88,12 +88,12 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const
s.qos = qos;
_z_bytes_copy(&s.payload, &payload);
_z_bytes_copy(&s.attachment, &attachment);
_z_encoding_copy(&s.encoding, &encoding); // FIXME: Move encoding, Issue #482
_z_encoding_move(&s.encoding, encoding);
return s;
}
#else
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment) {
_ZP_UNUSED(key);
_ZP_UNUSED(payload);
Expand Down
5 changes: 5 additions & 0 deletions src/protocol/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@ int8_t _z_hello_copy(_z_hello_t *dst, const _z_hello_t *src) {
_z_hello_t _z_hello_null(void) {
return (_z_hello_t){.zid = _z_id_empty(), .version = 0, .whatami = 0x0, .locators = _z_string_svec_make(0)};
}

void _z_value_move(_z_value_t *dst, _z_value_t *src) {
_z_encoding_move(&dst->encoding, &src->encoding);
_z_bytes_move(&dst->payload, &src->payload);
}
2 changes: 1 addition & 1 deletion src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body);
void _z_msg_put_clear(_z_msg_put_t *msg) {
_z_bytes_drop(&msg->_payload);
_z_bytes_drop(&msg->_attachment);
_z_encoding_clear(&msg->_encoding); // FIXME: Remove when possible, Issue #482
_z_encoding_clear(&msg->_encoding);
_z_timestamp_clear(&msg->_commons._timestamp);
}

Expand Down
15 changes: 11 additions & 4 deletions src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {

// TODO check body to know where to dispatch
#if Z_FEATURE_SUBSCRIPTION == 1
_z_bytes_t payload = push->_body._is_put ? push->_body._body._put._payload : _z_bytes_null();
_z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : _z_encoding_null();

size_t kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE;
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos,
push->_body._body._put._attachment);
if (push->_body._is_put) {
ret =
_z_trigger_subscriptions(zn, push->_key, push->_body._body._put._payload, &push->_body._body._put._encoding,
kind, push->_timestamp, push->_qos, push->_body._body._put._attachment);
} else {
_z_encoding_t encoding = _z_encoding_null();
_z_bytes_t payload = _z_bytes_null();
ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, push->_timestamp, push->_qos,
push->_body._body._put._attachment);
}
#else
_ZP_UNUSED(zn);
_ZP_UNUSED(push);
Expand Down
4 changes: 2 additions & 2 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
}

int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, const _z_keyexpr_t keyexpr,
const _z_msg_put_t *msg) {
_z_msg_put_t *msg) {
int8_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
Expand All @@ -115,7 +115,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons

// Build the reply
_z_reply_t reply = _z_reply_create(expanded_ke, Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload,
&msg->_commons._timestamp, msg->_encoding, Z_SAMPLE_KIND_PUT, msg->_attachment);
&msg->_commons._timestamp, &msg->_encoding, Z_SAMPLE_KIND_PUT, msg->_attachment);

// Verify if this is a newer reply, free the old one in case it is
if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) ||
Expand Down
2 changes: 1 addition & 1 deletion src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se
return ret;
}

int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid,
int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid,
const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;

Expand Down
62 changes: 31 additions & 31 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,28 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
switch (msg->_tag) {
case _Z_N_DECLARE: {
_Z_DEBUG("Handling _Z_N_DECLARE");
_z_n_msg_declare_t decl = msg->_body._declare;
switch (decl._decl._tag) {
_z_n_msg_declare_t *decl = &msg->_body._declare;
switch (decl->_decl._tag) {
case _Z_DECL_KEXPR: {
if (_z_register_resource(zn, decl._decl._body._decl_kexpr._keyexpr,
decl._decl._body._decl_kexpr._id, local_peer_id) == 0) {
if (_z_register_resource(zn, decl->_decl._body._decl_kexpr._keyexpr,
decl->_decl._body._decl_kexpr._id, local_peer_id) == 0) {
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}
} break;
case _Z_UNDECL_KEXPR: {
_z_unregister_resource(zn, decl._decl._body._undecl_kexpr._id, local_peer_id);
_z_unregister_resource(zn, decl->_decl._body._undecl_kexpr._id, local_peer_id);
} break;
case _Z_DECL_SUBSCRIBER: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_DECL_QUERYABLE: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_UNDECL_SUBSCRIBER: {
_z_interest_process_undeclares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_UNDECL_QUERYABLE: {
_z_interest_process_undeclares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_DECL_TOKEN: {
// TODO: add support or explicitly discard
Expand All @@ -72,10 +72,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_DECL_FINAL: {
// Check that interest id is valid
if (!decl.has_interest_id) {
if (!decl->has_interest_id) {
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_z_interest_process_declare_final(zn, decl._interest_id);
_z_interest_process_declare_final(zn, decl->_interest_id);
} break;
}
} break;
Expand All @@ -86,56 +86,56 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_N_REQUEST: {
_Z_DEBUG("Handling _Z_N_REQUEST");
_z_n_msg_request_t req = msg->_body._request;
switch (req._tag) {
_z_n_msg_request_t *req = &msg->_body._request;
switch (req->_tag) {
case _Z_REQUEST_QUERY: {
#if Z_FEATURE_QUERYABLE == 1
_z_msg_query_t *query = &req._body._query;
ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid,
req._body._query._ext_attachment);
_z_msg_query_t *query = &req->_body._query;
ret = _z_trigger_queryables(zn, query, req->_key, (uint32_t)req->_rid,
req->_body._query._ext_attachment);
#else
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported");
#endif
} break;
case _Z_REQUEST_PUT: {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_put_t put = req._body._put;
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp, req._ext_qos, put._attachment);
_z_msg_put_t put = req->_body._put;
ret = _z_trigger_subscriptions(zn, req->_key, put._payload, &put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp, req->_ext_qos, put._attachment);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req._rid);
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
}
} break;
case _Z_REQUEST_DEL: {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_del_t del = req._body._del;
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_null(), _z_encoding_null(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos,
_z_bytes_null());
_z_msg_del_t del = req->_body._del;
_z_encoding_t encoding = _z_encoding_null();
ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE,
del._commons._timestamp, req->_ext_qos, _z_bytes_null());
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req._rid);
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
}
} break;
}
} break;
case _Z_N_RESPONSE: {
_Z_DEBUG("Handling _Z_N_RESPONSE");
_z_n_msg_response_t response = msg->_body._response;
switch (response._tag) {
_z_n_msg_response_t *response = &msg->_body._response;
switch (response->_tag) {
case _Z_RESPONSE_BODY_REPLY: {
_z_msg_reply_t *reply = &response._body._reply;
ret = _z_trigger_reply_partial(zn, response._request_id, response._key, reply);
_z_msg_reply_t *reply = &response->_body._reply;
ret = _z_trigger_reply_partial(zn, response->_request_id, response->_key, reply);
} break;
case _Z_RESPONSE_BODY_ERR: {
// @TODO: expose zenoh errors to the user
_z_msg_err_t error = response._body._err;
_z_msg_err_t error = response->_body._err;
_z_slice_t payload = _z_bytes_try_get_contiguous(&error._payload);
_ZP_UNUSED(payload); // Unused when logs are deactivated
_Z_ERROR("Received Err for query %zu: message=%.*s", response._request_id, (int)payload.len,
_Z_ERROR("Received Err for query %zu: message=%.*s", response->_request_id, (int)payload.len,
payload.start);
} break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
_z_encoding_t encoding = _z_encoding_null();
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, _z_timestamp_null(), qos,
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, &encoding, Z_SAMPLE_KIND_PUT, _z_timestamp_null(), qos,
attachment);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;

Expand Down
Loading