Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement error reply support #523

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ Enums
.. autocenum:: constants.h::z_sample_kind_t
.. autocenum:: constants.h::z_consolidation_mode_t
.. autocenum:: constants.h::z_reliability_t
.. autocenum:: constants.h::z_reply_tag_t
.. autocenum:: constants.h::z_congestion_control_t
.. autocenum:: constants.h::z_priority_t
.. autocenum:: constants.h::z_query_target_t
Expand Down Expand Up @@ -426,6 +425,10 @@ Primitives
.. autocfunction:: primitives.h::z_undeclare_queryable
.. autocfunction:: primitives.h::z_query_reply_options_default
.. autocfunction:: primitives.h::z_query_reply
.. autocfunction:: primitives.h::z_query_reply_del_options_default
.. autocfunction:: primitives.h::z_query_reply_del
.. autocfunction:: primitives.h::z_query_reply_err_options_default
.. autocfunction:: primitives.h::z_query_reply_err
.. autocfunction:: primitives.h::z_keyexpr_from_str
.. autocfunction:: primitives.h::z_keyexpr_from_substr
.. autocfunction:: primitives.h::z_keyexpr_from_str_autocanonize
Expand Down
6 changes: 5 additions & 1 deletion examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
z_string_data(z_loan(replystr)));
z_drop(z_move(replystr));
} else {
printf(">> Received an error\n");
const z_loaned_reply_err_t *err = z_reply_err(reply);
z_owned_string_t errstr;
z_bytes_deserialize_into_string(z_reply_err_payload(err), &errstr);
printf(">> Received an error: %s\n", z_string_data(z_loan(errstr)));
z_drop(z_move(errstr));
}
}

Expand Down
34 changes: 23 additions & 11 deletions examples/unix/c11/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#if Z_FEATURE_QUERYABLE == 1
const char *keyexpr = "demo/example/zenoh-pico-queryable";
const char *value = "Queryable from Pico!";
const char *error = "Demo error";
static int msg_nb = 0;
static z_sample_kind_t reply_kind = Z_SAMPLE_KIND_PUT;
static enum { REPLY_DATA, REPLY_DELETE, REPLY_ERR } reply_kind = REPLY_DATA;
bool reply_err = false;

void query_handler(const z_loaned_query_t *query, void *ctx) {
(void)(ctx);
Expand All @@ -40,20 +42,27 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
}
z_drop(z_move(payload_string));

// Reply value encoding
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

switch (reply_kind) {
case Z_SAMPLE_KIND_PUT:
case REPLY_DATA: {
// Reply value encoding
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), NULL);
break;
case Z_SAMPLE_KIND_DELETE:
}
case REPLY_DELETE: {
z_query_reply_del(query, z_query_keyexpr(query), NULL);
break;
default:
printf("Unknown reply kind\n");
}
case REPLY_ERR: {
// Reply error encoding
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, error);

z_query_reply_err(query, z_move(reply_payload), NULL);
break;
}
}
msg_nb++;
}
Expand All @@ -65,7 +74,7 @@ int main(int argc, char **argv) {
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:d")) != -1) {
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:df")) != -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df -> d:f ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, ":" means that option value expected
https://www.man7.org/linux/man-pages/man3/getopt.3.html

switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -86,7 +95,10 @@ int main(int argc, char **argv) {
n = atoi(optarg);
break;
case 'd':
reply_kind = Z_SAMPLE_KIND_DELETE;
reply_kind = REPLY_DELETE;
break;
case 'f':
reply_kind = REPLY_ERR;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
Expand Down
10 changes: 0 additions & 10 deletions include/zenoh-pico/api/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,6 @@ typedef enum {
typedef enum { Z_RELIABILITY_BEST_EFFORT = 1, Z_RELIABILITY_RELIABLE = 0 } z_reliability_t;
#define Z_RELIABILITY_DEFAULT Z_RELIABILITY_RELIABLE

/**
* Reply tag values.
*
* Enumerators:
* Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
*/
typedef enum { Z_REPLY_TAG_DATA = 0, Z_REPLY_TAG_FINAL = 1 } z_reply_tag_t;

/**
* Congestion control values.
*
Expand Down
28 changes: 28 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,34 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options);
*/
int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options);

/**
* Builds a :c:type:`z_query_reply_err_options_t` with default values.
*
* Parameters:
* options: Pointer to an uninitialized :c:type:`z_query_reply_err_options_t`.
*/
void z_query_reply_err_options_default(z_query_reply_err_options_t *options);

/**
* Sends a reply error to a query.
*
* This function must be called inside of a :c:type:`z_owned_closure_query_t` callback associated to the
* :c:type:`z_owned_queryable_t`, passing the received query as parameters of the callback function. This function can
* be called multiple times to send multiple replies to a query. The reply will be considered complete when the callback
* returns.
*
* Parameters:
* query: Pointer to a :c:type:`z_loaned_query_t` to reply.
* payload: Pointer to the reply error data.
* options: Pointer to a :c:type:`z_query_reply_err_options_t` to configure the reply error.
*
* Return:
* ``0`` if reply operation successful, ``negative value`` otherwise.
*/
int8_t z_query_reply_err(const z_loaned_query_t *query, z_owned_bytes_t *payload,
const z_query_reply_err_options_t *options);

#endif

/**
Expand Down
10 changes: 10 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ typedef struct {
z_owned_bytes_t *attachment;
} z_query_reply_del_options_t;

/**
* Represents the configuration used to configure a query reply error sent via :c:func:`z_query_reply_err.
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
*/
typedef struct {
z_owned_encoding_t *encoding;
} z_query_reply_err_options_t;

/**
* Represents the configuration used to configure a put operation sent via via :c:func:`z_put`.
*
Expand Down
14 changes: 14 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const
const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
/**
* Send a reply error to a query.
*
* This function must be called inside of a Queryable callback passing the
* query received as parameters of the callback function. This function can
* be called multiple times to send multiple replies to a query. The reply
* will be considered complete when the Queryable callback returns.
*
* Parameters:
* query: The query to reply to. The caller keeps its ownership.
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
*/
int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_value_t payload);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
19 changes: 16 additions & 3 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*
*/
typedef struct _z_reply_data_t {
_z_value_t error;
_z_sample_t sample;
_z_id_t replier_id;
} _z_reply_data_t;
Expand All @@ -43,28 +44,40 @@ _z_reply_t _z_reply_move(_z_reply_t *src_reply);
_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t)

