diff --git a/docs/api.rst b/docs/api.rst index 648dfb7fe..9828487af 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -27,7 +27,6 @@ Enums .. autocenum:: constants.h::z_sample_kind_t .. autocenum:: constants.h::z_consolidation_mode_t .. autocenum:: constants.h::z_reliability_t -.. autocenum:: constants.h::z_reply_tag_t .. autocenum:: constants.h::z_congestion_control_t .. autocenum:: constants.h::z_priority_t .. autocenum:: constants.h::z_query_target_t @@ -426,6 +425,10 @@ Primitives .. autocfunction:: primitives.h::z_undeclare_queryable .. autocfunction:: primitives.h::z_query_reply_options_default .. autocfunction:: primitives.h::z_query_reply +.. autocfunction:: primitives.h::z_query_reply_del_options_default +.. autocfunction:: primitives.h::z_query_reply_del +.. autocfunction:: primitives.h::z_query_reply_err_options_default +.. autocfunction:: primitives.h::z_query_reply_err .. autocfunction:: primitives.h::z_keyexpr_from_str .. autocfunction:: primitives.h::z_keyexpr_from_substr .. autocfunction:: primitives.h::z_keyexpr_from_str_autocanonize diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index 62dbb6537..242482b40 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -44,7 +44,11 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) { z_string_data(z_loan(replystr))); z_drop(z_move(replystr)); } else { - printf(">> Received an error\n"); + const z_loaned_reply_err_t *err = z_reply_err(reply); + z_owned_string_t errstr; + z_bytes_deserialize_into_string(z_reply_err_payload(err), &errstr); + printf(">> Received an error: %s\n", z_string_data(z_loan(errstr))); + z_drop(z_move(errstr)); } } diff --git a/examples/unix/c11/z_queryable.c b/examples/unix/c11/z_queryable.c index 34fade86c..2039231b7 100644 --- a/examples/unix/c11/z_queryable.c +++ b/examples/unix/c11/z_queryable.c @@ -21,8 +21,10 @@ #if Z_FEATURE_QUERYABLE == 1 const char *keyexpr = "demo/example/zenoh-pico-queryable"; const char *value = "Queryable from Pico!"; +const char *error = "Demo error"; static int msg_nb = 0; -static z_sample_kind_t reply_kind = Z_SAMPLE_KIND_PUT; +static enum { REPLY_DATA, REPLY_DELETE, REPLY_ERR } reply_kind = REPLY_DATA; +bool reply_err = false; void query_handler(const z_loaned_query_t *query, void *ctx) { (void)(ctx); @@ -40,20 +42,27 @@ void query_handler(const z_loaned_query_t *query, void *ctx) { } z_drop(z_move(payload_string)); - // Reply value encoding - z_owned_bytes_t reply_payload; - z_bytes_serialize_from_str(&reply_payload, value); - switch (reply_kind) { - case Z_SAMPLE_KIND_PUT: + case REPLY_DATA: { + // Reply value encoding + z_owned_bytes_t reply_payload; + z_bytes_serialize_from_str(&reply_payload, value); + z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), NULL); break; - case Z_SAMPLE_KIND_DELETE: + } + case REPLY_DELETE: { z_query_reply_del(query, z_query_keyexpr(query), NULL); break; - default: - printf("Unknown reply kind\n"); + } + case REPLY_ERR: { + // Reply error encoding + z_owned_bytes_t reply_payload; + z_bytes_serialize_from_str(&reply_payload, error); + + z_query_reply_err(query, z_move(reply_payload), NULL); break; + } } msg_nb++; } @@ -65,7 +74,7 @@ int main(int argc, char **argv) { int n = 0; int opt; - while ((opt = getopt(argc, argv, "k:e:m:v:l:n:d")) != -1) { + while ((opt = getopt(argc, argv, "k:e:m:v:l:n:df")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -86,7 +95,10 @@ int main(int argc, char **argv) { n = atoi(optarg); break; case 'd': - reply_kind = Z_SAMPLE_KIND_DELETE; + reply_kind = REPLY_DELETE; + break; + case 'f': + reply_kind = REPLY_ERR; break; case '?': if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' || diff --git a/include/zenoh-pico/api/constants.h b/include/zenoh-pico/api/constants.h index 2f396d4a5..d91fc3441 100644 --- a/include/zenoh-pico/api/constants.h +++ b/include/zenoh-pico/api/constants.h @@ -227,16 +227,6 @@ typedef enum { typedef enum { Z_RELIABILITY_BEST_EFFORT = 1, Z_RELIABILITY_RELIABLE = 0 } z_reliability_t; #define Z_RELIABILITY_DEFAULT Z_RELIABILITY_RELIABLE -/** - * Reply tag values. - * - * Enumerators: - * Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data. - * Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more - * replies for this query. - */ -typedef enum { Z_REPLY_TAG_DATA = 0, Z_REPLY_TAG_FINAL = 1 } z_reply_tag_t; - /** * Congestion control values. * diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 9ee3271ba..eeefc0ef9 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1780,6 +1780,34 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options); */ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, const z_query_reply_del_options_t *options); + +/** + * Builds a :c:type:`z_query_reply_err_options_t` with default values. + * + * Parameters: + * options: Pointer to an uninitialized :c:type:`z_query_reply_err_options_t`. + */ +void z_query_reply_err_options_default(z_query_reply_err_options_t *options); + +/** + * Sends a reply error to a query. + * + * This function must be called inside of a :c:type:`z_owned_closure_query_t` callback associated to the + * :c:type:`z_owned_queryable_t`, passing the received query as parameters of the callback function. This function can + * be called multiple times to send multiple replies to a query. The reply will be considered complete when the callback + * returns. + * + * Parameters: + * query: Pointer to a :c:type:`z_loaned_query_t` to reply. + * payload: Pointer to the reply error data. + * options: Pointer to a :c:type:`z_query_reply_err_options_t` to configure the reply error. + * + * Return: + * ``0`` if reply operation successful, ``negative value`` otherwise. + */ +int8_t z_query_reply_err(const z_loaned_query_t *query, z_owned_bytes_t *payload, + const z_query_reply_err_options_t *options); + #endif /** diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 9a77fe1bf..f33a5378e 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -276,6 +276,16 @@ typedef struct { z_owned_bytes_t *attachment; } z_query_reply_del_options_t; +/** + * Represents the configuration used to configure a query reply error sent via :c:func:`z_query_reply_err. + * + * Members: + * z_owned_encoding_t *encoding: The encoding of the payload. + */ +typedef struct { + z_owned_encoding_t *encoding; +} z_query_reply_err_options_t; + /** * Represents the configuration used to configure a put operation sent via via :c:func:`z_put`. * diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index e950445e7..61b20f27a 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -204,6 +204,20 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment); +/** + * Send a reply error to a query. + * + * This function must be called inside of a Queryable callback passing the + * query received as parameters of the callback function. This function can + * be called multiple times to send multiple replies to a query. The reply + * will be considered complete when the Queryable callback returns. + * + * Parameters: + * query: The query to reply to. The caller keeps its ownership. + * key: The resource key of this reply. The caller keeps the ownership. + * payload: The value of this reply, the caller keeps ownership. + */ +int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_value_t payload); #endif #if Z_FEATURE_QUERY == 1 diff --git a/include/zenoh-pico/net/reply.h b/include/zenoh-pico/net/reply.h index a95a99c85..874b1b7ea 100644 --- a/include/zenoh-pico/net/reply.h +++ b/include/zenoh-pico/net/reply.h @@ -32,6 +32,7 @@ * */ typedef struct _z_reply_data_t { + _z_value_t error; _z_sample_t sample; _z_id_t replier_id; } _z_reply_data_t; @@ -43,18 +44,29 @@ _z_reply_t _z_reply_move(_z_reply_t *src_reply); _Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) _Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t) +/** + * Reply tag values. + * + * Enumerators: + * _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data. + * _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more + * replies for this query. + * _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error + */ +typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR = 2 } _z_reply_tag_t; + /** * An reply to a :c:func:`z_query`. * * Members: * _z_reply_t_Tag tag: Indicates if the reply contains data or if it's a FINAL reply. * _z_reply_data_t data: The reply data if :c:member:`_z_reply_t.tag` equals - * :c:member:`_z_reply_t_Tag.Z_REPLY_TAG_DATA`. + * :c:member:`_z_reply_t_Tag._Z_REPLY_TAG_DATA`. * */ typedef struct _z_reply_t { _z_reply_data_t data; - z_reply_tag_t _tag; + _z_reply_tag_t _tag; } _z_reply_t; _z_reply_t _z_reply_null(void); @@ -62,9 +74,10 @@ _Bool _z_reply_check(const _z_reply_t *reply); void _z_reply_clear(_z_reply_t *src); void _z_reply_free(_z_reply_t **hello); int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src); -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, +_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment); +_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding); typedef struct _z_pending_reply_t { _z_reply_t _reply; diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index dcd3eacc9..d58ec38f9 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -53,7 +53,7 @@ /// +---------------+ #define _Z_FLAG_Z_E_E 0x40 typedef struct { - _z_encoding_t encoding; + _z_encoding_t _encoding; _z_source_info_t _ext_source_info; _z_bytes_t _payload; } _z_msg_err_t; diff --git a/include/zenoh-pico/session/query.h b/include/zenoh-pico/session/query.h index 35b29847c..acdc3a6b3 100644 --- a/include/zenoh-pico/session/query.h +++ b/include/zenoh-pico/session/query.h @@ -27,6 +27,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq); int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr, _z_msg_put_t *msg, z_sample_kind_t kind); +int8_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg); int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id); void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq); void _z_flush_pending_queries(_z_session_t *zn); diff --git a/include/zenoh-pico/session/reply.h b/include/zenoh-pico/session/reply.h index 050f81ee5..719b2fee1 100644 --- a/include/zenoh-pico/session/reply.h +++ b/include/zenoh-pico/session/reply.h @@ -24,6 +24,8 @@ int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply); +int8_t _z_trigger_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *error); + int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final); #endif /* ZENOH_PICO_SESSION_REPLY_H */ diff --git a/src/api/api.c b/src/api/api.c index 5ec9ba3f7..d5734e7da 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1218,19 +1218,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co return ret; } -_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { - _ZP_UNUSED(reply); - // For the moment always return TRUE. - // FIXME: The support for reply errors will come in the next release. - return true; -} +_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->_tag != _Z_REPLY_TAG_ERROR; } const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; } -const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { - _ZP_UNUSED(reply); - return NULL; -} +const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data.error; } #endif #if Z_FEATURE_QUERYABLE == 1 @@ -1361,6 +1353,34 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t z_bytes_drop(opts.attachment); return ret; } + +void z_query_reply_err_options_default(z_query_reply_err_options_t *options) { options->encoding = NULL; } + +int8_t z_query_reply_err(const z_loaned_query_t *query, z_owned_bytes_t *payload, + const z_query_reply_err_options_t *options) { + // Try upgrading session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn); + if (sess_rc.in == NULL) { + return _Z_ERR_CONNECTION_CLOSED; + } + z_query_reply_err_options_t opts; + if (options == NULL) { + z_query_reply_err_options_default(&opts); + } else { + opts = *options; + } + // Set value + _z_value_t value = {.payload = _z_bytes_from_owned_bytes(payload), + .encoding = _z_encoding_from_owned(opts.encoding)}; + + int8_t ret = _z_send_reply_err(&query->in->val, &sess_rc, value); + if (payload != NULL) { + z_bytes_drop(payload); + } + // Clean-up + z_encoding_drop(opts.encoding); + return ret; +} #endif int8_t z_keyexpr_from_str_autocanonize(z_owned_keyexpr_t *key, const char *name) { diff --git a/src/net/primitives.c b/src/net/primitives.c index b8019b936..b34998f1c 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -406,6 +406,36 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_ke return ret; } + +int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_value_t payload) { + int8_t ret = _Z_RES_OK; + _z_session_t *zn = &zsrc->in->val; + + // Build the reply context decorator. This is NOT the final reply. + _z_id_t zid = zn->_local_zid; + _z_zenoh_message_t msg = { + ._tag = _Z_N_RESPONSE, + ._body._response = + { + ._request_id = query->_request_id, + ._ext_responder = {._zid = zid, ._eid = 0}, + ._ext_qos = _z_n_qos_make(false, true, Z_PRIORITY_DEFAULT), + ._ext_timestamp = _z_timestamp_null(), + ._tag = _Z_RESPONSE_BODY_ERR, + ._body._err = + { + ._payload = payload.payload, + ._encoding = payload.encoding, + ._ext_source_info = _z_source_info_null(), + }, + }, + }; + if (_z_send_n_msg(zn, &msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + ret = _Z_ERR_TRANSPORT_TX_FAILED; + } + + return ret; +} #endif #if Z_FEATURE_QUERY == 1 diff --git a/src/net/reply.c b/src/net/reply.c index 2b9501319..941cf2a4d 100644 --- a/src/net/reply.c +++ b/src/net/reply.c @@ -25,7 +25,7 @@ _z_reply_data_t _z_reply_data_null(void) { } _z_reply_t _z_reply_null(void) { - _z_reply_t r = {._tag = Z_REPLY_TAG_DATA, .data = _z_reply_data_null()}; + _z_reply_t r = {._tag = _Z_REPLY_TAG_DATA, .data = _z_reply_data_null()}; return r; } @@ -92,12 +92,12 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) { _z_timestamp_clear(&pr->_tstamp); } -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, +_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) { _z_reply_t reply = _z_reply_null(); reply._tag = tag; - if (tag == Z_REPLY_TAG_DATA) { + if (tag == _Z_REPLY_TAG_DATA) { reply.data.replier_id = id; // Create reply sample reply.data.sample.keyexpr = _z_keyexpr_steal(&keyexpr); @@ -109,8 +109,16 @@ _z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, } return reply; } + +_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding) { + _z_reply_t reply = _z_reply_null(); + reply._tag = _Z_REPLY_TAG_ERROR; + _z_bytes_copy(&reply.data.error.payload, &payload); + _z_encoding_move(&reply.data.error.encoding, encoding); + return reply; +} #else -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, +_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) { _ZP_UNUSED(keyexpr); @@ -123,4 +131,10 @@ _z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, _ZP_UNUSED(attachment); return _z_reply_null(); } + +_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding) { + _ZP_UNUSED(payload); + _ZP_UNUSED(encoding); + return _z_reply_null(); +} #endif diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index 343093d71..73e552816 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -532,7 +532,7 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) { uint8_t header = _Z_MID_Z_ERR; // Encode header - _Bool has_encoding = _z_encoding_check(&err->encoding); + _Bool has_encoding = _z_encoding_check(&err->_encoding); if (has_encoding) { _Z_SET_FLAG(header, _Z_FLAG_Z_E_E); } @@ -544,7 +544,7 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); // Encode encoding if (has_encoding) { - _Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &err->encoding)); + _Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &err->_encoding)); } // Encode extensions if (has_sinfo_ext) { @@ -576,7 +576,7 @@ int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) { *err = (_z_msg_err_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_E_E)) { - _Z_RETURN_IF_ERR(_z_encoding_decode(&err->encoding, zbf)); + _Z_RETURN_IF_ERR(_z_encoding_decode(&err->_encoding, zbf)); } if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_err_decode_extension, err)); diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 9de920ae3..68bf5d225 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -373,7 +373,7 @@ int8_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t head /*------------------ Response Final Message ------------------*/ int8_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t *msg) { int8_t ret = _Z_RES_OK; - _Z_DEBUG("Encoding _Z_MID_N_RESPONSE"); + _Z_DEBUG("Encoding _Z_MID_N_RESPONSE_FINAL"); uint8_t header = _Z_MID_N_RESPONSE_FINAL; _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_request_id)); diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index 27acf439b..3001b3372 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -43,6 +43,6 @@ void _z_msg_query_clear(_z_msg_query_t *msg) { _z_value_clear(&msg->_ext_value); } void _z_msg_err_clear(_z_msg_err_t *err) { - _z_encoding_clear(&err->encoding); + _z_encoding_clear(&err->_encoding); _z_bytes_drop(&err->_payload); } diff --git a/src/session/query.c b/src/session/query.c index 05996914f..18eb303a2 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -112,7 +112,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons } // Build the reply - _z_reply_t reply = _z_reply_create(expanded_ke, Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload, + _z_reply_t reply = _z_reply_create(expanded_ke, _Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload, &msg->_commons._timestamp, &msg->_encoding, kind, msg->_attachment); _Bool drop = false; @@ -177,6 +177,36 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons return ret; } +int8_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg) { + int8_t ret = _Z_RES_OK; + + _zp_session_lock_mutex(zn); + + _z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id); + if ((ret == _Z_RES_OK) && (pen_qry == NULL)) { + ret = _Z_ERR_ENTITY_UNKNOWN; + } + + // Build the reply + _z_reply_t reply = _z_reply_err_create(msg->_payload, &msg->_encoding); + + _zp_session_unlock_mutex(zn); + + // Trigger the user callback + if (ret == _Z_RES_OK) { + _z_reply_t cb_reply = _z_reply_null(); + cb_reply = _z_reply_move(&reply); + pen_qry->_callback(&cb_reply, pen_qry->_arg); + _z_reply_clear(&cb_reply); + } + + if (ret != _Z_RES_OK) { + _z_reply_clear(&reply); + } + + return ret; +} + int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) { int8_t ret = _Z_RES_OK; diff --git a/src/session/reply.c b/src/session/reply.c index 79e256e65..19835aa49 100644 --- a/src/session/reply.c +++ b/src/session/reply.c @@ -36,6 +36,21 @@ int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key return ret; } +int8_t _z_trigger_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *error) { + int8_t ret = _Z_RES_OK; + + // TODO check id to know where to dispatch + +#if Z_FEATURE_QUERY == 1 + ret = _z_trigger_query_reply_err(zn, id, error); +#else + _ZP_UNUSED(zn); + _ZP_UNUSED(id); + _ZP_UNUSED(error); +#endif + return ret; +} + int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final) { int8_t ret = _Z_RES_OK; diff --git a/src/session/rx.c b/src/session/rx.c index 6ff784618..de49473b3 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -132,12 +132,8 @@ int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, ret = _z_trigger_reply_partial(zn, response->_request_id, response->_key, reply); } break; case _Z_RESPONSE_BODY_ERR: { - // @TODO: expose zenoh errors to the user - _z_msg_err_t error = response->_body._err; - _z_slice_t payload = _z_bytes_try_get_contiguous(&error._payload); - _ZP_UNUSED(payload); // Unused when logs are deactivated - _Z_ERROR("Received Err for query %zu: message=%.*s", response->_request_id, (int)payload.len, - payload.start); + _z_msg_err_t *error = &response->_body._err; + ret = _z_trigger_reply_err(zn, response->_request_id, error); } break; } } break; diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 6412d17c5..30a55aae3 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1266,14 +1266,14 @@ void query_message(void) { _z_msg_err_t gen_err(void) { size_t len = 1 + gen_uint8(); return (_z_msg_err_t){ - .encoding = gen_encoding(), + ._encoding = gen_encoding(), ._ext_source_info = gen_bool() ? gen_source_info() : _z_source_info_null(), ._payload = gen_payload(len), // Hangs if 0 }; } void assert_eq_err(const _z_msg_err_t *left, const _z_msg_err_t *right) { - assert_eq_encoding(&left->encoding, &right->encoding); + assert_eq_encoding(&left->_encoding, &right->_encoding); assert_eq_source_info(&left->_ext_source_info, &right->_ext_source_info); assert_eq_bytes(&left->_payload, &right->_payload); }