/**
* Reply tag values.
*
* Enumerators:
* _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
* _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error
*/
typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR = 2 } _z_reply_tag_t;

/**
* An reply to a :c:func:`z_query`.
*
* Members:
* _z_reply_t_Tag tag: Indicates if the reply contains data or if it's a FINAL reply.
* _z_reply_data_t data: The reply data if :c:member:`_z_reply_t.tag` equals
* :c:member:`_z_reply_t_Tag.Z_REPLY_TAG_DATA`.
* :c:member:`_z_reply_t_Tag._Z_REPLY_TAG_DATA`.
*
*/
typedef struct _z_reply_t {
_z_reply_data_t data;
z_reply_tag_t _tag;
_z_reply_tag_t _tag;
} _z_reply_t;

_z_reply_t _z_reply_null(void);
_Bool _z_reply_check(const _z_reply_t *reply);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);
_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding);

typedef struct _z_pending_reply_t {
_z_reply_t _reply;
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/// +---------------+
#define _Z_FLAG_Z_E_E 0x40
typedef struct {
_z_encoding_t encoding;
_z_encoding_t _encoding;
_z_source_info_t _ext_source_info;
_z_bytes_t _payload;
} _z_msg_err_t;
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t
int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
_z_msg_put_t *msg, z_sample_kind_t kind);
int8_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply);

int8_t _z_trigger_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *error);

int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final);

#endif /* ZENOH_PICO_SESSION_REPLY_H */
40 changes: 30 additions & 10 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1218,19 +1218,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co
return ret;
}

_Bool z_reply_is_ok(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
// For the moment always return TRUE.
// FIXME: The support for reply errors will come in the next release.
return true;
}
_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->_tag != _Z_REPLY_TAG_ERROR; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this modification, this PR probably closes #345 as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems, yes


const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; }

const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
return NULL;
}
const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data.error; }
#endif

#if Z_FEATURE_QUERYABLE == 1
Expand Down Expand Up @@ -1361,6 +1353,34 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t
z_bytes_drop(opts.attachment);
return ret;
}

void z_query_reply_err_options_default(z_query_reply_err_options_t *options) { options->encoding = NULL; }

int8_t z_query_reply_err(const z_loaned_query_t *query, z_owned_bytes_t *payload,
const z_query_reply_err_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn);
if (sess_rc.in == NULL) {
return _Z_ERR_CONNECTION_CLOSED;
}
z_query_reply_err_options_t opts;
if (options == NULL) {
z_query_reply_err_options_default(&opts);
} else {
opts = *options;
}
// Set value
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(payload),
.encoding = _z_encoding_from_owned(opts.encoding)};

int8_t ret = _z_send_reply_err(&query->in->val, &sess_rc, value);
if (payload != NULL) {
z_bytes_drop(payload);
}
// Clean-up
z_encoding_drop(opts.encoding);
return ret;
}
#endif

int8_t z_keyexpr_from_str_autocanonize(z_owned_keyexpr_t *key, const char *name) {
Expand Down
30 changes: 30 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,36 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_ke

return ret;
}

int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_value_t payload) {
int8_t ret = _Z_RES_OK;
_z_session_t *zn = &zsrc->in->val;

// Build the reply context decorator. This is NOT the final reply.
_z_id_t zid = zn->_local_zid;
_z_zenoh_message_t msg = {
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_request_id,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _z_n_qos_make(false, true, Z_PRIORITY_DEFAULT),
._ext_timestamp = _z_timestamp_null(),
._tag = _Z_RESPONSE_BODY_ERR,
._body._err =
{
._payload = payload.payload,
._encoding = payload.encoding,
._ext_source_info = _z_source_info_null(),
},
},
};
if (_z_send_n_msg(zn, &msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}

return ret;
}
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
Loading
Loading